### ETL Project - Group 7

#### Srikar Gunisetty - Hui (Ines) Ye - Mouli Krishna Palacharla

In [1]:
import snowflake.connector as sf

In [2]:
### User Variable declaration ###
sfAccount = "IWA31950"
sfUser = "sgunisetty"
sfPswd = "Snow1993Flake***"
sfDbName = "ASSIGNMENT_DB"
sfSchemaName = "ASSIGNMENT_SCHEMA"

#Stage Names
sfCsvStageName = "ASSIGNMENT_CSV_STAGE"
sfXmlStageName = "ASSIGNMENT_XML_STAGE"
sfPsqlStageName = "ASSIGNMENT_PSQL_STAGE"
sfUSzipStageName = "ASSIGNMENT_USZIP_STAGE"

#Warehouse Name
sfWareHouseName = "ASSIGNMENT_WH"
sfCsvFileFormatName = "ASSIGNMENT_CSV_FF"
sfXmlFileFormatName = "ASSIGNMENT_XML_FF"

#RelativePaths
sfCsvDataRelativePath = "./Monthly_PO_Data"
sfXmlDataRelativePath = "./XML_Data"
sfPsqlDataRelativePath = "./PSQL_Data"
sfUSzipDataRelativePath = "./US_zipcode_data"

#Table Names
sfPurchasesTableName = "purchases_data"
sfSupplierInvoicesTableName = "supplier_invoices"
sfSupplierCaseTableName = 'supplier_case'
sfuszipcodeTableName = 'us_zip'

### Connection Variable Initialization ###
sfConnection = None
sfWareHouseSize = 'X-Small'
sfWhautoSuspendDuration = 300
sfWhShouldAutoResume = True

In [3]:
# Function to establish connection to SnowFlake using user credentials

def connectToSf(account, userName, password):
    try:
        global sfConnection
        sfConnection = sf.Connect(account=account, user=userName, password=password)
        sfq = sfConnection.cursor()
        sfq.execute("Select current_version()")
        sfResults = sfq.fetchall()
        print("Connected to Snowflake Version: " + sfResults[0][0])
        sfq.close()
    except:
        print("Connection to snowflake failed. Please check the credentials before retrying!")

In [4]:
# Function to create a Database and Schema in SnowFlake

def createDbAndSchema(dbName, schemaName):
    sfq = sfConnection.cursor()
    sfq.execute("CREATE DATABASE IF NOT EXISTS {0}".format(dbName))
    sfq.execute("USE DATABASE {0}".format(dbName))
    sfq.execute("CREATE SCHEMA IF NOT EXISTS {0}".format(schemaName))
    sfq.execute("USE SCHEMA {0}".format(schemaName))
    sfq.close()
    print("Database and Schema with {0} and {1} have been created".format(dbName, schemaName))

In [5]:
# Function to create a stage in SnowFlake

def createStage(stageName):
    sfq = sfConnection.cursor()
    sfq.execute("CREATE STAGE IF NOT EXISTS {0}".format(stageName))
    sfq.close()
    print("Stage with name {0} has been created".format(stageName))

In [6]:
# Function to create Warehouse

def createWareHouse(wareHouseName, wareHouseSize, autoSuspendDuration, shouldAutoResume):
    sfq = sfConnection.cursor()
    sfq.execute("CREATE WAREHOUSE IF NOT EXISTS {0} WAREHOUSE_SIZE = '{1}' AUTO_SUSPEND = {2} AUTO_RESUME = {3}".format(wareHouseName, wareHouseSize,autoSuspendDuration,shouldAutoResume))
    sfq.execute("USE WAREHOUSE {0}".format(wareHouseName))
    sfq.close()
    print("Created a warehouse with name {0}".format(wareHouseName))

In [7]:
# Function to create file formats for transforming data from csv/xml to SnowFlake Tables

def createFileFormat(ffName, ffType):
    sfq = sfConnection.cursor()
    if(ffType == "CSV"):
        sfq.execute("CREATE FILE FORMAT IF NOT EXISTS {0} TYPE = CSV FIELD_DELIMITER = ',' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '\042'".format(ffName))
    else:
        sfq.execute("CREATE FILE FORMAT IF NOT EXISTS {0} TYPE = XML STRIP_OUTER_ELEMENT = TRUE".format(ffName))
    sfq.close()

