In [0]:
# Install required libraries (UPDATED VERSIONS)
%pip install snowflake-snowpark-python pandas
dbutils.library.restartPython()

Collecting snowflake-snowpark-python
  Downloading snowflake_snowpark_python-1.40.0-py3-none-any.whl.metadata (170 kB)
Collecting snowflake-connector-python<4.0.0,>=3.17.0 (from snowflake-snowpark-python)
  Downloading snowflake_connector_python-3.18.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (74 kB)
Collecting tzlocal (from snowflake-snowpark-python)
  Downloading tzlocal-5.3.1-py3-none-any.whl.metadata (7.6 kB)
Collecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python<4.0.0,>=3.17.0->snowflake-snowpark-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting pyOpenSSL<26.0.0,>=22.0.0 (from snowflake-connector-python<4.0.0,>=3.17.0->snowflake-snowpark-python)
  Downloading pyopenssl-25.3.0-py3-none-any.whl.metadata (17 kB)
Collecting tomlkit (from snowflake-connector-python<4.0.0,>=3.17.0->snowflake-snowpark-python)
  Downloading tomlkit-0.13.3-py3-none-any.whl.metadata (2.8 kB)
Collecting cryptography>=3.1.0 (from s

In [0]:
# Import libraries and configuration
from snowflake.snowpark import Session
from snowflake.snowpark.types import *

# Snowflake connection details
snowflake_account = "TYMHDZV-PZ92491"
snowflake_user = "Ruthra" 
snowflake_password = "Your pass"

# Azure storage details
storage_account = "ittechgeniestorage"
container_name = "sales-data"

# Snowflake objects
warehouse = "COMPUTE_WH"
database = "ITTG_SALES_DB"
raw_schema = "RAW_DATA"
clean_schema = "CLEAN_DATA"

connection_parameters = {
    "account": "TYMHDZV-PZ92491",
    "user": "Ruthra",
    "password": "Ruthra#Your pass",
    "role": "ACCOUNTADMIN",
    "warehouse": "COMPUTE_WH",
    "database": "ITTG_SALES_DB",
    "schema": "RAW_DATA"
}

print("Configuration set successfully")

Configuration set successfully


In [0]:
# Create Snowpark session
session = Session.builder.configs(connection_parameters).create()
print("Snowpark session created successfully")

Snowpark session created successfully


In [0]:
# Create database and schema if not exists
session.sql("CREATE DATABASE IF NOT EXISTS ITTG_SALES_DB").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS ITTG_SALES_DB.RAW_DATA").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS ITTG_SALES_DB.CLEAN_DATA").collect()
session.sql("USE DATABASE ITTG_SALES_DB").collect()
session.sql("USE SCHEMA RAW_DATA").collect()
print("Database and schema setup completed")

Database and schema setup completed


In [0]:
# Create file format and stage WITH SAS TOKEN
session.sql("""
CREATE OR REPLACE FILE FORMAT csv_sales_format
    TYPE = 'CSV'
    FIELD_DELIMITER = ','
    SKIP_HEADER = 1
    NULL_IF = ('NULL', 'null')
    EMPTY_FIELD_AS_NULL = TRUE;
""").collect()

session.sql("""
CREATE OR REPLACE STAGE azure_sales_stage
    URL = 'azure://ittechgeniestorage.blob.core.windows.net/sales-data/'
    CREDENTIALS = (
        AZURE_SAS_TOKEN = '?sp=racwdl&st=2025-10-22T10:47:09Z&se=2025-10-23T19:02:09Z&spr=https&sv=2024-11-04&sr=c&sig=hEF7601nEZP%2Byvbuk9F2FVtAou%2F3%2BoDvfC3fNQ5fLbs%3D'
    )
    FILE_FORMAT = csv_sales_format;
""").collect()

print("File format and stage created successfully")

File format and stage created successfully


In [0]:
# Test stage connection
try:
    result = session.sql("LIST @azure_sales_stage").collect()
    print("Stage connection successful! Files found:")
    for row in result:
        print(f" - {row['name']}")
except Exception as e:
    print(f"Error listing files: {str(e)}")

Stage connection successful! Files found:
 - azure://ittechgeniestorage.blob.core.windows.net/sales-data/Retail_Sales__500_rows__Preview.csv


