In [4]:
import snowflake.connector

conn = snowflake.connector.connect(
user='',
password='',
account=''
)

  warn_incompatible_dep(


In [5]:
cs = conn.cursor()

In [6]:
# create a new warehouse in Snowflake
cs.execute("CREATE WAREHOUSE IF NOT EXISTS ELT_PROJECT")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f85b4e8ef50>

In [74]:
# create a new database in SnowFlake
cs.execute("CREATE DATABASE IF NOT EXISTS PROJECT_DATABASE")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [18]:
# specific the database we want to use (because sometimes it just goes to another database)
cs.execute("USE DATABASE PROJECT_DATABASE")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f263f9a6150>

In [9]:
# create a new schema in SnowFlake
cs.execute("CREATE SCHEMA IF NOT EXISTS PROJECT_SCHEMA")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f85b4e8ef50>

In [19]:
# specific the schema we want to use
cs.execute("USE SCHEMA PROJECT_SCHEMA")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f263f9a6150>

In [11]:
# create a stage in SnowFlake
cs.execute("CREATE OR REPLACE STAGE PROJECT_STAGE FILE_FORMAT = (TYPE = 'CSV')")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f85b4e8ef50>

In [26]:
# upload the files to the stage
import glob
files_to_upload = glob.glob('/CaseData/MonthlyPOData/*.csv')

In [27]:
for file_path in files_to_upload:
    put_command = f"PUT file://{file_path} @project_stage"
    cs.execute(put_command)

In [28]:
# create a new table called purchase
cs.execute("""
CREATE OR REPLACE TABLE purchase (
    PurchaseOrderID INT,
    SupplierID INT,
    OrderDate DATE,
    DeliveryMethodID INT,
    ContactPersonID INT,
    ExpectedDeliveryDate DATE,
    SupplierReference STRING,
    IsOrderFinalized INT,
    Comments STRING,
    InternalComments STRING,
    LastEditedBy INT,
    LastEditedWhen STRING,
    PurchaseOrderLineID INT,
    StockItemID INT,
    OrderedOuters INT,
    Description STRING,
    ReceivedOuters INT,
    PackageTypeID INT,
    ExpectedUnitPricePerOuter FLOAT,
    LastReceiptDate DATE,
    IsOrderLineFinalized INT,
    Right_LastEditedBy INT,
    Right_LastEditedWhen STRING                                                                                                                                           
)
""")
# Change the datatype of LastEditedWhen from datetime to string because of the error when copying the data to the table


<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [29]:
# copy data from stage to purchase table
copy_into_command = """
COPY INTO purchase
FROM @project_stage
FILE_FORMAT = (TYPE = 'CSV' skip_header = 1 NULL_IF=('NULL', '\\N'))
ON_ERROR = 'CONTINUE'
"""
cs.execute("TRUNCATE TABLE purchase")
cs.execute(copy_into_command)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [30]:
cs.execute("ALTER TABLE purchase DROP COLUMN comments, internalcomments")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [31]:
# Step 2
# add POAmount column that shows purchase order totals
cs.execute("ALTER TABLE purchase ADD COLUMN POAmount FLOAT")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [32]:
# Create a temp table to store the new POAmount
cs.execute("""CREATE TEMP TABLE IF NOT EXISTS temp_table AS
SELECT PurchaseOrderID, SUM(ReceivedOuters * ExpectedUnitPricePerOuter) OVER(PARTITION BY PurchaseOrderID)AS new_POAmount
FROM purchase""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [33]:
# update the POAmount with the sum of each purchase order 
cs.execute("""UPDATE purchase
SET POAmount = temp_table.new_POAmount
FROM temp_table
WHERE purchase.PurchaseOrderID = temp_table.PurchaseOrderID;
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [34]:
# Step 3: Extract and load the supplier invoice XML data
cs.execute("PUT file://CaseData/SupplierTransactionsXML.xml @project_stage")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [35]:
# create supplier table for XML data
cs.execute("""CREATE OR REPLACE TABLE supplier_invoice (
  scr VARIANT)
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [36]:
# copy the XML file into the supplier table
cs.execute("""COPY INTO PROJECT_DATABASE.PROJECT_SCHEMA.SUPPLIER_INVOICE
FROM @PROJECT_DATABASE.PROJECT_SCHEMA.PROJECT_STAGE/SupplierTransactionsXML.xml
FILE_FORMAT = (
    TYPE='XML')
ON_ERROR='CONTINUE'
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [37]:
# create a temporary table to store the data
# there are null values in PurchaseOrderID, SupplierInvoiceNumber, FinalizationDate column, use NULLIF to change the null values to ''
cs.execute("""
CREATE OR REPLACE TABLE "PROJECT_DATABASE"."PROJECT_SCHEMA"."FLATTENED_SUPPLIER_INVOICE" AS
SELECT 
    xmlget(value, 'SupplierTransactionID'):"$"::INTEGER AS SupplierTransactionID,
    xmlget(value, 'SupplierID'):"$"::INTEGER AS SupplierID_INVOICE,
    xmlget(value, 'TransactionTypeID'):"$"::INTEGER AS TransactionTypeID,
    NULLIF(xmlget(value, 'PurchaseOrderID'):"$", '')::INTEGER AS PurchaseOrderID_INVOICE,
    xmlget(value, 'PaymentMethodID'):"$"::INTEGER AS PaymentMethodID,
    NULLIF(xmlget(value, 'SupplierInvoiceNumber'):"$", '')::INTEGER AS SupplierInvoiceNumber,
    xmlget(value, 'TransactionDate'):"$"::DATE AS TransactionDate,
    xmlget(value, 'AmountExcludingTax'):"$"::FLOAT AS AmountExcludingTax,
    xmlget(value, 'TaxAmount'):"$"::FLOAT AS TaxAmount,
    xmlget(value, 'TransactionAmount'):"$"::FLOAT AS TransactionAmount,
    xmlget(value, 'OutstandingBalance'):"$"::FLOAT AS OutstandingBalance,
    NULLIF(xmlget(value, 'FinalizationDate'):"$", '')::DATE AS FinalizationDate,
    xmlget(value, 'IsFinalized'):"$"::INT AS IsFinalized,
    xmlget(value, 'LastEditedBy'):"$"::INT AS LastEditedBy_INVOICE,
    xmlget(value, 'LastEditedWhen'):"$"::DATETIME AS LastEditedWhen_INVOICE
    
FROM "PROJECT_DATABASE"."PROJECT_SCHEMA"."SUPPLIER_INVOICE",
LATERAL FLATTEN(input => scr:"$") AS LIST
""")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [78]:
# Step 4 
#use LEFT JOIN to join purchase and supplier_invoice table, and use anti-join to  only include matching rows
cs.execute("""CREATE OR REPLACE TABLE "PROJECT_DATABASE"."PROJECT_SCHEMA"."Filtered_Supplier_Data" AS
           SELECT *
           FROM purchase A
           LEFT JOIN FLATTENED_SUPPLIER_INVOICE B
           ON A.PURCHASEORDERID = B.PurchaseOrderID_INVOICE
           WHERE B.PurchaseOrderID_INVOICE IS NOT NULL""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [84]:
# Step 5
# Create Materialized View (Not supported with my Snowflake so I created a table)
cs.execute("""
        CREATE or REPLACE TABLE purchase_orders_and_invoices AS
        SELECT (a.AmountExcludingTax - a.POAmount) AS invoiced_vs_quoted, b.*
        FROM "PROJECT_DATABASE"."PROJECT_SCHEMA"."Filtered_Supplier_Data" as a
        JOIN "PROJECT_DATABASE"."PROJECT_SCHEMA"."Filtered_Supplier_Data" as b
        ON a.purchaseorderid = b.purchaseorderid
           """)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [44]:
#Step 6
#Extracting Supplier_Case from postgres and converting it into a csv file
import psycopg2

connection = psycopg2.connect(
        host="127.0.0.1",
        database="rsm-docker",
        user="jovyan",
        password="postgres",
        port = "8765"
    )

cs = connection.cursor()
query = "COPY supplier_case TO STDOUT WITH CSV HEADER"

with open("CaseData/supplier_case.csv", "w") as f:
        cs.copy_expert(query, f)


In [51]:
#Upload the csv file to Snowflake stage
cs.execute("PUT file://CaseData/supplier_case.csv @project_stage")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [51]:
#Test Table to see if my python code generates the same table and column names 

cs.execute("""
CREATE OR REPLACE TABLE Supplier_Case_Test (
    SupplierID               INTEGER,
    SupplierName             VARCHAR,
    SupplierCategoryID       INTEGER,
    PrimaryContactPersonID   INTEGER,
    AlternateContactPersonID INTEGER,
    DeliveryMethodID         INTEGER,
    PostalCityID             VARCHAR,
    SupplierReference        VARCHAR,
    BankAccountName          VARCHAR,
    BankAccountBranch        VARCHAR,
    BankAccountCode          VARCHAR,
    BankAccountNumber        VARCHAR,
    BankInternationalCode    VARCHAR,
    PaymentDays              INTEGER,
    InternalComments         VARCHAR,
    PhoneNumber              VARCHAR,
    FaxNumber                VARCHAR,
    WebsiteURL               VARCHAR,
    DeliveryAddressLine1     VARCHAR,
    DeliveryAddressLine2     VARCHAR,
    DeliveryPostalCode       VARCHAR,
    DeliveryLocation         VARCHAR,
    PostalAddressLine1       VARCHAR,
    PostalAddressLine2       VARCHAR,
    PostalPostalCode         VARCHAR,
    LastEditedBy             INTEGER,
    ValidFrom                VARCHAR,
    ValidTo                  VARCHAR
);
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fcf0f157090>

In [9]:
#Generate a Python code that reads the csv file path and generates a table based on the header and data types.
#ChatGPT was used to help generate and debug this code.

import csv
import pandas as pd

def generate_sql_field_definitions(csv_path):
    df = pd.read_csv(csv_path)

    field_definitions = []

    # Loop through each column to determine its name and data type
    for column in df.columns:
        dtype = str(df[column].dtype)
        if dtype == "int64":
            sql_dtype = "INT"
        elif dtype == "float64":
            sql_dtype = "FLOAT"
        elif dtype == "bool":
            sql_dtype = "BOOLEAN"
        elif dtype == "object":  
            sql_dtype = "TEXT"
        else:
            sql_dtype = "TEXT"

        field_definitions.append(f"{column} {sql_dtype}")

    # Join all field definitions into a single string
    field_definitions_str = ", ".join(field_definitions)

    return field_definitions_str

#Specify your file path and run in the function created above
csv_path = "CaseData/supplier_case.csv" 
field_definitions_str = generate_sql_field_definitions(csv_path)
create_table_statement = f"CREATE or REPLACE TABLE supplier_case ({field_definitions_str});"

cs.execute(create_table_statement)


<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [52]:
#Upload data from stage into Supplier Case Table
cs.execute("""
COPY INTO supplier_case
FROM @project_stage/supplier_case.csv.gz
FILE_FORMAT = (TYPE = 'CSV' skip_header = 1 NULL_IF=('NULL', '\\N'))
ON_ERROR = 'CONTINUE'
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [20]:
#Step 7
#Extract and Pivot the Data Set we need from Knoema Environment Data Atlas. 
#We only need the US data to match with US zip codes. We also need the pivot the Indicator Name into seperate columns for each measurement.
#Used a same approach as the towardsdatascienc.com link.

cs.execute("""
create or replace table weather_data
cluster by (station_id, date)
as (
    select *
    from (
        select "Stations", "Stations Longitude", "Stations Latitude", "Date", "Stations Name", "Country", "Indicator Name", "Value"
        from ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R
        where "Country"='US'
    )
    pivot(max("Value") for "Indicator Name" in ('Mean visibility (miles)','Maximum temperature (Fahrenheit)','Mean dew point (Fahrenheit)','Maximum wind gust (Number)','Minimum temperature (Fahrenheit)','Maximum sustained wind speed (knots)','Mean wind speed (knots)','Mean station pressure (millibars)','Precipitation amount (inches)','Mean temperature (Fahrenheit)','Mean sea level pressure (millibars)','Snow depth (inches)'))
    as p(station_id, lng, lat, date, name, country, visibility, max, dew, wind_max, min, wind_sustained_max, wind_mean, pressure, rain, temp, pressure_sea, snow_depth));
""")
          

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [7]:
#Upload a data set that has US zip codes
cs.execute("PUT file://CaseData/uszips.csv @project_stage")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [10]:
#Create a table for the zip code data
csv_path2 = "CaseData/uszips.csv" 
field_definitions_str2 = generate_sql_field_definitions(csv_path2)
create_table_statement2 = f"CREATE or REPLACE TABLE Zip_Codes ({field_definitions_str2});"

cs.execute(create_table_statement2)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [11]:
#Copy stage data into the zip_codes table we created
cs.execute("""
COPY INTO Zip_Codes
FROM @project_stage/uszips.csv.gz
FILE_FORMAT = (TYPE = 'CSV' skip_header = 1 NULL_IF=('NULL', '\\N', ' '))
ON_ERROR = 'CONTINUE'
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [25]:
#a) Find weather stations closest to each zip code. We used a very similar approach from towardsdatascienc.com link.
cs.execute("""
CREATE or REPLACE TABLE stations_zips AS(
    SELECT b.zip,
    st_distance(st_makepoint(a.lng, a.lat), st_makepoint(b.lng, b.lat)) as distance, 
    a.country, a.station_id
    FROM weather_data AS a
    JOIN Zip_Codes AS b
    ON a.country=b.country
    AND st_distance(st_makepoint(a.lng, a.lat), st_makepoint(b.lng, b.lat)) < 50000
    qualify row_number() over(partition by a.station_id order by distance) = 1
    ORDER BY b.zip
);
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3afedbfa10>

In [None]:
#Add zip codes from stations_zips to weather data table
cs.execute("""
ALTER TABLE weather_data ADD ZipCode INT;
""")

In [59]:
cs.execute("""
UPDATE weather_data
SET ZipCode = stations_Zips.zip
FROM Stations_Zips
WHERE Stations_Zips.Station_ID = weather_data.Station_ID;
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f3af21d4210>

In [14]:
#b) Create a view with three columns zip code, date, and high temperature. One row per day and unique supplier zip code
cs.execute("""CREATE OR REPLACE VIEW supplier_zip_code_weather AS (
    SELECT 
        sc.Zip AS PostalPostalCode,
        w."Date",
        MAX(w."Value") AS "Daily High Temperature"
    FROM stations_zips sc
    JOIN ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R w
    ON sc."STATION_ID" = w."Stations"
    WHERE sc.Zip IN (SELECT DISTINCT PostalPostalCode FROM supplier_case) AND w."Units" = 'Fahrenheit'
    GROUP BY sc.Zip, w."Date"
    ORDER BY w."Date"
);
 """)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f263f9a6150>

In [38]:
#Step 8 
#Join tables based on zip codes and the transaction date. Only include transactions that have matching temperature readings.
cs.execute("""
CREATE OR REPLACE TABLE Joined AS
SELECT a.*,c.*, b.suppliername
FROM purchase_orders_and_invoices AS a
JOIN supplier_case AS b
ON a.supplierid = b.supplierid
JOIN supplier_zip_code_weather AS c 
ON b.PostalPostalCode = c.PostalPostalCode
WHERE dateadd(year, +5, a.transactiondate) = c."Date"
ORDER BY c."Date";
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f263f9a6150>