# ETL Pipeline: Excel + JSON → Transform → BigQuery  
This notebook documents the full ETL workflow implemented as an Airflow DAG.



## **Sections**
1. Overview of the ETL pipeline  
2. Import dependencies  
3. DAG configuration
4. Create BigQuery dataset
5. Extraction steps  
6. Transformation steps  
7. Loading steps
8. Data validation
9. DAG Task Definitions
10. DAG Orchestration


## 1. ETL Pipeline Overview
## Requirements:

### i). Authentication
Run the following command in your terminal to authenticate with Google Cloud:
```bash
gcloud auth application-default login
```
Ensure your Google account has these BigQuery permissions:
- `bigquery.dataEditor`
- `bigquery.jobUser`

### ii). Python Packages
Install required packages:
```bash
pip install pandas pyarrow google-cloud-bigquery google-cloud-bigquery-storage
```

**Troubleshooting:** If you encounter `pyarrow` or `protobuf` errors:
- This environment uses `protobuf==4.25.8`
- Install compatible versions:
```bash
pip install protobuf==4.25.8 pyarrow==22.0.0
```

### iii). Configuration
- **Source files:** `/home/simon_mokaya/airflow/data`
- **BigQuery project:** `etl-simon-001-479300`
- **Dataset:** `staging_dataset`
- **Dataset location:** `us-central1`

## Diagrammatic Representation:
The diagram below shows the full ETL workflow, including extraction, transformation, loading, and validation steps.
![ETL Process Diagram](ETL_process.png)



## 2. Import Dependencies

In [None]:
# Import relevant libraries and modules
import pandas as pd
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
from google.cloud import bigquery
import logging


## 3. DAG Configuration
### i). Define project IDs, dataset, file paths, and scheduling details for the DAG.

In [None]:
# Configuration
PROJECT_ID = "etl-simon-001-479300"
DATASET_ID = "staging_dataset"

# File paths
EXCEL_FILE_PATH = "/home/simon_mokaya/airflow/data/Store_sales.xlsx"
JSON_FILE_PATH = "/home/simon_mokaya/airflow/data/products.json"

# Default arguments
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 12, 3),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email': ['mokayasimon495@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'email_on_success': False,
}


### ii). DAG Instantiation

Define schedule, description, and tags.

In [None]:
dag = DAG(
    dag_id='etl_pipeline_workflow',
    default_args=default_args,
    description='Complete ETL pipeline: Extract, Transform, Load to BigQuery',
    schedule='0 2 * * *',
    catchup=False,
    tags=['etl', 'bigquery', 'daily', 'staging'],
)

## 4. BigQuery Setup
Create the BigQuery dataset if it does not exist.


In [None]:
# Fxn to create bq dataset
def create_bigquery_dataset():
    """Create BigQuery dataset if it doesn't exist"""
    try:
        logging.info(f"Creating/verifying BigQuery dataset: {DATASET_ID}")

        client = bigquery.Client(project=PROJECT_ID)
        dataset_ref = f"{PROJECT_ID}.{DATASET_ID}"
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "us-central1"

        client.create_dataset(dataset, exists_ok=True)
        logging.info(f"Dataset '{DATASET_ID}' is ready")

        return "Dataset created/verified successfully"
    except Exception as e:
        logging.error(f"Dataset creation failed: {str(e)}")
        raise
        

## 5. Extraction Steps
These functions read data from Excel and JSON files, save temporary CSV extracts, and pass the paths to downstream tasks using XCom.

In [None]:
# Fxn to extract EXcel file
def extract_excel_data(**context):
    try:
        df = pd.read_excel(EXCEL_FILE_PATH)
        temp_path = '/tmp/extracted_excel.csv'
        df.to_csv(temp_path, index=False)

        context['ti'].xcom_push(key='excel_temp_path', value=temp_path)
        context['ti'].xcom_push(key='excel_row_count', value=len(df))
        return temp_path

    except Exception as e:
        logging.error(f"Excel extraction failed: {str(e)}")
        raise

# Fxn to extract JSON file
def extract_json_data(**context):
    try:
        df = pd.read_json(JSON_FILE_PATH)
        temp_path = '/tmp/extracted_json.csv'
        df.to_csv(temp_path, index=False)

        context['ti'].xcom_push(key='json_temp_path', value=temp_path)
        context['ti'].xcom_push(key='json_row_count', value=len(df))
        return temp_path

    except Exception as e:
        logging.error(f"JSON extraction failed: {str(e)}")
        raise
        