In [0]:
# Create raw table schema matching your CSV
session.sql("""
CREATE OR REPLACE TABLE raw_sales_data (
    OrderID STRING,
    OrderDate DATE,
    MonthOfSale STRING,
    CustomerID STRING,
    CustomerName STRING,
    Country STRING,
    Region STRING,
    City STRING,
    Category STRING,
    Subcategory STRING,
    Quantity INTEGER,
    Discount NUMBER(10,2),
    Sales NUMBER(10,2),
    Profit NUMBER(10,2),
    FileName STRING,
    LoadTimestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
""").collect()

print("Raw sales table created successfully")

Raw sales table created successfully


In [0]:
# Ingest data from Azure to Snowflake
copy_result = session.sql("""
COPY INTO raw_sales_data (
    OrderID, OrderDate, MonthOfSale, CustomerID, CustomerName, 
    Country, Region, City, Category, Subcategory, 
    Quantity, Discount, Sales, Profit, FileName
)
FROM (
    SELECT 
        $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14,
        METADATA$FILENAME
    FROM @azure_sales_stage/Retail_Sales__500_rows__Preview.csv
)
FILE_FORMAT = (FORMAT_NAME = csv_sales_format)
ON_ERROR = 'CONTINUE';
""").collect()

print(f"Data ingestion completed: {copy_result[0]['rows_loaded']} rows loaded")

Data ingestion completed: 25 rows loaded


In [0]:
# Verify raw data
result = session.sql("SELECT COUNT(*) as total_rows FROM raw_sales_data").collect()
print(f"Total rows in raw table: {result[0]['TOTAL_ROWS']}")

print("Sample raw data:")
session.sql("SELECT * FROM raw_sales_data LIMIT 5").show()

Total rows in raw table: 25
Sample raw data:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERID"     |"ORDERDATE"  |"MONTHOFSALE"  |"CUSTOMERID"  |"CUSTOMERNAME"  |"COUNTRY"  |"REGION"  |"CITY"   |"CATEGORY"       |"SUBCATEGORY"  |"QUANTITY"  |"DISCOUNT"  |"SALES"   |"PROFIT"  |"FILENAME"                           |"LOADTIMESTAMP"             |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|ORD-5F8D6F0C  |2024-10-08   |2024-10        |CUST1000      |Ananya Sharma   |India      |South     |Mumbai   |Office Supplies  |Paper          |9           |0.00        |2

In [0]:
# Switch to clean schema and create clean data table
session.sql("USE SCHEMA CLEAN_DATA").collect()

session.sql("""
CREATE OR REPLACE TABLE clean_sales_data AS
SELECT 
    OrderID,
    OrderDate,
    MonthOfSale,
    CustomerID,
    CustomerName,
    Country,
    Region,
    City,
    Category,
    Subcategory,
    Quantity,
    Discount,
    Sales,
    Profit,
    -- Data validation and calculations
    CASE WHEN Sales != Quantity * (Sales/NULLIF(Quantity,0)) THEN Sales ELSE Sales END AS ValidatedSales,
    -- Date parts for analysis
    YEAR(OrderDate) AS OrderYear,
    MONTH(OrderDate) AS OrderMonth,
    QUARTER(OrderDate) AS OrderQuarter,
    -- Business metrics
    Sales * Discount AS DiscountAmount,
    Profit / NULLIF(Sales, 0) AS ProfitMargin,
    LoadTimestamp
FROM ITTG_SALES_DB.RAW_DATA.raw_sales_data
WHERE OrderDate IS NOT NULL AND Sales > 0;
""").collect()

print("Clean sales data table created")

Clean sales data table created


In [0]:
# Create aggregated views for Power BI
session.sql("""
CREATE OR REPLACE VIEW sales_summary_monthly AS
SELECT
    Region,
    Category,
    OrderYear,
    OrderMonth,
    COUNT(*) AS TotalOrders,
    SUM(Sales) AS TotalSales,
    SUM(Profit) AS TotalProfit,
    AVG(Sales) AS AvgOrderValue,
    SUM(Quantity) AS TotalQuantity,
    COUNT(DISTINCT CustomerID) AS UniqueCustomers
FROM clean_sales_data
GROUP BY Region, Category, OrderYear, OrderMonth
ORDER BY OrderYear, OrderMonth, Region;
""").collect()

print("Aggregated views created successfully")

Aggregated views created successfully


