# ETL Processes
- imports sql queries and required modules

In [None]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

## Connection to the POSTGRE server

In [None]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

## `Function` to return JSON files for given path

In [None]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    return all_files

## First song file for the list of song_data JSON files

In [None]:
song_files = get_files('data/song_data/')[0]

In [None]:
print(song_files)

## Read the JSON song_data file using pandas Dataframe

In [None]:
df = pd.read_json(song_files,lines=True)

[[1, 2], [4, 5], [7, 8]]

## Extracting song ID, title, artist ID, year, and duration for dataframe

In [None]:
song_data = list(df.loc[:,['song_id','title','artist_id', 'year','duration']].values[0])

#### Insert song data into SONG_TABLE

In [None]:
cur.execute(song_table_insert, song_data)
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

## Inserting data into `artist` table

In [None]:
artist_data = list(df.loc[:,['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']].values[0]) 

cur.execute(artist_table_insert, artist_data)
conn.commit()

## Processing data `log_data` folder 

In [None]:
log_files = get_files('data/log_data')

In [None]:
filepath = log_files[0]

## Load log data from file into DataFrame

In [None]:
df = pd.read_json(filepath,lines=True)  

## Filter where page == 'NextSong'

In [None]:
df = df[df.page == 'NextSong'] 

## Populate `time` table

In [None]:
t = pd.to_datetime(df.ts,unit='ms') 
time_data =  list((t,t.dt.hour,t.dt.day,t.dt.weekofyear,t.dt.month,t.dt.year,t.dt.weekday))
column_labels = (['time','hour','day','weekofyear','month','year','weekday'])

In [None]:
time_df = pd.DataFrame({

                        'time':pd.to_datetime(df.ts,unit='ms')
                         ,'hour':pd.to_datetime(df.ts,unit='ms').dt.hour
                        ,'day':pd.to_datetime(df.ts,unit='ms').dt.day
                        ,'weekofyear':pd.to_datetime(df.ts,unit='ms').dt.weekofyear
                       ,'month':pd.to_datetime(df.ts,unit='ms').dt.month
                       ,'year':pd.to_datetime(df.ts,unit='ms').dt.year
                        ,'weekday':pd.to_datetime(df.ts,unit='ms').dt.weekday
                      }) 

## Insert Records into `Time` Table 

In [None]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## Populate `Users` table

In [None]:
user_df = df.loc[:,['userId', 'firstName','lastName','gender','level' ]]

## Inserting Records into `Users` Table


In [None]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

## Populating `songplays` Table

In [None]:
song_select = (""" SELECT songs.song_id, artists.artist_id  FROM songs join artists on songs.artist_id = artists.artist_id where songs.title = (%s) and artists.name = (%s) and songs.duration = (%s)""")
for index, row in df.iterrows():
    
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None


    songplay_data = (pd.to_datetime(row.ts,unit='ms'),row.userId,row.level,songid, artistid , row.sessionId,row.location,row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)    
    conn.commit()

## Close Connection to `Sparkify` Database

In [28]:
conn.close()