# Bulk Upload of raw data to snowflake using pandas

In this notebook we will be connecting to snowflake and uploading data from all the files in a particular folder to the specified table in snowflake. If the table already exists then we will be appending data else will create a new table and add data to it.

First run all the necessary import statements. 
Please install snowflake.connector and snowflake.sqlalchemy. 

In [27]:
import pandas as pd
import snowflake.connector
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
import os
from pandas import DataFrame
import sys
import datetime

Below are the credentials to connect to our isntance of snowflake.

In [28]:
# Credentials for connecting to snowflake. Migrate to properties file in the future.
USER = 'heinzlimetree'
ACCOUNT = 'um21928.us-east-1'
WAREHOUSE = 'COMPUTE-WH'
DATABASE = 'TEST'
SCHEMA = 'PUBLIC'
PASSWORD = 'Limetree123'

## Bulk upload to a table from a folder

The below block of code will access the specified directory and traverse through each folder in the directory. It will then pick each folder and load it as a separate table in snowflake if the file not been already loaded. The data within each file will be read into a dataframe. We then create a connection to snowflake using sqlalchemy. The data from the dataframe is then appended into the specified table if the table already exists in snowflake else it will create a new table.

All the files that error out will be recorded in the logs.txt file.

In [29]:
engine = create_engine(URL(
        user=USER,
      password=PASSWORD,
      account=ACCOUNT,
      database=DATABASE,
      schema=SCHEMA,
      warehouse = WAREHOUSE
    ))
connection = engine.connect()

In [None]:
connection.execute('USE Schema TEST.public')
connection.execute('USE warehouse COMPUTE_WH')

### WARNING : Run this next block of code only if the file is being run for the first time.

In [30]:
connection.execute(''' CREATE or REPLACE TABLE 
  loaded_files
    ( FILE_NAME varchar,
    TABLE_NAME varchar)''')

<sqlalchemy.engine.result.ResultProxy at 0x263e4a91128>

In [31]:
loaded_files_df = pd.read_sql_query("SELECT * FROM LOADED_FILES", engine)
folders_directory = 'D:\\Coursework\\Capstone\\test'
folders = os.listdir(folders_directory)# Change folder name
print(folders)

['additional_noninterest_income', 'dep_basedon_100krepthreshold', 'small_business_loans', 'unused_commitments_sec']


In [None]:
with open("logs.txt", "a") as myfile:
    myfile.write(str(datetime.datetime.now())+"\n")
    for folder in folders:
        files = os.listdir(folders_directory + '\\' + folder)
        for file in files:
            print('file:',file)
            if file not in loaded_files_df.file_name.tolist():
                try:
                    current_file = {'table_name': [folder], 'file_name' : [file]}
                    current_file_df = DataFrame(current_file,columns= ['table_name', 'file_name'])
                    data = pd.read_csv(folders_directory + '\\' + folder + '\\' + file, encoding='iso-8859-1')
                    data.to_sql(folder, con=connection, index=False, chunksize=200, if_exists='append') #make sure index is False, Snowflake doesnt accept indexes
                    current_file_df.to_sql('loaded_files', con=connection, index=False, chunksize=200, if_exists='append')
                except:
                    myfile.write(file+"\n")
                    myfile.write(str(sys.exc_info())+"\n")
                    print("Oops!",sys.exc_info(),"occured.")
                    print("Next entry.")
                    print()                

In [33]:
connection.close()
engine.dispose()
print('done')

done


## Bulk upload from a file to a table

This block of code is very similar to the above one only difference is we will now only upload one file at a time from the specified path to the specified table.

In [None]:
folder_path = 'D:\\Coursework\\Capstone'
data = pd.read_csv(folder_path + file) 
engine = create_engine(URL(
        user=USER,
      password=PASSWORD,
      account=ACCOUNT,
      #warehouse=WAREHOUSE,
      database=DATABASE,
      schema=SCHEMA,
      warehouse = 'compute_wh'
))

connection = engine.connect()
#change table name
data.to_sql('current_reporting_institutions', con=engine, index=False, chunksize=200,if_exists='append') #make sure index is False, Snowflake doesnt accept indexes

connection.close()
engine.dispose()