In [None]:
## Team J - Group:
## 1) Sankalp Kuram
## 2) Youjin Park
## 3) Yada Klueabvichit
## 4) Xiaoyue Huang

In [6]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [7]:
## 1.Extract and load the 41 comma delimited purchases data files and form a single table of purchases data; 
##   a.Preferably follow these guidelines when staging the files (this staging approach does not make sense for our data as the files are small, but it is good practice if you have more data and if the data is loaded over time)
##   b.Use Python to automate the PUT process, e.g., use glob to iterate through and PUT all purchases files automatically
##   c.COPY INTO is generally preferred over INSERT INTO (this applies to the entire project);
##   d.To the extent possible, perform transformations such as selecting columns and setting data types during the COPY INTO process

### Snowflake Connection
import snowflake.connector

conn = snowflake.connector.connect(
    user='skuram',
    password='S@nkalp16',
    account='jja15332'
    )
cs = conn.cursor()

## Set up the Snowflake environment 
cs.execute("CREATE WAREHOUSE IF NOT EXISTS ELT_Warehouse")
cs.execute("CREATE DATABASE IF NOT EXISTS ELT_Database")
cs.execute("USE DATABASE ELT_Database")
cs.execute("CREATE SCHEMA IF NOT EXISTS ELT_Schema")
cs.execute("CREATE STAGE IF NOT EXISTS ELT_Stage")

###### Fetch the Monthly PO .csv files data and load it to Snowflake ########

## Use PUT to stage the .csv files
cs.execute("PUT file://CaseData/MonthlyPOData/* @ELT_Schema.ELT_Stage")

## Create the Purchase_Order_Header table
cs.execute(
"CREATE TABLE IF NOT EXISTS Purchase_Order_Header(PurchaseOrderID INTEGER,SupplierID INTEGER,OrderDate VARCHAR,DeliveryMethodID INTEGER,ContactPersonID INTEGER,ExpectedDeliveryDate VARCHAR,SupplierReference VARCHAR,IsOrderFinalized INTEGER,Comments STRING,InternalComments STRING,LastEditedBy STRING,LastEditedWhen VARCHAR,PurchaseOrderLineID INTEGER,StockItemID INTEGER,OrderedOuters INTEGER,Description STRING,ReceivedOuters INTEGER,PackageTypeID INTEGER,ExpectedUnitPricePerOuter FLOAT,LastReceiptDate VARCHAR,IsOrderLineFinalized INTEGER,Right_LastEditedBy STRING,Right_LastEditedWhen VARCHAR)"
)

## Copy to the snowflake table through the staged data
cs.execute("COPY INTO Purchase_Order_Header FROM @ELT_Database.ELT_Schema.ELT_Stage on_error = continue")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [11]:
## 2.Create a calculated field that shows purchase order totals, i.e., for each order, 
##   sum the line item amounts (defined as ReceivedOuters * ExpectedUnitPricePerOuter), 
##   and name this field POAmount

cs.execute("""DROP TABLE IF EXISTS Purchase_Order_Header_Updated""")

