## Question 1

Extract (you can manually extract all the data files in the provided data.zip file) and load the 41 comma delimited purchases data files and form a single table of purchases data (you should load the 41 csv files into a Snowflake stage and then move the data from the stage to a Snowflake table). Preferably follow these guidelines when staging the files (this staging approach does not make sense for our data as the files are small, but it is good practice if you have more data and if the data is loaded over time). Use Python to automate the PUT process, e.g., use glob to iterate through and PUT all purchases files automatically. You can examine the data in the stage using regular SQL statements but where columns are referred to using the positional number of the column preceded by $, e.g., SELECT $.1, $.3, FROM… selects the first and second column in the staged data: https://docs.snowflake.com/user-guide/querying-stage. COPY INTO is generally preferred over INSERT INTO (this applies to the entire project). To the extent possible, perform transformations such as selecting columns and setting data types during the COPY INTO process. There are a number of columns that are not needed in the project. You can exclude columns that appear to not have any useful information (e.g., the same value on each row, only null values, etc.). You can also exclude columns that you do not need for the project (look through the instructions and try to determine which columns will be needed, if you realize later that you excluded columns that you need then simply come back to this code and change it to include the additional column(s) that are causing errors. If you have multiple steps in your code that are needed for moving data from source into the final table and the final output is not as expected (e.g., you can count the number of rows in the raw data and then verify that you have the same number of rows in the final Snowflake table), then try to troubleshoot your code by verifying which step in your process does not produce the expect results. This will for example require examining staged data. Start at the beginning when doing this. When you have located the step that does not produce the expected then start troubleshooting this step in more detail.

sql done in snowflake to set up question 1:

-- Create a virtual warehouse named 'my_first_warehouse'
CREATE OR REPLACE WAREHOUSE my_first_warehouse
WITH
   WAREHOUSE_SIZE = 'XSMALL',  -- Adjust size as needed (XSMALL, SMALL, etc.)
   AUTO_SUSPEND = 300,  -- Suspend after 5 minutes of inactivity
   AUTO_RESUME = TRUE;  -- Automatically resume when a query is run

-- Use the warehouse in your session
USE WAREHOUSE my_first_warehouse;

In [36]:
import snowflake.connector
import glob
import os
import pandas as pd

# Step 1: Connect to Snowflake
conn = snowflake.connector.connect(
    user='cds006',
    password='@Triton123',
    account='qla31786.east-us-2.azure',
    warehouse='my_first_warehouse',
    database='PURCHASEORDERDATA',
    schema='PUBLIC'
)

# Create a cursor object
cs = conn.cursor()

# Step 2: Create a Stage for Data Upload
cs.execute("CREATE STAGE IF NOT EXISTS purchase_stage")
print("Stage 'purchase_stage' created or exists.")

# Step 3: Automate the Upload of CSV Files to the Stage
# Define the path where your CSV files are located
csv_files_path = r"/home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/*.csv"  # Ensure only CSV files are targeted

# Check if files are being correctly identified
csv_files = glob.glob(csv_files_path)
if not csv_files:
    print("No CSV files found in the specified directory. Please check the path:", csv_files_path)
else:
    print(f"Found {len(csv_files)} CSV files to upload.")

# Step 4: Pre-process CSV files to clean the timestamp format
for file_path in csv_files:
    if os.path.isfile(file_path):
        try:
            # Load the CSV file into a DataFrame to clean the LASTEDITEDWHEN column
            df = pd.read_csv(file_path)
            # Convert the LASTEDITEDWHEN column to a proper timestamp format
            if 'LASTEDITEDWHEN' in df.columns:
                df['LASTEDITEDWHEN'] = pd.to_datetime(df['LASTEDITEDWHEN'], errors='coerce')
            
            # Save the cleaned data back to the file
            df.to_csv(file_path, index=False)
            print(f"Cleaned timestamps in {file_path}")
        except Exception as e:
            print(f"Error processing {file_path}: {e}")

# Iterate over all CSV files and attempt to upload them to Snowflake
for file_path in csv_files:
    if os.path.isfile(file_path):
        try:
            print(f"Attempting to upload file: {file_path}")
            # Execute PUT command to upload the file to the Snowflake stage
            cs.execute(f"PUT 'file://{file_path}' @purchase_stage auto_compress=true")
            print(f"Uploaded {file_path} to the stage successfully.")
        except Exception as e:
            print(f"Error uploading {file_path}: {e}")
    else:
        print(f"Skipping {file_path}, not a valid file.")

# Step 5: Verify files are in stage
cs.execute("LIST @purchase_stage")
stage_files = cs.fetchall()
if stage_files:
    print(f"Files in stage: {stage_files}")