In [8]:
# Function to upload local files to respective stages

def uploadLocalFiles(targetLocation, targetStage):
    from os import listdir
    from os.path import isfile, join, abspath
    filesList = [f for f in listdir(targetLocation) if isfile(join(targetLocation, f))]
    sfq = sfConnection.cursor()
    for file in filesList:
        currPath = join(targetLocation, file)
        f = "file://" + abspath(currPath)
        sfq.execute("PUT '{0}' @{1} OVERWRITE = TRUE".format(f, targetStage))
    sfq.close()
    print("Uploaded {0} files to {1} stage".format(str(len(filesList)),targetStage))

In [9]:
# Debugging tool to verify contents of the stage

def listFilesInStage(stageName):
    sfq = sfConnection.cursor()
    sfq.execute("LIST @{0}".format(stageName))
    results = sfq.fetchall()
    sfq.close()
    print(results)

In [10]:
# Function to drop existing database and warehouse

def dropDBAndWh(dbName, whName):
    sfq = sfConnection.cursor()
    sfq.execute("DROP DATABASE IF EXISTS {0}".format(dbName))
    sfq.execute("DROP WAREHOUSE IF EXISTS {0}".format(whName))
    sfq.close()
    print("Dropped {0} Database and {1} warehouse if exists already".format(dbName, whName))

In [11]:
#Function to create a new empty table in SnowFlake

def createTable(tableName, query):
    sfq = sfConnection.cursor()
    sfq.execute(query)
    sfq.close()
    print("Created a new table with name {0}".format(tableName))

In [12]:
# Function to copy contents of CSV files to the empty tables in SnowFlake

def copyCsvContentsToTable(tableName, stageName, ffName):
    sfq = sfConnection.cursor()
    query = "copy into {0} from (select t.$1, t.$2, t.$14, t.$17, t.$19 from @{1} t) file_format = {2};".format(tableName, stageName, ffName)
    sfq.execute(query)
    sfq.close()
    print("Copied the contents of csv to the table {0}".format(tableName))

In [13]:
# Function to copy contents of CSV files to the empty tables in SnowFlake

def copyCsvContentsToTable_2(tableName, stageName, ffName):
    sfq = sfConnection.cursor()
    query = "copy into {0} from (select t.$1, t.$2, t.$3, t.$25 from @{1} t) file_format = {2};".format(tableName, stageName, ffName)
    sfq.execute(query)
    sfq.close()
    print("Copied the contents of csv to the table {0}".format(tableName))

In [14]:
# Function to copy contents of CSV files to the empty tables in SnowFlake

def copyCsvContentsToTable_3(tableName, stageName, ffName):
    sfq = sfConnection.cursor()
    query = "copy into {0} from (select t.$1, t.$2, t.$3 from @{1} t) file_format = {2};".format(tableName, stageName, ffName)
    sfq.execute(query)
    sfq.close()
    print("Copied the contents of csv to the table {0}".format(tableName))

In [15]:
# Function to create an empty table in SnowFlake

def createVariantTable(tableName):
    sfq = sfConnection.cursor()
    query = "Create or replace table {0} ( src_data variant)".format(tableName)
    sfq.execute(query)
    sfq.close()
    print("Created a variant table with name {0}".format(tableName))

In [16]:
# Function to copy contents of XML files to the empty tables in SnowFlake

def copyXmlContentsToTable(tableName, stageName, ffName):
    sfq = sfConnection.cursor()
    query = "copy into {0} from @{1} file_format = {2}".format(tableName, stageName, ffName)
    sfq.execute(query)
    sfq.close()
    print("Copied Xml contents to temporary table")

In [17]:
# Fnction to execute any query

def executeAnyQuery(query):
    sfq = sfConnection.cursor()
    sfq.execute(query)
    sfq.close()

In [18]:
# Connecting to SF
connectToSf(sfAccount, sfUser, sfPswd)

Connected to Snowflake Version: 6.3.3


In [19]:
# Dropping existing database and warehouse
dropDBAndWh(sfDbName, sfWareHouseName)

Dropped ASSIGNMENT_DB Database and ASSIGNMENT_WH warehouse if exists already


In [20]:
# Creating DATABASE and SCHEMA in SF
createDbAndSchema(sfDbName, sfSchemaName)

Database and Schema with ASSIGNMENT_DB and ASSIGNMENT_SCHEMA have been created


