#### Use this notebook to develop the ETL process for each of your tables before completing the etl.py file to load the whole datasets.



In [1]:
import os 
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=student")
cur = conn.cursor()

# Getting the files

In [3]:
def get_files(filepath):
    all_files = []
    for root, dir, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))
    return all_files
    

# Processing song_data

perform ETL on the first dataset song_data, to create the **songs** and **artists** dimension table

Let's perform ETL on a single song file and load a single record into each table to start
 - Use the get_files function provided above to get a list of all song JSON files in data/song_data
 - Select the first song in this list
 - Read the song file and view the data

In [4]:
song_files = get_files('data/song_data')

In [5]:
filepath = song_files[0]
print(filepath)

/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/song_data/A/A/A/TRAAAEF128F4273421.json


In [6]:
df = pd.read_json(filepath, typ='series')
df

num_songs                            1
artist_id           AR7G5I41187FB4CE6C
artist_latitude                   None
artist_longitude                  None
artist_location        London, England
artist_name                   Adam Ant
song_id             SONHOTT12A8C13493C
title                  Something Girls
duration                       233.404
year                              1982
dtype: object

## 1. Songs Table

### Extract data for songs table
 - select column for sond_Id, title, artist_id, year and duration 
 - use df.values to select just the values from the dataframe 
 - index to select the first(only) record in the dataframe
 - convert the arry into a list and set it to song_data


In [7]:
songs_data = df[['song_id','title','artist_id', 'year', 'duration']]
songs_data_values = songs_data.values
songs_data_values

array(['SONHOTT12A8C13493C', 'Something Girls', 'AR7G5I41187FB4CE6C',
       1982, 233.40363], dtype=object)

In [8]:
first_record = songs_data_values[:]
first_record

array(['SONHOTT12A8C13493C', 'Something Girls', 'AR7G5I41187FB4CE6C',
       1982, 233.40363], dtype=object)

In [9]:
#convert to a list 
song_data = first_record.tolist()

In [10]:
song_data

['SONHOTT12A8C13493C',
 'Something Girls',
 'AR7G5I41187FB4CE6C',
 1982,
 233.40363]

In [11]:
#testing type 
assert isinstance(song_data, list), 'song_data should be a list'
song_data

['SONHOTT12A8C13493C',
 'Something Girls',
 'AR7G5I41187FB4CE6C',
 1982,
 233.40363]

### Insert record to song table 

Implement the song_table_insert query in *sql_querries.py* and run the cell below to incert a record for this song into the songs table 

Run *create_tables.py* before running the cell below to ensure you have created annd resetted the songs table in the sparkify 

In [12]:
cur.execute(song_table_insert, song_data)
conn.commit()

## 2. Artist Table

#### Extract data from artist table 
 - Select columns for artist_id, name, location, latitude, longitude
 - use df.values to select just the values from the dataframe
 - index to select the first(only) record in the dataframe
 - convert the array to a list and set it to artist_data


In [13]:
artist_data = df[['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude']]
artist_data

artist_id           AR7G5I41187FB4CE6C
artist_name                   Adam Ant
artist_location        London, England
artist_latitude                   None
artist_longitude                  None
dtype: object

In [14]:
# getting the values only 
artist_data_values = artist_data.values
artist_data_values

array(['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', None, None],
      dtype=object)

In [15]:
first_record = artist_data_values[:]
first_record


array(['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', None, None],
      dtype=object)

In [16]:
#changing it to a list 
artist_data= first_record.tolist()
artist_data

['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', None, None]

In [17]:
#testing types
assert isinstance(artist_data, list), 'artist_data should be a list'

artist_data

['AR7G5I41187FB4CE6C', 'Adam Ant', 'London, England', None, None]

### Insert the record to Artists Table

Implement the artist_table_insert_query in *sql_queries.py* and run the cell below to incert a record for this song's artist into the artists table

Run create_tables.py to ensure you have created and resetted the artists table in the sparkifydb

In [18]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

# processing the log_data

Performing ETL on the second dataset *log_data* to create the *time* and *users* dimensional tables as well as the *songsplays* fact table 

Performing ETL and ona a single log file and loading a single record to each table 
 - use the get_files function provided above o get a list of all log JSON files in data/log_data
 - select the first log file in the this list
 - read the log file and view the data 

In [19]:
log_files = get_files('data/log_data')
log_files