cs.execute("""
CREATE TABLE Purchase_Order_Header_Updated AS 
(
    SELECT PurchaseOrderID,SUPPLIERID,
    sum(cast(ReceivedOuters AS INT) * ExpectedUnitPricePerOuter) AS POAmount 
    FROM Purchase_Order_Header 
    GROUP BY PurchaseOrderID,SUPPLIERID
)""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [17]:
## 3.Extract and load the supplier invoice XML data
## a.shred the data into a table (preferably in the COPY INTO process) where each row 
## corresponds to a single invoice

## Create Staging for XML file loading
cs.execute("CREATE OR REPLACE STAGE ETLXMLStage")
cs.execute("CREATE OR REPLACE file format xml_load type = xml strip_outer_element = True")
cs.execute("PUT file://CaseData/Supplier_Transactions_XML.xml @ETLXMLStage")

## Creating a table Invoice XML
cs.execute("CREATE OR REPLACE TABLE INVOICE_XML (src_xml VARIANT)")

## Load the invoice data to the table through staging 
cs.execute("COPY INTO INVOICE_XML FROM @ETLXMLStage file_format = (format_name = xml_load)")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [36]:
## 4.Join the purchases data from step 2 and the supplier invoices data from step 3 (only include matching rows); 
##   assuming that step 2 was completed correctly, you can assume the following relationships among the four tables
##   (the other two tables are discussed below):

cs.execute("""CREATE OR REPLACE TABLE invoice_xml_table AS
(SELECT 
           XMLGET(src_xml,'suppliertransactionID'):"$" as suppliertransactionID,
           XMLGET(src_xml,'SupplierID'):"$" as SupplierID,
           XMLGET(src_xml,'TransactionTypeID'):"$" as TransactionTypeID,
           XMLGET(src_xml,'PurchaseOrderID'):"$" as PurchaseOrderID,
           XMLGET(src_xml,'PaymentMethodID'):"$" as PaymentMethodID,
           XMLGET(src_xml,'SupplierInvoiceNumber'):"$" as SupplierInvoiceNumber,
           XMLGET(src_xml,'TransactionDate'):"$" as TransactionDate,
           XMLGET(src_xml,'AmountExcludingTax'):"$" as AmountExcludingTax,
           XMLGET(src_xml,'TaxAmount'):"$" as TaxAmount,
           XMLGET(src_xml,'TransactionAmount'):"$" as TransactionAmount,
           XMLGET(src_xml,'OutstandingBalance'):"$" as OutstandingBalance,
           XMLGET(src_xml,'FinalizationDate'):"$" as FinalizationDate,
           XMLGET(src_xml,'IsFinalized'):"$" as IsFinalized,
           XMLGET(src_xml,'LastEditedBy'):"$" as LastEditedBy,
           XMLGET(src_xml,'LastEditedWhen'):"$" as LastEditedWhen
           FROM INVOICE_XML)
""");

In [27]:
## 5.Using the joined data from step 4, create a calculated field that shows the difference between  
##   AmountExcludingTax and POAmount,name this field invoiced_vs_quoted, and save the result
##   as a materialized view named purchase_orders_and_invoices
##  a.If your version of Snowflake does not support materialized views then create a table instead 
##    using the join (this applies to all requirements about materialized views)


#CREATING VIEW  purchase_orders_and_invoices
cs.execute("""CREATE VIEW purchase_orders_and_invoices AS
(SELECT 
    A.*,
    B.AMOUNTEXCLUDINGTAX,
    (AmountExcludingTax::numeric-A.POAmount) as invoiced_as_quoted
    FROM Purchase_Order_Header_Updated as A
    JOIN invoice_xml_table as B
    ON A.purchaseorderid = B.PurchaseOrderID AND A.supplierid = B.SupplierID)""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [26]:
## 6.Extract the supplier_case data from postgres, do not import the data into Python, instead
## use Python to move the data from postgres to your local drive and then directly into a Snowflake stage
## a.Consider creating a Python function that can take a csv file path as input and then generate field definitions (field names and datatypes based on the header and data types in the file) that can then be used in CREATE TABLE statement.
## b.You need to use psycopg2 or a similar Python library to connect to the postgres database within Python, issue a command to postgres to have postgres save the supplier_case data to file, and then use cs.execute to move the file to an internal Snowflake stage and eventually into a table.

import psycopg2

s = "SELECT * from supplier_case"

connection = psycopg2.connect(database="rsm-docker",user="jovyan",password="postgres",host="127.0.0.1",port = 8765)
cursor = connection.cursor()

SQL_for_file_output = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(s)

with open(r'supplier_case.csv','w') as f_output:
    cursor.copy_expert(SQL_for_file_output,f_output)

connection.close()

cs.execute("PUT file://supplier_case.csv @ETLXMLStage")
cs.execute("CREATE TABLE IF NOT EXISTS supplier_case(SupplierID INTEGER,SupplierName VARCHAR,SupplierCategoryID INTEGER,PrimaryContactPersonID INTEGER,AlternateContactPersonID INTEGER,DeliveryMethodID INTEGER ,PostalCityID INTEGER,SupplierReference VARCHAR,BankAccountName VARCHAR,BankAccountBranch VARCHAR,BankAccountCode INTEGER,BankAccountNumber NUMERIC,BankInternationalCode INTEGER,PaymentDays INTEGER,InternalComments VARCHAR,PhoneNumber VARCHAR,FaxNumber VARCHAR,WebsiteURL VARCHAR,DeliveryAddressLine1 VARCHAR,DeliveryAddressLine2 VARCHAR,DeliveryPostalCode INTEGER,DeliveryLocation VARCHAR,PostalAddressLine1 VARCHAR,PostalAddressLine2 VARCHAR,PostalPostalCode INTEGER,LastEditedBy INTEGER,ValidFrom VARCHAR,ValidTo VARCHAR)")
cs.execute("COPY INTO supplier_case from @ETLXMLStage on_error=continue")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [29]:
#Creating table WEATHERDATA
cs.execute("""CREATE TABLE WEATHERDATA(geoid numeric, ALAND NUMERIC, AWATER NUMERIC, ALAND_SQMI NUMERIC, AWATER_SQMI NUMERIC, INTPTLAT NUMERIC, INTPTLONG NUMERIC)""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [30]:
#Adding Gazzeteer files into WEATHERDATA through stage
cs.execute("create or replace stage stage_weather_data")
cs.execute("create or replace file format CSVfile type=csv skip_header=1")
cs.execute("PUT 'file://Gaz_zcta_national.csv' @stage_weather_data")
cs.execute("COPY INTO WEATHERDATA from @stage_weather_data file_format=(format_name=csvfile)")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [31]:
## 7.Connect manually inside Snowflake Marketplace to the Knoema Environment Data Atlas data 
##   (find it and click Get).  The name of the dataset that you will be using is 
##   ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R) and then extract weather data for each unique 
##   zip code in the supplier_case table (suppliers can have the same zip code but you only need to 
##   extract weather data for each zip code once).

#CREATE VIEW SUPPLIER_ZIP_CODE_WEATHER
cs.execute("""CREATE VIEW supplier_zip_code_weather as
(select DISTINCT wd.geoid as zipcode,EDA."Date",EDA."Value" 
from weatherdata wd
left join ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R as EDA
on wd.intptlat = EDA."Stations Latitude"::numeric and wd.intptlong = EDA."Stations Longitude"::numeric
join supplier_case sc
on sc.postalpostalcode = wd.geoid
where EDA."Units" = 'Fahrenheit')""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>

In [37]:
## 8.Join purchase_orders_and_invoices, supplier_case, and supplier_zip_code_weather based on zip codes 
##   and the transaction date. Only include transactions that have matching temperature readings

cs.execute("""SELECT *
FROM Purchase_Order_Header_Updated A
JOIN invoice_xml_table B
ON A.purchaseorderid = B.purchaseorderid AND A.supplierid = B.supplierid
JOIN supplier_case C
ON B.supplierid = C.supplierid
join supplier_zip_code_weather D 
on C.postalpostalcode::varchar = D.zipcode""")

<snowflake.connector.cursor.SnowflakeCursor at 0xffff4e7c3ca0>