In [21]:
# Creating a stage to upload local data to SF
createStage(sfCsvStageName)
createStage(sfXmlStageName)
createStage(sfPsqlStageName)
createStage(sfUSzipStageName)

Stage with name ASSIGNMENT_CSV_STAGE has been created
Stage with name ASSIGNMENT_XML_STAGE has been created
Stage with name ASSIGNMENT_PSQL_STAGE has been created
Stage with name ASSIGNMENT_USZIP_STAGE has been created


In [22]:
# Creating a warehouse to load the files
createWareHouse(sfWareHouseName, sfWareHouseSize, sfWhautoSuspendDuration, sfWhShouldAutoResume)

Created a warehouse with name ASSIGNMENT_WH


In [23]:
# Creating File format for transforming data from csv to Sf tables
createFileFormat(sfCsvFileFormatName, "CSV")
createFileFormat(sfXmlFileFormatName, "XML")

In [24]:
# Uploading local files to the stage
uploadLocalFiles(sfCsvDataRelativePath, sfCsvStageName)
uploadLocalFiles(sfXmlDataRelativePath, sfXmlStageName)
uploadLocalFiles(sfPsqlDataRelativePath, sfPsqlStageName)
uploadLocalFiles(sfUSzipDataRelativePath, sfUSzipStageName)

Uploaded 41 files to ASSIGNMENT_CSV_STAGE stage
Uploaded 1 files to ASSIGNMENT_XML_STAGE stage
Uploaded 1 files to ASSIGNMENT_PSQL_STAGE stage
Uploaded 1 files to ASSIGNMENT_USZIP_STAGE stage


In [25]:
# View contents of the stage
# listFilesInStage(sfCsvStageName)

In [26]:
# Create a table 
query = r"""create or replace table {0} (
        PurchaseOrderID INTEGER,
        SupplierID INTEGER,
        StockItemID INTEGER,
        ReceivedOuters INTEGER,
        ExpectedUnitPricePerOuter DOUBLE
    );""".format(sfPurchasesTableName)
createTable(sfPurchasesTableName, query)

# Temporary Table to load XML Files
createVariantTable("temporary_table")

Created a new table with name purchases_data
Created a variant table with name temporary_table


In [27]:
# Create a table for Supplier Case
query = r"""create or replace table {0} (
        SupplierID INTEGER,
        suppliername STRING,
        suppliercategoryid INTEGER,
        postalpostalcode INTEGER
    );""".format(sfSupplierCaseTableName)
createTable(sfSupplierCaseTableName, query)

Created a new table with name supplier_case


In [28]:
# Creating a new table with US zipcodes and their respective latitudes and longitudes
# Source of data =>
# https://gist.github.com/abatko/ee7b24db82a6f50cfce02afafa1dfd1e

# Create a table for Supplier Case
query = r"""create or replace table {0} (
        zip INTEGER,
        lat DOUBLE,
        long DOUBLE
    );""".format(sfuszipcodeTableName)
createTable(sfuszipcodeTableName, query)

Created a new table with name us_zip


In [29]:
# Upload contents of stage to the newly created table
copyCsvContentsToTable(sfPurchasesTableName, sfCsvStageName, sfCsvFileFormatName)
copyXmlContentsToTable("temporary_table", sfXmlStageName, sfXmlFileFormatName)
copyCsvContentsToTable_2(sfSupplierCaseTableName, sfPsqlStageName,sfCsvFileFormatName)
copyCsvContentsToTable_3(sfuszipcodeTableName, sfUSzipStageName,sfCsvFileFormatName)

Copied the contents of csv to the table purchases_data
Copied Xml contents to temporary table
Copied the contents of csv to the table supplier_case
Copied the contents of csv to the table us_zip


In [30]:
query = r"""create table {0} as select 
            XMLGET(src_data, 'SupplierTransactionID'):"$"::integer as SupplierTransactionID,
            XMLGET(src_data, 'SupplierID'):"$"::integer as SupplierID,
            XMLGET(src_data, 'PurchaseOrderID'):"$" as PurchaseOrderID,          
            XMLGET(src_data, 'SupplierInvoiceNumber'):"$" as SupplierInvoiceNumber,
            XMLGET(src_data, 'TransactionDate'):"$" as TransactionDate,
            XMLGET(src_data, 'AmountExcludingTax'):"$"::double as AmountExcludingTax,
            XMLGET(src_data, 'TaxAmount'):"$" as TaxAmount,
            XMLGET(src_data, 'TransactionAmount'):"$"::float as TransactionAmount
            from {1};""".format(sfSupplierInvoicesTableName, "temporary_table")
