In [3]:
import zipfile
import os

In [2]:
with zipfile.ZipFile("data/adtracking.zip", "r") as zip_ref:
    zip_ref.extractall("data")

In [6]:
for file in os.listdir("data"):
    if file.endswith(".zip"):
        print(file)

adtracking.zip


In [1]:
import os
import json
import pandas as pd 
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

In [2]:
sf_account_id = ""
sf_username = ""
sf_password = ""

In [3]:
# connect to Snowflake account
conn = snowflake.connector.connect(user=sf_username,
                                   password=sf_password,
                                   account=sf_account_id,
                                   protocol="https")

In [4]:
train_df = pd.read_csv('data/train.csv')
test_df = pd.read_csv("data/test.csv")

In [5]:
dwh = "amazon_sagemake_w_snowflake_as_datasource"
db = "adtracking"
schema = "adtracking_schema"
table_training = "adtracking_clicks_train"
table_testing = "adtracking_clicks_test"

In [6]:
conn.cursor().execute(f"CREATE WAREHOUSE IF NOT EXISTS {dwh}")
conn.cursor().execute(f"USE WAREHOUSE {dwh}")
conn.cursor().execute(f"CREATE DATABASE IF NOT EXISTS {db}")
conn.cursor().execute(f"USE DATABASE {db}")

<snowflake.connector.cursor.SnowflakeCursor at 0x1c33475dc00>

In [7]:
train_df.head()

Unnamed: 0,ip,app,device,os,channel,click_time,attributed_time,is_attributed
0,83230,3,1,13,379,2017-11-06 14:32:21,,0
1,17357,3,1,19,379,2017-11-06 14:33:34,,0
2,35810,3,1,13,379,2017-11-06 14:34:12,,0
3,45745,14,1,13,478,2017-11-06 14:34:52,,0
4,161007,3,1,13,379,2017-11-06 14:35:08,,0


In [8]:
test_df.head()

Unnamed: 0,click_id,ip,app,device,os,channel,click_time
0,0,5744,9,1,3,107,2017-11-10 04:00:00
1,1,119901,9,1,3,466,2017-11-10 04:00:00
2,2,72287,21,1,19,128,2017-11-10 04:00:00
3,3,78477,15,1,13,111,2017-11-10 04:00:00
4,4,123080,12,1,13,328,2017-11-10 04:00:00


In [9]:
test_df['click_time'] = pd.to_datetime(test_df['click_time'])
train_df['click_time'] = pd.to_datetime(train_df['click_time'])

In [10]:
train_df['click_time'] = pd.to_datetime(train_df['click_time'])

In [11]:
# connect to Snowflake Table schema
conn.cursor().execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
conn.cursor().execute(f"USE SCHEMA {schema}")

<snowflake.connector.cursor.SnowflakeCursor at 0x1c3345493c0>

In [13]:
def table_creation(df, table_name, conn=conn, db=db, schema=schema):
    create_table_sql = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name}\n ("

    df.rename(columns=str.upper, inplace=True)
    df.columns

    # iterating through the columns.  Note that we only have int64, object, and datetime64[ns].  Others for completeness
    for col in df.columns:
        column_name = col.upper()

        if (df[col].dtype.name == "int" or df[col].dtype.name == "int64"):
            create_table_sql = create_table_sql + column_name + " int"
        elif df[col].dtype.name == "object":
            create_table_sql = create_table_sql + column_name + " varchar(16777216)"
        elif df[col].dtype.name == "datetime64[ns]":
            create_table_sql = create_table_sql + column_name + " datetime"
        elif df[col].dtype.name == "float64":
            create_table_sql = create_table_sql + column_name + " float8"
        elif df[col].dtype.name == "bool":
            create_table_sql = create_table_sql + column_name + " boolean"
        else:
            create_table_sql = create_table_sql + column_name + " varchar(16777216)"

        # deciding next steps. Either column is not the last column (add comma) else end create_tbl_statement
        if df[col].name != df.columns[-1]:
            create_table_sql = create_table_sql + ",\n"
        else:
            create_table_sql = create_table_sql + ")"

            # execute the SQL statement to create the table
            print(f"create_table_sql={create_table_sql}")
            conn.cursor().execute(create_table_sql)  
    print(f"snowflake_table={table_name}")
    conn.cursor().execute(f'TRUNCATE TABLE IF EXISTS {table_name}')    

In [14]:
table_creation(test_df, table_testing)

create_table_sql=CREATE TABLE IF NOT EXISTS adtracking.adtracking_schema.adtracking_clicks_test
 (CLICK_ID int,
IP int,
APP int,
DEVICE int,
OS int,
CHANNEL int,
CLICK_TIME datetime)
snowflake_table=adtracking_clicks_test


In [17]:
table_creation(train_df, table_training)

create_table_sql=CREATE TABLE IF NOT EXISTS adtracking.adtracking_schema.adtracking_clicks_train
 (IP int,
APP int,
DEVICE int,
OS int,
CHANNEL int,
CLICK_TIME datetime,
ATTRIBUTED_TIME varchar(16777216),
IS_ATTRIBUTED int)
snowflake_table=adtracking_clicks_train


In [19]:
print(f"database={db}, schema={schema}, snowflake_table={table_training}")
# Write the data from the DataFrame to the Snowflake table.
write_pandas(conn=conn,
            df=train_df,
            table_name=table_training.upper(),
            database=db.upper(),
            schema=schema.upper())

database=adtracking, schema=adtracking_schema, snowflake_table=adtracking_clicks_train


(True,
 1,
 184903890,
 [('lcfrstokax/file0.txt',
   'LOADED',
   184903890,
   184903890,
   1,
   0,
   None,
   None,
   None,
   None)])

In [20]:
# Had errors due to pyarrow versioning, can use below for both tables if needed in the future

# for table_name, df in zip([table_training, table_testing], [train_df, test_df]):
#     print(f"database={db}, schema={schema}, snowflake_table={table_name}")
#     # Write the data from the DataFrame to the Snowflake table.
#     write_pandas(conn=conn,
#                 df=df,
#                 table_name=table_name.upper(),
#                 database=db.upper(),
#                 schema=schema.upper())

In [None]:
conn.close()

In [25]:
for file in os.listdir("data"):
    if file.endswith(".zip"):
        continue
    else:
        os.remove(fr"data/{file}")