In [2]:
import pandas as pd
import sqlite3
import os

def setup_paths():
    """Set up file paths and create output directory."""
    FULL_CSV_PATH = 'transformed_full.csv'
    INCREMENTAL_CSV_PATH = 'transformed_incremental.csv'
    OUTPUT_DIR = 'loaded_data'
    FULL_DB_PATH = os.path.join(OUTPUT_DIR, 'full_data.db')
    INCREMENTAL_DB_PATH = os.path.join(OUTPUT_DIR, 'incremental_data.db')
    FULL_PARQUET_PATH = os.path.join(OUTPUT_DIR, 'full_data.parquet')
    INCREMENTAL_PARQUET_PATH = os.path.join(OUTPUT_DIR, 'incremental_data.parquet')
    
    # Create output directory if it doesn't exist
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # Verify input files exist
    if not os.path.exists(FULL_CSV_PATH):
        raise FileNotFoundError(f'{FULL_CSV_PATH} not found. Ensure Lab 4 output is available.')
    if not os.path.exists(INCREMENTAL_CSV_PATH):
        raise FileNotFoundError(f'{INCREMENTAL_CSV_PATH} not found. Ensure Lab 4 output is available.')
    
    return (FULL_CSV_PATH, INCREMENTAL_CSV_PATH, 
            FULL_DB_PATH, INCREMENTAL_DB_PATH, 
            FULL_PARQUET_PATH, INCREMENTAL_PARQUET_PATH)

def load_full_data(full_csv_path, full_db_path, full_parquet_path):
    """Load transformed_full.csv into SQLite and Parquet."""
    # Load full CSV into DataFrame
    full_df = pd.read_csv(full_csv_path)
    
    # Save to Parquet
    full_df.to_parquet(full_parquet_path, index=False)
    print(f'Saved full data to {full_parquet_path}')
    
    # Save to SQLite with defined schema
    conn = sqlite3.connect(full_db_path)
    full_df.to_sql('full_data', conn, if_exists='replace', index=False, dtype={
        'id': 'INTEGER PRIMARY KEY',
        'customer_name': 'TEXT',
        'product': 'TEXT',
        'quantity': 'INTEGER',
        'unit_price': 'REAL',
        'total_price': 'REAL',
        'order_date': 'TEXT'
    })
    conn.close()
    print(f'Saved full data to SQLite database: {full_db_path}')

def load_incremental_data(incremental_csv_path, incremental_db_path, incremental_parquet_path):
    """Load transformed_incremental.csv into SQLite and Parquet."""
    # Load incremental CSV into DataFrame
    incremental_df = pd.read_csv(incremental_csv_path)
    
    # Save to Parquet
    incremental_df.to_parquet(incremental_parquet_path, index=False)
    print(f'Saved incremental data to {incremental_parquet_path}')
    
    # Save to SQLite with defined schema
    conn = sqlite3.connect(incremental_db_path)
    incremental_df.to_sql('incremental_data', conn, if_exists='replace', index=False, dtype={
        'id': 'INTEGER PRIMARY KEY',
        'customer_name': 'TEXT',
        'product': 'TEXT',
        'quantity': 'INTEGER',
        'unit_price': 'REAL',
        'total_price': 'REAL',
        'order_date': 'TEXT'
    })
    conn.close()
    print(f'Saved incremental data to SQLite database: {incremental_db_path}')

def verify_loaded_data(full_db_path, incremental_db_path, full_parquet_path, incremental_parquet_path):
    """Verify loaded data by previewing SQLite tables and Parquet files."""
    # Verify SQLite full_data
    conn = sqlite3.connect(full_db_path)
    full_query = pd.read_sql_query('SELECT * FROM full_data LIMIT 5', conn)
    print('Full Data SQLite Preview:')
    print(full_query)
    conn.close()
    
    # Verify SQLite incremental_data
    conn = sqlite3.connect(incremental_db_path)
    incremental_query = pd.read_sql_query('SELECT * FROM incremental_data LIMIT 5', conn)
    print('\nIncremental Data SQLite Preview:')
    print(incremental_query)
    conn.close()
    
    # Verify Parquet full_data
    full_parquet_df = pd.read_parquet(full_parquet_path)
    print('\nFull Data Parquet Preview:')
    print(full_parquet_df.head())
    
    # Verify Parquet incremental_data
    incremental_parquet_df = pd.read_parquet(incremental_parquet_path)
    print('\nIncremental Data Parquet Preview:')
    print(incremental_parquet_df.head())

def main():
    """Main function to execute the ETL load process."""
    # Setup paths
    (FULL_CSV_PATH, INCREMENTAL_CSV_PATH, 
     FULL_DB_PATH, INCREMENTAL_DB_PATH, 
     FULL_PARQUET_PATH, INCREMENTAL_PARQUET_PATH) = setup_paths()
    
    # Load data
    print("Loading full data...")
    load_full_data(FULL_CSV_PATH, FULL_DB_PATH, FULL_PARQUET_PATH)
    print("Loading incremental data...")
    load_incremental_data(INCREMENTAL_CSV_PATH, INCREMENTAL_DB_PATH, INCREMENTAL_PARQUET_PATH)
    
    # Verify loaded data
    print("\nVerifying loaded data...")
    verify_loaded_data(FULL_DB_PATH, INCREMENTAL_DB_PATH, FULL_PARQUET_PATH, INCREMENTAL_PARQUET_PATH)

if __name__ == "__main__":
    main()

Loading full data...
Saved full data to loaded_data\full_data.parquet
Saved full data to SQLite database: loaded_data\full_data.db
Loading incremental data...
Saved incremental data to loaded_data\incremental_data.parquet
Saved incremental data to SQLite database: loaded_data\incremental_data.db

Verifying loaded data...
Full Data SQLite Preview:
   order_id customer_id  order_date  quantity  unit_price product_category  \
0      1001        C001  2023-10-01         2   20.000000      Electronics   
1      1002        C002  2023-10-15         5   15.500000         Clothing   
2      1003        C003  2023-11-01         3   25.000000      Electronics   
3      1006        C006  2023-11-10         4   25.142857        Furniture   
4      1007        C007  2023-12-01         2   50.000000            Books   

   total_price  
0    40.000000  
1    77.500000  
2    75.000000  
3   100.571429  
4   100.000000  

Incremental Data SQLite Preview:
   order_id customer_id  order_date  quantity 