createTable(sfSupplierInvoicesTableName, query)

Created a new table with name supplier_invoices


In [31]:
# Delete the temporary table
query = "Drop table {0}".format("temporary_table")
executeAnyQuery(query)

In [32]:
## Q2: 
query = "CREATE or replace table {0} as SELECT purchaseorderid, sum(ReceivedOuters * ExpectedUnitPricePerOuter) AS POAmount FROM {1} GROUP BY purchaseorderid ORDER BY purchaseorderid".format("POAmount_table", sfPurchasesTableName)
executeAnyQuery(query)

In [33]:
## Q4 - join POAmount_table with supplier_invoices
query = "create or replace table {0} as select a.POAmount, b.* from {1} as a inner join {2} as b on a.purchaseorderid = b.PurchaseOrderID order by a.PurchaseOrderID;".format("supplier_poamount_merged", "POAmount_table", sfSupplierInvoicesTableName)
executeAnyQuery(query)

In [34]:
## Q5 create a materialized view
query = "create or replace materialized view purchase_orders_and_invoices as select purchaseorderid, (POAmount-AmountExcludingTax) as invoiced_vs_quoted  from supplier_poamount_merged"
executeAnyQuery(query)

In [35]:
## Q6
import psycopg2

conn = psycopg2.connect( host="127.0.0.1", port="8765",database="ETL_Project", user="jovyan", password="postgres")
cursor = conn.cursor()
# Loading psql file to databse
cursor.execute(open("./supplier_case.pgsql", "r").read())
# Exporting table to csv file
with open("./supplier_case.csv", "w") as file:
    cursor.copy_expert("COPY (SELECT * from supplier_case) TO STDOUT WITH CSV DELIMITER ',' HEADER", file)
conn.commit()
conn.close()

In [36]:
## Q7-Part a & b

#Extracting weather data from Environmental Database and pivoting to reflect stations
query = """
create or replace table noaa_gsod
cluster by (station_id, date)
as (
    select *
    from (
        select "Stations", "Date", "Stations Latitude", "Stations Longitude","Stations Name", "Country", "Indicator Name", "Value"
        from ENVIRONMENT_DATA_ATLAS.ENVIRONMENT.NOAACD2019R
        where "Date">='2020-01-01'
        and "Measure"='M1'
        and "Country"='US'
    )
    pivot(max("Value") for "Indicator Name" in ('Maximum temperature (Fahrenheit)','Mean dew point (Fahrenheit)','Minimum temperature (Fahrenheit)','Mean temperature (Fahrenheit)'))
    as p(station_id, date, station_lat, station_long, name, country_fips, max, dew, min, mean_temp)
);"""

executeAnyQuery(query)

In [37]:
#Combining zip codes and weather data
query ="""
create or replace table stations_city
as (
    select a.station_id, date, a.name, a.country_fips, a.max, a.dew, a.min, a.mean_temp, st_distance(st_makepoint(a.station_long, a.station_lat), st_makepoint(b.long, b.lat)) AS distance, b.zip
    from noaa_gsod AS a
    join ASSIGNMENT_DB.ASSIGNMENT_SCHEMA.us_zip AS b
    on st_distance(st_makepoint(a.station_long, a.station_lat), st_makepoint(b.long, b.lat)) < 50000
    LIMIT 1000
);"""
executeAnyQuery(query)

In [38]:
#Combining supplier zip codes and weather data zip codes

query ="""
create or replace table supplier_zip_code_weather as
(SELECT A.*,B.*
FROM supplier_case AS A
INNER JOIN stations_city AS B
ON A.postalpostalcode = B.zip
ORDER BY ZIP);"""
executeAnyQuery(query)

In [39]:
## Q8 -> Final Merged Database
query = """
create or replace table combined_merged_table as
(SELECT A.poamount, A.purchaseorderid, A.transactiondate, B.*
FROM supplier_poamount_merged AS A
INNER JOIN supplier_zip_code_weather AS B
ON A.supplierid = B.supplierid);"""
executeAnyQuery(query)