## 6. Transformation Steps
Transformation functions were created to clean, standardize, and validate the extracted data.

In [None]:
# Transform Excel file
def transform_excel_data(**context):
    try:
        excel_path = context['ti'].xcom_pull(key='excel_temp_path', task_ids='extract_excel')
        df = pd.read_csv(excel_path)
        
# Check if the first column contains comma-separated values 
        first_col = df.columns[0]
        sample = df[first_col].dropna().astype(str).head(5)

        if sample.str.contains(",").any():
            # Single-column case -> split
            df = df[first_col].str.split(",", expand=True)
            df.columns = ["date", "store_id", "product_id", "units_sold", "sales_amount"]
        else:
            # Multi-column case -> assume Excel columns are already correct
            expected_cols = ["date", "store_id", "product_id", "units_sold", "sales_amount"]
            df = df.rename(columns={df.columns[i]: expected_cols[i] for i in range(len(expected_cols))})

        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df["units_sold"] = pd.to_numeric(df["units_sold"], errors="coerce").astype("int64")
        df["sales_amount"] = pd.to_numeric(df["sales_amount"], errors="coerce").astype("float64")

        df = df.dropna()

        transformed_path = '/tmp/transformed_excel.csv'
        df.to_csv(transformed_path, index=False)

        context['ti'].xcom_push(key='excel_transformed_path', value=transformed_path)
        context['ti'].xcom_push(key='excel_final_count', value=len(df))

        return transformed_path

    except Exception as e:
        logging.error(f"Excel transformation failed: {str(e)}")
        raise

# Transform JSON file
def transform_json_data(**context):
    try:
        json_path = context['ti'].xcom_pull(key='json_temp_path', task_ids='extract_json')
        df = pd.read_csv(json_path)

        df["price"] = pd.to_numeric(df["price"], errors="coerce").astype("float64")
        df = df.drop_duplicates().dropna()

        transformed_path = '/tmp/transformed_json.csv'
        df.to_csv(transformed_path, index=False)

        context['ti'].xcom_push(key='json_transformed_path', value=transformed_path)
        context['ti'].xcom_push(key='json_final_count', value=len(df))

        return transformed_path

    except Exception as e:
        logging.error(f"JSON transformation failed: {str(e)}")
        raise


## 7. Loading Steps
Functions were created to load the preprocessed data into BigQuery staging tables.

In [None]:
# Fxn to load a dataFrame to BigQuery staging table
def load_staging_table(df, table_name):
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_name}"

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        autodetect=True
    )

    client = bigquery.Client(project=PROJECT_ID)
    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
    job.result()

    return table_id

# Fxn to load transformed Excel file to BigQuery staging table
def load_excel_to_bigquery(**context):
    try:
        transformed_path = context['ti'].xcom_pull(key='excel_transformed_path', task_ids='transform_excel')
        df = pd.read_csv(transformed_path)
        df["date"] = pd.to_datetime(df["date"])

        table_id = load_staging_table(df, "store_sales")
        return f"Loaded {len(df)} rows → {table_id}"

    except Exception as e:
        logging.error(f"Excel load failed: {str(e)}")
        raise

# Fxn to load transformed Excel file to BigQuery staging table
def load_json_to_bigquery(**context):
    try:
        transformed_path = context['ti'].xcom_pull(key='json_transformed_path', task_ids='transform_json')
        df = pd.read_csv(transformed_path)

        table_id = load_staging_table(df, "products")
        return f"Loaded {len(df)} rows → {table_id}"

    except Exception as e:
        logging.error(f"JSON load failed: {str(e)}")
        raise


## 8. Data Validation
After loading the transformed data into BigQuery, a validation step is run to ensure data quality across all loaded tables. This task checks for:

- Row counts – confirms that tables are not empty and have the expected number of rows.

- Null values – scans key fields to ensure required columns are populated.

- Duplicates – identifies duplicate records based on primary key or unique identifier fields.

- Referential integrity – verifies that foreign keys match corresponding values in referenced tables (where applicable).

