In [1]:
import pandas as pd
import requests
import sqlite3
import logging
from datetime import datetime

In [3]:
# Set up logging for ETL monitoring
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [5]:
# Step 1: Extract - Collect data from a public API (simulating real-time ingestion)
def extract_data():
    try:
        # Using a public API (e.g., mock logistics API; replace with real API if available)
        url = "https://api.example.com/shipments"  # Placeholder; using sample JSON for demo
        sample_data = {
            "shipments": [
                {"shipment_id": "S001", "origin": "Delhi", "destination": "Mumbai", "weight_kg": 100, "status": "In Transit"},
                {"shipment_id": "S002", "origin": "Bangalore", "destination": "Chennai", "weight_kg": -50, "status": None}
            ]
        }
        df = pd.DataFrame(sample_data["shipments"])
        logging.info("Data extracted successfully")
        return df
    except Exception as e:
        logging.error(f"Extraction failed: {e}")
        return None


In [6]:
# Step 2: Transform - Validate and transform data
def transform_data(df):
    if df is None:
        logging.error("No data to transform")
        return None
    try:
        # Data validation
        # Check for null shipment_id
        if df['shipment_id'].isnull().any():
            logging.warning("Null shipment_id found; dropping rows")
            df = df.dropna(subset=['shipment_id'])
        
        # Check for valid weight_kg (> 0)
        invalid_weights = df['weight_kg'] <= 0
        if invalid_weights.any():
            logging.warning(f"Invalid weights found in {invalid_weights.sum()} rows; setting to median")
            df.loc[invalid_weights, 'weight_kg'] = df['weight_kg'].median()
            
        # Transform: Convert weight_kg to weight_lbs
        df['weight_lbs'] = df['weight_kg'] * 2.20462

        # Fill missing status with "Pending"
        df['status'] = df['status'].fillna("Pending")

        # Standardize destination to uppercase
        df['destination'] = df['destination'].str.upper()

        # Add timestamp for pipeline tracking
        df['processed_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        logging.info("Data transformed successfully")
        return df
    except Exception as e:
        logging.error(f"Transformation failed: {e}")
        return None


In [7]:
# Step 3: Load - Store data in SQLite database (simulating data warehouse)
def load_data(df, db_name="logistics.db", table_name="shipments"):
    if df is None:
        logging.error("No data to load")
        return
    try:
        conn = sqlite3.connect(db_name)
        df.to_sql(table_name, conn, if_exists="append", index=False)
        conn.close()
        logging.info(f"Data loaded to {db_name} in table {table_name}")
    except Exception as e:
        logging.error(f"Loading failed: {e}")


In [8]:
# Step 4: Verify loaded data
def verify_data(db_name="logistics.db", table_name="shipments"):
    try:
        conn = sqlite3.connect(db_name)
        loaded_df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
        conn.close()
        logging.info("Data verification completed")
        return loaded_df
    except Exception as e:
        logging.error(f"Verification failed: {e}")
        return None


In [9]:
# Run ETL pipeline
logging.info("Starting ETL pipeline")
extracted_df = extract_data()
print("Extracted Data:\n", extracted_df)

transformed_df = transform_data(extracted_df)
print("\nTransformed Data:\n", transformed_df)

load_data(transformed_df)
verified_df = verify_data()
print("\nLoaded Data from Database:\n", verified_df)


2025-08-20 10:16:34,983 - INFO - Starting ETL pipeline
2025-08-20 10:16:34,988 - INFO - Data extracted successfully
2025-08-20 10:16:34,998 - INFO - Data transformed successfully
2025-08-20 10:16:35,003 - INFO - Data loaded to logistics.db in table shipments
2025-08-20 10:16:35,005 - INFO - Data verification completed


Extracted Data:
   shipment_id     origin destination  weight_kg      status
0        S001      Delhi      Mumbai        100  In Transit
1        S002  Bangalore     Chennai        -50        None

Transformed Data:
   shipment_id     origin destination  weight_kg      status  weight_lbs  \
0        S001      Delhi      MUMBAI        100  In Transit    220.4620   
1        S002  Bangalore     CHENNAI         25     Pending     55.1155   

          processed_at  
0  2025-08-20 10:16:34  
1  2025-08-20 10:16:34  

Loaded Data from Database:
   shipment_id     origin destination  weight_kg      status  weight_lbs  \
0        S001      Delhi      MUMBAI        100  In Transit    220.4620   
1        S002  Bangalore     CHENNAI         25     Pending     55.1155   

          processed_at  
0  2025-08-20 10:16:34  
1  2025-08-20 10:16:34  