['/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-11-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-23-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-18-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-04-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-01-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-14-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-08-events.json',
 '/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres

In [20]:
filepath = log_files[0]
filepath

'/Users/itsmuriuki/Desktop/Data_Engineering/Data_Modeling/project_1_RDBMS_Postgres/data/log_data/2018/11/2018-11-11-events.json'

In [21]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,1541904034796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
3,,Logged In,Lily,F,0,Burns,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1540621000000.0,456,,200,1541910841796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,1541910973796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32


In [22]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 95 entries, 0 to 94
Data columns (total 18 columns):
artist           73 non-null object
auth             95 non-null object
firstName        92 non-null object
gender           92 non-null object
itemInSession    95 non-null int64
lastName         92 non-null object
length           73 non-null float64
level            95 non-null object
location         92 non-null object
method           95 non-null object
page             95 non-null object
registration     92 non-null float64
sessionId        95 non-null int64
song             73 non-null object
status           95 non-null int64
ts               95 non-null int64
userAgent        92 non-null object
userId           95 non-null object
dtypes: float64(2), int64(4), object(12)
memory usage: 13.4+ KB


## 3. time Table

#### Extract data for time table
 - Filter reecords by next song action 
 - convert the ts timestamp column to datetime as the current timestamp is in milliseconds 
 - Extract the timestamp hour, day, week of year, month, year, and weekday from the ts column and set time_data to a list containing these values in order
 - pandas has a  dt attribute to access easily datetimelike properties.
 - Specify labels for these columns and set to *column_labels*
 - create a dataframe *time_df* containing the time data for this file by combining *column_labels* and *time_data* into a dictionary and converting this into a dataframe

In [23]:
df = df[df['page'] == "NextSong"]

In [24]:
df['ts'] = pd.to_datetime(df['ts'], unit = 'ms')

In [25]:
time_data = [df['ts'], df['ts'].dt.hour, df['ts'].dt.day, df['ts'].dt.weekofyear, df['ts'].dt.month, df['ts'].dt.weekday]

In [26]:
column_labels = ['ts', 'hour', 'day', 'week of year', 'month', 'year', 'weekday']

assert isinstance(time_data, list), 'time_data should be a list'
assert isinstance(column_labels, list), 'column_labels should be a list'

In [27]:
dictionary = dict(zip(column_labels, time_data))
time_df = pd.DataFrame.from_dict(dictionary)

assert isinstance(time_df, pd.DataFrame), 'time_df should be a dataframe'

In [28]:
time_df.head()

Unnamed: 0,ts,hour,day,week of year,month,year,weekday
0,2018-11-11 02:33:56.796,2,11,45,11,11,6
1,2018-11-11 02:36:10.796,2,11,45,11,11,6
2,2018-11-11 02:40:34.796,2,11,45,11,11,6
4,2018-11-11 04:36:13.796,4,11,45,11,11,6
5,2018-11-11 04:36:46.796,4,11,45,11,11,6


### Insert the record to time Table

implemente the *time_table_insert* query in *sql_queries.py* and run run the cell below to insert records for the timestamp in this log file into the time table 

Run *create_tables.py* before running the cell below to ensure you've created and reseted the tim table in the sparkifydb

In [29]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## 4. Users Table 

#### Extract Data for Users table 
 - Select columns for user_Id, firstname, lastname, gender, level set them to user_df
 


In [30]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

### Insert the records to user_tables
implement the user_table_insert query is *sql_queries.py* and run the cell below to insert records for the users in this log file into the users table.

Run create_tables.py before running the cell below to ensure you've created and reseted the tim table in the sparkifydb

In [31]:
user_df = user_df.drop_duplicates().sort_values(["userId"])

for i,row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

## 5. songsplay Table

#### Extract data for songsplay table
- This is a bit complicated sinse the information for songs table, artist table, and original log file is need to fill the songsplays table.
- Since the log file does not specify an ID for either the song or the artist you will need to get the songid and artistid by querying the songs and artist tables to find matches based on song title, artist name, and song duration time
     - implement the song_select querry in *sql_queries.py* to find the songId and artistId based on the title, artist name and duration of a song 
     - Select the timestamp, userId, level, songId, artistId, sessionId, location, and user agent and set to songplays_data
     
### Insert records into songplay table
- implement the songplay_table_insert querry and run the cell bellow to insert records for the songplay actions in this log file into the songsplays table 

- run *create_tables.py* before running the cell below to ensure you have created and resetted the songplays table in the sparkify database
 


In [32]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,2018-11-11 02:33:56.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,2018-11-11 02:36:10.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,2018-11-11 02:40:34.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,2018-11-11 04:36:13.796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
5,Matmos,Logged In,Joseph,M,0,Gutierrez,1449.11628,free,"Columbia, SC",PUT,NextSong,1540809000000.0,284,Supreme Balloon,200,2018-11-11 04:36:46.796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",75


In [33]:
for index, row in df.iterrows():
    
    #get songId and artistId from song and artist tables
    cur.execute(song_select_by_song_id_artist_id, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else: 
        songid, artistid = None, None
        
        
    #insert songplay record, first record is wrong 
    songplay_data = (row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

Run test.ipynb to see if you've successfully added records to this table.

### Close connection to sparkify database 

In [35]:
conn.close()

#### Next steps: implementing etl.py