In [1]:
# %% [markdown]
# # ETL Pipeline - Load Phase
# **Kevin (ID: 656)**  
# DSA 2040 A US 2025 Mid Semester Exam

# %% [markdown]
# ## 1. Import Required Libraries

# %%
import pandas as pd
import sqlite3
from sqlite3 import Error
from IPython.display import display
import os

# %% [markdown]
# ## 2. Load Transformed Data

# %%
# Load the transformed data
print("Loading transformed data...")
try:
    full_data = pd.read_csv('transformed/transformed_full.csv')
    inc_data = pd.read_csv('transformed/transformed_incremental.csv')
    print(" Data loaded successfully!")
    print(f"Full data: {len(full_data)} records")
    print(f"Incremental data: {len(inc_data)} records")
except Exception as e:
    print(f" Error loading files: {e}")
    print("Please run etl_transform.ipynb first")

# %% [markdown]
# ## 3. Database Loading (SQLite Option)

# %%
def create_connection(db_file):
    """ Create a database connection to a SQLite database """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        print(f" Connected to SQLite database: {db_file}")
        return conn
    except Error as e:
        print(f" Error connecting to database: {e}")
    return conn

# %%
print("\n=== LOADING TO SQLITE DATABASES ===")

# Create database connections
full_db = create_connection('loaded/full_data.db')
inc_db = create_connection('loaded/incremental_data.db')

# Load data into SQLite
if full_db and inc_db:
    try:
        # Full dataset
        full_data.to_sql('orders', full_db, if_exists='replace', index=False)
        print("\nFull data loaded to SQLite:")
        display(pd.read_sql("SELECT * FROM orders LIMIT 5", full_db))
        
        # Incremental dataset
        inc_data.to_sql('orders', inc_db, if_exists='replace', index=False)
        print("\nIncremental data loaded to SQLite:")
        display(pd.read_sql("SELECT * FROM orders LIMIT 5", inc_db))
        
    except Error as e:
        print(f" Error loading data: {e}")
    finally:
        full_db.close()
        inc_db.close()
else:
    print("Skipping SQLite loading due to connection errors")

# %% [markdown]
# ## 4. Parquet Loading (Alternative Option)

# %%
print("\n=== LOADING TO PARQUET FILES ===")

try:
    # Save as Parquet files
    full_data.to_parquet('loaded/full_data.parquet', index=False)
    inc_data.to_parquet('loaded/incremental_data.parquet', index=False)
    
    # Verify the files
    print(" Parquet files created successfully!")
    print("\nSample from full data:")
    display(pd.read_parquet('loaded/full_data.parquet').head(3))
    
except Exception as e:
    print(f" Error creating Parquet files: {e}")

# %% [markdown]
# ## 5. Final Verification

# %%
print("\n=== FINAL VERIFICATION ===")

# Check files were created
print("\nFiles in loaded folder:")
print(os.listdir('loaded'))

# Database verification
def verify_db(filepath, expected_records):
    try:
        conn = sqlite3.connect(filepath)
        count = pd.read_sql("SELECT COUNT(*) FROM orders", conn).iloc[0,0]
        conn.close()
        return count == expected_records
    except:
        return False

print("\nVerification results:")
print(f"- Full data.db: {'VALID' if verify_db('loaded/full_data.db', len(full_data)) else 'INVALID'}")
print(f"- Incremental_data.db: {'VALID' if verify_db('loaded/incremental_data.db', len(inc_data)) else 'INVALID'}")
print(f"- Full_data.parquet: {'FOUND' if os.path.exists('loaded/full_data.parquet') else 'MISSING'}")
print(f"- Incremental_data.parquet: {'FOUND' if os.path.exists('loaded/incremental_data.parquet') else 'MISSING'}")

# %% [markdown]
# ## 6. Completion Message

# %%
print("\nETL PIPELINE COMPLETE ")
print("All data has been successfully loaded to:")
print("- SQLite databases (in loaded/ folder)")
print("- Parquet files (alternative format)")


Loading transformed data...
 Data loaded successfully!
Full data: 99 records
Incremental data: 10 records

=== LOADING TO SQLITE DATABASES ===
 Connected to SQLite database: loaded/full_data.db
 Connected to SQLite database: loaded/incremental_data.db

Full data loaded to SQLite:


Unnamed: 0,order_id,customer_name,product,quantity,unit_price,order_date,region,total_price,order_size
0,1,Diana,Tablet,1,500.0,2024-01-20,South,500.0,Medium
1,2,Eve,Laptop,1,250.0,2024-04-29,North,250.0,Small
2,3,Charlie,Laptop,2,250.0,2024-01-08,Unknown,500.0,Medium
3,4,Eve,Laptop,2,750.0,2024-01-07,West,1500.0,Large
4,5,Eve,Tablet,3,500.0,2024-03-07,South,1500.0,Large



Incremental data loaded to SQLite:


Unnamed: 0,order_id,customer_name,product,quantity,unit_price,order_date,region,total_price,order_size
0,101,Alice,Laptop,1,900.0,2024-05-09,Central,900.0,Large
1,102,Unknown,Laptop,1,300.0,2024-05-07,Central,300.0,Small
2,103,Unknown,Laptop,1,600.0,2024-05-04,Central,600.0,Medium
3,104,Unknown,Tablet,1,300.0,2024-05-26,Central,300.0,Small
4,105,Heidi,Tablet,2,600.0,2024-05-21,North,1200.0,Large



=== LOADING TO PARQUET FILES ===
 Parquet files created successfully!

Sample from full data:


Unnamed: 0,order_id,customer_name,product,quantity,unit_price,order_date,region,total_price,order_size
0,1,Diana,Tablet,1,500.0,2024-01-20,South,500.0,Medium
1,2,Eve,Laptop,1,250.0,2024-04-29,North,250.0,Small
2,3,Charlie,Laptop,2,250.0,2024-01-08,Unknown,500.0,Medium



=== FINAL VERIFICATION ===

Files in loaded folder:
['full_data.db', 'full_data.parquet', 'incremental_data.db', 'incremental_data.parquet']

Verification results:
- Full data.db: VALID
- Incremental_data.db: VALID
- Full_data.parquet: FOUND
- Incremental_data.parquet: FOUND

ETL PIPELINE COMPLETE 
All data has been successfully loaded to:
- SQLite databases (in loaded/ folder)
- Parquet files (alternative format)