In [None]:
# Fxn to run data validation
def validate_data(**context):
    """
    Run data quality checks on loaded BigQuery tables.
    Validates: row counts, nulls, duplicates, and referential integrity.
    """
    client = bigquery.Client(project=PROJECT_ID)
    validation_results = []
    has_critical_failure = False

    try:
        logging.info("="*60)
        logging.info("Starting Data Quality Validation")
        logging.info("="*60)

        # Get expected row counts from XCom
        expected_sales_count = context['ti'].xcom_pull(key='excel_final_count', task_ids='transform_excel')
        expected_products_count = context['ti'].xcom_pull(key='json_final_count', task_ids='transform_json')

        # Row Count Checks
        logging.info("Checking row counts...")

        sales_count = client.query(f"""
            SELECT COUNT(*) as cnt
            FROM `{PROJECT_ID}.{DATASET_ID}.store_sales`
        """).result().to_dataframe().iloc[0]['cnt']

        products_count = client.query(f"""
            SELECT COUNT(*) as cnt
            FROM `{PROJECT_ID}.{DATASET_ID}.products`
        """).result().to_dataframe().iloc[0]['cnt']

        # Check if tables are empty
        if sales_count == 0:
            has_critical_failure = True
            validation_results.append("❌ CRITICAL: store_sales table is empty")
        else:
            validation_results.append(f"✅ store_sales has {sales_count} rows")

        if products_count == 0:
            has_critical_failure = True
            validation_results.append("❌ CRITICAL: products table is empty")
        else:
            validation_results.append(f"✅ products has {products_count} rows")

        # Verify row counts match expected from transformation
        if sales_count != expected_sales_count:
            has_critical_failure = True
            validation_results.append(
                f"❌ CRITICAL: store_sales row count mismatch. Expected: {expected_sales_count}, Got: {sales_count}"
            )
        else:
            validation_results.append(f"✅ store_sales row count matches transformed data")

        if products_count != expected_products_count:
            has_critical_failure = True
            validation_results.append(
                f"❌ CRITICAL: products row count mismatch. Expected: {expected_products_count}, Got: {products_count}"
            )
        else:
            validation_results.append(f"✅ products row count matches transformed data")

        # Check for null values
        logging.info("Checking for null values in critical columns...")

        null_check_sales = client.query(f"""
            SELECT
                COUNTIF(date IS NULL) as null_dates,
                COUNTIF(product_id IS NULL) as null_product_ids,
                COUNTIF(units_sold IS NULL) as null_units,
                COUNTIF(sales_amount IS NULL) as null_amounts
            FROM `{PROJECT_ID}.{DATASET_ID}.store_sales`
        """).result().to_dataframe().iloc[0]

        for col, null_count in null_check_sales.items():
            col_name = col.replace('null_', '')
            if null_count > 0:
                has_critical_failure = True
                validation_results.append(f"❌ CRITICAL: {null_count} null values in store_sales.{col_name}")
            else:
                validation_results.append(f"✅ No nulls in store_sales.{col_name}")

        null_check_products = client.query(f"""
            SELECT
                COUNTIF(product_id IS NULL) as null_product_ids,
                COUNTIF(product_name IS NULL) as null_names,
                COUNTIF(price IS NULL) as null_prices
            FROM `{PROJECT_ID}.{DATASET_ID}.products`
        """).result().to_dataframe().iloc[0]

        for col, null_count in null_check_products.items():
            col_name = col.replace('null_', '')
            if null_count > 0:
                has_critical_failure = True
                validation_results.append(f"❌ CRITICAL: {null_count} null values in products.{col_name}")
            else:
                validation_results.append(f"✅ No nulls in products.{col_name}")

        # Check for duplicate records
        logging.info("Checking for duplicate records...")

        duplicates_sales = client.query(f"""
            SELECT product_id, date, COUNT(*) as dupes
            FROM `{PROJECT_ID}.{DATASET_ID}.store_sales`
            GROUP BY product_id, date
            HAVING COUNT(*) > 1
        """).result().to_dataframe()

        if len(duplicates_sales) > 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: {len(duplicates_sales)} duplicate records in store_sales")
            logging.warning(f"Duplicate records:\n{duplicates_sales.head()}")
        else:
            validation_results.append("✅ No duplicates in store_sales")

        duplicates_products = client.query(f"""
            SELECT product_id, COUNT(*) as dupes
            FROM `{PROJECT_ID}.{DATASET_ID}.products`
            GROUP BY product_id
            HAVING COUNT(*) > 1
        """).result().to_dataframe()

        if len(duplicates_products) > 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: {len(duplicates_products)} duplicate product_ids in products table")
            logging.warning(f"Duplicate products:\n{duplicates_products.head()}")
        else:
            validation_results.append("✅ No duplicates in products")

        # Referential Integrity
        logging.info("Checking referential integrity...")

        orphaned = client.query(f"""
            SELECT COUNT(*) as orphaned_records
            FROM `{PROJECT_ID}.{DATASET_ID}.store_sales` s
            LEFT JOIN `{PROJECT_ID}.{DATASET_ID}.products` p
            ON s.product_id = p.product_id
            WHERE p.product_id IS NULL
        """).result().to_dataframe().iloc[0]['orphaned_records']

        if orphaned > 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: {orphaned} sales records with invalid product_ids")
        else:
            validation_results.append("✅ All sales records have valid product_ids")

        # Validating business logic
        logging.info("Validating business rules...")

        ranges = client.query(f"""
            SELECT
                MIN(sales_amount) as min_amount,
                MAX(sales_amount) as max_amount,
                MIN(units_sold) as min_units,
                MAX(units_sold) as max_units
            FROM `{PROJECT_ID}.{DATASET_ID}.store_sales`
        """).result().to_dataframe().iloc[0]

       # Check for negative sales amounts
        if ranges['min_amount'] < 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: Negative sales_amount detected: {ranges['min_amount']}")
        else:
            validation_results.append(f"✅ sales_amount valid range: ${ranges['min_amount']:.2f} - ${ranges['max_amount']:.2f}")

        # Check for negative units
        if ranges['min_units'] < 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: Negative units_sold detected: {ranges['min_units']}")
        else:
            validation_results.append(f"✅ units_sold valid range: {ranges['min_units']} - {ranges['max_units']}")

        # Check product prices
        price_ranges = client.query(f"""
            SELECT
                MIN(price) as min_price,
                MAX(price) as max_price
            FROM `{PROJECT_ID}.{DATASET_ID}.products`
        """).result().to_dataframe().iloc[0]

        if price_ranges['min_price'] <= 0:
            has_critical_failure = True
            validation_results.append(f"❌ CRITICAL: Invalid product price detected: ${price_ranges['min_price']}")
        else:
            validation_results.append(f"✅ product prices valid range: ${price_ranges['min_price']:.2f} - ${price_ranges['max_price']:.2f}")

        # Generate report
        report = "\n".join(validation_results)
        logging.info("\n" + "="*60)
        logging.info("DATA QUALITY REPORT")
        logging.info("="*60)
        logging.info(report)
        logging.info("="*60)

        # Store results in XCom for potential email alerts
        context['ti'].xcom_push(key='validation_report', value=report)
        context['ti'].xcom_push(key='has_critical_failure', value=has_critical_failure)

        # Fail the task if critical issues found
        if has_critical_failure:
            raise ValueError(f"Data validation failed with critical errors:\n{report}")

        logging.info("✅ All data quality checks passed!")
        return "All validation checks passed successfully"

    except Exception as e:
        logging.error(f"Validation failed: {str(e)}")
        raise

