In [1]:
import snowflake.connector

conn = snowflake.connector.connect(
user='group2msba',
password='EDG9jun2pnp*bqh6aku',
account='bdtcdea-yub97737'
)

In [2]:
cs = conn.cursor()

# Extract and Load Data to Snowflake

In [3]:
# Create warehouse
cs.execute("CREATE WAREHOUSE IF NOT EXISTS group_assignment")

# Create database
cs.execute("CREATE DATABASE IF NOT EXISTS final_project")

# Create schema
cs.execute("CREATE SCHEMA IF NOT EXISTS final_project.schema")

# Create stage
# Create a stage
cs.execute("""
    CREATE OR REPLACE STAGE final_project.schema.final_project_stage
    FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"');
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

# Question 1:

### Cleaning data

In [4]:
import os
import glob
import pandas as pd

# Define the folder where all CSVs are located
files_path = './data/MonthlyPOData'

# Combine all CSV files into one DataFrame
files = glob.glob(os.path.join(files_path, "*.csv"))

# List of DataFrames from each CSV
df_list = [pd.read_csv(f) for f in files]

# Concatenate all DataFrames into a single DataFrame
combined_df = pd.concat(df_list, ignore_index=True)

# Save the combined DataFrame to a CSV file for loading into Snowflake
combined_df.to_csv('combined_purchases.csv', index=False)

# Drop columns with all NaN values
combined_df = combined_df.dropna(axis=1, how='all')

# Drop the unrelevant columns
columns = [
    'LastEditedBy', 
    'LastEditedWhen',
    'LastReceiptDate',
    'IsOrderFinalized',
    'IsOrderLineFinalized',
    'Right_LastEditedWhen']
combined_df = combined_df.drop(columns=columns)

# Optionally, drop columns that contain the same value in every row
combined_df = combined_df.loc[:, (combined_df != combined_df.iloc[0]).any()]

# Save cleaned DataFrame
combined_df.to_csv('combined_purchases.csv', index=False)

### a, b. Upload the combined_purchases file to stage

In [5]:
# Use the database
cs.execute("USE DATABASE final_project")
cs.execute("""PUT file://combined_purchases.csv @final_project.schema.final_project_stage""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

### c. Examine the data in the stage

In [6]:
query = """
SELECT 
    $1 AS PurchaseOrderID, 
    $2 AS SupplierID, 
    $3 AS OrderDate
FROM 
    @final_project.schema.final_project_stage/combined_purchases.csv.gz;
"""
cs.execute(query)
print(cs.fetchmany(5))

[('PurchaseOrderID', 'SupplierID', 'OrderDate'), ('1', '2', '1/1/2019'), ('1', '2', '1/1/2019'), ('1', '2', '1/1/2019'), ('10', '10', '1/2/2019')]


### d. Load data from stage to the table

In [7]:
# Create a table to store the data
cs.execute("""
    CREATE OR REPLACE TABLE final_project.schema.purchases (
        PurchaseOrderID TEXT,
        SupplierID TEXT,
        OrderDate DATE,
        DeliveryMethodID TEXT,
        ExpectedDeliveryDate DATE,
        SupplierReference TEXT,
        PurchaseOrderLineID INTEGER,
        StockItemID TEXT,
        OrderedOuters INTEGER,
        Description TEXT,
        ReceivedOuters INTEGER,
        PackageTypeID TEXT,
        ExpectedUnitPricePerOuter FLOAT,
        Right_LastEditedBy TEXT
    );
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

In [8]:
print(combined_df.columns)

Index(['PurchaseOrderID', 'SupplierID', 'OrderDate', 'DeliveryMethodID',
       'ExpectedDeliveryDate', 'SupplierReference', 'PurchaseOrderLineID',
       'StockItemID', 'OrderedOuters', 'Description', 'ReceivedOuters',
       'PackageTypeID', 'ExpectedUnitPricePerOuter', 'Right_LastEditedBy'],
      dtype='object')


In [9]:
cs.execute("""
    COPY INTO final_project.schema.purchases
    FROM @schema.final_project_stage/combined_purchases.csv.gz
    FILE_FORMAT = (
        TYPE = 'CSV',
        FIELD_OPTIONALLY_ENCLOSED_BY = '"',
        SKIP_HEADER = 1,
        TIMESTAMP_FORMAT = 'MM/DD/YYYY HH:MI:SS AM'
    )
    ON_ERROR = 'CONTINUE';
""")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

Check if the table was created successfully

In [10]:
# Execute the query
cs.execute("SELECT * FROM final_project.schema.purchases LIMIT 5")
results = cs.fetchall()
for row in results:
    print(row)

('1', '2', datetime.date(2019, 1, 1), '9', datetime.date(2019, 1, 15), 'B2084020', 1, '150', 18, 'Pack of 12 action figures (variety)', 18, '9', 5.5, '6')
('1', '2', datetime.date(2019, 1, 1), '9', datetime.date(2019, 1, 15), 'B2084020', 2, '151', 21, 'Pack of 12 action figures (male)', 21, '9', 5.5, '6')
('1', '2', datetime.date(2019, 1, 1), '9', datetime.date(2019, 1, 15), 'B2084020', 3, '152', 18, 'Pack of 12 action figures (female)', 18, '9', 5.5, '6')
('10', '10', datetime.date(2019, 1, 2), '8', datetime.date(2019, 1, 22), 'ML0300202', 136, '60', 83, 'RC toy sedan car with remote control (Blue) 1/50 scale', 83, '7', 12.5, '5')
('11', '12', datetime.date(2019, 1, 2), '7', datetime.date(2019, 1, 22), '237408032', 137, '1', 88, 'USB missile launcher (Green)', 88, '7', 9.5, '5')


## 2. Create a calculated field that shows purchase order totals

In [11]:
cs.execute("USE SCHEMA final_project.schema")
# Add a new column
cs.execute("""
    ALTER TABLE purchases
    ADD COLUMN POAmount NUMBER;
""")

# Update the new column with calculated values
cs.execute("""
    UPDATE purchases
    SET POAmount = ReceivedOuters * ExpectedUnitPricePerOuter;
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

Check if the new column was created successfully

In [12]:
# Execute the query
cs.execute("SELECT PurchaseOrderID, POAmount FROM purchases LIMIT 5")
results = cs.fetchall()

# Print column headers
print("PurchaseOrderID | POAmount")

# Print rows
for row in results:
    print(f"{row[0]} | {row[1]}")

PurchaseOrderID | POAmount
1 | 99
1 | 116
1 | 99
10 | 1038
11 | 836


# 3. Load xml data to Snowflake

In [13]:
files_path = './data/'
cs.execute(f"PUT file://{files_path}Supplier_Transactions_XML.xml @final_project.schema.final_project_stage")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

a. Parse and Shred the XML data

In [14]:
cs.execute("""CREATE OR REPLACE TABLE final_project.schema.supplier_invoice (src VARIANT)
           ;""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

In [15]:
cs.execute("""CREATE OR REPLACE FILE FORMAT my_xml_format
TYPE = 'XML'
STRIP_OUTER_ELEMENT = TRUE
SKIP_BYTE_ORDER_MARK = TRUE;
""")

cs.execute("""
COPY INTO final_project.schema.supplier_invoice
FROM @final_project_stage/Supplier_Transactions_XML.xml
FILE_FORMAT = (FORMAT_NAME = 'my_xml_format')
ON_ERROR = 'CONTINUE';
""")


<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

In [16]:
columns = [
"ADD COLUMN SupplierTransactionID NUMBER",
"ADD COLUMN SupplierID NUMBER",
"ADD COLUMN TransactionTypeID NUMBER",
"ADD COLUMN PurchaseOrderID NUMBER",
"ADD COLUMN PaymentMethodID NUMBER",
"ADD COLUMN SupplierInvoiceNumber STRING",
"ADD COLUMN TransactionDate DATE",
"ADD COLUMN AmountExcludingTax FLOAT",
"ADD COLUMN TaxAmount FLOAT",
"ADD COLUMN TransactionAmount FLOAT",
"ADD COLUMN OutstandingBalance FLOAT",
"ADD COLUMN FinalizationDate DATE",
"ADD COLUMN IsFinalized BOOLEAN",
"ADD COLUMN LastEditedBy NUMBER",
"ADD COLUMN LastEditedWhen TIMESTAMP"
]
for column in columns:
    query = f"ALTER TABLE final_project.schema.supplier_invoice {column}"
    cs.execute(query)

### b. Examine the data in the stage

In [17]:
cs.execute("""
    SELECT
        XMLGET(src, 'SupplierTransactionID'):"$"::TEXT AS SupplierTransactionID,
        XMLGET(src, 'SupplierID'):"$"::TEXT AS SupplierID,
        XMLGET(src, 'TransactionTypeID'):"$"::TEXT AS TransactionTypeID,
        XMLGET(src, 'PurchaseOrderID'):"$"::TEXT AS PurchaseOrderID,
        XMLGET(src, 'PaymentMethodID'):"$"::TEXT AS PaymentMethodID,
        XMLGET(src, 'SupplierInvoiceNumber'):"$"::STRING AS SupplierInvoiceNumber,
        XMLGET(src, 'TransactionDate'):"$"::DATE AS TransactionDate,
        XMLGET(src, 'AmountExcludingTax'):"$"::FLOAT AS AmountExcludingTax,
        XMLGET(src, 'TaxAmount'):"$"::FLOAT AS TaxAmount,
        XMLGET(src, 'TransactionAmount'):"$"::FLOAT AS TransactionAmount,
        XMLGET(src, 'OutstandingBalance'):"$"::FLOAT AS OutstandingBalance,
        XMLGET(src, 'FinalizationDate'):"$"::DATE AS FinalizationDate,
        XMLGET(src, 'IsFinalized'):"$"::NUMBER AS IsFinalized,
        XMLGET(src, 'LastEditedBy'):"$"::NUMBER AS LastEditedBy,
        XMLGET(src, 'LastEditedWhen'):"$"::TIMESTAMP AS LastEditedWhen,
    FROM final_project.schema.supplier_invoice,
    LATERAL FLATTEN(INPUT => src:"$")
    LIMIT 10;
""")
results = cs.fetchall()
for row in results:
    print(row)

('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53, 0.0, datetime.date(2019, 1, 7), 1, 4, datetime.datetime(2019, 1, 7, 9, 0))
('134', '2', '5', '1', '4', '7290', date

In [18]:
cs.execute("""
UPDATE final_project.schema.supplier_invoice
SET 
    SupplierTransactionID = NULLIF(XMLGET(src, 'SupplierTransactionID'):"$"::STRING, '')::TEXT,
    SupplierID = NULLIF(XMLGET(src, 'SupplierID'):"$"::STRING, '')::TEXT,
    TransactionTypeID = NULLIF(XMLGET(src, 'TransactionTypeID'):"$"::STRING, '')::TEXT,
    PurchaseOrderID = NULLIF(XMLGET(src, 'PurchaseOrderID'):"$"::STRING, '')::TEXT,
    PaymentMethodID = NULLIF(XMLGET(src, 'PaymentMethodID'):"$"::STRING, '')::TEXT,
    SupplierInvoiceNumber = XMLGET(src, 'SupplierInvoiceNumber'):"$"::STRING,
    TransactionDate = NULLIF(XMLGET(src, 'TransactionDate'):"$"::STRING, '')::DATE,
    AmountExcludingTax = NULLIF(XMLGET(src, 'AmountExcludingTax'):"$"::STRING, '')::FLOAT,
    TaxAmount = NULLIF(XMLGET(src, 'TaxAmount'):"$"::STRING, '')::FLOAT,
    TransactionAmount = NULLIF(XMLGET(src, 'TransactionAmount'):"$"::STRING, '')::FLOAT,
    OutstandingBalance = NULLIF(XMLGET(src, 'OutstandingBalance'):"$"::STRING, '')::FLOAT,
    FinalizationDate = NULLIF(XMLGET(src, 'FinalizationDate'):"$"::STRING, '')::DATE,
    IsFinalized = NULLIF(XMLGET(src, 'IsFinalized'):"$"::STRING, '')::NUMBER,
    LastEditedBy = NULLIF(XMLGET(src, 'LastEditedBy'):"$"::STRING, '')::NUMBER,
    LastEditedWhen = NULLIF(XMLGET(src, 'LastEditedWhen'):"$"::STRING, '')::TIMESTAMP;
""")

# Execute the query to drop the src column
cs.execute("""
    ALTER TABLE final_project.schema.supplier_invoice
    DROP COLUMN src;
""")



<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

# 4. Join the two tables

In [19]:
cs.execute("""
SELECT 
    p.PurchaseOrderID,
    p.SupplierID,
    p.OrderDate,
    p.ExpectedDeliveryDate,
    p.PurchaseOrderLineID,
    p.OrderedOuters,
    p.ReceivedOuters,
    p.ExpectedUnitPricePerOuter,
    p.POAmount,
    s.SupplierTransactionID,
    s.SupplierInvoiceNumber,
    s.TransactionDate,
    s.AmountExcludingTax,
    s.TaxAmount,
    s.TransactionAmount
FROM purchases p
JOIN supplier_invoice s
ON p.PurchaseOrderID = s.PurchaseOrderID AND p.SupplierID = s.SupplierID
ORDER BY p.PurchaseOrderID, s.TransactionDate;
""")
# Fetch and display the results
results = cs.fetchall()
if results:
    for row in results:
        print(row)

('1', '2', datetime.date(2019, 1, 1), datetime.date(2019, 1, 15), 2, 21, 21, 5.5, 116, 134, '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53)
('1', '2', datetime.date(2019, 1, 1), datetime.date(2019, 1, 15), 3, 18, 18, 5.5, 99, 134, '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53)
('1', '2', datetime.date(2019, 1, 1), datetime.date(2019, 1, 15), 1, 18, 18, 5.5, 99, 134, '7290', datetime.date(2019, 1, 2), 313.5, 47.03, 360.53)
('10', '10', datetime.date(2019, 1, 2), datetime.date(2019, 1, 22), 136, 83, 83, 12.5, 1038, 590, '1853', datetime.date(2019, 1, 3), 1037.5, 155.63, 1193.13)
('100', '4', datetime.date(2019, 2, 26), datetime.date(2019, 3, 18), 448, 89, 89, 84.0, 7476, 12881, '5762', datetime.date(2019, 2, 27), 36552.0, 5482.8, 42034.8)
('100', '4', datetime.date(2019, 2, 26), datetime.date(2019, 3, 18), 449, 107, 107, 84.0, 8988, 12881, '5762', datetime.date(2019, 2, 27), 36552.0, 5482.8, 42034.8)
('100', '4', datetime.date(2019, 2, 26), datetime.date(2019, 3, 18), 

5. Using the joined data from step 4, create a calculated field that shows the difference between AmountExcludingTax and POAmount, name this field invoiced_vs_quoted, and save the result as a
materialized view named purchase_orders_and_invoices
a. If your version of Snowflake does not support materialized views then create a table instead
using the join (this applies to all requirements about materialized views)

In [20]:
# Create a materialized view or table with the invoiced_vs_quoted field
cs.execute("""
    CREATE OR REPLACE VIEW purchase_orders_and_invoices AS
    SELECT 
        p.PurchaseOrderID,
        p.SupplierID,
        p.OrderDate,
        p.ExpectedDeliveryDate,
        p.PurchaseOrderLineID,
        p.OrderedOuters,
        p.ReceivedOuters,
        p.ExpectedUnitPricePerOuter,
        p.POAmount,
        s.SupplierTransactionID,
        s.SupplierInvoiceNumber,
        s.TransactionDate,
        s.AmountExcludingTax,
        s.TaxAmount,
        s.TransactionAmount,
        (s.AmountExcludingTax - p.POAmount) AS invoiced_vs_quoted
    FROM purchases p
    JOIN supplier_invoice s
    ON p.PurchaseOrderID = s.PurchaseOrderID AND p.SupplierID = s.SupplierID
    ORDER BY p.PurchaseOrderID, s.TransactionDate;
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff6d0a86d0>

6. Connect pgsql to Snowflake

In [21]:
import psycopg2
import csv
conn_pg = psycopg2.connect(
    host="127.0.0.1",
    database="WestCoastImporters",
    user="jovyan",
    password="postgres",
    port="8765"
)
cs_pg = conn_pg.cursor()

# Query to select data from supplier_case table (excluding unnecessary columns)
query = """
    SELECT 
        supplierid,
        suppliername,
        suppliercategoryid,
        primarycontactpersonid,
        alternatecontactpersonid,
        postalcityid,
        bankaccountname,
        bankaccountbranch,
        bankaccountcode,
        bankaccountnumber,
        bankinternationalcode,
        paymentdays,
        phonenumber,
        deliveryaddressline1,
        deliveryaddressline2,
        deliverypostalcode,
        postaladdressline1,
        postaladdressline2,
        postalpostalcode
    FROM supplier_case
    """

# Execute query
cs_pg.execute(query)

# Fetch data and column names
data = cs_pg.fetchall()
columns = [desc[0] for desc in cs_pg.description]

# Write data to CSV file
with open('supplier_case.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(columns)  # Write headers
    writer.writerows(data)    # Write data



OperationalError: connection to server at "127.0.0.1", port 8765 failed: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.
SSL SYSCALL error: Connection reset by peer
connection to server at "127.0.0.1", port 8765 failed: FATAL:  database "WestCoastImporters" does not exist


In [None]:
cs.execute("PUT file://supplier_case.csv @final_project.schema.final_project_stage AUTO_COMPRESS=TRUE")



<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

In [None]:
cs.execute("""
CREATE OR REPLACE TABLE final_project.schema.supplier_case (
    supplierid TEXT,
    suppliername TEXT,
    suppliercategoryid TEXT,
    primarycontactpersonid TEXT,
    alternatecontactpersonid TEXT,
    postalcityid TEXT,
    bankaccountname TEXT,
    bankaccountbranch TEXT,
    bankaccountcode TEXT,
    bankaccountnumber NUMBER,
    bankinternationalcode TEXT,
    paymentdays INTEGER,
    phonenumber TEXT,
    deliveryaddressline1 TEXT,
    deliveryaddressline2 TEXT,
    deliverypostalcode TEXT,
    postaladdressline1 TEXT,
    postaladdressline2 TEXT,
    postalpostalcode TEXT
);
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

In [None]:
cs.execute("""
    COPY INTO schema.supplier_case
    FROM @schema.final_project_stage/supplier_case.csv.gz
    FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY='"', SKIP_HEADER=1)
    ON_ERROR = 'CONTINUE'
""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

Examine data in the table

In [None]:
cs.execute("""
           SELECT * FROM final_project.schema.supplier_case LIMIT 5;
           """)
results = cs.fetchall()
for row in results:
    print(row)

('1', 'A Datum Corporation', '2', '21', '22', '22202', 'A Datum Corporation', 'Woodgrove Bank Zionsville', '356981', 8575824136, '25986', 14, '(847) 555-0100', 'Suite 10', '183838 Southwest Boulevard', '22202', 'PO Box 1039', 'Arlington', '22202')
('2', 'Contoso, Ltd.', '2', '23', '24', '80125', 'Contoso Ltd', 'Woodgrove Bank Greenbank', '358698', 4587965215, '25868', 7, '(360) 555-0100', 'Unit 2', '2934 Night Road', '80125', 'PO Box 1012', 'Highlands Ranch', '80125')
('3', 'Consolidated Messenger', '6', '25', '26', '60523', 'Consolidated Messenger', 'Woodgrove Bank San Francisco', '354269', 3254872158, '45698', 30, '(415) 555-0100', None, '894 Market Day Street', '60523', 'PO Box 1014', 'Westmont', '60523')
('4', 'Fabrikam, Inc.', '4', '27', '28', '95642', 'Fabrikam Inc', 'Woodgrove Bank Lakeview Heights', '789568', 4125863879, '12546', 30, '(203) 555-0104', 'Level 2', '393999 Woodberg Road', '95642', 'PO Box 301', 'Jackson', '95642')
('5', 'Graphic Design Institute', '2', '29', '30',

# 7.

a.

First, we upload the zcta file to the stage and then load it to the table.

In [None]:
# load file to the stage
cs.execute("USE DATABASE final_project;")

file_path = "./data/"

cs.execute(f"PUT file://{file_path}2021_Gaz_zcta_national.txt @final_project.schema.final_project_stage auto_compress=false;")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

In [None]:
# create a table to store the data
cs.execute("""
           CREATE OR REPLACE TABLE final_project.schema.zcta_national (
               GEOID TEXT,
               ALAND INTEGER,
               AWATER INTEGER,
               ALAND_SQMI FLOAT,
               AWATER_SQMI FLOAT,
               INTPTLAT FLOAT,
               INTPTLONG FLOAT
           );
           """)

<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

In [None]:
# COPY the data from the stage to the table
cs.execute("""
           COPY INTO final_project.schema.zcta_national
           FROM @final_project.schema.final_project_stage/2021_Gaz_zcta_national.txt
           FILE_FORMAT = (
           FIELD_DELIMITER = '\t'  -- Tab-delimited
           SKIP_HEADER = 1         -- Skip the first row, which contains column headers
           FIELD_OPTIONALLY_ENCLOSED_BY = NONE)
           ON_ERROR = 'CONTINUE';
           """)

<snowflake.connector.cursor.SnowflakeCursor at 0xffff66553aa0>

Examine the data

In [None]:
cs.execute("SELECT * FROM final_project.schema.zcta_national LIMIT 5;")
results = cs.fetchall()
for row in results:
    print(row)

('00601', 166847909, 799292, 64.42, 0.309, 18.180555, -66.749961)
('00602', 78546713, 4428428, 30.327, 1.71, 18.361945, -67.175597)
('00603', 88957333, 6276536, 34.347, 2.423, 18.458497, -67.123906)
('00606', 114825382, 12487, 44.334, 0.005, 18.158327, -66.932928)
('00610', 96129350, 4310530, 37.116, 1.664, 18.294032, -67.127156)


In [None]:
cs.execute("""
           SELECT count(*) FROM final_project.schema.zcta_national;
           """)
results = cs.fetchall()
print(results)

[(33791,)]


Add the Weather Environment database from Snowflake

![](./weather_data.png)

Create a table to link the zcta data to weather station data

In [None]:
cs.execute("""
CREATE OR REPLACE TABLE final_project.schema.stations_zipcode AS (
    SELECT
        ws.NOAA_WEATHER_STATION_ID AS station_id,
        ws.NOAA_WEATHER_STATION_NAME AS station_name, 
        ws.COUNTRY_GEO_ID as country_id,
        ws.COUNTRY_NAME as country_name,
        z.GEOID AS zip_code,
        ST_DISTANCE(ST_MAKEPOINT(z.INTPTLONG, z.INTPTLAT), ST_MAKEPOINT(ws.LONGITUDE, ws.LATITUDE)) AS distance  -- Distance between zip code and weather station
    FROM WEATHER__ENVIRONMENT.cybersyn.noaa_weather_station_index ws
    LEFT JOIN final_project.schema.zcta_national z
    ON z.GEOID = ws.ZIP_NAME
    AND ST_DISTANCE(ST_MAKEPOINT(z.INTPTLONG, z.INTPTLAT), ST_MAKEPOINT(ws.LONGITUDE, ws.LATITUDE)) < 50000  -- Limit to stations within 50 km
    QUALIFY ROW_NUMBER() OVER (PARTITION BY z.GEOID ORDER BY distance) = 1  -- Keep only the closest station for each zip code
);
""")

results = cs.fetchall()
for row in results:
    print(row)


('Table STATIONS_ZIPCODE successfully created.',)


Check if stations_zipcode was created successfully

In [None]:
cs.execute("""
SELECT count(*) FROM final_project.schema.stations_zipcode;
""")

results = cs.fetchall()
for row in results:
    print(row)
    
cs.execute("""
SELECT count(distinct zip_code) FROM final_project.schema.stations_zipcode;
""")

results = cs.fetchall()
for row in results:
    print(row)

(18902,)
(18901,)


b. create view

In [None]:
cs.execute("""
CREATE OR REPLACE VIEW final_project.schema.supplier_zip_code_weather AS (
WITH distinct_zip_codes AS (
    SELECT 
        distinct postalpostalcode as zip_code 
    FROM final_project.schema.supplier_case A 
    JOIN final_project.schema.stations_zipcode B 
    ON A.postalpostalcode = B.zip_code
), 
weather_station_ids AS (
    SELECT
        B.station_id AS station_id,
        C.date as date,
        C.VALUE AS high_temperature,
        C.VARIABLE as variable,
        B.zip_code as zip_code
    FROM final_project.schema.stations_zipcode B
    JOIN WEATHER__ENVIRONMENT.cybersyn.noaa_weather_metrics_timeseries C
    ON B.station_id = C.NOAA_WEATHER_STATION_ID
    AND C.VARIABLE = 'maximum_temperature'
    AND C.VALUE IS NOT NULL
), 
ranked_weather_data AS (
    SELECT
        weather_station_ids.zip_code,
        weather_station_ids.date,
        weather_station_ids.high_temperature,
        ROW_NUMBER() OVER (PARTITION BY weather_station_ids.zip_code ORDER BY weather_station_ids.date DESC) AS rn
    FROM weather_station_ids
)
SELECT
    ranked_weather_data.zip_code,
    ranked_weather_data.date,
    ranked_weather_data.high_temperature
FROM ranked_weather_data
WHERE rn = 1
);
""")

results = cs.fetchall()
print(results)


[('View SUPPLIER_ZIP_CODE_WEATHER successfully created.',)]


Examine if the view was created successfully

In [None]:
# Run a query to count the distinct zip codes
cs.execute("""
           SELECT COUNT(DISTINCT zip_code) AS distinct_zip_codes
           FROM final_project.schema.supplier_zip_code_weather;
           """)

# Fetch the result
result = cs.fetchone()

# Check the result
if result[0] == 1:
    print("The table has only one unique zip code.")
else:
    print(f"The table has {result[0]} unique zip codes.")

The table has 3981 unique zip codes.


Question 8:

In [None]:
cs.execute("""USE SCHEMA final_project.schema;""")
cs.execute("""SELECT
    p.purchaseorderid as Order_ID,
    p.ftransactiondate as Transaction_Date,
    s.supplierid as Supplier_ID,
    s.postalpostalcode AS Zip_Code,
    w.high_temperature as Temperature
FROM
    purchase_orders_and_invoices p
JOIN
    supplier_case s ON p.supplierid = s.supplierid  -- Join on supplier ID
JOIN
    supplier_zip_code_weather w ON p.transactiondate = w.date  -- Join on matching transaction date and weather date
WHERE
    w.high_temperature IS NOT NULL;  -- Only include rows with a valid temperature reading
""")
results = cs.fetchall()
for row in results:
    print(row)

('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1903', datetime.date(2022, 2, 18), '4', '95642', Decimal('21.100000'))
('1904', datetime.date(2022, 2, 18), '7', '95642', Decimal('21.100000'))
('1904', datetime.date(2022, 2, 18), '7', '95642', Decimal('21.100000'))
('1904', datetime.date(2022, 2, 18), '7', '95642', Decimal('21.100000'))
('1761', datetime.date(2021, 11, 30), '4', '95642', Decimal('11.100000'))
('1761', datetime.date(2021, 11, 30), '4', '95642', Decimal('11.100000'))
('1761', datetime.date(2021, 11, 30), '4', '95642', Decimal('11.100000'))
('1761', datetime.date(2021, 11, 30), '4', '95642', Decimal('11.100000'))
('1761', datetime.date(2021, 11, 30), '4', '956