In [None]:
#p3
import pandas as pd
import sqlite3
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def extract_data():
    file_path = "N:\CS2225 DS\Datasets\Program3_Traffic_Volume.csv"
    try:
        df = pd.read_csv(file_path, encoding='latin1')  
        logging.info("Data extracted successfully")
        return df
    except FileNotFoundError:
        logging.error(f"File '{file_path}' not found. Please check the file path.")
        return None
    except pd.errors.EmptyDataError:
        logging.error(f"File '{file_path}' is empty or invalid.")
        return None
    except Exception as e:
        logging.error(f"Extraction failed: {e}")
        return None


def transform_data(df):
    if df is None:
        logging.error("No data to transform")
        return None
    try:
        required_columns = ['date_time', 'weather_main', 'traffic_volume', 'weather_description']
        if not all(col in df.columns for col in required_columns):
            logging.error(f"Required columns {required_columns} not found. Available columns: {list(df.columns)}")
            return None
        df = df.rename(columns={
            'date_time': 'delivery_date',
            'weather_main': 'destination',
            'traffic_volume': 'weight_kg',
            'weather_description': 'status'
        })
        
        if df['delivery_date'].isnull().any():
            logging.warning("Null delivery_date found; dropping rows")
            df = df.dropna(subset=['delivery_date'])
   
        df['weight_kg'] = pd.to_numeric(df['weight_kg'], errors='coerce')
        invalid_weights = (df['weight_kg'] <= 0) | (df['weight_kg'].isna())
        if invalid_weights.any():
            logging.warning(f"Invalid weights found in {invalid_weights.sum()} rows; setting to median")
            df.loc[invalid_weights, 'weight_kg'] = df['weight_kg'].median()
       
        df['weight_lbs'] = df['weight_kg'] * 2.20462
        df['status'] = df['status'].astype(str).fillna("Pending")
        df['destination'] = df['destination'].astype(str).str.upper()
        df['processed_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        logging.info("Data transformed successfully")
        return df
    except Exception as e:
        logging.error(f"Transformation failed: {e}")
        return None


def load_data(df, db_name="logistics.db", table_name="shipments"):
    if df is None:
        logging.error("No data to load")
        return
    try:
        conn = sqlite3.connect(db_name)
        df.to_sql(table_name, conn, if_exists="append", index=False)
        conn.close()
        logging.info(f"Data loaded to {db_name} in table {table_name}")
    except Exception as e:
        logging.error(f"Loading failed: {e}")


def verify_data(db_name="logistics.db", table_name="shipments"):
    try:
        conn = sqlite3.connect(db_name)
        loaded_df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
        conn.close()
        logging.info("Data verification completed")
        return loaded_df
    except Exception as e:
        logging.error(f"Verification failed: {e}")
        return None


logging.info("Starting ETL pipeline")
extracted_df = extract_data()
print("Extracted Data:\n", extracted_df)
transformed_df = transform_data(extracted_df)
print("\nTransformed Data:\n", transformed_df)
load_data(transformed_df)
verified_df = verify_data()
print("\nLoaded Data from Database:\n", verified_df)


2025-08-21 21:02:23,076 - INFO - Starting ETL pipeline
2025-08-21 21:02:23,096 - INFO - Data extracted successfully
2025-08-21 21:02:23,108 - INFO - Data transformed successfully
2025-08-21 21:02:23,145 - INFO - Data loaded to logistics.db in table shipments
2025-08-21 21:02:23,180 - INFO - Data verification completed


Extracted Data:
       Unnamed: 0        holiday    temp  rain_1h  snow_1h  clouds_all  \
0          40255  New Years Day  249.36      0.0      0.0           1   
1          40256            NaN  249.08      0.0      0.0           1   
2          40257            NaN  248.86      0.0      0.0           1   
3          40258            NaN  248.72      0.0      0.0           1   
4          40259            NaN  248.43      0.0      0.0           1   
...          ...            ...     ...      ...      ...         ...   
7944       48199            NaN  283.45      0.0      0.0          75   
7945       48200            NaN  282.76      0.0      0.0          90   
7946       48201            NaN  282.73      0.0      0.0          90   
7947       48202            NaN  282.09      0.0      0.0          90   
7948       48203            NaN  282.12      0.0      0.0          90   

      weather_main     weather_description            date_time  year  month  \
0            Clear        

In [None]:
#ex3
import pandas as pd
import sqlite3
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# EXTRACT
def extract_data():

    data = [
        {"sensor_id": 101, "location": "main street", "vehicle_count": 25, "speed_kmh": 60, "timestamp": "2025-08-21 20:00:00"},
        {"sensor_id": 102, "location": "broadway ave", "vehicle_count": 42, "speed_kmh": 75, "timestamp": None},
        {"sensor_id": None, "location": "elm road", "vehicle_count": -5, "speed_kmh": 300, "timestamp": "2025-08-21 20:05:00"},
        {"sensor_id": 103, "location": "oak boulevard", "vehicle_count": 0, "speed_kmh": 45, "timestamp": "2025-08-21 20:10:00"}
    ]
    df = pd.DataFrame(data)
    print("\nExtracted Data:")
    print(df)
    return df

# TRANSFORM 
def transform_data(df):
    df = df[df["sensor_id"].notnull()]  
    df = df[df["vehicle_count"] >= 0]   
    df = df[(df["speed_kmh"] >= 0) & (df["speed_kmh"] <= 200)]  

    df["speed_mph"] = df["speed_kmh"] * 0.621371

    df["timestamp"] = df["timestamp"].fillna(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    df["location"] = df["location"].str.title()

    df["processed_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    print("\nTransformed Data:")
    print(df)
    return df

# LOAD
def load_data(df, db_name="traffic.db", table_name="traffic"):
    conn = sqlite3.connect(db_name)
    df.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()
    logging.info("Data loaded successfully into SQLite")

# VERIFY 
def verify_data(db_name="traffic.db", table_name="traffic"):
    conn = sqlite3.connect(db_name)
    loaded_df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
    conn.close()
    print("\nLoaded Data from Database:")
    print(loaded_df)
    return loaded_df

# MAIN
if __name__ == "__main__":
    raw_df = extract_data()
    transformed_df = transform_data(raw_df)
    load_data(transformed_df)
    verify_data()


2025-08-21 21:25:23,974 - INFO - Data loaded successfully into SQLite



Extracted Data:
   sensor_id       location  vehicle_count  speed_kmh            timestamp
0      101.0    main street             25         60  2025-08-21 20:00:00
1      102.0   broadway ave             42         75                 None
2        NaN       elm road             -5        300  2025-08-21 20:05:00
3      103.0  oak boulevard              0         45  2025-08-21 20:10:00

Transformed Data:
   sensor_id       location  vehicle_count  speed_kmh            timestamp  \
0      101.0    Main Street             25         60  2025-08-21 20:00:00   
1      102.0   Broadway Ave             42         75  2025-08-21 21:25:23   
3      103.0  Oak Boulevard              0         45  2025-08-21 20:10:00   

   speed_mph         processed_at  
0  37.282260  2025-08-21 21:25:23  
1  46.602825  2025-08-21 21:25:23  
3  27.961695  2025-08-21 21:25:23  

Loaded Data from Database:
   sensor_id       location  vehicle_count  speed_kmh            timestamp  \
0      101.0    Main Stree