In [51]:
# Step 1: Import required libraries and load environment variables

import os
import pandas as pd
import snowflake.connector
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

print("Environment variables loaded.")


Environment variables loaded.


In [52]:
# Step 2: Read Snowflake credentials from environment variables

account = os.getenv("SNOWFLAKE_ACCOUNT")
user = os.getenv("SNOWFLAKE_USER")
password  = os.getenv("SNOWFLAKE_PASSWORD")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
database = os.getenv("SNOWFLAKE_DATABASE")
schema = os.getenv("SNOWFLAKE_SCHEMA")

# Print summary
print("Snowflake environment setup:")
print(f"Account: {bool(account)} | User: {bool(user)} | Password: {bool(password)} | Warehouse: {bool(warehouse)}")
print(f"Database: {bool(database)} | Schema: {bool(schema)}")


Snowflake environment setup:
Account: True | User: True | Password: True | Warehouse: True
Database: True | Schema: True


In [53]:
# Step 3: Establish a secure connection to Snowflake using the credentials

conn = snowflake.connector.connect(
    account=account,
    user=user,
    password=password,
    warehouse=warehouse,
    database=database,
    schema=schema
)

# Create a cursor object to run SQL queries
cursor = conn.cursor()

# Run a test query to confirm Snowflake session context
cursor.execute("""
    SELECT 
        CURRENT_ACCOUNT(), 
        CURRENT_USER(), 
        CURRENT_WAREHOUSE(), 
        CURRENT_DATABASE(), 
        CURRENT_SCHEMA();
""")

# Retrieve and display session info (non-sensitive)
result = cursor.fetchone()

print("Connected to Snowflake.")
print("Environment Details:")
print(f"Warehouse : {result[2]}")
print(f"Database  : {result[3]}")
print(f"Schema    : {result[4]}")

# Always close cursor after execution
cursor.close()

Connected to Snowflake.
Environment Details:
Warehouse : COMPUTE_WH
Database  : ETL_PROJECT_DB
Schema    : ETL_PROJECT_SCHEMA


True

In [54]:
# Step 4: Extract the CSV data

# Load CSV file path from environment variable to avoid hardcoding sensitive paths
data_file_path = os.getenv("DATA_FILE_PATH")

# Check if the file path exists before attempting to read
if not os.path.exists(data_file_path):
    raise FileNotFoundError(f"The CSV file at {data_file_path} was not found.")

# Read the CSV into a DataFrame
df = pd.read_csv(data_file_path)

# Display basic structure and first few rows
print("\nCSV File Loaded Successfully.")
print("Shape of DataFrame:", df.shape)

# Displaying the first 5 records for a quick preview
print("\nFirst 5 records:")
display(df.head())


CSV File Loaded Successfully.
Shape of DataFrame: (9800, 18)

First 5 records:


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,State,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales
0,1,CA-2017-152156,08/11/2017,11/11/2017,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420.0,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96
1,2,CA-2017-152156,08/11/2017,11/11/2017,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420.0,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",731.94
2,3,CA-2017-138688,12/06/2017,16/06/2017,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036.0,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,14.62
3,4,US-2016-108966,11/10/2016,18/10/2016,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311.0,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775
4,5,US-2016-108966,11/10/2016,18/10/2016,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311.0,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368


In [55]:
# Check for missing values across the dataset
print("\nMissing Values (Original Data):")
print(df.isnull().sum())


Missing Values (Original Data):
Row ID            0
Order ID          0
Order Date        0
Ship Date         0
Ship Mode         0
Customer ID       0
Customer Name     0
Segment           0
Country           0
City              0
State             0
Postal Code      11
Region            0
Product ID        0
Category          0
Sub-Category      0
Product Name      0
Sales             0
dtype: int64


In [56]:
# Step 5: Transform – Initial Cleanup & Feature Engineering

# Convert 'Order Date' and 'Ship Date' columns to datetime format
df["Order Date"] = pd.to_datetime(df["Order Date"], format="%d/%m/%Y")
df["Ship Date"] = pd.to_datetime(df["Ship Date"], format="%d/%m/%Y")

# Create new column for Month of Order
df["Order_Month"] = df["Order Date"].dt.month

# Calculate delivery time in days
df["Delivery Time (Days)"] = (df["Ship Date"] - df["Order Date"]).dt.days

# Handle missing values in 'Postal Code' by replacing with 0 and converting to integer
df["Postal Code"] = df["Postal Code"].fillna(0).astype(int)

# Drop duplicate rows if any
df.drop_duplicates(inplace=True)

