# To pass system arguments, uncomment the following cell

Change arguments in terms of your requirements.


In [1]:
import os
import sys
import json
from pathlib import Path

from datetime import datetime

# Add the project root directory to sys.path to enable module imports
project_root = Path.cwd().parent.resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

from app.schemas.settings import settings


stage = settings.stage
current_time = datetime.now()


sys.argv = [
    "data_processing_pipeline.py",  # sys.argv[0], script name
    "--stage",
    stage,
    "--log_group_name",
    f"data_processing_pipeline_{stage}",
    "--database_url",
    settings.db_url,
]

print(json.dumps(sys.argv, indent=4))

[
    "data_processing_pipeline.py",
    "--stage",
    "development",
    "--log_group_name",
    "data_processing_pipeline_development",
    "--database_url",
    "postgresql://air_quality_user:ab12cd34@localhost:5433/air_quality_db"
]


# Imports


In [3]:
import sys
import logging
from pathlib import Path
import json

from app.utils.arg_utils import get_system_args
from app.db.models.air_quality import AirQualityData
from app.db.database_manager import DatabaseManager
from notebooks.log_utils import LogUtils, log_operation
from notebooks.data_utils import (
    get_netcdf_file,
    process_netcdf_file,
    save_processed_data_to_parquet,
)
from notebooks.errors.no_data_found_error import NoDataFoundError

import pandas as pd
import gc
import pyarrow.parquet as pq
from sqlalchemy.exc import SQLAlchemyError
import re

# Configurations


In [4]:
# Configure system arguments
args = get_system_args()

# Convert argparse Namespace to dictionary
args_dict = vars(args)

print(f"sys.argv = {json.dumps(sys.argv, indent=4)}")
print(f"args_dict = {json.dumps(args_dict, indent=4)}")

sys.argv = [
    "data_processing_pipeline.py",
    "--stage",
    "development",
    "--log_group_name",
    "data_processing_pipeline_development",
    "--database_url",
    "postgresql://air_quality_user:ab12cd34@localhost:5433/air_quality_db"
]
args_dict = {
    "stage": "development",
    "database_url": "postgresql://air_quality_user:ab12cd34@localhost:5433/air_quality_db",
    "log_group_name": "data_processing_pipeline_development"
}


# Configure Logging


In [6]:
log_utils = LogUtils(stage=args.stage)

log_utils.configure_logging()
logger = logging.getLogger(args.log_group_name)

logger.info("Testing log message")

2024-10-12 21:01:44,611 - data_processing_pipeline_development - INFO - Testing log message


# Load netCDF Data and Export to CSV


In [None]:
# Define directories
data_dir = Path("../data")
processed_data_dir = Path("../processed_data")

# Ensure the processed data directory exists
os.makedirs(processed_data_dir, exist_ok=True)


def process_data(year: int):
    # Locate the netCDF file for the specified year
    file_path = log_operation(
        "Locate netCDF File", lambda: get_netcdf_file(data_dir, year)
    )

    if file_path:
        # Process the netCDF file to get a DataFrame
        df = log_operation(
            "Process netCDF File", lambda: process_netcdf_file(file_path, year)
        )

        if df is None or df.empty:
            raise NoDataFoundError(year)

        log_operation(
            "Save processed data to Parquet",
            lambda: save_processed_data_to_parquet(df, processed_data_dir, year),
        )
    else:
        raise FileNotFoundError(f"No netCDF file found for the year {year}")


year_range = range(1998, 2023)  # Years from 1998 to 2022

for year in year_range:
    process_data(year)

logger.info("All years processed.")

## Bulk Insert of Parquet Data into PostgreSQL with Optimized Processing (It takes hours so if you need all data then uncomment below cell)

This script processes large parquet files and inserts their data into a PostgreSQL database in an optimized and efficient manner. The focus is on handling large datasets while managing memory usage and ensuring robust error handling.

### Example Usage:

```python
# Run the bulk insertion process with a batch size of 1000 records
load_parquet_to_postgres_bulk(batch_size=1000)



In [None]:
# def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
#     numeric_cols = df.select_dtypes(include=["int", "float"]).columns
#     for col in numeric_cols:
#         df[col] = pd.to_numeric(df[col], downcast="integer")  # or 'float' based on data
#     return df


# def insert_records(
#     db_session, records: list, file_name: str, row_group: int, batch_num: int
# ):
#     try:
#         db_session.bulk_insert_mappings(AirQualityData, records)
#         db_session.commit()
#         logger.info(
#             f"Inserted {len(records)} records from {file_name} "
#             f"row group {row_group + 1} batch {batch_num + 1} into PostgreSQL."
#         )
#     except SQLAlchemyError as e:
#         db_session.rollback()
#         logger.error(
#             f"SQLAlchemyError inserting records from {file_name} row group {row_group + 1}: {e}"
#         )
#     except Exception as e:
#         db_session.rollback()
#         logger.error(
#             f"Unexpected error inserting records from {file_name} row group {row_group + 1}: {e}"
#         )


# def process_parquet_file(file_path: str, db_session, batch_size: int):
#     file_name = os.path.basename(file_path)
#     logger.info(f"Loading {file_name} into PostgreSQL...")

#     try:
#         # Open the Parquet file with PyArrow
#         parquet_file = pq.ParquetFile(file_path)
#         num_row_groups = parquet_file.num_row_groups
#         logger.info(f"{file_name} has {num_row_groups} row groups.")

#         # Iterate over each row group
#         for rg in range(num_row_groups):
#             table = parquet_file.read_row_group(rg)
#             df = table.to_pandas()
#             logger.info(
#                 f"Row group {rg + 1}/{num_row_groups} loaded into DataFrame with {len(df)} records."
#             )

#             # Optimize DataFrame memory usage
#             df = optimize_dataframe(df)

#             # Iterate over DataFrame in smaller batches
#             for batch_num, start in enumerate(range(0, len(df), batch_size)):
#                 end = start + batch_size
#                 batch_df = df.iloc[start:end]
#                 records = batch_df.to_dict(orient="records")

#                 # Insert the batch into PostgreSQL
#                 insert_records(db_session, records, file_name, rg, batch_num)

#             # Free up memory
#             del df
#             gc.collect()

#     except SQLAlchemyError as e:
#         logger.error(f"SQLAlchemyError processing {file_name}: {e}")
#     except Exception as e:
#         logger.error(f"Unexpected error processing {file_name}: {e}")


# def load_parquet_to_postgres_bulk(batch_size=1000):
#     # Initialize Database Manager
#     db_manager = DatabaseManager(database_url=args.database_url)
#     db_manager.recreate_tables()

#     # Define the processed data directory
#     processed_data_dir = Path("../processed_data").resolve()

#     # Establish a database session using the context manager
#     with db_manager.get_db() as db_session:
#         # Loop through each processed Parquet file
#         for file_name in os.listdir(processed_data_dir):
#             if file_name.endswith(".parquet"):
#                 file_path = os.path.join(processed_data_dir, file_name)
#                 process_parquet_file(file_path, db_session, batch_size)

#     logger.info("All Parquet files have been loaded into PostgreSQL (Bulk Insert).")


# load_parquet_to_postgres_bulk(batch_size=1000000)

## Bulk Insert of Parquet Data into PostgreSQL with Sampling

This script processes large parquet files, samples data, and inserts it into a PostgreSQL database in batches. Below is an overview of the key components and functionality:

### Usage

You can control the number of records being processed from each parquet file by adjusting the `sample_size` and `batch_size`:

```python
# Process files with a batch size of 1000 and sample 10,000 records from each parquet file, only for the years 2000-2020
load_parquet_to_postgres_bulk(batch_size=1000, sample_size=10000, start_year=2000, end_year=2022)

In [7]:
def optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    numeric_cols = df.select_dtypes(include=["int", "float"]).columns
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], downcast="integer")  # or 'float' based on data
    return df


def insert_records(
    db_session, records: list, file_name: str, row_group: int, batch_num: int
):
    try:
        db_session.bulk_insert_mappings(AirQualityData, records)
        db_session.commit()
        logger.info(
            f"Inserted {len(records)} records from {file_name} "
            f"row group {row_group + 1} batch {batch_num + 1} into PostgreSQL."
        )
    except SQLAlchemyError as e:
        db_session.rollback()
        logger.error(
            f"SQLAlchemyError inserting records from {file_name} row group {row_group + 1}: {e}"
        )
    except Exception as e:
        db_session.rollback()
        logger.error(
            f"Unexpected error inserting records from {file_name} row group {row_group + 1}: {e}"
        )