In [0]:
# Create Power BI optimized view
session.sql("""
CREATE OR REPLACE VIEW vw_powerbi_sales_dashboard AS
SELECT 
    cs.*,
    sm.TotalSales AS RegionMonthlySales,
    sm.TotalProfit AS RegionMonthlyProfit,
    sm.UniqueCustomers AS RegionMonthlyCustomers
FROM clean_sales_data cs
LEFT JOIN sales_summary_monthly sm 
    ON cs.Region = sm.Region 
    AND cs.Category = sm.Category
    AND cs.OrderYear = sm.OrderYear 
    AND cs.OrderMonth = sm.OrderMonth;
""").collect()

print("Power BI view created successfully")

Power BI view created successfully


In [0]:
# Data quality checks
checks = [
    "SELECT COUNT(*) AS total_rows FROM raw_sales_data",
    "SELECT COUNT(*) AS clean_rows FROM clean_sales_data", 
    "SELECT COUNT(DISTINCT Region) AS region_count FROM clean_sales_data",
    "SELECT MIN(OrderDate) AS earliest_date, MAX(OrderDate) AS latest_date FROM clean_sales_data",
    "SELECT SUM(Sales) AS total_sales, SUM(Profit) AS total_profit FROM clean_sales_data"
]

print("Data Quality Check Results:")
for check in checks:
    try:
        result = session.sql(check).collect()
        print(f"{check}: {result[0]}")
    except Exception as e:
        print(f"Error in check {check}: {str(e)}")

Data Quality Check Results:
Error in check SELECT COUNT(*) AS total_rows FROM raw_sales_data: (1304): 01bfe111-0001-6723-000c-5afa000343a2: 002003 (42S02): SQL compilation error:
Object 'RAW_SALES_DATA' does not exist or not authorized.
SELECT COUNT(*) AS clean_rows FROM clean_sales_data: Row(CLEAN_ROWS=25)
SELECT COUNT(DISTINCT Region) AS region_count FROM clean_sales_data: Row(REGION_COUNT=5)
SELECT MIN(OrderDate) AS earliest_date, MAX(OrderDate) AS latest_date FROM clean_sales_data: Row(EARLIEST_DATE=datetime.date(2024, 2, 27), LATEST_DATE=datetime.date(2025, 9, 15))
SELECT SUM(Sales) AS total_sales, SUM(Profit) AS total_profit FROM clean_sales_data: Row(TOTAL_SALES=Decimal('1066026.50'), TOTAL_PROFIT=Decimal('180647.47'))


In [0]:
# Final verification and sample data
print("Final verification - Sample from Power BI view:")
session.sql("SELECT * FROM vw_powerbi_sales_dashboard LIMIT 5").show()

print("Summary statistics:")
session.sql("""
SELECT 
    COUNT(*) AS total_records,
    COUNT(DISTINCT CustomerID) AS unique_customers,
    COUNT(DISTINCT Region) AS regions_covered,
    SUM(Sales) AS total_sales,
    SUM(Profit) AS total_profit,
    AVG(ProfitMargin) AS avg_profit_margin
FROM vw_powerbi_sales_dashboard
""").show()

Final verification - Sample from Power BI view:
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"ORDERID"     |"ORDERDATE"  |"MONTHOFSALE"  |"CUSTOMERID"  |"CUSTOMERNAME"  |"COUNTRY"  |"REGION"  |"CITY"   |"CATEGORY"       |"SUBCATEGORY"  |"QUANTITY"  |"DISCOUNT"  |"SALES"   |"PROFIT"  |"VALIDATEDSALES"  |"ORDERYEAR"  |"ORDERMONTH"  |"ORDERQUARTER"  |"DISCOUNTAMOUNT"  |"PROFITMARGIN"  |"LOADTIMESTAMP"             |"REGIONMONTHLYSALES"  |"REGIONMONTHLYPROFIT"  |"REGIONMONTHLYCUSTOMERS"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Close session
session.close()
print("Snowpark session closed")
print("Pipeline execution completed successfully!")
print("\nNext steps: Connect Power BI to Snowflake using:")
print("Database: ITTG_SALES_DB")
print("Schema: CLEAN_DATA") 
print("View: VW_POWERBI_SALES_DASHBOARD")

Snowpark session closed
Pipeline execution completed successfully!

Next steps: Connect Power BI to Snowflake using:
Database: ITTG_SALES_DB
Schema: CLEAN_DATA
View: VW_POWERBI_SALES_DASHBOARD
