## ETL Notebook for CA Biositing Project

This notebook provides a documented walkthrough of the ETL (Extract, Transform, Load) process for the CA Biositing project. It is designed for interactive development and exploration before migrating logic into the production pipeline.

It covers:

1.  **Setup**: Importing necessary libraries and establishing a connection to the database.
2.  **Extraction**: Pulling raw data from Google Sheets.
3.  **Cleaning**: Standardizing data types, handling missing values, and cleaning column names.
4.  **Normalization**: Replacing human-readable names (e.g., "Corn") with database foreign key IDs (e.g., `resource_id: 1`).
5.  **Utilities**: Common functions for data manipulation and analysis.
6.  **Deployment Plan**: A step-by-step guide for moving the code from this notebook into the production ETL modules.

In [14]:
import os
import sys
import pandas as pd
import numpy as np
import janitor as jn
import logging
from IPython.display import display
from sqlalchemy.orm import Session
from sqlalchemy import select

# --- Basic Logging Configuration for Notebook ---
# When running in a notebook, we use Python's standard logging.
# In the production pipeline, this will be replaced by Prefect's `get_run_logger()`
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# --- Robustly find the project root ---
# This ensures that the notebook can be run from any directory within the project.
path = os.getcwd()
project_root = None
while path != os.path.dirname(path):
    if 'pixi.toml' in os.listdir(path):
        project_root = path
        break
    path = os.path.dirname(path)

if not project_root:
    raise FileNotFoundError("Could not find project root containing 'pixi.toml'.")

# Add the project root to the Python path to allow for module imports
if project_root not in sys.path:
    sys.path.insert(0, project_root)
    logger.info(f"Added project root '{project_root}' to sys.path")
else:
    logger.info(f"Project root '{project_root}' is already in sys.path")

# --- Import project modules ---
try:
    from src.ca_biositing.pipeline.ca_biositing.pipeline.utils.engine import engine
    from src.ca_biositing.datamodels.ca_biositing.datamodels.schemas.generated.ca_biositing import *
    from src.ca_biositing.pipeline.ca_biositing.pipeline.utils.name_id_swap import replace_name_with_id_df
    from src.ca_biositing.pipeline.ca_biositing.pipeline.etl.extract import biodiesel_plants
    logger.info('Successfully imported all project modules.')
except ImportError as e:
    logger.error(f'Failed to import project modules: {e}', exc_info=True)

2026-01-13 16:10:18,800 - INFO - Project root 'C:\Users\Abigail\OneDrive\Documents\GitHub\ca-biositing' is already in sys.path
2026-01-13 16:10:18,802 - INFO - Successfully imported all project modules.


### Data Cleaning Function

In [3]:
def clean_the_gsheets(df):
    """Cleans and standardizes a DataFrame extracted from Google Sheets.

    This function performs several key operations:
    1. Cleans column names to a standard format (snake_case).
    2. Drops rows where essential columns ('repl_no', 'value') are empty.
    3. Coerces data types for numeric and datetime columns, handling errors gracefully.
    4. Converts remaining columns to the best possible data types.

    Args:
        df (pd.DataFrame): The raw DataFrame.

    Returns:
        pd.DataFrame: The cleaned DataFrame.
    """
    logger.info('Starting DataFrame cleaning process.')
    if not isinstance(df, pd.DataFrame):
        logger.error('Input is not a pandas DataFrame.')
        return None
    
    try:
        # 1. Clean names and drop rows with missing essential data
        df_cleaned = df.clean_names().dropna(subset=['repl_no', 'value'])
        logger.info(f'Dropped {len(df) - len(df_cleaned)} rows with missing values.')

        # 2. Coerce numeric types
        df_cleaned['repl_no'] = pd.to_numeric(df_cleaned['repl_no'], errors='coerce').astype('Int32')
        df_cleaned['value'] = pd.to_numeric(df_cleaned['value'], errors='coerce').astype(np.float32)

        # 3. Coerce datetime types
        if 'created_at' in df_cleaned.columns:
            df_cleaned['created_at'] = pd.to_datetime(df_cleaned['created_at'], errors='coerce')
        if 'updated_at' in df_cleaned.columns:
            df_cleaned['updated_at'] = pd.to_datetime(df_cleaned['updated_at'], errors='coerce')

        # 4. Replace empty strings with NaN so they are properly ignored
        df_cleaned = df_cleaned.replace(r'^\s*$', np.nan, regex=True)

        # 5. Convert other dtypes to best possible
        df_cleaned = df_cleaned.convert_dtypes()
        logger.info('Successfully cleaned DataFrame.')

        # 6. Convert all string data to lowercase
        df_cleaned = df_cleaned.applymap(lambda s: s.lower() if isinstance(s, str) else s)
        logger.info('Converted all string data to lowercase.')
        return df_cleaned

    except Exception as e:
        logger.error(f'An error occurred during DataFrame cleaning: {e}', exc_info=True)
        return None

        