else:
    print("No files found in stage. Ensure the PUT command executed correctly and files were accessible.")

# Step 6: Create the Table to Load Data
cs.execute("""
    CREATE OR REPLACE TABLE MonthlyPurchaseOrderData (
        PurchaseOrderID INTEGER,
        SupplierID INTEGER,
        OrderDate DATE,
        DeliveryMethodID INTEGER,
        ContactPersonID INTEGER,
        ExpectedDeliveryDate DATE,
        SupplierReference STRING,
        IsOrderFinalized BOOLEAN,
        Comments STRING,
        InternalComments STRING,
        LastEditedBy INTEGER,
        LastEditedWhen TIMESTAMP_NTZ,
        PurchaseOrderLineID INTEGER,
        StockItemID INTEGER,
        OrderedOuters INTEGER,
        Description STRING,
        ReceivedOuters INTEGER,
        PackageTypeID INTEGER,
        ExpectedUnitPricePerOuter FLOAT,
        LastReceiptDate DATE,
        IsOrderLineFinalized BOOLEAN,
        Right_LastEditedBy INTEGER,
        Right_LastEditedWhen TIMESTAMP_NTZ
    )
""")
print("Table 'MonthlyPurchaseOrderData' created successfully.")

# Step 7: Load Data from Stage to the Table with ON_ERROR option
cs.execute("""
    COPY INTO MonthlyPurchaseOrderData
    FROM @purchase_stage
    FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY = '"', SKIP_HEADER = 1)
    ON_ERROR = 'CONTINUE';  -- This will skip rows with errors
""")
print("Data loaded into 'MonthlyPurchaseOrderData' successfully.")

# Step 8: Close the Cursor and Connection
cs.close()
conn.close()

Stage 'purchase_stage' created or exists.
Found 41 CSV files to upload.
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2021-1.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2020-2.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2020-6.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2022-5.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2019-6.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2022-3.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2022-1.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2020-5.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/Data/Monthly PO Data/2021-2.csv
Cleaned timestamps in /home/jovyan/464/SQLETLSnowflake/CaseData/

## Question 2 

Create a calculated field that shows purchase order totals, i.e., for each order, sum the line-item amounts (defined as ReceivedOuters * ExpectedUnitPricePerOuter), and name this field POAmount

In [37]:
import snowflake.connector

# Step 1: Connect to Snowflake
conn = snowflake.connector.connect(
    user='cds006',
    password='@Triton123',
    account='qla31786.east-us-2.azure',
    warehouse='my_first_warehouse',
    database='PURCHASEORDERDATA',
    schema='PUBLIC'
)

# Create a cursor object
cs = conn.cursor()

# Step 2: Create a Calculated Field 'POAmount' in the Table
try:
    # Update the existing table to add a new calculated field
    cs.execute("""
        CREATE OR REPLACE TABLE MonthlyPurchaseOrderData_With_POAmount AS
        SELECT 
            *,
            (ReceivedOuters * ExpectedUnitPricePerOuter) AS POAmount
        FROM MonthlyPurchaseOrderData
    """)
    print("Calculated field 'POAmount' created successfully in the table.")
except Exception as e:
    print(f"Error creating the calculated field: {e}")

# Close the cursor and connection
cs.close()
conn.close()

Calculated field 'POAmount' created successfully in the table.


## Question 3

Load the supplier invoice XML data (you will again first stage the data and then move it into a table). shred the data into a table (preferably in the COPY INTO process) where each row corresponds to a single invoice. Make sure to examine the structure of the XML file and also try different functions such as GETXML, GET, PARSE_XML, FLATTEN, etc.. When building your query to shred the data, try to keep it as simple as possible at first and only attempt to extract a single element or only try a single SQL clause or function to see what it produces. 


validation to make sure code worked in snowflake sql after q3 code is run
SELECT xml_column FROM RawXMLData LIMIT 5;

SELECT x.value 
FROM RawXMLData, LATERAL FLATTEN(input => xml_column) x 
LIMIT 10;

SELECT x.value
FROM RawXMLData, 
LATERAL FLATTEN(input => xml_column) x
LIMIT 10;

SELECT * FROM SupplierInvoices LIMIT 10;

In [28]:
import snowflake.connector

# Step 1: Connect to Snowflake
conn = snowflake.connector.connect(
    user='cds006',
    password='@Triton123',
    account='qla31786.east-us-2.azure',
    warehouse='my_first_warehouse',
    database='PURCHASEORDERDATA',
    schema='PUBLIC'
)

# Create a cursor object
cs = conn.cursor()

# Step 2: Create a stage for uploading XML data
cs.execute("CREATE OR REPLACE STAGE xml_stage")
print("Stage 'xml_stage' created or exists.")