# Basic transformation completed
print("\nData cleaned and transformed successfully.")
print("New columns added: 'Order_Month', 'Delivery Time (Days)'")




Data cleaned and transformed successfully.
New columns added: 'Order_Month', 'Delivery Time (Days)'


In [57]:
# Step 6: Verify and Validate Transformed Data

# Preview the first 3 rows to confirm the correct addition of new columns ('Order_Month', 'Delivery Time (Days)')
display(df.head(3))

# Verify the transformation of delivery-related columns and ensure 'Delivery Time (Days)' was calculated correctly
display(df[["Order Date", "Ship Date", "Delivery Time (Days)"]].head())

# Reconfirm that no missing values remain in the 'Postal Code' column after transformation
print("\nRemaining Missing Values in 'Postal Code':", df["Postal Code"].isnull().sum())


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,Country,City,State,Postal Code,Region,Product ID,Category,Sub-Category,Product Name,Sales,Order_Month,Delivery Time (Days)
0,1,CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,11,3
1,2,CA-2017-152156,2017-11-08,2017-11-11,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs,...",731.94,11,3
2,3,CA-2017-138688,2017-06-12,2017-06-16,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters b...,14.62,6,4


Unnamed: 0,Order Date,Ship Date,Delivery Time (Days)
0,2017-11-08,2017-11-11,3
1,2017-11-08,2017-11-11,3
2,2017-06-12,2017-06-16,4
3,2016-10-11,2016-10-18,7
4,2016-10-11,2016-10-18,7



Remaining Missing Values in 'Postal Code': 0


In [58]:
# Step 7: Final Data Validation

# Display final shape (rows, columns)
print(f"Final dataset shape: {df.shape}")

# Display final data types
print("\nData Types after Transformation:")
print(df.dtypes)

# Check for any remaining missing values
print("\nRemaining Missing Values across all columns:")
print(df.isnull().sum())


Final dataset shape: (9800, 20)

Data Types after Transformation:
Row ID                           int64
Order ID                        object
Order Date              datetime64[ns]
Ship Date               datetime64[ns]
Ship Mode                       object
Customer ID                     object
Customer Name                   object
Segment                         object
Country                         object
City                            object
State                           object
Postal Code                      int32
Region                          object
Product ID                      object
Category                        object
Sub-Category                    object
Product Name                    object
Sales                          float64
Order_Month                      int32
Delivery Time (Days)             int64
dtype: object

Remaining Missing Values across all columns:
Row ID                  0
Order ID                0
Order Date              0
Ship Date       

In [59]:
# Load
# Step 8.1 —  Connect to Snowflake for Loading

# Reconnect to Snowflake for data loading
conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema=os.getenv("SNOWFLAKE_SCHEMA")
)

cur = conn.cursor()
print("Connected to Snowflake Succesfully")

Connected to Snowflake Succesfully


In [60]:
# Step 8.2 — Create Table in Snowflake

# Create Target Table (if not exists)
create_table_query = """
CREATE OR REPLACE TABLE SALES_DATA_CSV (
    ROW_ID INT,
    ORDER_ID STRING,
    ORDER_DATE DATE,
    SHIP_DATE DATE,
    SHIP_MODE STRING,
    CUSTOMER_ID STRING,
    CUSTOMER_NAME STRING,
    SEGMENT STRING,
    COUNTRY STRING,
    CITY STRING,
    STATE STRING,
    POSTAL_CODE INT,
    REGION STRING,
    PRODUCT_ID STRING,
    CATEGORY STRING,
    SUB_CATEGORY STRING,
    PRODUCT_NAME STRING,
    SALES FLOAT,
    ORDER_MONTH INT,
    DELIVERY_TIME_DAYS INT
);
"""

cur.execute(create_table_query)
print("\nTable 'SALES_DATA_CSV' created successfully.")



Table 'SALES_DATA_CSV' created successfully.


In [61]:
# Step 8.3 — Load Transformed Data into Snowflake (Optimized and Clean)