### Data Normalization Function

In [4]:
def normalize_dataframes(dataframes, normalize_columns):
    """Normalizes a list of DataFrames by replacing name columns with foreign key IDs.

    This function iterates through a list of dataframes and, for each one, iterates
    through a dictionary of columns that need to be normalized. It uses the 
    `replace_name_with_id_df` utility to look up or create the corresponding ID
    in the database.

    Args:
        dataframes (list[pd.DataFrame]): A list of DataFrames to normalize.
        normalize_columns (dict): A dictionary mapping column names to SQLModel classes and attributes.

    Returns:
        list[pd.DataFrame]: The list of normalized DataFrames.
    """
    logger.info(f'Starting normalization process for {len(dataframes)} dataframes.')
    normalized_dfs = []
    try:
        with Session(engine) as db:
            for i, df in enumerate(dataframes):
                if not isinstance(df, pd.DataFrame):
                    logger.warning(f'Item {i+1} is not a DataFrame, skipping.')
                    continue
                
                logger.info(f'Processing DataFrame #{i+1} with {len(df)} rows.')
                df_normalized = df.copy()

                for df_col, (model, model_name_attr) in normalize_columns.items():
                    if df_col not in df_normalized.columns:
                        logger.warning(f"Column '{df_col}' not in DataFrame #{i+1}. Skipping normalization for this column.")
                        continue
                    
                    try:
                        # Skip normalization if the column is all NaN/None
                        if df_normalized[df_col].isnull().all():
                            logger.info(f"Skipping normalization for column '{df_col}' as it contains only null values.")
                            continue
                            
                        logger.info(f"Normalizing column '{df_col}' using model '{model.__name__}'.")
                        df_normalized, num_created = replace_name_with_id_df(
                            db=db,
                            df=df_normalized,
                            ref_model=model,
                            df_name_column=df_col,
                            model_name_attr=model_name_attr,
                            id_column_name='id',
                            final_column_name=f'{df_col}_id'
                        )
                        if num_created > 0:
                            logger.info(f"Created {num_created} new records in '{model.__name__}' table.")
                        new_col_name = f'{df_col}_id'
                        num_nulls = df_normalized[new_col_name].isnull().sum()
                        logger.info(f"Successfully normalized '{df_col}'. New column '{new_col_name}' contains {num_nulls} null values.")
                    except Exception as e:
                        logger.error(f"Error normalizing column '{df_col}' in DataFrame #{i+1}: {e}", exc_info=True)
                        continue # Continue to the next column
                
                normalized_dfs.append(df_normalized)
                logger.info(f'Finished processing DataFrame #{i+1}.')
            
            logger.info('Committing database session.')
            db.commit()
            logger.info('Database commit successful.')
    except Exception as e:
        logger.error(f'A critical error occurred during the database session: {e}', exc_info=True)
        db.rollback()
        logger.info('Database session rolled back.')
        
    return normalized_dfs


### ETL Execution Example

In [72]:
import pandas as pd
from pydrive2.auth import GoogleAuth, AuthenticationError
from pydrive2.drive import GoogleDrive
from pydrive2.files import ApiRequestError