def process_parquet_file(file_path: str, db_session, batch_size: int, sample_size: int):
    file_name = os.path.basename(file_path)
    logger.info(f"Loading {file_name} into PostgreSQL...")

    try:
        # Open the Parquet file with PyArrow
        parquet_file = pq.ParquetFile(file_path)
        num_row_groups = parquet_file.num_row_groups
        logger.info(f"{file_name} has {num_row_groups} row groups.")

        # Iterate over each row group
        for rg in range(num_row_groups):
            table = parquet_file.read_row_group(rg)
            df = table.to_pandas()
            logger.info(
                f"Row group {rg + 1}/{num_row_groups} loaded into DataFrame with {len(df)} records."
            )

            # Sample a subset of records if the sample size is smaller than the DataFrame size
            if len(df) > sample_size:
                df = df.sample(n=sample_size)
                logger.info(
                    f"Sampled {sample_size} records from {file_name} row group {rg + 1}."
                )

            # Optimize DataFrame memory usage
            df = optimize_dataframe(df)

            # Iterate over DataFrame in smaller batches
            for batch_num, start in enumerate(range(0, len(df), batch_size)):
                end = start + batch_size
                batch_df = df.iloc[start:end]
                records = batch_df.to_dict(orient="records")

                # Insert the batch into PostgreSQL
                insert_records(db_session, records, file_name, rg, batch_num)

            # Free up memory
            del df
            gc.collect()

    except SQLAlchemyError as e:
        logger.error(f"SQLAlchemyError processing {file_name}: {e}")
    except Exception as e:
        logger.error(f"Unexpected error processing {file_name}: {e}")


def load_parquet_to_postgres_bulk(
    batch_size=1000, sample_size=10000, start_year=1998, end_year=2022
):
    # Initialize Database Manager
    db_manager = DatabaseManager(database_url=args.database_url)
    db_manager.recreate_tables()

    # Define the processed data directory
    processed_data_dir = Path("../processed_data").resolve()

    # Establish a database session using the context manager
    with db_manager.get_db() as db_session:
        # Loop through each processed Parquet file
        for file_name in os.listdir(processed_data_dir):
            if file_name.endswith(".parquet"):
                # Extract the year from the file name using regex
                match = re.search(r"\d{4}", file_name)
                if match:
                    file_year = int(match.group(0))

                    # Check if the file's year falls within the year range
                    if start_year <= file_year <= end_year:
                        file_path = os.path.join(processed_data_dir, file_name)
                        process_parquet_file(
                            file_path, db_session, batch_size, sample_size
                        )
                    else:
                        logger.info(
                            f"Skipping {file_name} as it is outside the year range."
                        )
                else:
                    logger.warning(f"No year found in {file_name}. Skipping file.")

    logger.info(
        "All Parquet files within the year range have been loaded into PostgreSQL (Bulk Insert)."
    )


# Load parquet files with a batch size of 1000 and sample 10,000 records per file, only for the years 2000-2020
load_parquet_to_postgres_bulk(
    batch_size=1000, sample_size=10000, start_year=2000, end_year=2022
)

2024-10-12 21:02:08,055 - root - INFO - All tables dropped successfully.
2024-10-12 21:02:08,086 - root - INFO - All tables created successfully.
2024-10-12 21:02:08,087 - root - INFO - All tables have been dropped and recreated successfully.
2024-10-12 21:02:08,088 - data_processing_pipeline_development - INFO - Loading pm25_processed_2022.parquet into PostgreSQL...
2024-10-12 21:02:08,089 - data_processing_pipeline_development - INFO - pm25_processed_2022.parquet has 9 row groups.
2024-10-12 21:02:08,619 - data_processing_pipeline_development - INFO - Row group 1/9 loaded into DataFrame with 49200000 records.
2024-10-12 21:02:09,883 - data_processing_pipeline_development - INFO - Sampled 10000 records from pm25_processed_2022.parquet row group 1.
2024-10-12 21:02:09,934 - data_processing_pipeline_development - INFO - Inserted 1000 records from pm25_processed_2022.parquet row group 1 batch 1 into PostgreSQL.
2024-10-12 21:02:09,990 - data_processing_pipeline_development - INFO - Inser