In [28]:
# system libs
import glob
import os

# etl/eda libs
import pandas as pd

# sql libs
import sqlalchemy
from sql_queries import *

# Extra ETL Steps, Exploratory Analysis, Validation and Tests

#### 0. Connects to the database and sets the TABLE column names constant.

In [29]:
engine = sqlalchemy.create_engine('postgresql+psycopg2://student:student@127.0.0.1/sparkifydb')
conn = engine.connect()

In [30]:
TABLE = {
    'artists': ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'],
    'songs': ['song_id', 'title', 'year', 'duration', 'artist_id'],
    'time': ['timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday'],
    'users': ['userId', 'firstName', 'lastName', 'gender', 'level'],
    'songplays': ['level', 'location', 'sessionId', 'userAgent', 'ts', 'userId'], 
    # songplays ['artist_id', 'song_id'] will JOIN after SQL function call
}

#### 1. Get a list of JSON files in the folder and subfolders

In [31]:
def get_files(folder):
    all_files = []
    for root, dirs, files in os.walk(folder):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

#### 2. Get a concatenated DataFrame of all JSON files in the list

In [32]:
def get_dataframe(files):
    dfs = []
    for f in files:
        df = pd.read_json(f, lines=True)
        dfs.append(df)
    
    data = pd.concat(dfs, ignore_index=True)
    
    # trim all strings to avoid query mismatch
    strings = list(data.select_dtypes(include=['object']).columns)
    data[strings] = data[strings].apply(lambda x: x.str.strip())

    return data

#### 3. Get the DataFrame for all JSON files in the folder ```'data/song_data'```

In [33]:
song_files = get_files('data/song_data')
song_df = get_dataframe(song_files)

print(f'Number of songs: {song_df.shape[0]}')
song_df.head(2)

Number of songs: 71


Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARNF6401187FB57032,40.79086,-73.96644,"New York, NY [Manhattan]",Sophie B. Hawkins,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,305.162,1994
1,1,ARVBRGZ1187FB4675A,,,,Gwen Stefani,SORRZGD12A6310DBC3,Harajuku Girls,290.55955,2004


#### 4. Look for null values

In [34]:
song_df.isnull().sum()

num_songs            0
artist_id            0
artist_latitude     40
artist_longitude    40
artist_location      0
artist_name          0
song_id              0
title                0
duration             0
year                 0
dtype: int64

#### 5. Look for duplicates

In [35]:
check = ['song_id', 'title', 'artist_id', 'artist_name', 'duration']
for column in check:
    duplicates = song_df.duplicated(subset=[column], keep="first").sum()
    print(f'Duplicate {column}: {duplicates}')

Duplicate song_id: 0
Duplicate title: 0
Duplicate artist_id: 2
Duplicate artist_name: 2
Duplicate duration: 0


#### 6. Get the DataFrame for all JSON files in the folder ```'data/log_data'```

In [36]:
log_files = get_files('data/log_data')
log_df = get_dataframe(log_files)

log_df = log_df[log_df['page'] == 'NextSong']

log_df['ts'] = pd.to_datetime(log_df['ts'], unit='ms')
log_df['time_data'] = log_df['ts'].apply(lambda x: [x, x.hour, x.day, x.week, x.month, x.year, x.day_name()])

log_df['userId'] = log_df['userId'].astype('Int64')

print(f'Number of songplays: {log_df.shape[0]}')
log_df.head(2)

