# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [1]:
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from sql_queries import *
from pandas.io.json import json_normalize
from copy import deepcopy

In [48]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=johnrick")
cur = conn.cursor()

In [3]:
# conn.close()

In [4]:
# set autocommit to true
conn.set_session(autocommit=True)

In [5]:
# function to get files
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [6]:
#uses the get_files function to get a list of the song files
song_files = get_files('data/song_data')

In [7]:
#create a dataframe, adds each json file to dataframe
df = pd.DataFrame()
for song in range(len(song_files)):
    dfItem = pd.read_json(song_files[song], lines=True)
    df = df.append(dfItem, ignore_index = True)

In [8]:
df.head(2)

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
1,1,AR8ZCNI1187B9A069B,,,,Planet P Project,SOIAZJW12AB01853F1,Pink World,269.81832,1984


## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [9]:
# instead of iloc[0], values[0] turns everything into int and float
df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[1].tolist()

['SOIAZJW12AB01853F1', 'Pink World', 'AR8ZCNI1187B9A069B', 1984, 269.81832]

In [10]:
# just getting the first song. Use tolist()
song_data = df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[1].tolist()

In [11]:
# extract just the relevant columns
song_data_df = df[['song_id', 'title', 'artist_id', 'year', 'duration']]

In [12]:
## check what the data types are
# for i in song_data:
#     print(type(i))

#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

In [13]:
# #insert 1 row
# cur.execute(song_table_insert, song_data)
# conn.commit()

In [14]:
# for loop to insert all rows
for i, row in song_data_df.iterrows():
    cur.execute(song_table_insert, row)
    conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [15]:
# save artist data
artist_data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist()
artist_data

['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', nan, nan]

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [16]:
# # testing insert on 1 artist
# cur.execute(artist_table_insert, artist_data)
# conn.commit()

In [17]:
# save artists to a dataframe
artist_data_df = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]

In [18]:
# for loop to insert artist table 
for i, row in artist_data_df.iterrows():
    cur.execute(artist_table_insert, row)
    conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [19]:
#use the get_files function to get a list of the json files
log_files = get_files('data/log_data')

In [20]:
# example of how to get one song_play row
song_play = pd.read_json(log_files[0], lines=True)

In [21]:
# confirm that the columns are the same for each log file
pd.read_json(log_files[0], lines=True).columns == pd.read_json(log_files[2], lines=True).columns

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True])

In [22]:
#put everything into one dataframe
song_play = pd.DataFrame()
for i in range(len(log_files)):
    dfItem = pd.read_json(log_files[i], lines=True)
    song_play = song_play.append(dfItem)
    

In [23]:
#reseting index because we have included many indeces of the same name
song_play = song_play.reset_index()

In [24]:
song_play.head(2)

Unnamed: 0,index,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [25]:
# Filter records by NextSong action in the page column
df3 = deepcopy(song_play[song_play['page'] =='NextSong'])

In [26]:
df3= df3.reset_index()

In [27]:
# drop "index" column
df3.drop(columns = ['index'], inplace=True)

In [28]:
#convert ts column to date time
df3.ts = pd.to_datetime(df3.ts, unit='ms')

Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order

Hint: use pandas' dt attribute to access easily datetimelike properties.

Specify labels for these columns and set to column_labels

Create a dataframe, time_df, containing the time data for this file by combining column_labels and time_data into a dictionary and converting this into a dataframe

In [29]:
# different .dt functions 
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.dayofweek.html
    

In [30]:
#create an empty dataframe
time_df = pd.DataFrame()

In [31]:
#save all the relevent columns into a list
cols = [df3.ts.dt.time, df3.ts.dt.hour,df3.ts.dt.day, df3.ts.dt.weekofyear,df3.ts.dt.month,df3.ts.dt.year, df3.ts.dt.weekday]

In [32]:
#name of columns, must be the same as the order of list of time_data
df_time_col_names = ['time', 'hour', 'day', 'weekofyear', 'month', 'year', 'weekday']

In [33]:
# create dataframe with each column saved
for i in range(len(df_time_col_names)):
    time_df[df_time_col_names[i]] = cols[i]