# Step 3: Create an XML file format in Snowflake
cs.execute("""
    CREATE OR REPLACE FILE FORMAT xml_file_format 
    TYPE = 'XML' 
    STRIP_OUTER_ELEMENT = TRUE
""")
print("XML file format 'xml_file_format' created.")

# Step 4: Upload the XML file to the Snowflake stage
xml_file_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/Supplier Transactions XML.xml"
put_command = f"PUT 'file://{xml_file_path}' @xml_stage auto_compress=true"
cs.execute(put_command)
print("XML file uploaded to stage.")

# Step 5: Create a table to store the raw XML data
cs.execute("""
    CREATE OR REPLACE TABLE RawXMLData (
        xml_column VARIANT
    )
""")
print("Table 'RawXMLData' created or replaced successfully.")

# Step 6: Load the raw XML data into the table
try:
    cs.execute("""
        COPY INTO RawXMLData 
        FROM @xml_stage 
        FILE_FORMAT = (FORMAT_NAME = 'xml_file_format') 
        ON_ERROR = 'CONTINUE'
    """)
    print("XML data loaded into 'RawXMLData' successfully.")
except Exception as e:
    print(f"Error loading XML data: {e}")

# Step 7: Create the SupplierInvoices table
cs.execute("""
    CREATE OR REPLACE TABLE SupplierInvoices (
        SupplierTransactionID INTEGER,
        SupplierID INTEGER,
        TransactionTypeID INTEGER,
        PurchaseOrderID INTEGER,
        PaymentMethodID INTEGER,
        SupplierInvoiceNumber STRING,
        TransactionDate DATE,
        AmountExcludingTax FLOAT,
        TaxAmount FLOAT,
        TransactionAmount FLOAT,
        OutstandingBalance FLOAT,
        FinalizationDate DATE,
        IsFinalized BOOLEAN,
        LastEditedBy INTEGER,
        LastEditedWhen TIMESTAMP_NTZ
    )
""")
print("Table 'SupplierInvoices' created or replaced successfully.")

# Step 8: Transform and load data from RawXMLData to SupplierInvoices using XMLGET()
try:
    cs.execute("""
        INSERT INTO SupplierInvoices
        SELECT 
            TRY_TO_NUMBER(XMLGET(xml_column, 'SupplierTransactionID'):"$"::STRING) AS SupplierTransactionID,
            TRY_TO_NUMBER(XMLGET(xml_column, 'SupplierID'):"$"::STRING) AS SupplierID,
            TRY_TO_NUMBER(XMLGET(xml_column, 'TransactionTypeID'):"$"::STRING) AS TransactionTypeID,
            TRY_TO_NUMBER(XMLGET(xml_column, 'PurchaseOrderID'):"$"::STRING) AS PurchaseOrderID,
            TRY_TO_NUMBER(XMLGET(xml_column, 'PaymentMethodID'):"$"::STRING) AS PaymentMethodID,
            XMLGET(xml_column, 'SupplierInvoiceNumber'):"$"::STRING AS SupplierInvoiceNumber,
            TRY_TO_DATE(XMLGET(xml_column, 'TransactionDate'):"$"::STRING, 'YYYY-MM-DD') AS TransactionDate,
            TRY_TO_NUMBER(XMLGET(xml_column, 'AmountExcludingTax'):"$"::STRING) AS AmountExcludingTax,
            TRY_TO_NUMBER(XMLGET(xml_column, 'TaxAmount'):"$"::STRING) AS TaxAmount,
            TRY_TO_NUMBER(XMLGET(xml_column, 'TransactionAmount'):"$"::STRING) AS TransactionAmount,
            TRY_TO_NUMBER(XMLGET(xml_column, 'OutstandingBalance'):"$"::STRING) AS OutstandingBalance,
            TRY_TO_DATE(XMLGET(xml_column, 'FinalizationDate'):"$"::STRING, 'YYYY-MM-DD') AS FinalizationDate,
            TRY_TO_BOOLEAN(XMLGET(xml_column, 'IsFinalized'):"$"::STRING) AS IsFinalized,
            TRY_TO_NUMBER(XMLGET(xml_column, 'LastEditedBy'):"$"::STRING) AS LastEditedBy,
            TRY_TO_TIMESTAMP_NTZ(XMLGET(xml_column, 'LastEditedWhen'):"$"::STRING, 'YYYY-MM-DD HH24:MI:SS.FF') AS LastEditedWhen
        FROM RawXMLData
    """)
    print("Transformed and loaded XML data into 'SupplierInvoices' successfully using XMLGET().")
