# Chapter 22: Building Reusable ETL Logic

⚠️ **DO NOT SKIP THIS CELL**

## Run the Next cell.
### Before executing any other cell you must run the next cell to set up the project folder environment.

In [None]:
from pathlib import Path

try:
    from google.colab import drive
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    drive.mount("/content/drive")
    PROJECT_ROOT = Path("/content/drive/MyDrive/DataScience/census-education-analysis")
else:
    PROJECT_ROOT = Path.cwd().parent

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
STAGING_DIR = DATA_DIR / "staging"
PROCESSED_DIR = DATA_DIR / "processed"
OUTPUTS_DIR = PROJECT_ROOT / "outputs"

PROJECT_ROOT


## Problem 1: What Exactly Is Repeating Across State Files?

## Problem 2: What Should a Beginner-Friendly ETL Pipeline Look Like?

## Problem 3: How Do I Define a Reusable Ingestion Step?

In [None]:
import pandas as pd

def load_staged_excel(file_path):
    """
    Load a renamed Census education Excel file from staging.
    """
    df = pd.read_excel(file_path, skiprows=7, header=None)
    return df

## Problem 4: How Do I Reuse the Cleaning Logic from Chapter 20?

In [None]:
def clean_census_data(df):
    """
    Select required columns, rename them,
    clean text fields, and filter to 'all ages'.
    """
    edu_selected = df.iloc[:, [
        2,   # District Code
        4,   # Area Type
        5,   # Age Group
        6,   # Total Persons
        7,   # Male Persons
        8,   # Female Persons
        9,   # Total Illiterate
        10,  # Male Illiterate
        11,  # Female Illiterate
        12,  # Total Literate
        13,  # Male Literate
        14   # Female Literate
    ]].copy()

    edu_selected.columns = [
        "district_code",
        "area_type",
        "age_group",
        "total_persons",
        "male_persons",
        "female_persons",
        "total_illiterate",
        "male_illiterate",
        "female_illiterate",
        "total_literate",
        "male_literate",
        "female_literate",
    ]


    edu_selected["age_group"] = (
        edu_selected["age_group"]
        .astype(str)
        .str.strip()
        .str.lower()
    )

    edu_selected["area_type"] = (
        edu_selected["area_type"]
        .astype(str)
        .str.strip()
        .str.lower()
    )

    # Remove rows that do not represent real geographic data
    edu_selected = edu_selected[edu_selected["district_code"].notna()]

    # Standard cleaning steps
    edu_selected["district_code"] = edu_selected["district_code"].astype(int)

    # Clean column labels (defensive, even if already renamed)
    edu_selected.columns = edu_selected.columns.str.strip()

    # Replace missing numeric values with 0
    edu_selected = edu_selected.fillna(0)

    # Keep only "all ages"
    edu_selected = edu_selected[edu_selected["age_group"] == "all ages"]

    return edu_selected

## Problem 5: How Do I Reuse the Validation Logic from Chapter 21?

In [None]:
def validate_census_data(df):
    """
    Validate numeric consistency of Census counts.
    """
    df = df.copy()

    df["persons_check"] = (
        df["male_persons"] + df["female_persons"] - df["total_persons"]
    )

    df["illiterate_check"] = (
        df["male_illiterate"] + df["female_illiterate"] - df["total_illiterate"]
    )

    df["literate_check"] = (
        df["male_literate"] + df["female_literate"] - df["total_literate"]
    )

    df["is_valid"] = (
        (df["persons_check"] == 0) &
        (df["illiterate_check"] == 0) &
        (df["literate_check"] == 0)
    )

    return df

## Problem 6: How Do I Combine Everything into One ETL Pipeline?

In [None]:
def census_etl_pipeline(excel_path):
    """
    Full ETL pipeline for one Census education Excel file.
    """
    df_raw = load_staged_excel(excel_path)
    df_cleaned = clean_census_data(df_raw)
    df_validated = validate_census_data(df_cleaned)
    return df_validated

## Problem 7: How Do I Test the Pipeline with West Bengal?

In [None]:
wb_excel = STAGING_DIR / "education" / "west_bengal.xlsx"
wb_df = census_etl_pipeline(wb_excel)

In [None]:
wb_df.shape
wb_df["is_valid"].value_counts()

## Problem 8: How Do I Save the Final Processed Output?

In [None]:
processed_path = PROCESSED_DIR / "education" / "west_bengal_processed.csv"
processed_path.parent.mkdir(parents=True, exist_ok=True)

wb_df.to_csv(processed_path, index=False)
processed_path

## End-of-Chapter Direction