In [34]:
time_df.head(2)

Unnamed: 0,time,hour,day,weekofyear,month,year,weekday
0,02:33:56.796000,2,11,45,11,2018,6
1,02:36:10.796000,2,11,45,11,2018,6


In [35]:
# # easier way to create dataframe from transformaions (uses transpose)
# # [time_df.ts.dt.time, time_df.ts.dt.hour,time_df.ts.dt.day, time_df.ts.dt.weekofyear,time_df.ts.dt.month,time_df.ts.dt.year, time_df.ts.dt.weekday]

# time_df_way2 = pd.DataFrame([df3.ts.dt.time, df3.ts.dt.hour,df3.ts.dt.day, df3.ts.dt.weekofyear,df3.ts.dt.month,df3.ts.dt.year, df3.ts.dt.weekday]).T
# time_df_way2.columns

# # rename columns
# time_df_way2.columns = df_time_col_names

# time_df_way2.head()

#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `time` table in the sparkify database.

In [36]:
# for loop to insert rows to time table
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [37]:
user_df_cols = ['userId', 'firstName', 'lastName', 'gender', 'level']

In [38]:
user_df = deepcopy(song_play[user_df_cols])

In [39]:
#index is currently incorrect, we need to reset
user_df = user_df.reset_index()

In [40]:
# drop "index" column redundant if we did this to df3
user_df.drop(columns = ['index'], inplace=True)

#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [41]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [42]:
# # test our song_select statement
# # the reason there are none values on song_plays table is that there are only
# # 71 artists on artist table, but 1300 unique values for artist column on df3
# cur.execute("""SELECT a.artist_id, s.song_id
# FROM artists a
# INNER JOIN songs s
# ON s.artist_id = a.artist_id
# WHERE a.artist_name = %s
# AND s.title = %s;""", ('Planet P Project', 'Pink World'))

# for i in cur.fetchall():
#     print(i)

In [78]:
# # testing our upload
# for index, row in df3[5530:5540].iterrows():
#     # get songid and artistid from song and artist tables
#     cur.execute(song_select, (row.artist, row.song))
#     if cur.fetchone():
#         cur.execute(song_select, (row.artist, row.song))
#         songid, artistid = cur.fetchone()
#     else: 
#         songid, artistid = None, None
#     print(songid, artistid)
    
# #     songid, artistid = cur.fetchone() if cur.fetchone() else None, None
# #     if songid != None:
# #         print(songid)

None None
None None
None None
None None
None None
None None
AR5KOSW1187FB35FF4 SOZCTXZ12AB0182364
None None
None None
None None


In [47]:
# alternative: just get all data from df3
for index, row in df3.iterrows():
    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.artist, row.song))
    # checks to see if there's a match
    if cur.fetchone():
        #have to rerun because we can only use statement once
        cur.execute(song_select, (row.artist, row.song))
        #save into variables
        songid, artistid = cur.fetchone()
    else: 
        #save none into variables
        songid, artistid = None, None

    # get all the data for the corresponding rows
    songplay_data = (row.ts, int(row.userId), row.level, songid, artistid, int(row.sessionId), row.location, row.userAgent)
    # insert songplay record
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

InterfaceError: cursor already closed

Run `test.ipynb` to see if you've successfully added records to this table.

In [44]:
## Idea of using select statement to save all the information to variables before pushing out to table
## Didn't need to do this because answers were in dataframe and not in other tables.

# for index, row in df3.iterrows():

#     # get songid and artistid from song and artist tables
#     cur.execute(song_select, (row.artist, row.song))
#     songid, artistid = cur.fetchone() if cur.fetchone() else None, None

#     # get all the data for the corresponding rows
#     songplay_data = ()
#     cur.execute("""SELECT * FROM songs s WHERE s.song_id = %s AND s.artist_id = %s;""", (songid, artistid))
#     for i in cur.fetchall():
#         songplay_data=i
    
    
    
#     # insert songplay record
#     cur.execute(songplay_table_insert, songplay_data)
#     conn.commit()

# Close Connection to Sparkify Database

In [79]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.