## **LATAM Challenge - Data Ingest, Storage and Processing with Google Drive, Google Cloud Storage and Google BigQuery in Google Colab (Jupyter) with Python 3.10**

**Welcome to the Data Engineer Challenge.** On this occasion, you will have the opportunity to get closer to the reality of the role, demonstrate your skills and knowledge in data processing with Python and different data structures.

**Preparation:**

1. Initial project commit (done with GitHub desktop)
2. Install Git Flow with `brew install git-flow`
3. Configure the repository with `git flow init`
4. Configure feature finishes to be done only in develop with `git config gitflow.feature.finish.keepremote true`
5. Synchronize the repository with GDrive
6. Read the code from GDrive with Colab

**Additional Notes:**

* The `README.md` file mentions using GitHub Desktop, Git Flow, and Colab. These are tools that can be used for version control and code collaboration.
* The `README.md` file also mentions measuring time and memory. This can be done using Python's built-in `time` and `memory_profiler` modules.
* English was used for both documentation and code.

## **Challenge Guidelines:**

**Repository:**

* Your solution must be in a public repository on the GitHub platform.

**Submitting your challenge:**

1. Make a POST request to [https://advana-challenge-check-api-cr-k4hdbggvoq-uc.a.run.app/data-engineer](https://advana-challenge-check-api-cr-k4hdbggvoq-uc.a.run.app/data-engineer).
2. The request body should be a JSON object with the following fields:
    * `name`: Your full name
    * `mail`: Your email address
    * `github_url`: The URL of your GitHub repository containing the solution

**Deadline:**

* The deadline for submitting the challenge is 5 calendar days after receiving the challenge.

**Technology and Techniques:**

* You can use any technology or technique you prefer for data processing.
* We will value your knowledge of cloud platforms.
* If you use cloud platforms, follow the steps in your files WITHOUT adding access credentials to the different services.

**Ranking Criteria:**

* Challenges that are clearly organized, explanatory, modular, efficient, and creative will be ranked higher.

**Assumptions and Documentation:**

* Write down the assumptions you are making.
* Include the versions of the libraries you are using in the requirements.txt file.
* Do not delete what is already written in the requirements.txt file.
* For this challenge, we recommend that you clearly describe how each part of your exercise can be improved.

**Data:**

* You must use the data contained in the provided file.
* You can use the official Twitter documentation to understand the data structure.

**Git Usage:**

* We will positively evaluate good practices of Git usage.
* Use the main branch for any final version you want us to review.
* We recommend that you use some GitFlow practice.
* Do not delete your development branches.

**Error Handling and Edge Cases:**

* Consider error handling and edge cases.

**Maintainability, Readability, and Scalability:**

* Remember that you will be working with other developers, so the maintainability, readability, and scalability of your code is essential.

**Code Documentation:**

* Good code documentation always helps the reader.

**Additional Notes:**

* The `README.md` file mentions using GitHub Desktop, Git Flow, and Colab. These are tools that can be used for version control and code collaboration.
* The `README.md` file also mentions measuring time and memory. This can be done using Python's built-in `time` and `memory_profiler` modules.

In [1]:
import sys

if __name__ != "__main__":
    sys.exit()

# General libraries
import logging
import os
import time

# Data related (if used later)
from typing import List, Tuple, Any, Optional
import datetime

## **Definitions and Configurations:**
The following code snippet defines constants used in the data transfer and processing pipeline:

- **Google Cloud Storage (GCS) Information:**
    - `BUCKET_NAME`: Specifies the name of the GCS bucket where data will be uploaded (`tw-gcp-public-lab`).
    - `FOLDER_NAME`: Denotes the folder within the bucket to store the uploaded file (`raw`).
    - `ZIP_FILE_NAME`: Represents the name of the compressed file containing tweets data (`tweets.json.zip`).
    - `GCS_SOURCE_URI`: Constructs the full URI for the file location in GCS after upload (`gs://tw-gcp-public-lab/raw/`).

- **Local File Paths:**
    - `SOURCE_PATH`: Currently defines a local file path (`/content/drive/Othercomputers/My Mac/latam-challenge`), but it's not used in the provided code for downloading.

- **Google Cloud Project and Dataset Information:**
    - `PROJECT_ID`: Specifies the Google Cloud project ID (`tw-techdash`).
    - `DATASET_NAME`: Defines the name of the BigQuery dataset where the data will be loaded (`tweets_dataset`).
    - `TABLE_NAME`: Identifies the name of the BigQuery table to store the extracted tweets data (`tweets`).

**Observations:**

- The `SOURCE_PATH` might require modification if you intend to download a file from a different location.
- Consider using environment variables or a configuration file to manage these constants, making your code more flexible and easier to maintain.

In [2]:
# Definitions

# Notebook time measure
START_TIME:str = os.environ.get("START_TIME", str(time.time()))
END_TIME:str = START_TIME

# Google Cloud Storage (GCS) information
BUCKET_NAME: str = os.environ.get("BUCKET_NAME", "tw-gcp-public-lab")
FOLDER_NAME: str = os.environ.get("FOLDER_NAME", "raw")
ZIP_FILE_NAME: str = os.environ.get("ZIP_FILE_NAME", "tweets.json.zip")
FILE_ID: str = os.environ.get("FILE_ID", "1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis")
GCS_SOURCE_URI: str = os.environ.get("GCS_SOURCE_URI", f"gs://{BUCKET_NAME}/{FOLDER_NAME}/")

# Local file paths (consider user input/environment variables)
MOUNT_POINT: str = os.environ.get("MOUNT_POINT", "/content/drive")
SOURCE_PATH: str = os.environ.get("SOURCE_PATH", "/content/drive/Othercomputers/My Mac/latam-challenge")

# Google Cloud project and dataset information (consider environment variables)
PROJECT_ID: str = os.environ.get("PROJECT_ID", "tw-techdash")
DATASET_NAME: str = os.environ.get("DATASET_NAME", "tweets_dataset")
TABLE_NAME: str = os.environ.get("TABLE_NAME", "tweets")

# Logging
LOGGING_LEVEL: str = os.environ.get("LOGGING_LEVEL", str(logging.DEBUG))
LOGGING_FILE: str = os.environ.get("LOGGING_FILE", f"{SOURCE_PATH}/notebook.log")

# Configurations
# Logging
logging.basicConfig(filename=LOGGING_FILE, level=int(LOGGING_LEVEL))

## **Jupyter Kernel code reloading**

**Functionality:**

* This code snippet utilizes magic commands within Jupyter Notebooks to manage code reloading.
* The `%reload_ext autoreload` line imports and activates the `autoreload` extension.
* The `%autoreload 2` line configures the `autoreload` extension to automatically reload Python modules when changes are detected.

**Key Concepts:**

* **Jupyter Magic Commands:** `%` prefix is used for magic commands that provide special functionality within Jupyter notebooks.
* **Autoreload Extension:**  A Jupyter extension that automatically reloads Python modules when changes are detected in the corresponding source files.
* **Reload Level:** The level `2` specifies that reload should occur when source files or any imported modules are modified (level 1 only reloads source file changes).

**Overall Assessment:**

* This code improves development efficiency within Jupyter notebooks by automatically reloading code, avoiding manual restarts.
* It leverages the `autoreload` extension for automatic reloading functionality.
* The configuration level `2` ensures comprehensive reloading behavior.

**Potential Enhancements:**

* While automatic reloading is helpful in development, it might not be suitable for production environments due to potential unexpected behavior during execution.
* Consider using this approach primarily for interactive development within Jupyter notebooks.

In [3]:
# Enable automatic reloading of modules in Jupyter Notebook (improves development workflow)
%reload_ext autoreload

# Automatically restart the kernel whenever the source code changes
# (Provides a clean development environment)
%autoreload 2

## **Google Drive mounting**

**Functionality:**

1. **Connects Google Drive:** This code establishes a connection between your Google Drive storage and the virtual machine running the Colab notebook.
2. **Navigates to Project Directory:** This magic command changes the working directory within the Colab notebook to a specific location within your project directory.

**Key Concepts:**

* **Google Drive Mounting:**
    - `from google.colab import drive`: Imports the `drive` module for interacting with Google Drive from Colab.
    - `drive.mount('/content/drive', force_remount=True)`: Mounts your Drive at the `/content/drive` path within Colab.
    - **Authorization:** Requires initial authorization to grant Colab access to your Drive.
* **Jupyter Notebook Magic Commands:**
    - `%cd`: A magic command specifically designed for changing directories.

**Overall Assessment:**

* **Convenient Data Access:** Enables seamless access to your personal data stored in Google Drive for use within Colab notebooks.
* **Improved Code Organization:** Helps organize your notebook within the project structure by focusing on a specific subdirectory (like "src").

**Potential Enhancements:**

* **Google Drive Mounting:**
    - **Error Handling:** Consider incorporating `try-except` blocks to gracefully handle potential mounting issues.
    - **Authentication Persistence:** Explore ways to persist the authentication token (if applicable) to avoid re-authorization for every session.
* **Navigation:**
    - **Clear Path Definitions:** Replace `{SOURCE_PATH}` with the actual path to your project directory for clarity.
    - **Error Handling:** Consider handling potential issues like non-existent directories using Python code (like `try-except` blocks).

**Explanation:**

1. **Mount Google Drive:** The first part of the code imports the `drive` module and mounts your Google Drive to the `/content/drive` directory within Colab. This allows you to access your Drive files from within your notebook.
2. **Change Directory:** The `%cd {SOURCE_PATH}/src` line uses a magic command to navigate to the subdirectory named "src" within your project directory (assuming `{SOURCE_PATH}` points to the correct location). This helps organize your notebook by focusing on the relevant project code.

**Important Notes:**

* Replace `{SOURCE_PATH}` with the actual path to your project directory on your machine.
* You'll need to go through an authorization process the first time you run the mounting code to grant Colab access to your Drive.

In [4]:
from google.colab import drive

def mount_google_drive(
    mount_point: str = '/content/drive'
) -> None:
  """Mounts Google Drive to the specified mount point.

  Args:
      mount_point (str, optional): The path to mount Google Drive. Defaults to '/content/drive'.

  Raises:
      RuntimeError: If there's an error mounting the drive.
  """
  logging.info(f"Attempting to mount Google Drive to {mount_point}")
  try:
    drive.mount(mount_point, force_remount=True)
    logging.info(f"Successfully mounted Google Drive to {mount_point}")
  except Exception as e:
    logging.error(f"Error mounting Google Drive: {e}")
    raise RuntimeError(f"Error mounting Google Drive: {e}")

# Mount Google Drive (optional: call the function)
mount_google_drive()

# Change directory using a more explicit method
logging.info(f"Changing directory to: {os.path.join(MOUNT_POINT, SOURCE_PATH, 'src')}")
os.chdir(os.path.join(MOUNT_POINT, SOURCE_PATH, 'src'))


Mounted at /content/drive


## **Libraries requirements**

**Functionality:**

- **Installs Python Libraries:** This code snippet installs a collection of Python libraries listed in a file named `requirements.txt` within the currently active virtual environment.

**Key Concepts:**

- **requirements.txt File:** This text file contains a list of library names and their version requirements, ensuring consistent installation across environments.
- **Virtual Environments:** Virtual environments isolate project dependencies, preventing conflicts with other Python projects on your system.
- **sys.executable:** This Python variable points to the path of the Python interpreter for the active virtual environment.
- **pip:** The Python Package Installer (pip) is used for managing Python packages and libraries.

**Explanation:**

1. **`import sys`**: Imports the `sys` module, providing access to system-specific variables and functions.
2. **`!{sys.executable} -m pip install -r '../requirements.txt'`**: This line calls the pip installer within the virtual environment:
   - **`!`**: Jupyter Notebook magic command to execute terminal commands.
   - **`{sys.executable}`**: Ensures pip is called from the virtual environment's Python interpreter.
   - **`-m`**: Designates a module to execute as a script (in this case, `pip`).
   - **`install -r`**: Instructs pip to install packages from a requirements file.
   - **`'../requirements.txt'`**: Specifies the path to the requirements file (relative to the current notebook's directory).

**Important Notes:**

- **Virtual Environment Activation:** Ensure you've activated the desired virtual environment before running this code.
- **Path to requirements.txt:** Verify that `../requirements.txt` correctly points to the file's location.
- **Internet Connection:** An internet connection is required for pip to download and install packages.

**Overall Assessment:**

- **Efficient Dependency Management:** Using `requirements.txt` is a best practice for managing project dependencies consistently.
- **Consistent Environments:** Facilitates consistent library installations across different machines for reproducibility.
- **Collaboration:** Enables easy setup of the same project environment for others.

**Potential Enhancements:**

- **Error Handling:** Consider incorporating error handling (like try-except blocks) to gracefully handle potential issues during installation, such as network connectivity problems or missing packages.


In [5]:
def install_requirements(
    requirements_path: str = "../requirements.txt"
) -> None:
    """Installs Python libraries from the specified requirements file.

    Args:
        requirements_path (str, optional): Path to the requirements.txt file. Defaults to "../requirements.txt".

    Returns:
        None
    """
    import subprocess

    try:
        subprocess.run([sys.executable, "-m", "pip", "install", "-r", requirements_path], check=True)
        print("Successfully installed libraries from requirements.txt")
    except subprocess.CalledProcessError as e:
        print(f"Error installing libraries: {e}")

# Call the install function
install_requirements()


Successfully installed libraries from requirements.txt


## **Ingest Google Drive ZIP into Google Storage**
**Key Functions:**

1. **authenticate_google_drive()**: Authenticates with Google Drive using the user's credentials.
2. **download_file_from_drive(drive_service, file_id)**: Downloads a specified file from Google Drive.
3. **upload_file_to_cloud_storage(client, bucket_name, folder_name, downloaded, zip_file_name)**: Uploads a file to Google Cloud Storage, handling folder creation if needed.
4. **decompress_zip_file(client, bucket_name, folder_name, zip_file_name)**: Decompresses a ZIP file within a GCS bucket.

**Code Structure:**

- **Logging:** Employs `logging` for debugging and tracking progress.
- **Error Handling:** Uses try-except blocks to gracefully handle potential errors.
- **Modularity:** Separates functionality into distinct, reusable functions.
- **Type Hints:** Enhances code readability and potential type checking.

**Main Code Execution:**

1. Configures logging to a file named 'transfer.log'.
2. Authenticates with Google Drive.
3. Downloads the specified file from Drive.
4. Creates a Cloud Storage client.
5. Uploads the downloaded file to GCS.
6. Decompresses the ZIP file in GCS if its content type is 'application/zip'.
7. Logs success or failure messages.
8. Finally, ensures the downloaded file is closed.

**Overall Assessment:**

- **Well-structured:** The code is organized, modular, and includes error handling.
- **Clear Functionality:** It effectively handles file transfer and decompression tasks.
- **Authentication Flexibility:** Uses authentication methods external to the code (useful for avoiding credentials in code).
- **Good Practices:** Adheres to good practices like logging and try-except blocks.

**Potential Enhancements:**

- **Parameterization:** Explore using command-line arguments or configuration files to adjust parameters more flexibly.
- **Progress Reporting:** Consider more granular progress reporting for downloads/uploads.
- **Content Validation:** Validate file content after decompression for integrity.
- **Advanced Error Handling:** Implement retries or alternative actions for potential errors.

This code provides a foundation for file transfer and decompression tasks within Google Cloud environments, demonstrating clarity and attention to best practices.

In [6]:
from google.colab import auth
from googleapiclient.discovery import build
import io
from googleapiclient.http import MediaIoBaseDownload
from google.cloud import storage
import zipfile


def authenticate_google_drive() -> None:
    """Authenticates to Google Drive using the user's credentials.

    Args:
        None
    """
    try:
        auth.authenticate_user()
    except Exception as e:
        logging.error(f"Error authenticating to Google Drive: {e}")
        raise


def download_file_from_drive(
    drive_service: Any, file_id: str
) -> io.BytesIO:
    """Downloads a file from Google Drive.

    Args:
        drive_service (Any): The Google Drive service resource (in the build function returns Any).
        file_id (str): The ID of the file to download.

    Returns:
        io.BytesIO: The downloaded file content as a BytesIO object.
    """
    downloaded: io.BytesIO = io.BytesIO()
    try:
        request = drive_service.files().get_media(fileId=file_id)
        downloader = MediaIoBaseDownload(downloaded, request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            print(f'Downloading {int(status.progress() * 100)}%')
        downloaded.seek(0)
        return downloaded
    except Exception as e:
        print(f"Error downloading file: {e}")
        raise


def upload_file_to_cloud_storage(
    bucket: storage.Bucket, folder_name: str, downloaded: io.BytesIO, zip_file_name: str
) -> storage.Blob:
    """Uploads a file to Google Cloud Storage.

    Args:
        bucket (Bucket): Google Cloud Storage bucket.
        folder_name (str): The name of the folder within the bucket where the file will be uploaded.
        downloaded (io.BytesIO): The downloaded file to upload.
        zip_file_name (str): The name of the file to be uploaded.

    Returns:
        google.cloud.storage.Blob: The uploaded blob object.
    """
    folder_blob: storage.Blob = bucket.blob(f"{folder_name}/")

    # Check and create folder if it doesn't exist
    if not folder_blob.exists():
        folder_blob.upload_from_string('', content_type='application/x-www-form-urlencoded;charset=UTF-8')

    # Upload the file to the specified folder
    blob: storage.Blob = bucket.blob(f'{folder_name}/{zip_file_name}')
    blob.upload_from_file(downloaded, content_type='application/zip')

    print(f'File uploaded to gs://{bucket.name}/{blob.name}')
    return blob


def decompress_zip_file(
    bucket: storage.Bucket, folder_name: str, zip_file_name: str
) -> str:
    """Decompresses a ZIP file stored in Google Cloud Storage.

    Args:
        bucket (Bucket): Google Cloud Storage bucket where the ZIP file is stored.
        folder_name (str): The name of the folder within the bucket where the ZIP file is located.
        zip_file_name (str): The name of the ZIP file

    Returns:
        str: The unzipped file name.
    """

    json_file_name: str = ''
    blob_name: str = ''

    try:
        zip_blob: storage.Blob = bucket.blob(f'{folder_name}/{zip_file_name}')
        with zipfile.ZipFile(io.BytesIO(zip_blob.download_as_string()), 'r') as z:
            for file_info in z.infolist():
                with z.open(file_info) as file:
                    blob_name: str = f'{folder_name}/{file_info.filename}'
                    json_file_name: str = file_info.filename
                    json_blob: storage.Blob = bucket.blob(blob_name)
                    json_blob.upload_from_file(file)
        print(f'File decompressed in gs://{bucket.name}/{blob_name}')
    except zipfile.BadZipFile:
        logging.warning(f'The file in gs://{bucket.name}/{folder_name}/{zip_file_name} is not a valid ZIP file.')
    except Exception as e:
        logging.error(f'Error decompressing file: {e}')
    finally:
        return json_file_name

downloaded: io.BytesIO = io.BytesIO()

try:
    # Authenticate to Google Drive
    authenticate_google_drive()
    drive_service: Any = build('drive', 'v3')

    # Download file from Drive
    downloaded: io.BytesIO = download_file_from_drive(drive_service, FILE_ID)

    # Create Cloud Storage Bucket
    bucket: storage.Bucket = storage.Client().bucket(BUCKET_NAME)

    # Upload file to Cloud Storage
    uploaded_blob: storage.Blob = upload_file_to_cloud_storage(bucket, FOLDER_NAME, downloaded, ZIP_FILE_NAME)

    # Decompress ZIP file if applicable
    if uploaded_blob.content_type == 'application/zip':
        json_file_name: str = decompress_zip_file(bucket, FOLDER_NAME, ZIP_FILE_NAME)

    logging.info("File transfer successful!")

except Exception as e:
    logging.error(f"An error occurred: {e}")

finally:
    downloaded.close()  # Close downloaded file
    print("File transfer process completed.")


Downloading 100%
File uploaded to gs://tw-gcp-public-lab/raw/tweets.json.zip
File decompressed in gs://tw-gcp-public-lab/raw/farmers-protest-tweets-2021-2-4.json
File transfer process completed.


## **BigQuery Storage Functions**

**Functionality:**

These Python functions interact with BigQuery to authenticate, create datasets and tables, and load data from Cloud Storage.

**Key Concepts:**

* **Client:** The `bigquery.Client` object is central to interacting with BigQuery.
* **Datasets and Tables:** Datasets organize tables, and both can be created or overwritten using these functions.
* **Data Loading:** Data is loaded from Cloud Storage in newline-delimited JSON format, and BigQuery automatically infers the schema.
* **Error Handling:** The functions use logging and try-except blocks to handle errors and provide informative messages.

**Snippet 1: authenticate_bigquery**

**Functionality:**

Authenticates to BigQuery and returns a client object for subsequent operations.

**Key Concepts:**

* **Project ID:** Required for authentication.

**Overall Assessment:**

Clear and concise function for initial setup.

**Potential Enhancements:**

* **Error Handling:** Consider logging errors with more detail.

**Snippet 2: create_dataset**

**Functionality:**

Creates a dataset if it doesn't exist or overwrites it if specified.

**Key Concepts:**

* **Mode:** Optional argument to control actions if the dataset already exists.

**Overall Assessment:**

Good flexibility with `mode` argument for handling existing datasets.

**Potential Enhancements:**

* **Input Validation:** Consider validating dataset names for compliance with BigQuery rules.

**Snippet 3: create_table**

**Functionality:**

Creates a table within a dataset if it doesn't exist or overwrites it if specified.

**Key Concepts:**

* **Schema Inference:** Uses an empty schema to let BigQuery infer it from the data.

**Overall Assessment:**

Handles table creation effectively.

**Potential Enhancements:**

* **Schema Definition:** Explore allowing optional schema definition for more control.

**Snippet 4: load_data_from_storage**

**Functionality:**

Loads data from a newline-delimited JSON file in Cloud Storage to a BigQuery table.

**Key Concepts:**

* **Load Job Configuration:** Specifies data format, schema inference, and handling of unknown values.

**Overall Assessment:**

Well-structured data loading process.

**Potential Enhancements:**

* **Progress Reporting:** Consider logging loading progress.
* **Data Validation:** Explore adding data validation checks before loading.


In [7]:
from google.cloud import bigquery
from google.api_core.exceptions import NotFound


def authenticate_bigquery(
    project_id: str
) -> bigquery.Client:
    """Authenticates to BigQuery and returns the client object.

    Args:
        project_id (str): Your GCP project ID.

    Returns:
        bigquery.Client: BigQuery client object.
    """

    return bigquery.Client(project_id)


def create_dataset(
    client: bigquery.Client, dataset_name: str, mode: str = 'create'
) -> None:
    """
    Creates a BigQuery dataset if it doesn't exist.

    Args:
        client (bigquery.Client): BigQuery client object.
        dataset_name (str): Name of the dataset to create.
        mode (Optional[str], optional): Action to take if the dataset already exists ('create' or 'overwrite'). Defaults to 'create'.

    Raises:
        Exception: For unexpected errors during dataset creation or existence check.
    """

    dataset_ref: bigquery.DatasetReference = client.dataset(dataset_name)
    try:
        client.get_dataset(dataset_ref)
        if mode == 'overwrite':
            logging.info(f"Dataset '{dataset_name}' already exists, overwriting...")
            client.delete_dataset(dataset_ref, delete_contents=True)
            client.create_dataset(dataset_ref)
            logging.info(f"Dataset '{dataset_name}' overwritten.")
        else:
            logging.info(f"Dataset '{dataset_name}' already exists.")
    except NotFound:
        logging.info(f"Dataset '{dataset_name}' not found, creating...")
        client.create_dataset(dataset_ref)
        logging.info(f"Dataset '{dataset_name}' created.")
    except Exception as e:
        logging.error(f"Error creating dataset '{dataset_name}': {e}")
        raise


def create_table(
    client: bigquery.Client, dataset_name: str, table_name: str, mode: str = 'create'
) -> None:
    """
    Creates a BigQuery table if it doesn't exist.

    Args:
        client (bigquery.Client): BigQuery client object.
        dataset_name (str): Name of the dataset containing the table.
        table_name (str): Name of the table to create.
        mode (Optional[str], optional): Action to take if the table already exists ('create' or 'overwrite'). Defaults to 'create'.

    Raises:
        Exception: For unexpected errors during table creation or existence check.
    """

    dataset_ref: bigquery.DatasetReference = client.dataset(dataset_name)
    table_ref: bigquery.TableReference = dataset_ref.table(table_name)
    try:
        client.get_table(table_ref)
        logging.info(f"Table '{table_name}' already exists.")
        if mode == 'overwrite':
            logging.info(f"Overwriting table '{table_name}'...")
            client.delete_table(table_ref)
            table: bigquery.Table = bigquery.Table(table_ref)
            client.create_table(table) # Empty schema for BigQuery to infer
            logging.info(f"Table '{table_name}' overwritten.")
    except NotFound:
        logging.info(f"Table '{table_name}' not found, creating...")
        table: bigquery.Table = bigquery.Table(table_ref)
        client.create_table(table) # Empty schema for BigQuery to infer
        logging.info(f"Table '{table_name}' created.")
    except Exception as e:
        logging.error(f"Error creating table '{table_name}': {e}")
        raise

def load_data_from_storage(
    client: bigquery.Client, source_uri: str, dataset_name: str, table_name: str
) -> None:
    """
    Loads data from Cloud Storage (newline-delimited JSON) to BigQuery table.

    Args:
        client (bigquery.Client): BigQuery client object.
        source_uri (str): URI of the data file in Cloud Storage.
        dataset_name (str): Name of the dataset containing the table.
        table_name (str): Name of the table to load data into.

    Raises:
        Exception: For unexpected errors during data loading.
    """

    job_config: bigquery.LoadJobConfig = bigquery.LoadJobConfig()

    # Only way to set job_config properties is without type notation
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.autodetect = True  # Auto-detect schema
    job_config.ignore_unknown_values = True  # Ignore unknown values

    load_job = client.load_table_from_uri(
        source_uri + json_file_name,
        client.dataset(dataset_name).table(table_name),
        job_config=job_config
    )
    try:
        load_job.result()  # Wait for load completion
        logging.info(f"Data loaded from '{source_uri}' to table '{dataset_name}.{table_name}'.")
    except Exception as e:
        logging.error(f"Error loading data: {e}")
        raise


# Authenticate to BigQuery (assuming PROJECT_ID is defined elsewhere)
bigquery_client: bigquery.Client = authenticate_bigquery(PROJECT_ID)

# Create dataset (overwrite if needed)
create_dataset(bigquery_client, DATASET_NAME, mode='overwrite')

# Create table (overwrite if needed)
create_table(bigquery_client, DATASET_NAME, TABLE_NAME, mode='overwrite')

# Load data from Cloud Storage
load_data_from_storage(bigquery_client, GCS_SOURCE_URI, DATASET_NAME, TABLE_NAME)

print("Data loading completed!")

Data loading completed!


## **BigQuery Processing Functions**

**Functionality:**

- **Processes BigQuery Results:** The `process_bigquery_results` function executes the query, handles results, and converts them into a desired format (list of tuples with date and username).

**Key Concepts:**

- **Type Hints:** Employs type hints (`List`, `Tuple`, `datetime.date`) for improved code readability and potential static type checking.
- **Error Handling:** Incorporates `try-except` blocks to gracefully handle exceptions (`BadRequest` and generic exceptions).
- **Data Conversion:** Converts retrieved data rows into the specified format.

**Overall Assessment:**

- **Clear Separation:** Functions promote modularity and reusability.
- **Meaningful Variable Names:** Descriptive names enhance code understandability.
- **Error Management:** Handles potential errors during query execution and processing.

**Potential Enhancements:**

- **Input Validation:** Consider validating the constructed query string before execution.
- **Logging:** Integrate logging for detailed tracking and debugging.
- **Security:** Ensure secure credential management for BigQuery access.
- **Query Parameterization:** If DATASET_NAME and TABLE_NAME are not intended for hardcoding, utilize BigQuery's query parameters for better reusability and security.
- **Data Usage:** Currently, the extracted data is printed. You can modify this section to store the data in a desired location or perform further processing.

This code provides a foundation for working with BigQuery data retrieval and processing. You can extend it based on your specific needs.

In [8]:
from google.api_core.exceptions import BadRequest, NotFound  # Specific exceptions

def process_bigquery_results(
    client: bigquery.Client, query: str
) -> List[Tuple[Any, Any]]:
    """
    Executes a BigQuery query, handles results, and performs data conversion.

    Args:
        client: BigQuery client object.
        query: BigQuery SQL query string.

    Returns:
        A list of tuples containing the extracted data (date and username).

    Raises:
        NotFound: If the query returns no results.
        Exception: For other unexpected errors during query execution or processing.
    """

    extracted_data: List[Tuple[str, str]] = []

    try:
        query_job: bigquery.QueryJob = client.query(query)
        results = query_job.result() # Type notation not possible for this

        if not results:
            raise NotFound("No results found for the query.")

        extracted_data = [(row[0], row[1]) for row in results]

    except BadRequest as e:
        print(f"BigQuery error: {e}")
        raise
    except NotFound as e:
        print(f"Query returned no results: {e}")
    except Exception as e:
        print(f"Error: {e}")
        raise
    finally:
        return extracted_data


In [9]:
# TODO: check this
#import q1_time

# Llama a la función q1_time para obtener el resultado
#resultado = q1_time.q1_time(file_path)

# Imprime el resultado
#print(resultado)

## **BigQuery Queries**

**Snippet 1: Top 10 Dates with Top Users**

**Functionality**

This SQL query identifies the top 10 dates with the most tweets and, for each of those dates, finds the user with the most tweets (considering usernames alphabetically in case of ties).

**Key Concepts**

* **Common Table Expressions (CTEs):** The query utilizes two CTEs:
    * `TopDates`: Calculates the daily tweet count and ranks them in descending order, selecting the top 10.
    * `TopUsersDate`: Joins the `tweets` table with `TopDates` to find the user(s) with the most tweets for each top date. It uses `ROW_NUMBER()` to handle ties by username order.
* **Window Functions:** `ROW_NUMBER()` is used within `TopUsersDate` to assign a unique row number within each date partition, ordered by tweet count (descending) and then by number of tweets per user (descending).
* **Filtering:** The final result retrieves users with `row_number = 1` (the user with the most tweets for each date).

**Overall Assessment**

This query effectively addresses the task by leveraging CTEs for modularity and window functions to handle ranking and ties.

**Potential Enhancements**

* **Clarity:** Consider adding comments within the query to explain the purpose of each CTE.
* **Efficiency:** Explore alternative approaches to handle ties if performance is critical.

**Data Usage**

The query currently prints the `tweets_date` and `username`. You might want to consider storing this information in a table or using it for further analysis.

**Snippet 2: Top 10 Most Used Emojis**

**Functionality**

This query extracts emojis from tweets and identifies the top 10 most frequently used emojis along with their counts.

**Key Concepts**

* **Regular Expressions (RegEx):** The `REGEXP_EXTRACT_ALL()` function utilizes a complex RegEx pattern to capture a wide range of emoji characters across different Unicode blocks.
* **UNNEST:** The `UNNEST()` operator is used to explode the extracted emoji list into a single row per emoji for counting.

**Overall Assessment**

This query effectively extracts and counts emojis, providing valuable insights into emoji usage.

**Potential Enhancements**

* **Filtering:** Depending on the analysis goals, you might want to filter out specific emoji categories (e.g., flags, country codes).
* **Normalization:** Consider normalizing emojis to a canonical form to handle variations (e.g., skin tone modifiers).

**Data Usage**

The query currently prints the `emoji` and `count`. You could store this information for further analysis of emoji popularity.

**Snippet 3: Top 10 Influential Users**

**Functionality**

This query identifies the top 10 users with the most mentions (`@username`) received in tweets.

**Key Concepts**

* **UNNEST:** Similar to snippet 2, `UNNEST()` is used to explode the mentioned user list from each tweet for counting mentions.

**Overall Assessment**

This query effectively identifies influential users based on mentions.

**Potential Enhancements**

* **Filtering:** You might consider filtering out self-mentions or mentions from specific accounts.
* **Weighted Mentions:** Depending on the analysis goals, explore assigning weights to mentions based on factors like follower count.

**Data Usage**

The query currently prints the `username` and `mention_count`. You could store this information for further analysis of user influence.

In [10]:
import queries # For src py files it shows import unresolved but it works anyways

queries.top_dates_with_top_users

'\n    WITH \n    TopDates AS (\n        SELECT\n            CAST(date AS DATE) AS tweets_date,\n            COUNT(id) AS tweet_count\n        FROM tweets_dataset.tweets\n        WHERE id IS NOT NULL\n        GROUP BY tweets_date\n        ORDER BY tweet_count DESC\n        LIMIT 10\n    ),\n    TopUsersDate AS (\n        SELECT\n            TD.tweets_date,\n            TW.user.username,\n            MAX(TD.tweet_count) AS max_tweet_count,\n            COUNT(TW.id) AS user_tweet_count,\n            ROW_NUMBER() OVER (\n                PARTITION BY TD.tweets_date \n                ORDER BY MAX(TD.tweet_count) DESC, COUNT(*) DESC\n            ) AS row_number\n        FROM tweets_dataset.tweets AS TW\n        INNER JOIN TopDates AS TD\n            ON TD.tweets_date = CAST(TW.date AS DATE)\n        WHERE TW.id IS NOT NULL\n        GROUP BY\n            TD.tweets_date,\n            TW.user.username\n        ORDER BY\n            max_tweet_count DESC,\n            user_tweet_count DESC,\n    

In [11]:
process_bigquery_results(bigquery_client, queries.top_dates_with_top_users)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [12]:
def hola():
    print("¡Hola desde el notebook!")

In [13]:
%store hola

Proper storage of interactively declared classes (or instances
of those classes) is not possible! Only instances
of classes in real modules on file system can be %store'd.



In [18]:
import measure

# Measure elapsed time
print_elapsed_time(measure_elapsed_time(START_TIME))  # Print the result

NameError: name 'time' is not defined