# ETL Processes

In [1]:
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
import textwrap as tw
from sql_queries import *

In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [3]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`


In [4]:
base_path = os.getcwd()
filepath = os.path.join(base_path, 'data', 'song_data')
song_files = get_files(filepath)

## #1: `songs` Table
#### Extract Data for Songs Table

In [5]:
# function for reading jsons

In [6]:

def read_song_jsons (columns_list, files_list, debug = False):
    '''the function reads json file(s) from the files_list and returns a data frame with columns = columns_list'''
    data_list = []
    j = 0
    for a_file in files_list:
        json_df = pd.read_json(files_list[j], lines=True, convert_axes=False, dtype=False)
        j+=1
        for i, row in json_df.iterrows():
            data = row[columns_list].values.tolist()
            if debug:
                print(type(data[0]))
            data_list.append(data)   
    return(pd.DataFrame(data_list, columns = columns_list))

def trim_str(string_to_trim, length):
    '''if string_to_trim is a string and the length of it is more than "length", the function truncates it to the given length'''
    if (isinstance(string_to_trim, str)):
        if len(string_to_trim) > length:
            return(string_to_trim[:length])
        else:
            return(string_to_trim)
    else:
        return(string_to_trim)
        

def fill_table(dataframe, cursor, connection, q, col_max_length, debug = False):
    '''the function inserts data from the "dataframe" to the "cursor" position of a table. "col_max_length" is a list of max length for\
    string data (for non string arguments it's ignored). "q" -- SQL query for performing insert '''
    for i, row in dataframe.iterrows():
        if debug:
            print((row))
        row_list = row.tolist()    
        for index, el in enumerate(row_list):
            row_list[index] = trim_str(el, col_max_length[index])
        new_row = pd.Series(row_list)
        cursor.execute(q, new_row)
        connection.commit()
    

In [7]:
song_data_df = read_song_jsons(['song_id', 'title', 'duration',  'year','artist_id'], song_files)
song_data_df.head(5)

Unnamed: 0,song_id,title,duration,year,artist_id
0,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0,ARD7TVE1187B99BFB1
1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),266.39628,0,ARNTLGG11E2835DDB9
2,SOIAZJW12AB01853F1,Pink World,269.81832,1984,AR8ZCNI1187B9A069B
3,SOHKNRJ12A6701D1F8,Drop of Rain,189.57016,0,AR10USD1187B99F3F1
4,SOCIWDW12A8C13D406,Soul Deep,148.03546,1969,ARMJAGH1187FB546F3


#### Insert Record into Song Table

In [8]:
max_length = [22, 128, None,  None, 22]
fill_table(song_data_df, cur, conn, song_table_insert, max_length)

## #2: `artists` Table
#### Extract Data for Artists Table

In [9]:
artist_data_df = read_song_jsons(['artist_id', 'artist_name', 'artist_latitude','artist_longitude'\
                                  , 'artist_location'], song_files).drop_duplicates(subset ='artist_id')
artist_data_df.sort_values(by=['artist_id']).head(5)

Unnamed: 0,artist_id,artist_name,artist_latitude,artist_longitude,artist_location
64,AR051KA1187B98B2FF,Wilks,,,
63,AR0IAWL1187B9A96D0,Danilo Perez,8.4177,-80.11278,Panama
26,AR0RCMP1187FB3F427,Billie Jo Spears,30.08615,-94.10158,"Beaumont, TX"
3,AR10USD1187B99F3F1,Tweeterfriendly Music,,,"Burlington, Ontario, Canada"
32,AR1Y2PT1187FB5B9CE,John Wesley,27.94017,-82.32547,Brandon


#### Insert Record into Artist Table

In [10]:
max_length = [22, 128, None,  None, 128]
fill_table(artist_data_df, cur, conn, artist_table_insert, max_length)

# Process `log_data`

In [11]:
filepath2 = os.path.join(base_path, 'data', 'log_data')
log_files =  get_files(filepath2)
columns_log = ['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration'\
           ,'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']

In [12]:
df = read_song_jsons(columns_log, log_files, debug=False)
log_df = df[df['page'] == 'NextSong']

## #3: `time` Table
#### Extract Data for Time Table

In [13]:
time_df = log_df.copy()

In [14]:
time_df['timestamp'] = pd.to_datetime(time_df['ts'] , unit='ms')

In [15]:
time_df['new_col'] = time_df['timestamp'].apply(lambda row: [row.hour, row.day, row.weekofyear, row.month, row.year, row.dayofweek])

In [16]:
time_df[['hour', 'day', 'weekofyear', 'month', 'year', 'dayofweek']]= pd.DataFrame(time_df['new_col'].tolist(), index = time_df.index)

In [17]:
columns_time =  ['sessionId', 'itemInSession', 'timestamp', 'hour', 'day', 'weekofyear', 'month', 'year', 'dayofweek']
time_df_for_insert = time_df[columns_time]

#### Insert Records into Time Table

In [18]:
max_length = [None, None, None,  None, None, None, None, None, None]
fill_table(time_df_for_insert, cur, conn, time_table_insert, max_length)

## #4: `users` Table
#### Extract Data for Users Table

In [19]:
user_columns = ['userId', 'lastName', 'gender', 'firstName', 'level']

In [20]:
user_df = log_df[user_columns].drop_duplicates(subset='userId').copy()

#### Insert Records into Users Table

In [21]:
max_length = [11, 64, 1,  64, 4]
fill_table(user_df, cur, conn, user_table_insert, max_length)

## #5: `songplays` Table
#### Extract Data and Songplays Table
#### Insert Records into Songplays Table

In [22]:
play_columns = ['song', 'artist', 'length', 'sessionId', 'location', 'itemInSession', 'userAgent', 'userId', 'level','ts']

In [23]:
plays_df = log_df[play_columns].copy()

In [24]:
def convert_to_datetime(ms):
    df = pd.DataFrame([ms], columns = ['ms'])
    df['timestamp'] = pd.to_datetime(df['ms'] , unit='ms')
    return (df['timestamp'].iloc[0])

In [25]:
for i, row in plays_df.iterrows():
    cur.execute(song_select, (row.song, float(str(row.length))))#WORKING!!!!!!!!!!!!
    results = cur.fetchone()
    if results:
        songid, artistid = results
    else:
        songid, artistid = None,None        
    my_dict = row.to_dict()
    songplay_data = (trim_str(my_dict.get('userId'), 11), trim_str(songid,22), trim_str(artistid,22), my_dict.get('sessionId')\
                     , trim_str(my_dict.get('location'), 128), trim_str(my_dict.get('userAgent'),192), trim_str(my_dict.get('level'),4), convert_to_datetime(my_dict.get('ts')))
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

## Close Connection to Sparkify Database

In [26]:
conn.close()