def gdrive_to_df(file_name: str, mime_type: str, credentials_path: str, dataset_folder: str) -> pd.DataFrame:
    """
    Extracts data from a CSV, ZIP, or GEOJSON file into a pandas DataFrame.

    Args:
        file_name: The name of the requested file.
        mime_type: The MIME type - according to https://mime-type.com/
        credentials_path: The path to the Google Cloud service account credentials JSON file.

    Returns:
        A pandas DataFrame containing the data from the specified worksheet, or None on error.
    """
    try:
        settings = {
                "client_config_backend": "service",
                "service_config": {
                    "client_json_file_path": credentials_path,
                }
            }
        # Create instance of GoogleAuth
        gauth = GoogleAuth(settings=settings)
        gauth.ServiceAuth()
        drive = GoogleDrive(gauth)

        try:
            file_entries = drive.ListFile({"q": f"title = '{file_name}' and mimeType= '{mime_type}'"}).GetList()
            if len(file_entries) == 0: 
                raise FileNotFoundError(f"Error: File '{file_name}' not found. \n Please make sure the spreadsheet name is correct and that you have shared it with the service account email.")
                return None
            else:
                file_entry = file_entries[0]
            file = drive.CreateFile({'id': file_entry['id']})
            file.GetContentFile(dataset_folder + file_name) # Download file
        except ApiRequestError:
            print(f"An unexpected error occurred: {e}")
            return None

        # Use the first row as header and the rest as data
        if mime_type == "text/csv":
            df = pd.read_csv(dataset_folder + file_name)

        # De-duplicate columns, keeping the first occurrence
        df = df.loc[:, ~df.columns.duplicated()]

        return df

    except AuthenticationError as e:
        print(f"Google Authentication Error: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None


In [73]:
from typing import Optional
import pandas as pd
from prefect import task, get_run_logger
# from src.ca_biositing.pipeline.ca_biositing.pipeline.utils.gdrive_to_pandas import gdrive_to_df
# from resources.prefect import credentials

@task
def extract(project_root: Optional[str] = None) -> Optional[pd.DataFrame]:
    """
    Extracts raw data from a .csv file.

    This function serves as the 'Extract' step in an ETL pipeline. It connects
    to the data source and returns the data as is, without transformation.

    Returns:
        A pandas DataFrame containing the raw data, or None if an error occurs.
    """
    logger = get_run_logger()

    FILE_NAME = "Biodiesel_Plants.csv"
    MIME_TYPE = "text/csv"
    CREDENTIALS_PATH = "credentials.json"
    DATASET_FOLDER = "src/ca_biositing/pipeline/ca_biositing/pipeline/temp_external_datasets/" 
    logger.info(f"Extracting raw data from '{FILE_NAME}'...")

    # If project_root is provided (e.g., from a notebook), construct an absolute path
    # Otherwise, use the default relative path (for the main pipeline)
    credentials_path = CREDENTIALS_PATH
    dataset_folder = DATASET_FOLDER
    if project_root:
        credentials_path = os.path.join(project_root, CREDENTIALS_PATH)
        dataset_folder = os.path.join(project_root, DATASET_FOLDER)

    # The gsheet_to_df function handles authentication, data fetching, and error handling.
    raw_df = gdrive_to_df(FILE_NAME, MIME_TYPE, credentials_path, dataset_folder)

    

    if raw_df is None:
        logger.error("Failed to extract data. Aborting.")
        return None

    logger.info("Successfully extracted raw data.")
    return raw_df


In [74]:
# --- 1. Extraction ---
# In a real Prefect flow, each extraction would be a separate task.
logger.info('Starting data extraction...')
biodiesel_plants_df = extract(project_root=project_root)
dataframes = [biodiesel_plants_df]
print(dataframes)
logger.info('Data extraction complete.')

# # --- 2. Cleaning ---
# # This list comprehension applies the cleaning function to each extracted dataframe.
# logger.info('Starting data cleaning...')
# clean_dataframes = [clean_the_gsheets(df) for df in dataframes if df is not None]
# logger.info('Data cleaning complete.')

# # --- 3. Normalization ---
# # This dictionary defines the columns to be normalized. 
# # The key is the column name in the DataFrame.
# # The value is a tuple containing the corresponding SQLAlchemy model and the name of the attribute on the model to match against.
# NORMALIZE_COLUMNS = {
#     'resource': (Resource, 'name'),
#     'prepared_sample': (PreparedSample, 'name'),
#     'preparation_method': (PreparationMethod, 'name'),
#     'parameter': (Parameter, 'name'),
#     'unit': (Unit, 'name'),
#     'analyst_email': (Contact, 'email'),
#     'analysis_type': (AnalysisType, 'name'),
#     'primary_ag_product': (PrimaryAgProduct, 'name')
# }

# logger.info('Starting data normalization...')
# normalized_dataframes = normalize_dataframes(clean_dataframes, NORMALIZE_COLUMNS)
# logger.info('Data normalization complete.')

# # --- 4. Display Results ---
# logger.info('Displaying results of normalization...')
# for i, df in enumerate(normalized_dataframes):
#     print(f'--- Normalized DataFrame {i+1} ---')
#     display(df.head())

2026-01-13 18:36:20,831 - INFO - Starting data extraction...
2026-01-13 18:36:20,856 - INFO - HTTP Request: GET http://localhost:4200/api/admin/version "HTTP/1.1 200 OK"
2026-01-13 18:36:20,866 - INFO - Extracting raw data from 'Biodiesel_Plants.csv'...
2026-01-13 18:36:20,886 - INFO - Attempting refresh to obtain initial access_token
  return crypto.sign(self._key, message, 'sha256')
2026-01-13 18:36:20,889 - INFO - Refreshing access_token
  self.token_expiry = delta + _UTCNOW()
  now = _UTCNOW()
2026-01-13 18:36:22,092 - INFO - Successfully extracted raw data.
2026-01-13 18:36:22,097 - INFO - Finished in state Completed()
2026-01-13 18:36:22,113 - INFO - Data extraction complete.


[                                  company  bbi_index             city  \
0                     American GreenFuels        NaN        New Haven   
1                Down To Earth Energy LLC        NaN           Monroe   
2                      Maine Bio-Fuel Inc        NaN         Portland   
3                   Cape Cod Biofuels Inc        NaN         Sandwich   
4             Renewable Fuels by Peterson        NaN  North Haverhill   
..                                    ...        ...              ...   
73                     Walsh BioFuels LLC       58.0          Mauston   
74  Western Iowa Energy - Agron Bioenergy       60.0      Watsonville   
75           White Mountain Biodiesel LLC       62.0  North Haverhill   
76                 World Energy - Natchez       65.0          Natchez   
77                    World Energy - Rome       66.0             Rome   

            state  capacity_mmg_per_y            feedstock       status  \
0     Connecticut                  35          

### Deployment Plan

The code in this notebook will be transitioned to the main ETL pipeline by following these steps:

1.  **Function Migration**: The `clean_the_gsheets` and `normalize_dataframes` functions will be moved to a new utility module, for example, `src/ca_biositing/pipeline/ca_biositing/pipeline/utils/etl_utils.py`. Each function will be decorated with `@task` from Prefect to turn it into a reusable pipeline component.
2.  **Flow Creation**: A new Prefect flow will be created in the `src/ca_biositing/pipeline/ca_biositing/pipeline/flows/` directory (e.g., `master_extraction_flow.py`). This flow will orchestrate the entire ETL process for a given data source.
3.  **Task Integration**: The new flow will be composed of individual tasks. It will call the existing extraction tasks (`proximate.extract`, etc.), and then pass the results to the new cleaning and normalization tasks from `etl_utils.py`.
4.  **Logging**: The `logging` module will be replaced with `get_run_logger()` from Prefect within the tasks to ensure logs are captured by the Prefect UI.
5.  **Configuration**: The `NORMALIZE_COLUMNS` dictionary will be moved to a configuration file or defined within the relevant flow to make it easier to manage and modify without changing the code.
6.  **Testing**: Unit tests will be written for the new utility functions in `etl_utils.py`. An integration test will be created for the new Prefect flow to ensure all the tasks work together correctly.
7.  **Deployment**: Once the flow is complete and tested, it will be deployed to the Prefect server using the `pixi run deploy` command, making it available to be run on a schedule or manually via the UI.