## 9. DAG Task Definitions
Create Airflow tasks using PythonOperators.

In [None]:
# bq dataset creation
create_dataset = PythonOperator(
    task_id='create_bigquery_dataset',
    python_callable=create_bigquery_dataset,
    dag=dag,
)

# Extract .xlsx file
extract_excel = PythonOperator(
    task_id='extract_excel',
    python_callable=extract_excel_data,
    dag=dag,
)

# Extract .json file
extract_json = PythonOperator(
    task_id='extract_json',
    python_callable=extract_json_data,
    dag=dag,
)

# Transform .xlsx file
transform_excel = PythonOperator(
    task_id='transform_excel',
    python_callable=transform_excel_data,
    dag=dag,
)

# Transform .json file
transform_json = PythonOperator(
    task_id='transform_json',
    python_callable=transform_json_data,
    dag=dag,
)

# Load .xlsx file to bq
load_excel = PythonOperator(
    task_id='load_excel_to_bigquery',
    python_callable=load_excel_to_bigquery,
    dag=dag,
)

# Load .json file to bq
load_json = PythonOperator(
    task_id='load_json_to_bigquery',
    python_callable=load_json_to_bigquery,
    dag=dag,
)

# Validation task
validate = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag,
)


## 10. DAG Orchestration
Define the sequence and parallel execution paths.

In [None]:
create_dataset >> [extract_excel, extract_json]

extract_excel >> transform_excel >> load_excel
extract_json >> transform_json >> load_json

[load_excel, load_json] >> validate