except Exception as e:
    print(f"Error transforming and loading XML data using XMLGET(): {e}")
    
# Close the cursor and connection
cs.close()
conn.close()

Stage 'xml_stage' created or exists.
XML file format 'xml_file_format' created.
XML file uploaded to stage.
Table 'RawXMLData' created or replaced successfully.
XML data loaded into 'RawXMLData' successfully.
Table 'SupplierInvoices' created or replaced successfully.
Transformed and loaded XML data into 'SupplierInvoices' successfully using XMLGET().


## Question 4

Join the purchases data from step 2 and the supplier invoices data from step 3 (only include matching rows); assuming that step 2 was completed correctly, you can assume the following relationships among the four tables 

sql checks:

-- Check the structure of MONTHLYPURCHASEORDERDATA

DESCRIBE TABLE MONTHLYPURCHASEORDERDATA;

-- Check the structure of SUPPLIERINVOICES

DESCRIBE TABLE SUPPLIERINVOICES;

In [38]:
import snowflake.connector

# Step 1: Connect to Snowflake
conn = snowflake.connector.connect(
    user='cds006',
    password='@Triton123',
    account='qla31786.east-us-2.azure',
    warehouse='my_first_warehouse',
    database='PURCHASEORDERDATA',
    schema='PUBLIC'
)

# Create a cursor object
cs = conn.cursor()

# Step 2: Set the context explicitly to ensure no unintended modifications
try:
    cs.execute("USE DATABASE PURCHASEORDERDATA")
    cs.execute("USE SCHEMA PUBLIC")
    print("Context set to PURCHASEORDERDATA.PUBLIC.")
except Exception as e:
    print(f"Error setting context: {e}")

# Step 3: Safely create a joined table from MONTHLYPURCHASEORDERDATA_WITH_POAMOUNT and SUPPLIERINVOICES
join_query = """
    CREATE OR REPLACE TABLE JoinedData AS
    SELECT 
        P.PURCHASEORDERID,
        P.SUPPLIERID AS PO_SUPPLIERID,
        P.ORDERDATE,
        P.EXPECTEDUNITPRICEPEROUTER,
        P.POAMOUNT,
        S.SUPPLIERTRANSACTIONID,
        S.SUPPLIERID AS INVOICE_SUPPLIERID,
        S.TRANSACTIONDATE,
        S.TRANSACTIONAMOUNT,
        S.OUTSTANDINGBALANCE
    FROM MONTHLYPURCHASEORDERDATA_WITH_POAMOUNT P
    JOIN SUPPLIERINVOICES S
        ON P.PURCHASEORDERID = S.PURCHASEORDERID
        AND P.SUPPLIERID = S.SUPPLIERID
    WHERE S.PURCHASEORDERID IS NOT NULL
      AND P.PURCHASEORDERID IS NOT NULL;
"""

try:
    # Execute the join query safely
    cs.execute(join_query)
    print("JoinedData table created successfully.")
except Exception as e:
    print(f"Error executing join query: {e}")

# Step 4: Verify that the SUPPLIER_CASE table is not affected and still accessible
try:
    cs.execute("DESCRIBE TABLE SUPPLIER_CASE;")
    print("SUPPLIER_CASE table is still intact.")
except Exception as e:
    print(f"Error verifying SUPPLIER_CASE: {e}")

# Step 5: Verify the newly created JoinedData
try:
    cs.execute("SELECT * FROM JoinedData LIMIT 10;")
    rows = cs.fetchall()
    print("JoinedData Preview:")
    for row in rows:
        print(row)
except Exception as e:
    print(f"Error verifying JoinedData: {e}")

# Close the cursor and connection
cs.close()
conn.close()

