# Test manual ETL

In [19]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [20]:
host = os.environ['PGHOSTADDR'] 
password = os.environ['PGPASSWORD'] 

In [21]:
conn = psycopg2.connect(host=host, port="5432", dbname="sparkifydb",  user="postgres",  password=password)
cur = conn.cursor()
conn.autocommit = True

In [22]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## Load Log Staging

In [23]:
all_files = get_files("data/log_data")

dfs = []
for i, datafile in enumerate(all_files, 1):
    df = pd.read_json(datafile, lines=True)
    dfs.append(df)
    
log_data = pd.concat(dfs, ignore_index=True) 

log_data = log_data[(log_data['page']=='NextSong')].astype({'ts': 'datetime64[ms]'})

log_data.to_csv("log_file.csv", sep = '|',index=False, header=False)

In [24]:
with open('log_file.csv',encoding='utf-8') as f:
    cur.copy_from(f, 'log_staging', sep = '|', null='')

## Load Time dim

In [25]:
t = pd.to_datetime(log_data['ts'])

In [26]:
time_data = list((t, t.dt.hour, t.dt.day, t.dt.isocalendar().week, t.dt.month, t.dt.year, t.dt.weekday))
column_labels = list(('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'))

In [27]:
time_df =  pd.DataFrame.from_dict(dict(zip(column_labels, time_data)))

In [28]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    #conn.commit()

## Load User dim


In [29]:
#print(user_table_insert)
cur.execute(user_table_insert)

## Load Song Staging

In [30]:
all_files = get_files("data/song_data")

dfs = []
for i, datafile in enumerate(all_files, 1):
    df = pd.read_json(datafile, lines=True)
    dfs.append(df)

song_data = pd.concat(dfs, ignore_index=True)

song_data.to_csv("song_file.csv", sep = '|',index=False, header=False)

In [31]:
with open('song_file.csv',encoding='utf-8') as f:
    cur.copy_from(f, 'song_staging', sep = '|', null='')

## Load Song dim

In [32]:
cur.execute(song_table_insert)

## Load Artist Dim

In [33]:
cur.execute(artist_table_insert)

## Load fact table

In [34]:
cur.execute(songplay_table_insert)

## Close Connection to Sparkify Database

In [35]:
conn.close()