**ETL Snowflake and Python Project - Supplier (PGSQL) , Purchase Order (CSV), Invoice (XML) and Weather Data (NOAA) **
- Alexa Gamble
- Anjana Khabir
- Kai Stern
- Kowsalya Nitya Vootla 


In [149]:
#Connect to Snowflake account
import snowflake.connector
import os

conn = snowflake.connector.connect(
    user='NKVOOTLA',
    password='ABCD1234cool***',
    account='qblwawb-idb43915'
    )

In [150]:
#Define cursor and create warehouse and database
cs = conn.cursor()
cs.execute("CREATE WAREHOUSE IF NOT EXISTS Group3_ETL_project_warehouse")

cs.execute("CREATE DATABASE IF NOT EXISTS Group3_ETL_project_database")

cs.execute("USE DATABASE Group3_ETL_project_database")

cs.execute("CREATE SCHEMA IF NOT EXISTS Group3_ETL_project_schema")

cs.execute("USE SCHEMA Group3_ETL_project_schema")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

**Question 1 and Question 2**

- Note: We have used the updated `2022-2.csv` file shared in the announcement.
- We removed the columns `comments` and `internal comments` since they only have null values.


In [151]:
# Create a file format for CSV called CSV_FORMAT_NAME
create_format_query = """
CREATE OR REPLACE FILE FORMAT CSV_FORMAT_NAME
TYPE = 'CSV'
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
SKIP_HEADER = 1
FIELD_DELIMITER = ','
EMPTY_FIELD_AS_NULL = TRUE
NULL_IF = ('', 'NULL')
TIMESTAMP_FORMAT = 'MM/DD/YYYY HH24:MI';
"""
cs.execute(create_format_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [152]:
#create a stage to load the csv files
create_stage_query = """
CREATE OR REPLACE STAGE purchase_orders_stage
FILE_FORMAT = 'CSV_FORMAT_NAME';
"""
cs.execute(create_stage_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [153]:
#Load files into the stage using glob

import glob
csv_files_path = "Monthly PO Data" # Path to the directory containing the CSV files assuming we are in data directory

for year in [2019, 2020, 2021, 2022]:
    for file_path in glob.glob(os.path.join(csv_files_path, f'{year}-[0-9]*.csv')):
        # Handle backslashes and print the file path
        normalized_file_path = file_path.replace('\\', '/')
        print(f"Attempting to upload file: {normalized_file_path}")

        put_query = f"PUT 'file://{normalized_file_path}' @purchase_orders_stage;"
        
        try:
            cs.execute(put_query)
        except Exception as e:
            print(f"Error uploading file {os.path.basename(file_path)}: {e}")


Attempting to upload file: Monthly PO Data/2019-1.csv
Attempting to upload file: Monthly PO Data/2019-10.csv
Attempting to upload file: Monthly PO Data/2019-11.csv
Attempting to upload file: Monthly PO Data/2019-12.csv
Attempting to upload file: Monthly PO Data/2019-2.csv
Attempting to upload file: Monthly PO Data/2019-3.csv
Attempting to upload file: Monthly PO Data/2019-4.csv
Attempting to upload file: Monthly PO Data/2019-5.csv
Attempting to upload file: Monthly PO Data/2019-6.csv
Attempting to upload file: Monthly PO Data/2019-7.csv
Attempting to upload file: Monthly PO Data/2019-8.csv
Attempting to upload file: Monthly PO Data/2019-9.csv
Attempting to upload file: Monthly PO Data/2020-1.csv
Attempting to upload file: Monthly PO Data/2020-10.csv
Attempting to upload file: Monthly PO Data/2020-11.csv
Attempting to upload file: Monthly PO Data/2020-12.csv
Attempting to upload file: Monthly PO Data/2020-2.csv
Attempting to upload file: Monthly PO Data/2020-3.csv
Attempting to upload f

In [154]:

#Create staging table for purchase orders
create_staging_table_query = """
CREATE OR REPLACE TABLE purchase_orders_staging_table(
    PurchaseOrderID INTEGER,
    SupplierID INTEGER,
    OrderDate DATE,
    DeliveryMethodID INTEGER,
    ContactPersonID INTEGER,
    ExpectedDeliveryDate DATE,
    SupplierReference VARCHAR(16777216),
    IsOrderFinalized INTEGER,
    Comments TEXT,
    InternalComments TEXT,
    LastEditedBy INTEGER,
    LastEditedWhen STRING,
    PurchaseOrderLineID INTEGER,
    StockItemID INTEGER,
    OrderedOuters INTEGER,
    Description TEXT,
    ReceivedOuters INTEGER,
    PackageTypeID INTEGER,
    ExpectedUnitPricePerOuter FLOAT,
    LastReceiptDate DATE,
    IsOrderLineFinalized INTEGER,
    Right_LastEditedBy INTEGER,
    Right_LastEditedWhen STRING
)
"""
cs.execute(create_staging_table_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [155]:
# Load data from the purchase orders stage into the purchase orders stage table
load_data_query = """
COPY INTO purchase_orders_staging_table
FROM @purchase_orders_stage
FILE_FORMAT = (FORMAT_NAME = 'CSV_FORMAT_NAME')
ON_ERROR = 'ABORT_STATEMENT';
"""
cs.execute(load_data_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [156]:
# Verify staging table creation and data load
try:
    cs.execute("SELECT * FROM purchase_orders_staging_table LIMIT 5")
    print("Sample data from purchase_orders_staging_table:")
    for row in cs.fetchall():
        print(row)
except Exception as e:
    print(f"Error verifying staging table: {e}")


Sample data from purchase_orders_staging_table:
(106, 4, datetime.date(2019, 3, 1), 7, 2, datetime.date(2019, 3, 21), '293092', 1, None, None, 4, '3/4/2019 7:00', 469, 77, 92, '"The Gu" red shirt XML tag t-shirt (White) XXS', 92, 6, 84.0, datetime.date(2019, 3, 4), 1, 4, '3/4/2019 7:00')
(106, 4, datetime.date(2019, 3, 1), 7, 2, datetime.date(2019, 3, 21), '293092', 1, None, None, 4, '3/4/2019 7:00', 470, 78, 127, '"The Gu" red shirt XML tag t-shirt (White) XS', 127, 6, 84.0, datetime.date(2019, 3, 4), 1, 4, '3/4/2019 7:00')
(106, 4, datetime.date(2019, 3, 1), 7, 2, datetime.date(2019, 3, 21), '293092', 1, None, None, 4, '3/4/2019 7:00', 471, 80, 20, '"The Gu" red shirt XML tag t-shirt (White) M', 20, 6, 84.0, datetime.date(2019, 3, 4), 1, 4, '3/4/2019 7:00')
(106, 4, datetime.date(2019, 3, 1), 7, 2, datetime.date(2019, 3, 21), '293092', 1, None, None, 4, '3/4/2019 7:00', 472, 86, 74, '"The Gu" red shirt XML tag t-shirt (White) 5XL', 74, 6, 96.0, datetime.date(2019, 3, 4), 1, 4, '3/4/2

In [157]:
#Create final table for purchase orders
#POAmount is added to the table to store the total amount of the purchase order for question 2
create_final_table_query = """
CREATE OR REPLACE TABLE purchase_orders_table (
    PurchaseOrderID INTEGER,
    SupplierID INTEGER,
    OrderDate DATE,
    DeliveryMethodID INTEGER,
    ContactPersonID INTEGER,
    ExpectedDeliveryDate DATE,
    SupplierReference VARCHAR(16777216),
    IsOrderFinalized INTEGER,
    LastEditedBy INTEGER,
    LastEditedWhen TIMESTAMP,
    PurchaseOrderLineID INTEGER,
    StockItemID INTEGER,
    OrderedOuters INTEGER,
    Description TEXT,
    ReceivedOuters INTEGER,
    PackageTypeID INTEGER,
    ExpectedUnitPricePerOuter FLOAT,
    LastReceiptDate DATE,
    IsOrderLineFinalized INTEGER,
    Right_LastEditedBy INTEGER,
    Right_LastEditedWhen TIMESTAMP,
    POAmount INTEGER
);
"""
cs.execute(create_final_table_query)



<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [160]:
#Insert data from purchase orders staging table to final table with proper date and timestamp conversions. 
#For question 2 we added a column POAmount which is the total amount of the purchase order, which is calculated by multiplying ExpectedUnitPricePerOuter and OrderedOuters and summing them up for each PurchaseOrderID.
#We used a LEFT JOIN to join the staging table with the subquery that calculates the POAmount for each PurchaseOrderID.
convert_and_load_query = """
INSERT INTO purchase_orders_table
SELECT 
    PurchaseOrderID,
    SupplierID,
    OrderDate,
    DeliveryMethodID,
    ContactPersonID,
    ExpectedDeliveryDate,
    SupplierReference,
    IsOrderFinalized,
    LastEditedBy,
    CASE 
        WHEN TRY_TO_TIMESTAMP(LastEditedWhen, 'MM/DD/YYYY HH24:MI:SS') IS NOT NULL THEN TRY_TO_TIMESTAMP(LastEditedWhen, 'MM/DD/YYYY HH24:MI')
        WHEN TRY_TO_TIMESTAMP(LastEditedWhen, 'MM/DD/YYYY HH24:MI') IS NOT NULL THEN TRY_TO_TIMESTAMP(LastEditedWhen, 'MM/DD/YYYY HH24:MI')
        ELSE NULL
    END AS LastEditedWhen,
    PurchaseOrderLineID,
    StockItemID,
    OrderedOuters,
    Description,
    ReceivedOuters,
    PackageTypeID,
    ExpectedUnitPricePerOuter,
    LastReceiptDate,
    IsOrderLineFinalized,
    Right_LastEditedBy,
    CASE 
        WHEN TRY_TO_TIMESTAMP(Right_LastEditedWhen, 'MM/DD/YYYY HH24:MI:SS') IS NOT NULL THEN TRY_TO_TIMESTAMP(Right_LastEditedWhen, 'MM/DD/YYYY HH24:MI')
        WHEN TRY_TO_TIMESTAMP(Right_LastEditedWhen, 'MM/DD/YYYY HH24:MI') IS NOT NULL THEN TRY_TO_TIMESTAMP(Right_LastEditedWhen, 'MM/DD/YYYY HH24:MI')
        ELSE NULL
    END AS Right_LastEditedWhen,
    POAmount
FROM purchase_orders_staging_table
LEFT JOIN (SELECT PurchaseOrderID AS POID, SUM(ExpectedUnitPricePerOuter * OrderedOuters) AS POAmount FROM purchase_orders_staging_table GROUP BY PurchaseOrderID) ON purchase_orders_staging_table.PurchaseOrderID = POID;

"""

try:
    cs.execute(convert_and_load_query)
except Exception as e:
    print(f"Error converting and loading data: {e}")

# This table creates 22 Columns (1 column (PO Amount) is for question 2 and 8367 rows

#cs.execute("SELECT COUNT(*) FROM purchase_orders_table")
#row_count = cs.fetchone()[0]
#print(f"Number of rows in purchase_orders_table: {row_count}")


**Question 3**


In [163]:
#Create a stage for the XML file
create_xml_stage_query = "CREATE OR REPLACE STAGE supplier_transactions_xml_stage;"
cs.execute(create_xml_stage_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [165]:
#Create a file format for XML called XML_FORMAT_NAME
create_xml_format_query = """
CREATE OR REPLACE FILE FORMAT xml_format_test
TYPE = 'XML'
COMPRESSION = 'AUTO'
STRIP_OUTER_ELEMENT = TRUE;
""" 

cs.execute(create_xml_format_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [168]:
#Load the XML file into the stage
file_path = 'Supplier Transactions XML.xml'  #Using relative path assuming we are on the data directory

if os.path.exists(file_path):
    put_command = f"PUT 'file://{file_path}' @supplier_transactions_xml_stage OVERWRITE=TRUE;"
    cs.execute(put_command)
else:
    raise FileNotFoundError(f"The file {file_path} does not exist.")

In [171]:
#Create a staging table for the XML file
cs.execute("""
CREATE OR REPLACE TABLE supplier_transactions_xml_staging_table (
    raw_data VARIANT
);
""")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [173]:
#Copy the data from the XML file into the staging table
cs.execute("""
COPY INTO supplier_transactions_xml_staging_table(raw_data)
FROM @supplier_transactions_xml_stage
FILE_FORMAT = (FORMAT_NAME = 'xml_format_test')
ON_ERROR = 'CONTINUE';
""")


<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [174]:
#Create final table for supplier transactions
cs.execute("""
CREATE OR REPLACE TABLE supplier_transactions (
    SupplierTransactionID INT,
    SupplierID INT,
    TransactionTypeID INT,
    PurchaseOrderID INT,
    PaymentMethodID INT,
    SupplierInvoiceNumber STRING,
    TransactionDate DATE,
    AmountExcludingTax FLOAT,
    TaxAmount FLOAT,
    TransactionAmount FLOAT,
    OutstandingBalance FLOAT,
    FinalizationDate DATE,
    IsFinalized BOOLEAN,
    LastEditedBy INT,
    LastEditedWhen TIMESTAMP
);
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [176]:
#Insert data from the staging table to the final table
insert_xml_data_query = """
INSERT INTO supplier_transactions (
    SupplierTransactionID,
    SupplierID,
    TransactionTypeID,
    PurchaseOrderID,
    PaymentMethodID,
    SupplierInvoiceNumber,
    TransactionDate,
    AmountExcludingTax,
    TaxAmount,
    TransactionAmount,
    OutstandingBalance,
    FinalizationDate,
    IsFinalized,
    LastEditedBy,
    LastEditedWhen
)
SELECT
    MAX(CASE WHEN json_element.value['@']::STRING = 'SupplierTransactionID' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS SupplierTransactionID,
    MAX(CASE WHEN json_element.value['@']::STRING = 'SupplierID' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS SupplierID,
    MAX(CASE WHEN json_element.value['@']::STRING = 'TransactionTypeID' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS TransactionTypeID,
    MAX(CASE WHEN json_element.value['@']::STRING = 'PurchaseOrderID' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS PurchaseOrderID,
    MAX(CASE WHEN json_element.value['@']::STRING = 'PaymentMethodID' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS PaymentMethodID,
    MAX(CASE WHEN json_element.value['@']::STRING = 'SupplierInvoiceNumber' THEN json_element.value['$']::STRING END) AS SupplierInvoiceNumber,
    MAX(CASE WHEN json_element.value['@']::STRING = 'TransactionDate' THEN TRY_TO_DATE(NULLIF(json_element.value['$']::STRING, ''), 'YYYY-MM-DD') END) AS TransactionDate,
    MAX(CASE WHEN json_element.value['@']::STRING = 'AmountExcludingTax' THEN TRY_CAST(json_element.value['$']::STRING AS FLOAT) END) AS AmountExcludingTax,
    MAX(CASE WHEN json_element.value['@']::STRING = 'TaxAmount' THEN TRY_CAST(json_element.value['$']::STRING AS FLOAT) END) AS TaxAmount,
    MAX(CASE WHEN json_element.value['@']::STRING = 'TransactionAmount' THEN TRY_CAST(json_element.value['$']::STRING AS FLOAT) END) AS TransactionAmount,
    MAX(CASE WHEN json_element.value['@']::STRING = 'OutstandingBalance' THEN TRY_CAST(json_element.value['$']::STRING AS FLOAT) END) AS OutstandingBalance,
    MAX(CASE WHEN json_element.value['@']::STRING = 'FinalizationDate' THEN TRY_TO_DATE(NULLIF(json_element.value['$']::STRING, ''), 'YYYY-MM-DD') END) AS FinalizationDate,
    MAX(CASE WHEN json_element.value['@']::STRING = 'IsFinalized' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS IsFinalized,
    MAX(CASE WHEN json_element.value['@']::STRING = 'LastEditedBy' THEN TRY_CAST(json_element.value['$']::STRING AS NUMBER) END) AS LastEditedBy,
    MAX(CASE WHEN json_element.value['@']::STRING = 'LastEditedWhen' THEN TRY_TO_TIMESTAMP_NTZ(NULLIF(json_element.value['$']::STRING, ''), 'YYYY-MM-DD HH24:MI:SS.FF') END) AS LastEditedWhen
FROM Group3_ETL_project_schema.supplier_transactions_xml_staging_table,
LATERAL FLATTEN(input => RAW_DATA) AS xml_row,
LATERAL FLATTEN(input => xml_row.value) AS json_element
GROUP BY xml_row.seq;


"""
cs.execute(insert_xml_data_query)


#This table has 15 columns and 2.4k rows

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

**Question 4**


In [177]:
#Join the tables from step 2, purchase_orders_table with the poamount column and supplier_transactions, Called PODATA_TRANSACTIONS
create_table_query_pot = """
CREATE OR REPLACE TABLE podata_transactions AS
SELECT 
    A.*, 
    B.SupplierTransactionID,
    B.TransactionTypeID,
    B.PaymentMethodID,
    B.SupplierInvoiceNumber,
    B.TransactionDate,
    B.AmountExcludingTax,
    B.TaxAmount,
    B.TransactionAmount,
    B.OutstandingBalance,
    B.FinalizationDate,
    B.IsFinalized,
    B.LastEditedBy AS SupplierTransactionLastEditedBy,
    B.LastEditedWhen AS SupplierTransactionLastEditedWhen
FROM 
    PURCHASE_ORDERS_TABLE A
LEFT JOIN 
    SUPPLIER_TRANSACTIONS B
ON 
    A.PurchaseOrderID = B.PurchaseOrderID AND A.SupplierID = B.SupplierID;
"""

cs.execute(create_table_query_pot)

#This column has 8.4k rows and 35 columns

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

**Question 5**


In [180]:
#Create materialized view named purchase_orders_and_invoices to find the difference between POAmount and AmountExcludingTax
create_materialized_view_po_invoices = """
CREATE OR REPLACE MATERIALIZED VIEW purchase_orders_and_invoices AS
SELECT 
    *, 
    (POAmount - AmountExcludingTax) AS invoice_vs_quoted
FROM 
    podata_transactions;
"""
cs.execute(create_materialized_view_po_invoices)

#This view has 8.4k rows and 36 columns. It is to be noted that the difference between POAmount and AmountExcludingTax is either 0 or marginal from the data observed

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

**Question 6**


In [181]:
#Adding the postgres file into the WestCoastImporters database

import psycopg2

sql_file_path = "supplier_case.pgsql" #assuming we are on data directory

def execute_pgsql_file(dbname, user, password, host, port):
    try: 
        conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
        cs = conn.cursor()
    
        with open(sql_file_path, 'r') as a:
            sql = a.read()
        cs.execute(sql)
        conn.commit()
    
        print(f"SQL script in {sql_file_path} executed successfully.")
    
    except Exception as b:
        print(f"Error: {str(b)}")
    finally:
        cs.close()
        conn.close()

execute_pgsql_file(
    dbname="WestCoastImporters",
    user="jovyan",
    password="postgres",
    host="127.0.0.1",
    port="8765"
)

SQL script in supplier_case.pgsql executed successfully.


In [182]:
#Export the data from the supplier_case table on the WestCoastImporters database to a CSV file in our current directory (Data)

import csv

def export_data_to_csv():
    try:
        # Connect to your PostgreSQL DB
        conn = psycopg2.connect(
            dbname="WestCoastImporters", 
            user="jovyan", 
            password="postgres", 
            host="127.0.0.1", 
            port="8765"
        )
        cs = conn.cursor()

        # Fetch the data and column names
        cs.execute("SELECT * FROM supplier_case")
        rows = cs.fetchall()
        col_names = [desc[0] for desc in cs.description]

        # Open a file for writing, within data directory, assumed to be our current directory
        with open('supplier_case.csv', 'w', newline='') as f:
            writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
            # Write the column headers
            writer.writerow(col_names)
            # Write the data rows
            writer.writerows(rows)

        print("Data exported successfully to supplier_case.csv")

    except Exception as e:
        print(f"Error: {e}")
    finally:
        if 'cs' in locals():
            cs.close()
        if 'conn' in locals():
            conn.close()

export_data_to_csv()

Data exported successfully to supplier_case.csv


In [183]:
#Upload the created csv file to Snowflake
def upload_csv_to_snowflake():
    cs = conn.cursor()

    try:
        # Create a stage if not already done
        cs.execute("CREATE OR REPLACE STAGE supplier_case_stage")
        
        # Upload the file to the stage
        cs.execute("PUT file://supplier_case.csv @supplier_case_stage")

        # Create table if not exists
        cs.execute("""
        CREATE OR REPLACE TABLE supplier_case_table (
            SupplierID INTEGER,
            "SupplierName" VARCHAR,
            SupplierCategoryID INTEGER,
            PrimaryContactPersonID INTEGER,
            AlternateContactPersonID INTEGER,
            DeliveryMethodID INTEGER,
            PostalCityID INTEGER,
            "SupplierReference" VARCHAR,
            "BankAccountName" VARCHAR,
            "BankAccountBranch" VARCHAR,
            BankAccountCode INTEGER,
            BankAccountNumber NUMERIC,
            BankInternationalCode INTEGER,
            PaymentDays INTEGER,
            InternalComments VARCHAR,
            "PhoneNumber" VARCHAR,
            "FaxNumber" VARCHAR,
            "WebsiteURL" VARCHAR,
            DeliveryAddressLine1 VARCHAR,
            DeliveryAddressLine2 VARCHAR,
            DeliveryPostalCode INTEGER,
            DeliveryLocation VARCHAR,
            PostalAddressLine1 VARCHAR,
            PostalAddressLine2 VARCHAR,
            PostalPostalCode INTEGER,
            LastEditedBy INTEGER,
            ValidFrom VARCHAR,
            ValidTo VARCHAR
        )
        """)

        # Copy the data from the stage into the table
        cs.execute("""
            COPY INTO supplier_case_table
            FROM @supplier_case_stage/supplier_case.csv
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER = 1)
        """)

        print("Data loaded successfully into Snowflake table supplier_case_table")

    except Exception as e:
        print(f"Error: {e}")


upload_csv_to_snowflake()

#The supplier case table has 13 rows and 28 columns

Data loaded successfully into Snowflake table supplier_case_table


**Question 7**


In [189]:
#Upload the US 2021 Gazetteer txt file to Snowflake

def upload_txt_to_snowflake():
    try:
        # Create a stage if not already done
        cs.execute("CREATE OR REPLACE STAGE US_GZN_stage")
        
        # Upload the .txt file to the stage
        cs.execute("PUT file://2021_Gaz_zcta_national/2021_Gaz_zcta_national.txt @US_GZN_stage")

        # Create table if not exists (adjust the structure as per your .txt file's structure)
        cs.execute("""
        CREATE OR REPLACE TABLE US_GZN_table (
            GEOID INTEGER,
            ALAND INTEGER,
            AWATER INTEGER,
            ALAND_SQMI FLOAT,
            AWATER_SQMI FLOAT,
            INTPTLAT FLOAT,
            INTPTLONG FLOAT
        )
        """)

        # Copy the data from the stage into the table
        cs.execute("""
            COPY INTO US_GZN_table
            FROM @US_GZN_stage/2021_Gaz_zcta_national.txt
            FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER='\t' SKIP_HEADER = 1)
        """)

        print("Data loaded successfully into Snowflake table US_GZN_table")

    except Exception as e:
        print(f"Error: {e}")
    
upload_txt_to_snowflake()

#The US_GZN_table has 33.8k rows and 7 columns

Data loaded successfully into Snowflake table US_GZN_table


In [193]:
#7a- created closest_weather_station table for zipcodes with max temp values over selected date range (based on requirement in 7b)
# The table was created by mapping the closest weather station from the NOAA Weather Station Index data to each geoid from the US Gazetteer data using longitude and latitude
# The date range was picked based on time frame of the Monthly PO Data files cross checked against the min and max supplier transaction dates

cs.execute("""
CREATE OR REPLACE TABLE closest_weather_station AS
SELECT 
    b.geoid, 
    a.noaa_weather_station_id, 
    a.noaa_weather_station_name, 
    ST_DISTANCE(ST_MAKEPOINT(a.LONGITUDE, a.LATITUDE), ST_MAKEPOINT(b.INTPTLONG, b.INTPTLAT)) AS distance
FROM 
    weather__environment.cybersyn.NOAA_WEATHER_STATION_INDEX AS a     
JOIN 
    US_GZN_TABLE AS b
    ON ST_DISTANCE(ST_MAKEPOINT(a.LONGITUDE, a.LATITUDE), ST_MAKEPOINT(b.INTPTLONG, b.INTPTLAT)) < 20000
JOIN
    (
        SELECT DISTINCT noaa_weather_station_id
        FROM weather__environment.cybersyn.NOAA_WEATHER_METRICS_TIMESERIES
        WHERE VARIABLE = 'maximum_temperature'
        AND DATE >= '2019-01-01' AND DATE <= '2022-05-31'
    ) AS c
    ON a.noaa_weather_station_id = c.noaa_weather_station_id
QUALIFY 
    ROW_NUMBER() OVER(PARTITION BY b.geoid ORDER BY distance ASC) = 1
""")

#This table has 26.4k rows and 4 columns

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [195]:
#7b - created materialized view, from a table, to store the supplier zip code weather data with daily high temperature values over the selected date range

cs.execute("""
CREATE OR REPLACE TABLE supplier_zip_code_weather_temp AS 
SELECT
    A.POSTALPOSTALCODE AS Zip_code,
    C.DATE,
    C.VALUE AS Daily_High_Temperature
FROM SUPPLIER_CASE_TABLE AS A
LEFT JOIN closest_weather_station AS B ON A.POSTALPOSTALCODE = B.GEOID
LEFT JOIN weather__environment.cybersyn.NOAA_WEATHER_METRICS_TIMESERIES AS C 
ON B.NOAA_WEATHER_STATION_ID = C.NOAA_WEATHER_STATION_ID
WHERE C.VARIABLE = 'maximum_temperature' AND DATE >= '2019-01-01' AND DATE <= '2022-05-31'
GROUP BY Zip_code, C.DATE, Daily_High_Temperature
ORDER BY Zip_code ASC, C.DATE ASC
""")


cs.execute("""
CREATE OR REPLACE MATERIALIZED VIEW supplier_zip_code_weather AS 
SELECT * FROM supplier_zip_code_weather_temp
""")

#This view has 9k rows and 3 columns

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

**Question 8**


In [211]:

#Create a materialized view of the supplier_case_table to join with the supplier_zip_code_weather view and purchase_orders_and_invoices view
cs.execute("""
CREATE OR REPLACE MATERIALIZED VIEW supplier_case_view AS 
SELECT * FROM supplier_case_table
""")

#Create table "Orders_suppliers_weather" selecting all the relevant columns and dropping duplicates to view supplier case data and max temperature data for 
#Purchase orders with max temperature data for the given transaction date

cs.execute("""
CREATE OR REPLACE TABLE Orders_suppliers_weather AS
SELECT
    A.PurchaseOrderID AS PurchaseOrderID,
    A.SupplierID AS SupplierID,
    A.OrderDate AS OrderDate,
    A.ContactPersonID AS ContactPersonID,
    A.ExpectedDeliveryDate AS ExpectedDeliveryDate,
    A.IsOrderFinalized AS IsOrderFinalized,
    A.LastEditedWhen AS LastEditedWhen,
    A.PurchaseOrderLineID AS PurchaseOrderLineID,
    A.StockItemID AS StockItemID,
    A.OrderedOuters AS OrderedOuters,
    A.Description AS Description,
    A.ReceivedOuters AS ReceivedOuters,
    A.PackageTypeID AS PackageTypeID,
    A.ExpectedUnitPricePerOuter AS ExpectedUnitPricePerOuter,
    A.LastReceiptDate AS LastReceiptDate,
    A.IsOrderLineFinalized AS IsOrderLineFinalized,
    A.Right_LastEditedBy AS Right_LastEditedBy,
    A.Right_LastEditedWhen AS Right_LastEditedWhen,
    A.POAmount AS POAmount,
    A.SupplierTransactionID AS SupplierTransactionID,
    A.TransactionTypeID AS TransactionTypeID,
    A.PaymentMethodID AS PaymentMethodID,
    A.SupplierInvoiceNumber AS SupplierInvoiceNumber,
    A.TransactionDate AS TransactionDate,
    A.AmountExcludingTax AS AmountExcludingTax,
    A.TaxAmount AS TaxAmount,
    A.TransactionAmount AS TransactionAmount,
    A.OutstandingBalance AS OutstandingBalance,
    A.FinalizationDate AS FinalizationDate,
    A.IsFinalized AS IsFinalized,
    A.SupplierTransactionLastEditedBy AS SupplierTransactionLastEditedBy,
    A.SupplierTransactionLastEditedWhen AS SupplierTransactionLastEditedWhen,
    A.Invoice_vs_quoted AS Invoice_vs_quoted,
    B."SupplierName" AS SupplierName,
    B.SupplierCategoryID AS SupplierCategoryID,
    B.PrimaryContactPersonID AS PrimaryContactPersonID,
    B.AlternateContactPersonID AS AlternateContactPersonID,
    B.DeliveryMethodID AS DeliveryMethodID,
    B.PostalCityID AS PostalCityID,
    B."SupplierReference" AS SupplierReference,
    B."BankAccountName" AS BankAccountName,
    B."BankAccountBranch" AS BankAccountBranch,
    B.BankAccountCode AS BankAccountCode,
    B.BankAccountNumber AS BankAccountNumber,
    B.BankInternationalCode AS BankInternationalCode,
    B.PaymentDays AS PaymentDays,
    B.InternalComments AS InternalComments,
    B."PhoneNumber" AS PhoneNumber,
    B."FaxNumber" AS FaxNumber,
    B."WebsiteURL" AS WebsiteURL,
    B.DeliveryAddressLine1 AS DeliveryAddressLine1,
    B.DeliveryAddressLine2 AS DeliveryAddressLine2,
    B.DeliveryPostalCode AS DeliveryPostalCode,
    B.DeliveryLocation AS DeliveryLocation,
    B.PostalAddressLine1 AS PostalAddressLine1,
    B.PostalAddressLine2 AS PostalAddressLine2,
    B.PostalPostalCode AS PostalPostalCode,
    B.LastEditedBy AS LastEditedBy,
    B.ValidFrom AS ValidFrom,
    B.ValidTo AS ValidTo,
    C.Zip_code AS Zip_code,
    C.Daily_High_Temperature AS Daily_High_Temperature
    
FROM purchase_orders_and_invoices AS A
JOIN supplier_case_view AS B ON A.SupplierID = B.SupplierID
JOIN supplier_zip_code_weather AS C ON B.PostalPostalCode = C.Zip_code AND A.TransactionDate = C.DATE
""")

#This table has 6k rows and 62 columns

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6b531d5210>

In [215]:
#Close all connections :)
cs.close()
conn.close()