Number of songplays: 6820


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,time_data
0,The Killers,Logged In,Jayden,M,32,Graves,246.80444,paid,"Marinette, WI-MI",PUT,NextSong,1540664000000.0,594,Read My Mind,200,2018-11-20 00:00:42.796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",25,"[2018-11-20 00:00:42.796000, 0, 20, 47, 11, 20..."
1,Tamia,Logged In,Jayden,M,33,Graves,243.09506,paid,"Marinette, WI-MI",PUT,NextSong,1540664000000.0,594,Officially Missing You (Radio Version),200,2018-11-20 00:04:48.796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",25,"[2018-11-20 00:04:48.796000, 0, 20, 47, 11, 20..."


#### 7. Look for null values

In [37]:
log_df.isnull().sum()

artist             0
auth               0
firstName          0
gender             0
itemInSession      0
lastName           0
length             0
level              0
location           0
method             0
page               0
registration       0
sessionId          0
song               0
status             0
ts                 0
userAgent          0
userId           253
time_data          0
dtype: int64

#### 8. Look for duplicates

In [38]:
check = ['song', 'artist', 'length', 'ts', 'userId']
for column in check:
    duplicates = log_df.duplicated(subset=[column], keep="first").sum()
    print(f'Duplicate {column}: {duplicates}')

print(f'Duplicate combined: {log_df.duplicated(subset=check, keep="first").sum()}')

Duplicate song: 1631
Duplicate artist: 3672
Duplicate length: 2826
Duplicate ts: 7
Duplicate userId: 6724
Duplicate combined: 0


#### 9. Check if all necessary ```song_select``` query data types match

In [39]:
print(log_df[['song', 'artist', 'length']].dtypes.tolist())
print(song_df[['title', 'artist_name', 'duration']].dtypes.tolist())

[dtype('O'), dtype('O'), dtype('float64')]
[dtype('O'), dtype('O'), dtype('float64')]


#### 10. Verify the ```song_select``` query

- Iterate over all log_df DataFrame as proposed by the ETL process.
- Uses a loc filter to compare row.song, row.artist and row.length like the SQL WHERE statement.
- Counts the results that match and the empty ones.

~~~~sql
    SELECT s.song_id, a.artist_id
    FROM songs s
    JOIN artists a
    ON s.artist_id = a.artist_id
    WHERE s.title = %s AND s.duration = %s AND a.name = %s;
~~~~

##### 10.1 DataFrame results

In [40]:
empty = 0
matches = 0
results = []

for index, row in log_df.iterrows():
    result = song_df.loc[(song_df['title'] == row.song) & (song_df['artist_name'] == row.artist) & (song_df['duration'] == row.length)]

    if result.empty:
        empty += 1
    else:
        matches += 1
        results.append(result)

print(f'Empty: {empty} Matches: {matches}')
pd.concat(results, ignore_index=True)

Empty: 6819 Matches: 1


Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR5KOSW1187FB35FF4,49.80388,15.47491,Dubai UAE,Elena,SOZCTXZ12AB0182364,Setanta matins,269.58322,0


##### 10.2 SQL query results

In [41]:
empty = 0
matches = 0
results = []

for index, row in log_df.iterrows():
    result = pd.read_sql_query(song_select, conn, params=(row.song, row.length, row.artist))

    if result.empty:
        empty += 1
    else:
        matches += 1
        results.append(result)

print(f'Empty: {empty} Matches: {matches}')
pd.concat(results, ignore_index=True)

Empty: 6819 Matches: 1


Unnamed: 0,song_id,artist_id
0,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4


##### 10.3 SQL Function or Stored Procedure

An SQL function (stored procedure) was created to improve perfomance by executing a series of queries in a single server-side statement.

- The ```SONG_ARTIST``` custom parameter type was defined as a tuple ```(index, title, duration, artist)```.
- The ```song_artist_ids``` function is called with an array of SONG_ARTIST[].
- The function will process a batch of joins for each index and compare with title, duration and artist values.
- The function returns a table of ```(index, song_id, artist_id)``` matches.
- The function is performing approximately 32x faster than the iterrows loop with a single select query.

In [42]:
func = sqlalchemy.text("SELECT * FROM song_artist_ids(CAST (:param AS SONG_ARTIST[]))")
param = log_df[['song', 'length', 'artist']].to_records(index=True).tolist()

ids = engine.execute(func, param=param).fetchall()

ids_df = pd.DataFrame(ids, columns=['index', 'song_id', 'artist_id'])
ids_df.set_index('index', inplace=True)

songplays_df = log_df[TABLE['songplays']].join(ids_df[['artist_id', 'song_id']])
songplays_df.head(2)

Unnamed: 0,level,location,sessionId,userAgent,ts,userId,artist_id,song_id
0,paid,"Marinette, WI-MI",594,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018-11-20 00:00:42.796,25,,
1,paid,"Marinette, WI-MI",594,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018-11-20 00:04:48.796,25,,


#### 11. Verify the songs table

In [43]:
songs = pd.read_sql_query('SELECT * FROM songs', conn)

print(f'Number of songs: {songs.shape[0]}')
songs.head(2)

Number of songs: 71


Unnamed: 0,song_id,title,year,duration,artist_id
0,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994,305.162,ARNF6401187FB57032
1,SORRZGD12A6310DBC3,Harajuku Girls,2004,290.55955,ARVBRGZ1187FB4675A


#### 12. Verify the artists table

The artists table was loaded correct since 2 of them were duplicated as shown in the topic 5.

In [44]:
artists = pd.read_sql_query('SELECT * FROM artists', conn)

print(f'Number of artists: {artists.shape[0]}')
artists.head(2)

Number of artists: 69


Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644
1,ARVBRGZ1187FB4675A,Gwen Stefani,,,


#### 13. Verify the time table

The time table was loaded correct since 7 of them were duplicated as shown in the topic 8.

In [45]:
time = pd.read_sql_query('SELECT * FROM time', conn)

print(f'Number of times: {time.shape[0]}')
time.head(2)

Number of times: 6813


Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-20 00:00:42.796,0.0,20.0,47.0,11.0,2018,Tuesday
1,2018-11-20 00:04:48.796,0.0,20.0,47.0,11.0,2018,Tuesday


#### 14. Verify the user table

The users table was loaded correct since 6724 of them were duplicated as shown in the topic 8.

In [46]:
users = pd.read_sql_query('SELECT * FROM users', conn)

print(f'Number of users: {users.shape[0]}')
users.head(2)

Number of users: 96


Unnamed: 0,user_id,first_name,last_name,gender,level
0,86,Aiden,Hess,M,free
1,15,Lily,Koch,F,paid


##### 14.1 User ID generation

There are 253 user ids with null values according to the topic 7. So these ids were generated as follows:

- Generate an unique string key by concatenating (firstName, lastName, gender, level).
- Create a new 'key' column in the data frame for every row.
- Create a list of all unique values in the 'key' column to use as the index of a dictionary of ids.
- Calculate the max integer used as an userId and set maxId + 1 to start the generator.
- Generate an ordered list of integers where the length is equal to the key list.
- Use the dictionary of {'keys': 'ids'} to replace each null userId by the generated id.

After this transformation we were able to restore 33 user records.

In [47]:
users_df = log_df[TABLE['users']].copy()

users_df['key'] = users_df.apply(lambda x: x['firstName'] + x['lastName'] + x['gender'] + x['level'], axis=1)

maxId = users_df['userId'].max()
keys = users_df[users_df['userId'].isnull()]['key'].unique()
ids = dict(zip(keys, [x for x in range(maxId + 1, maxId + len(keys) + 1)]))

users_df['userId'] = users_df.apply(lambda x: ids.get(x['key']) if pd.isnull(x['userId']) else x['userId'], axis=1)

print(f'Number of users: {len(users_df["userId"].unique())}')

Number of users: 129


#### 15. Verify the songplays table

##### 15.1 It returns only one register when JOINed with all dimension tables since the artist_id and the song_id is null.

In [48]:
query = '''
SELECT * FROM songplays AS sp
JOIN songs AS s
ON sp.song_id = s.song_id
JOIN artists AS a
ON sp.artist_id = a.artist_id
JOIN time AS t
ON sp.start_time = t.start_time
JOIN users AS u
ON sp.user_id = u.user_id;
'''
songplays = pd.read_sql_query(query, conn)

print(f'Number of songplays: {songplays.shape[0]}')
songplays.head(2)

Number of songplays: 1


Unnamed: 0,songplay_id,level,location,session_id,user_agent,start_time,user_id,artist_id,song_id,song_id.1,...,day,week,month,year,weekday,user_id.1,first_name,last_name,gender,level.1
0,1649,paid,"Chicago-Naperville-Elgin, IL-IN-WI",818,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018-11-21 21:56:47.796,15,AR5KOSW1187FB35FF4,SOZCTXZ12AB0182364,SOZCTXZ12AB0182364,...,21.0,47.0,11.0,2018,Wednesday,15,Lily,Koch,F,paid


##### 15.2 It returns all songplays when JOINed with time and users dimension tables.

In [49]:
query = '''
SELECT * FROM songplays AS sp
JOIN time AS t
ON sp.start_time = t.start_time
JOIN users AS u
ON sp.user_id = u.user_id;
'''
songplays = pd.read_sql_query(query, conn)

print(f'Number of songplays: {songplays.shape[0]}')
songplays.head(2)

Number of songplays: 6820


Unnamed: 0,songplay_id,level,location,session_id,user_agent,start_time,user_id,artist_id,song_id,start_time.1,...,day,week,month,year,weekday,user_id.1,first_name,last_name,gender,level.1
0,1,paid,"Marinette, WI-MI",594,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018-11-20 00:00:42.796,25,,,2018-11-20 00:00:42.796,...,20.0,47.0,11.0,2018,Tuesday,25,Jayden,Graves,M,paid
1,2,paid,"Marinette, WI-MI",594,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",2018-11-20 00:04:48.796,25,,,2018-11-20 00:04:48.796,...,20.0,47.0,11.0,2018,Tuesday,25,Jayden,Graves,M,paid


In [50]:
conn.close()

#### 16. Results

- This notebook was developed as an extra test layer and exploratory analysis validation.
- The string trim was added as a supplementary step in the ETL process.
- All tables were loaded appropriately by the ETL pipeline.
- The duplicated values were handled properly by the INSERT queries.
- We were able to restore 33 user records with null ids.
- The song_select query was double-checked and produced the expected results.
- The performance improvement with the SQL function was quite significant.
- The etl2 code refactoring replaced all single processing logic by batch processing. 
- All etl.py insert statements were replaced by copy_from batch inserts in etl2.py.
- All JSON sigle file processing were replaced by batch transformations in a single merged DataFrame.