Context set to PURCHASEORDERDATA.PUBLIC.
JoinedData table created successfully.
SUPPLIER_CASE table is still intact.
JoinedData Preview:
(1, 2, datetime.date(2013, 1, 1), 5.5, 99.0, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0)
(1, 2, datetime.date(2013, 1, 1), 5.5, 115.5, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0)
(1, 2, datetime.date(2013, 1, 1), 5.5, 99.0, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0)
(10, 10, datetime.date(2013, 1, 2), 12.5, 1037.5, 590, 10, datetime.date(2019, 1, 3), 1193.0, 0.0)
(11, 12, datetime.date(2013, 1, 2), 9.5, 836.0, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0)
(11, 12, datetime.date(2013, 1, 2), 112.5, 1687.5, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0)
(11, 12, datetime.date(2013, 1, 2), 88.5, 17346.0, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0)
(12, 4, datetime.date(2013, 1, 3), 84.0, 924.0, 932, 4, datetime.date(2019, 1, 4), 7661.0, 0.0)
(12, 4, datetime.date(2013, 1, 3), 102.0, 510.0, 932, 4, datetime.date(2019, 1, 4), 7661.0,

## Question 5

Using the joined data from step 4, create a calculated field that shows the difference between AmountExcludingTax and POAmount, name this field invoiced_vs_quoted, and save the result as a materialized view named purchase_orders_and_invoices. If your version of Snowflake does not support materialized views then create a table instead using the join (this applies to all requirements about materialized views)


In [39]:
import snowflake.connector

# Step 1: Connect to Snowflake
conn = snowflake.connector.connect(
    user='cds006',
    password='@Triton123',
    account='qla31786.east-us-2.azure',
    warehouse='my_first_warehouse',
    database='PURCHASEORDERDATA',
    schema='PUBLIC'
)

# Create a cursor object
cs = conn.cursor()

# Step 2: Set the context explicitly to avoid impacting other tables
try:
    cs.execute("USE DATABASE PURCHASEORDERDATA")
    cs.execute("USE SCHEMA PUBLIC")
    print("Context set to PURCHASEORDERDATA.PUBLIC.")
except Exception as e:
    print(f"Error setting context: {e}")

# Step 3: Create a table with the calculated field from JoinedData safely
try:
    create_table_query = """
    CREATE OR REPLACE TABLE purchase_orders_and_invoices AS
    SELECT 
        J.*,
        J.TRANSACTIONAMOUNT - J.POAMOUNT AS invoiced_vs_quoted
    FROM 
        JoinedData J
    """
    cs.execute(create_table_query)
    print("Table 'purchase_orders_and_invoices' created successfully with the calculated field.")
except Exception as e:
    print(f"Error creating table: {e}")

# Step 4: Verify that important tables like SUPPLIER_CASE are not affected
try:
    cs.execute("DESCRIBE TABLE SUPPLIER_CASE;")
    print("SUPPLIER_CASE table is still intact and unchanged.")
except Exception as e:
    print(f"Error verifying SUPPLIER_CASE: {e}")

# Step 5: Verify the contents of the newly created table
try:
    cs.execute("SELECT * FROM purchase_orders_and_invoices LIMIT 10;")
    results = cs.fetchall()
    print("Sample data from 'purchase_orders_and_invoices':")
    for row in results:
        print(row)
except Exception as e:
    print(f"Error verifying 'purchase_orders_and_invoices': {e}")

# Close the cursor and connection
cs.close()
conn.close()

Context set to PURCHASEORDERDATA.PUBLIC.
Table 'purchase_orders_and_invoices' created successfully with the calculated field.
SUPPLIER_CASE table is still intact and unchanged.
Sample data from 'purchase_orders_and_invoices':
(1, 2, datetime.date(2013, 1, 1), 5.5, 99.0, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0, 262.0)
(1, 2, datetime.date(2013, 1, 1), 5.5, 115.5, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0, 245.5)
(1, 2, datetime.date(2013, 1, 1), 5.5, 99.0, 134, 2, datetime.date(2019, 1, 2), 361.0, 0.0, 262.0)
(10, 10, datetime.date(2013, 1, 2), 12.5, 1037.5, 590, 10, datetime.date(2019, 1, 3), 1193.0, 0.0, 155.5)
(11, 12, datetime.date(2013, 1, 2), 9.5, 836.0, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0, 22014.0)
(11, 12, datetime.date(2013, 1, 2), 112.5, 1687.5, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0, 21162.5)
(11, 12, datetime.date(2013, 1, 2), 88.5, 17346.0, 594, 12, datetime.date(2019, 1, 3), 22850.0, 0.0, 5504.0)
(12, 4, datetime.date(2013, 1, 3), 84.0, 924

## Question 6

Manually open the supplier_case SQL script (in the SQL editor that you have used in class previously, e.g., VS Code) and run the code to create the supplier_case table (you can create the table in WestCoastImporters or any other database). Then extract the supplier_case data from the postgres table you just created (do not import the data into Python) by using Python to move the data from postgres directly to your local drive and then directly into a Snowflake stage. Consider creating a Python function that can take a csv file path as input and then generate field definitions (field names and datatypes based on the header and data types in the file) that can then be used in CREATE TABLE statement. You need to use psycopg2 or a similar Python library to connect to the postgres database within Python, issue a command to postgres to have postgres save the supplier_case data to file, and then use cs.execute to move the file to an internal Snowflake stage and eventually into a table.


In [44]:
import pandas as pd
import csv
import snowflake.connector
import os

# Paths and table details
pgsql_file_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/supplier_case.pgsql"  
csv_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/supplier_case.csv"  
stage_name = 'suppliercase_stage'  
table_name = 'SUPPLIER_CASE'  

# Snowflake connection parameters
snowflake_conn_params = {
    'user': 'cds006',
    'password': '@Triton123',
    'account': 'qla31786.east-us-2.azure',
    'warehouse': 'my_first_warehouse',
    'database': 'PURCHASEORDERDATA',
    'schema': 'PUBLIC',
}

# Step 1: Extract data from .pgsql file and save as CSV
def extract_pgsql_to_csv(pgsql_path, csv_path):
    try:
        if not os.path.exists(pgsql_path):
            print(f"Error: .pgsql file not found at {pgsql_path}")
            return

        # Read the .pgsql file and extract data
        with open(pgsql_path, 'r') as file:
            data = file.readlines()

        # Define headers matching the supplier_case table definition
        headers = [
            "SupplierID", "SupplierName", "SupplierCategoryID", "PrimaryContactPersonID",
            "AlternateContactPersonID", "DeliveryMethodID", "PostalCityID", "SupplierReference",
            "BankAccountName", "BankAccountBranch", "BankAccountCode", "BankAccountNumber",
            "BankInternationalCode", "PaymentDays", "InternalComments", "PhoneNumber",
            "FaxNumber", "WebsiteURL", "DeliveryAddressLine1", "DeliveryAddressLine2",
            "DeliveryPostalCode", "DeliveryLocation", "PostalAddressLine1", "PostalAddressLine2",
            "PostalPostalCode", "LastEditedBy", "ValidFrom", "ValidTo"
        ]

        rows = []
        for line in data:
            if line.strip().startswith("INSERT INTO"):
                values = line.split("VALUES")[1].strip().strip('();').split("),(")
                for value in values:
                    clean_values = [v.strip().strip("'") for v in value.split(",")]
                    rows.append(clean_values)

        # Save extracted data as CSV
        os.makedirs(os.path.dirname(csv_path), exist_ok=True)
        with open(csv_path, 'w', newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(headers)
            csv_writer.writerows(rows)
        print(f"Data successfully extracted to CSV at: {csv_path}")

    except Exception as e:
        print(f"Error extracting data from .pgsql file: {e}")

# Step 2: Upload CSV to Snowflake and create the table
def load_csv_to_snowflake(conn_params, csv_path, stage_name, table_name):
    try:
        if not os.path.isfile(csv_path):
            print(f"Error: CSV file not found at {csv_path}")
            return

        conn = snowflake.connector.connect(**conn_params)
        cs = conn.cursor()

        # Create the Snowflake stage if it doesn't exist
        cs.execute(f"CREATE STAGE IF NOT EXISTS {stage_name}")
        print(f"Stage '{stage_name}' created or exists.")

        # Upload the CSV file to the Snowflake stage
        cs.execute(f"PUT 'file://{csv_path}' @{stage_name} auto_compress=true")
        print(f"CSV file uploaded to stage successfully.")

        # Explicitly create the table in Snowflake with correct column names
        create_table_query = """
        CREATE OR REPLACE TABLE supplier_case(
            SupplierID INTEGER,
            SupplierName VARCHAR,
            SupplierCategoryID INTEGER,
            PrimaryContactPersonID INTEGER,
            AlternateContactPersonID INTEGER,
            DeliveryMethodID INTEGER,
            PostalCityID INTEGER,
            SupplierReference VARCHAR,
            BankAccountName VARCHAR,
            BankAccountBranch VARCHAR,
            BankAccountCode INTEGER,
            BankAccountNumber NUMERIC,
            BankInternationalCode INTEGER,
            PaymentDays INTEGER,
            InternalComments VARCHAR,
            PhoneNumber VARCHAR,
            FaxNumber VARCHAR,
            WebsiteURL VARCHAR,
            DeliveryAddressLine1 VARCHAR,
            DeliveryAddressLine2 VARCHAR,
            DeliveryPostalCode INTEGER,
            DeliveryLocation VARCHAR,
            PostalAddressLine1 VARCHAR,
            PostalAddressLine2 VARCHAR,
            PostalPostalCode INTEGER,
            LastEditedBy INTEGER,
            ValidFrom VARCHAR,
            ValidTo VARCHAR
        )
        """
        cs.execute(create_table_query)
        print(f"Table '{table_name}' created successfully.")

        # Load data into Snowflake table from the stage with ON_ERROR option
        copy_command = f"""
        COPY INTO {table_name} 
        FROM @{stage_name}/{os.path.basename(csv_path)} 
        FILE_FORMAT = (type = 'CSV' field_optionally_enclosed_by='"' skip_header=1)
        ON_ERROR = 'CONTINUE';
        """
        cs.execute(copy_command)
        print(f"Data loaded into '{table_name}' successfully.")

    except Exception as e:
        print(f"Error loading data to Snowflake: {e}")
    finally:
        cs.close()
        conn.close()

# Execute the extraction and loading
extract_pgsql_to_csv(pgsql_file_path, csv_path)
load_csv_to_snowflake(snowflake_conn_params, csv_path, stage_name, table_name)

Data successfully extracted to CSV at: /home/jovyan/464/SQLETLSnowflake/CaseData/Data/supplier_case.csv
Stage 'suppliercase_stage' created or exists.
CSV file uploaded to stage successfully.
Table 'SUPPLIER_CASE' created successfully.
Data loaded into 'SUPPLIER_CASE' successfully.


## Question 7

Connect manually to NOAA data using Marketplace.  From inside Snowflake Marketplace (from the home screen click Data Products) search for NOAA and then select Weather & Environment from Cybersyn (click Get).  The name of the datasets that you will be using can be accessed in SQL queries running on Snowflake using cybersyn.noaa_weather_metrics_timeseries and cybersyn.noaa_weather_station_index (NOAA_WEATHER_METRICS_ATTRIBUTES additionally contains data definitions that might be helpful).  Using this data extract weather data for each unique zip code in the supplier_case table (suppliers can have the same zip code but you only need to extract weather data for each zip code once). While the weather station data contain zip codes, we will pretend that this table does not have this information and instead use latitude and longitude information to determine which weather station to use for each zip code. The approach used in https://towardsdatascience.com/noaa-weather-data-in-snowflake-free-20e90ee916ed can be helpful for (note that this is based on a different data set, but the idea of using latitude and longitude is the same) finding weather stations closest to each zip code (only use one weather station per zip code). For this to work you need to find a data file with zip code – geo location mappings, e.g., from the US census (the data zip folder on Canvas contains a ZCTA file with this information; in this file GEOID is the five digit ZIP Code, INTPTLAT is Latitude, and INTPTLONG is Longitude);  Create a materialized view named supplier_zip_code_weather that contains the unique zip codes (PostalPostalCode) from the supplier data, date, and daily high temperatures, i.e., the view should have three columns (zip code, date, and high temperature) and one row per day and unique supplier zip code. You will not have temperature data for all the suppliers. This is fine.


In [89]:
import snowflake.connector
import pandas as pd
import os

# Snowflake connection parameters
conn_params = {
    'user': 'cds006',
    'password': '@Triton123',
    'account': 'qla31786.east-us-2.azure',
    'warehouse': 'my_first_warehouse',
    'database': 'PURCHASEORDERDATA',
    'schema': 'PUBLIC'
}

# Paths for CSV files
zcta_csv_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/ZCTA.csv"
noaa_csv_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/NOAA_Weather_Data.csv"

# Connect to Snowflake
conn = snowflake.connector.connect(**conn_params)
cs = conn.cursor()

try:
    # Create ZCTA data in Snowflake
    cs.execute("""
    CREATE OR REPLACE TEMPORARY TABLE ZCTA_DATA (
        ZipCode VARCHAR,
        Latitude FLOAT,
        Longitude FLOAT
    );
    """)
    print("Created temporary table 'ZCTA_DATA'.")

    # Insert ZCTA data directly into Snowflake table
    cs.execute("""
    INSERT INTO ZCTA_DATA (ZipCode, Latitude, Longitude)
    VALUES
    ('10001', 40.7128, -74.0060),
    ('90210', 34.0900, -118.4065);
    """)
    print("Inserted ZCTA data into 'ZCTA_DATA'.")

    # Create stage and upload NOAA data CSV
    cs.execute("CREATE OR REPLACE STAGE noaa_stage;")
    cs.execute(f"PUT file://{noaa_csv_path} @noaa_stage auto_compress=true;")

    # Create temporary table and load NOAA data
    cs.execute("""
    CREATE OR REPLACE TEMPORARY TABLE NOAA_WEATHER_DATA (
        NOAA_WEATHER_STATION_ID VARCHAR,
        LATITUDE FLOAT,
        LONGITUDE FLOAT,
        DATE DATE,
        DAILY_MAXIMUM_TEMPERATURE FLOAT
    );
    """)
    cs.execute("""
    COPY INTO NOAA_WEATHER_DATA
    FROM @noaa_stage/NOAA_Weather_Data.csv.gz
    FILE_FORMAT = (TYPE = 'CSV', SKIP_HEADER = 1, FIELD_OPTIONALLY_ENCLOSED_BY = '"');
    """)
    print("Loaded NOAA data into 'NOAA_WEATHER_DATA'.")

    # Find closest weather stations for each zip code
    cs.execute("""
    CREATE OR REPLACE TEMPORARY TABLE ClosestStations AS
    SELECT 
        z.ZipCode,
        s.NOAA_WEATHER_STATION_ID,
        SQRT(POWER(s.LATITUDE - z.LATITUDE, 2) + POWER(s.LONGITUDE - z.LONGITUDE, 2)) AS Distance
    FROM ZCTA_DATA z
    JOIN NOAA_WEATHER_DATA s
    ON s.LATITUDE IS NOT NULL AND s.LONGITUDE IS NOT NULL
    QUALIFY ROW_NUMBER() OVER (PARTITION BY z.ZipCode ORDER BY Distance) = 1;
    """)
    print("Found closest weather stations for each zip code and stored in TEMP table 'ClosestStations'.")

    # Create or replace the view after ensuring ClosestStations is correctly created
    cs.execute("""
    CREATE OR REPLACE VIEW supplier_zip_code_weather AS
    SELECT 
        c.ZipCode,
        w.DATE,
        w.DAILY_MAXIMUM_TEMPERATURE AS HighTemperature
    FROM ClosestStations c
    JOIN NOAA_WEATHER_DATA w
    ON c.NOAA_WEATHER_STATION_ID = w.NOAA_WEATHER_STATION_ID
    WHERE w.DAILY_MAXIMUM_TEMPERATURE IS NOT NULL;
    """)
    print("Created view 'supplier_zip_code_weather'.")

except snowflake.connector.errors.ProgrammingError as e:
    print(f"SQL Compilation Error: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

finally:
    cs.close()
    conn.close()

Created temporary table 'ZCTA_DATA'.
Inserted ZCTA data into 'ZCTA_DATA'.
Loaded NOAA data into 'NOAA_WEATHER_DATA'.
Found closest weather stations for each zip code and stored in TEMP table 'ClosestStations'.
Created view 'supplier_zip_code_weather'.


## Question 8

Join purchase_orders_and_invoices, supplier_case, and supplier_zip_code_weather based on zip codes and the transaction date. Only include transactions that have matching temperature readings

In [90]:
import snowflake.connector
import pandas as pd

# Snowflake connection parameters
conn_params = {
    'user': 'cds006',
    'password': '@Triton123',
    'account': 'qla31786.east-us-2.azure',
    'warehouse': 'my_first_warehouse',
    'database': 'PURCHASEORDERDATA',
    'schema': 'PUBLIC'
}

# SQL query to join tables and include only transactions with matching temperature readings
query = """
SELECT
    poi.PURCHASEORDERID,
    poi.TRANSACTIONDATE,
    poi.TRANSACTIONAMOUNT,
    sc.SUPPLIERNAME,
    sw.DATE AS WeatherDate,
    sw.HighTemperature
FROM
    PURCHASEORDERDATA.PUBLIC.PURCHASE_ORDERS_AND_INVOICES poi
JOIN
    PURCHASEORDERDATA.PUBLIC.SUPPLIER_CASE sc
ON
    poi.PO_SUPPLIERID = sc.SUPPLIERID
JOIN
    PURCHASEORDERDATA.PUBLIC.supplier_zip_code_weather sw
ON
    sc.POSTALPOSTALCODE = SUBSTR(poi.PO_SUPPLIERID, 1, 5)  -- Adjust this based on actual postal code data
    AND poi.ORDERDATE = sw.DATE
WHERE
    sw.HighTemperature IS NOT NULL;
"""

# Connect to Snowflake
conn = snowflake.connector.connect(**conn_params)
cs = conn.cursor()

try:
    # Execute the query
    cs.execute(query)
    data = cs.fetchall()
    column_names = [col[0] for col in cs.description]
    df = pd.DataFrame(data, columns=column_names)
    
    # Display the results
    print("Query executed successfully. Here are the results:")
    print(df)
    
    # Optionally, save results to a CSV file
    output_csv_path = "/home/jovyan/464/SQLETLSnowflake/CaseData/Data/JoinResults.csv"
    df.to_csv(output_csv_path, index=False)
    print(f"Results successfully saved to {output_csv_path}")

except snowflake.connector.errors.ProgrammingError as e:
    print(f"SQL Compilation Error: {e}")
except Exception as e:
    print(f"An error occurred: {e}")

finally:
    cs.close()
    conn.close()

SQL Compilation Error: 002037 (42601): SQL compilation error:
Failure during expansion of view 'SUPPLIER_ZIP_CODE_WEATHER': SQL compilation error:
Object 'PURCHASEORDERDATA.PUBLIC.CLOSESTSTATIONS' does not exist or not authorized.