# Define the SQL query for inserting data into Snowflake
insert_query = """
    INSERT INTO SALES_DATA_CSV (
        ROW_ID, ORDER_ID, ORDER_DATE, SHIP_DATE, SHIP_MODE, CUSTOMER_ID, CUSTOMER_NAME, 
        SEGMENT, COUNTRY, CITY, STATE, POSTAL_CODE, REGION, PRODUCT_ID, CATEGORY, 
        SUB_CATEGORY, PRODUCT_NAME, SALES, ORDER_MONTH, DELIVERY_TIME_DAYS
    )
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

# Step 1: Filter out existing ROW_IDs from the data to avoid duplicates before loading into Snowflake
cur.execute("SELECT ROW_ID FROM SALES_DATA_CSV;")
existing_row_ids = set(row[0] for row in cur.fetchall())

# Step 2: Prepare data for insertion (only NEW ROW_IDs)
data_to_insert = []
for index, row in df.iterrows():
    if row['Row ID'] not in existing_row_ids:
        data_to_insert.append((
            row['Row ID'],
            row['Order ID'],
            row['Order Date'].strftime('%Y-%m-%d') if pd.notnull(row['Order Date']) else None,
            row['Ship Date'].strftime('%Y-%m-%d') if pd.notnull(row['Ship Date']) else None,
            row['Ship Mode'],
            row['Customer ID'],
            row['Customer Name'],
            row['Segment'],
            row['Country'],
            row['City'],
            row['State'],
            row['Postal Code'],
            row['Region'],
            row['Product ID'],
            row['Category'],
            row['Sub-Category'],
            row['Product Name'],
            row['Sales'],
            row['Order_Month'],
            row['Delivery Time (Days)']
        ))

# Step 3: Bulk insert new data
if data_to_insert:
    cur.executemany(insert_query, data_to_insert)
    conn.commit()
    print(f"Loaded {len(data_to_insert)} new rows successfully.")
else:
    print("No new orders to Load. Database is already up-to-date.")


Loaded 9800 new rows successfully.


In [71]:
# Final Data Integrity Check
# Step 9.1: Compare row counts between the source and the target table
# Row count in the CSV
source_row_count = len(df)

# Row count in Snowflake (target table)
cur.execute("SELECT COUNT(*) FROM SALES_DATA_CSV;")
snowflake_row_count = cur.fetchone()[0]

# Check if the row counts match
if source_row_count == snowflake_row_count:
    print(f"Row counts match: {source_row_count} rows loaded successfully.")
else:
    print(f"Warning: Row count mismatch. Source: {source_row_count}, Snowflake: {snowflake_row_count}")


Row counts match: 9800 rows loaded successfully.


In [72]:
# Step 9.2 — Check for Duplicates (ROW_ID Check)

# Query to check for duplicate ROW_IDs in the Snowflake table
cur.execute("""
    SELECT ROW_ID, COUNT(*) 
    FROM SALES_DATA_CSV 
    GROUP BY ROW_ID 
    HAVING COUNT(*) > 1;
""")

duplicates = cur.fetchall()

if duplicates:
    print(f"Post-Load Check: Found {len(duplicates)} duplicate ROW_ID(s).")
else:
    print("Post-Load Check: No duplicate ROW_IDs found. Data is consistent.")


Post-Load Check: No duplicate ROW_IDs found. Data is consistent.


In [73]:
# Step 9.3 — Data Quality Check (Look for Nulls or Invalid Data)

# Query to check for NULL values in critical columns
cur.execute("""
    SELECT COUNT(*) 
    FROM SALES_DATA_CSV
    WHERE ROW_ID IS NULL OR SALES IS NULL OR CUSTOMER_ID IS NULL;
""")

null_count = cur.fetchone()[0]

if null_count > 0:
    print(f"Data Quality Check: Found {null_count} rows with NULL values in critical columns.")
else:
    print("Data Quality Check: No NULL values found in critical columns.")


Data Quality Check: No NULL values found in critical columns.


In [74]:
# Step 9.4 — Final Validation (Data Integrity)

# Query to check if the sum of SALES in the CSV matches the sum in the Snowflake table
cur.execute("SELECT SUM(SALES) FROM SALES_DATA_CSV;")
snowflake_sales_sum = cur.fetchone()[0]

# Get the sum of SALES from the CSV (assuming 'df' is the DataFrame containing your CSV data)
csv_sales_sum = df['Sales'].sum()

# Round both sums to 4 decimal places for a more accurate comparison
snowflake_sales_sum_rounded = round(snowflake_sales_sum, 4)
csv_sales_sum_rounded = round(csv_sales_sum, 4)

if snowflake_sales_sum_rounded == csv_sales_sum_rounded:
    print("Final Validation: Data integrity confirmed. SALES sum matches between CSV and Snowflake.")
else:
    print(f"Final Validation: WARNING! SALES sum mismatch. CSV: {csv_sales_sum_rounded}, Snowflake: {snowflake_sales_sum_rounded}")


Final Validation: Data integrity confirmed. SALES sum matches between CSV and Snowflake.
