In [1]:
import os
from pathlib import Path
from tempfile import TemporaryDirectory
os.chdir("/home/jovyan/work")

import boto3
import pandas as pd
import sqlalchemy

In [2]:
# get S3 Connection
s3 = boto3.resource("s3", 
                    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
                    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"]
                   )

# get postgres connection
engine = sqlalchemy.create_engine("postgresql://postgres@postgres_container:5432")

In [6]:
query = sqlalchemy.text('SELECT * FROM pg_catalog.pg_tables WHERE tablename=:table')

with TemporaryDirectory() as tdir:
    
    for obj in s3.Bucket("jgoerner-kaggle").objects.all():
        # skip tables with leading underscore
        name=obj.key
        if name.startswith("_"):
            continue
            
        # start log
        print("#"*(len(name)+15))
        print("# Processing {} #".format(name))
        print("#"*(len(name)+15))
        
        # skip if table already exists
        t_name = "t_original_{}".format(name.replace(".csv", ""))
        tbl_already_exists = pd.read_sql(query, con=engine, params={'table': t_name}).shape[0]
        if tbl_already_exists:
            print("Table '{}' already exists".format(name))
            print("Skipping\n")
            continue
            
        # download file to temporary dir
        path = Path(tdir, name)
        print("Downloading")
        s3.Bucket("jgoerner-kaggle").download_file(name, str(path))
        print("Ingestion")
        df = pd.read_csv(path)
        
        # serialize dataframe to database
        try:
            df.to_sql(t_name, con=engine, if_exists="fail")
        except ValueError as e:
            print(e, end="\n\n")

#########################
# Processing Cities.csv #
#########################
Table 'Cities.csv' already exists
Skipping

#########################################
# Processing ConferenceTourneyGames.csv #
#########################################
Table 'ConferenceTourneyGames.csv' already exists
Skipping

##############################
# Processing Conferences.csv #
##############################
Table 'Conferences.csv' already exists
Skipping

#############################
# Processing GameCities.csv #
#############################
Table 'GameCities.csv' already exists
Skipping

############################################
# Processing NCAATourneyCompactResults.csv #
############################################
Table 'NCAATourneyCompactResults.csv' already exists
Skipping

#############################################
# Processing NCAATourneyDetailedResults.csv #
#############################################
Table 'NCAATourneyDetailedResults.csv' already exists
Skipping

############