# ETL testing
I use this notebook to help implement `etl.py` and markdowned `README.md`

# README setup
The cell below is used to fine tune the readme.md files

---

- [Purpose of this database](#Purpose-of-this-database)
- [The ETL process](#The-ETL-process)
- [The schema](#The-schema)
- [How To run](#How-To-run)
- [Files in this repository](#Files-in-this-repository)
- [References](#References)

# Purpose of this database

Sparkify want to analyze how their new (web)app is used. For that, they mainly dispose of a web server log.<br>
This log expose among other the *next_song* action which can help to determine : <br>
- from where, when and with which web browser the app is used
- the peak hours, how many people are using
- user songs choice what do he like

This can help improve the service provided and maybe lead to a suggestions system<br>
On other hand, they export their songs database who can help making a match between user and artist location and maybe respond to the question "do our customer buy locally ?".<br>
> The exposed songs data in this lab is very limited. I have had to insert manually data in order to test the query for songs & artist ID in "Getting artists & song ID" in `ETL.ipynb`

# The ETL process

> A lot of my thought/reflexion are in etl.ipynb (so go there please ;-)<br>

The files are directly extracted to Pandas dataframes for transformation.<br>
For loading the data into PostgreSQL, I choosed the `COPY` style option.<br>
That implicate no INSERT queries and the whole transform process has to be done in Python with Pandas.<br>
_So I can not use the `ON CONFLICT` statement as I dont use any direct `INSERT` queries (or UPSERT) for the loading phase_<br>

The key point about transformation is to prepare to datas to direct input. Dataframe have to be CSV compliant
So I have had to take care of : 
- remove tab character because of its usage in CSV file, 
- uniformise empty, NULL or NaN (Not a Number) "values" in order to make copy_from() directly
- datatype cannot be mixed for the same column, specialy for primary keys (for example `int` & `str` in users dataset)
- ensure that there are no duplicates

I mainly used Pandas dataframe to do those jobs and also to better choose datatype in PostgreSQL<br>
The presence of infos depend of the source of the dataset : ther are less infos in human created files then in computer generated files.<br>

# The schema
The goals mentioned above imply logically songplays as the fact table<br>
The tables extracted from the same dataset are potentially in a 1:1 relation :
- users, times & songplays from log json files
- songs & artists from songs json files

Some point had to be mentionned here :<br>
- Implementation of table constraints have an impact on the sequence of creation & deletion.
- The suggested primary key for times table (timestamp) isn't reliable.<br> 
This is generally the case when a key means something (here the timestamp)<br>
Because an event in the log can without doubt occurs at the same time.<br>
The solution was to implement a SERIAL PK field in times table and use it has a foreign key in somgplays table<br>
(see "Why do we need another primary key ?" in etl.ipynb). 
- the COPY method lead me understand a type mixing in user_id field<br>
So there is a potential bug in the log function of the web app !<br>
(see "Manage duplicates caused by type and keep the last level (or status)" in etl.ipynb)

Here you can find the schema :


![alt text](/ERD-diagram.png "Sparkify ERD diagram")

Direct link to [Sparkify ERD diagram created with dbdiagram.io](https://dbdiagram.io/d/5ccdf91bf7c5bb70c72fddbd)

# How To run
In a jupyter notebook cell use the following commands (test.ipynb)
``` ipython
%run create_tables.py #prepare the database schema
%run etl.py #launch the ETL pipline
```
# Files in this repository
> A TOC is generally implemented in each notebook

- dashboard.ipynb : trigger some queries & plot related graphs
- etl-tests.ipynb : used to faciliatte the developpment of etl.py
- etl.ipynb : this file contains all the brainstorming and the analysis on data
- test.ipynb : used to quickly query the db. Contains tools to mange buggy db connections
- create_tables.py : Manage creation of the db and tables
- etl.py : ETL pipeline
- README.md : this file
- sql_queries.py : list of DSN & SQL queries to create, drop, insert & select

# References
[Million SOngs Dataset](https://labrosa.ee.columbia.edu/millionsong/pages/field-list)

---

In [None]:
import os
import glob
import psycopg2
import pandas as pd
import io
from sql_queries import *

In [None]:
%run create_tables.py

In [None]:
conn = psycopg2.connect( DSN_SPARKIFY )
cur = conn.cursor()

# Factorisations

In [None]:
def get_files( root_directory , file_search_query = '*' ):
    '''
        lookup for json file in root_directory and return a list of full file path
        
        Parameters
        ----------
            root_directory : str - filepath
            file_search_query : str - a query string for files, will be appended to each founded folder
        Returns
        -------
            list(str) : a list of full path on each files
        Raise
        -----
            ValueError if filepath is empty  
    '''
        
    if( root_directory == "" ):
        raise ValueError('filepath must be defined')
        
    all_files = []
    #iterate over the folder tree structure
    for root, dirs, files in os.walk( root_directory ):
        #search json files on each folder
        files = glob.glob( os.path.join( root , file_search_query ) )
        for f in files :
            #concatenate result in the list
            all_files.append(os.path.abspath(f))
    
    return all_files

In [None]:
def create_dataframe_from_jsonfiles(root_directory):
    '''
        Concatenante all json files founded in root_directory
        Drop duplicate directly
        Parameters
        ----------       
            root_directory str - the folder to import in a dataframe
        Returns
        -------        
            a Pandas dataframe
    '''
    
    #get all files
    files = get_files(root_directory , '*.json' )

    #load each files in a data frame
    df_from_each_json = ( pd.read_json( f , lines = True ) for f in files )

    #concatenate each dataframe in one
    df_concatenated = pd.concat( df_from_each_json , ignore_index = True )

    #drop duplicates
    return df_concatenated.drop_duplicates(inplace=False)

In [None]:
def clean_dataframe_for_export( df , force_empty_string_to_null = True , null_string = 'NULL' , sep='\t' , sep_replace_with=''):
    '''
       Prepare (&uniformise) the DF for CSV exportation
       Replace NaN or empty cell with NULL 
        
        Parameters
        ----------       
            df (Dataframe) - the Pandas dataframe
            force_empty_string_to_null (bool) - by default empty strings are nullified
            null_string (str) - the string used to represent NULL (by def)
            sep (str) - to separator choosed in CSV file..
            sep_replace_with (str) - .. to replce with
        Returns
        -------        
            a Pandas dataframe
    '''   
    #NaN becomes NULL
    df = df.fillna(null_string)
    
    #replace all empty cells with NULL
    if( force_empty_string_to_null ): df = df.replace('', null_string)
    
    #avoid usage of tab in all cells...
    df = df.replace(sep, sep_replace_with)
    
    return df

In [None]:
def lookup_song_and_artist( params , query ):
    '''
        >> Function to apply in a dataframe that came from a Log Dataset <<
        Based on params[], query the DB for artist & song ID
        Parameters
        ----------
            It is tricky to use to because params must be well ordered
            params[0] : str - artist name
            params[1] : str - song title
            params[2] : decimal - length of songs
            query : str - the sql query to execute with params (to be passed bay apply(args=) )
        Returns
        -------
            list(songid, artistid) : 
                - a list of related entity db primary key
                - 'NULL', 'NULL' if nothing found
                - 'Error' , 'exception error' in case of exceptions
    '''
    #concretise params
    artist = params[0]
    song = params[1]
    length = params[2]
        
    
    try:
        # query the db
        cur.execute( query , (artist, song , length))
        results = cur.fetchone()
    except psycopg2.Error as e:
        # catch the error and return an empty result
        results = 'Error' , e 
        
    if results:
        #ok we have a match or an error
        songid, artistid = results
    else:
        #instead we nulls
        songid, artistid = 'NULL', 'NULL'
        
    return ( songid, artistid )    

In [None]:
def copy_df_to_db( cur , df , tablename , table_columns , with_index=False , separator='\t'):
    '''
        Manage the df copy to db. 
        This functions manage the creation of the CSV to be imported
        
        Parameters
        ----------       
            cur - cursor to the database where the copy_from will be performed
            df - Pandas dataframe to export to SQL
            tablename - table where Pandas data will be imported
            table_columns (str) - list of the sql columns in tablename
            with_index=False - do we export the df index as first column ?
            separator='\t' - separator 
    '''
    # create a buffer for CSV infos
    buffer = io.StringIO()
    
    #serialize the dataframe into the buffer (no header at all)
    df.to_csv( buffer , index=with_index , header=False, sep=separator)
    
    # I have the move the pointer at the start of the stream to do another iteration
    buffer.seek(0)
    
    #default params doesn't fit to CSV
    try:
        cur.copy_from(buffer, tablename , sep=separator , columns=table_columns , null='NULL' )
    except psycopg2.Error as e:
        print('Error while processing table{} : {}'.format(tablename , e ))    
    
    

# song_data

In [None]:
df_songs_json = create_dataframe_from_jsonfiles('data/song_data')

In [None]:
df_songs = df_songs_json[['song_id','title','artist_id','year','duration']].copy()
df_artist = df_songs_json[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].copy()
df_artist.drop_duplicates(inplace=True)
df_songs = clean_dataframe_for_export(df_songs)
df_artist = clean_dataframe_for_export(df_artist)

len(df_artist.index)

In [None]:
copy_df_to_db(cur, df_artist, 'artists',( 'artist_id', 'name', 'location', 'lattitude', 'longitude' ) )
copy_df_to_db(cur, df_songs, 'songs', ( 'song_id', 'title', 'artist_id', 'year', 'duration') )

In [None]:
cur.execute("""SELECT * FROM artists""")

rows = cur.fetchall()
len(rows)

# log_data

In [None]:
df_logs_json = create_dataframe_from_jsonfiles('data/log_data')
df_NextSong = df_logs_json[ df_logs_json['page'] == 'NextSong' ].copy()
len(df_NextSong.index)

In [None]:
#convert the ts column to a datetime column
df_NextSong['start_time'] = pd.to_datetime( df_NextSong['ts'], unit='ms')
df_NextSong['start_time'] = df_NextSong['start_time'].astype('datetime64[s]')

#append column needed for times table
df_NextSong['hour'] = df_NextSong['start_time'].dt.hour
df_NextSong['day'] = df_NextSong['start_time'].dt.day
df_NextSong['week'] = df_NextSong['start_time'].dt.week
df_NextSong['month'] = df_NextSong['start_time'].dt.month
df_NextSong['year'] = df_NextSong['start_time'].dt.year
df_NextSong['weekday'] = df_NextSong['start_time'].dt.weekday

df_NextSong['userId'] = pd.to_numeric( df_NextSong['userId'])

## times tables

In [None]:
df_times = df_NextSong[['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']]

In [None]:
copy_df_to_db(cur, df_times, 'times', ( 'time_id', 'start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday') , with_index=True )

In [None]:
cur.execute("""SELECT * FROM times""")

rows = cur.fetchall()
len(rows)

## users tables

In [None]:
df_users = df_NextSong[['userId', 'firstName', 'lastName', 'gender', 'level']].copy()
#convert userID to int to 
#df_users['userId'] = pd.to_numeric( df_users['userId'])
df_users.drop_duplicates( inplace=True, keep='last' , subset=['userId', 'firstName', 'lastName', 'gender'])

In [None]:
copy_df_to_db(cur, df_users, 'users', ('user_id', 'first_name', 'last_name', 'gender', 'level') )

In [None]:
cur.execute("""SELECT * FROM users""")

rows = cur.fetchall()
len(rows)

In [None]:
# will contains a list of identical users expect on the level fields
test_users = []
# arbitrary get the first users in the df
first_user = df_users.values[0].tolist()
# append seamly users (by copy!)
test_users.append( first_user.copy() )
user[4] = 'paid'
test_users.append(first_user.copy() )
user[4] = 'paid'
test_users.append(first_user.copy() )
test_users

In [None]:
cur.execute('SELECT * FROM users WHERE users.user_id = {}'.format( first_user[0] ) )
rows = cur.fetchall()
rows

In [None]:
cur.execute( 'DELETE FROM users WHERE users.user_id = {}'.format( first_user[0] ) )

In [None]:
user_table_insert_conflict = user_table_insert +' ON CONFLICT(user_id) DO UPDATE set level=EXCLUDED.level'
print( user_table_insert_conflict )

In [None]:
for user in test_users:
    cur.execute( user_table_insert_conflict, user )
    conn.commit()

In [None]:
cur.execute('SELECT * FROM users WHERE users.user_id = {}'.format(user[0]))
rows = cur.fetchall()
rows

In [None]:
conn.close()
conn = psycopg2.connect( DSN_SPARKIFY )
cur = conn.cursor()

 ## songplays tables

In [None]:
df_songplay = df_NextSong[['userId', 'level', 'sessionId',  'location', 'userAgent','artist','song','length']].copy()

#remeber the bug between free & paid log detected in users table
#df_songplay['userId'] = pd.to_numeric( df_songplay['userId'])
df_songplay['time_id'] = df_songplay.index

In [None]:
#lookup artist & song IDs
df_songplay['songid'],  df_songplay['artistid'] = zip(*df_songplay[['artist','song' , 'length' ]].apply( lookup_song_and_artist , axis=1 , args=(song_select,)))
#rearrange the column
df_songplay = df_songplay[['time_id','userId','level','songid', 'artistid','sessionId','location','userAgent']]

In [None]:
copy_df_to_db(cur, df_songplay, 'songplays', ('songplay_id', 'time_id', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent'), with_index=True )

In [None]:
cur.execute("""SELECT * FROM songplays""")

rows = cur.fetchall()
len(rows)

In [None]:
df_songplay[df_songplay['songid'] != 'NULL'].head()

# sone query

In [5]:
import os
import glob
import psycopg2
import pandas as pd
import io
from sql_queries import *

conn = psycopg2.connect( DSN_SPARKIFY )
cur = conn.cursor()

In [1]:
%run create_tables.py #prepare the database schema
%run etl.py #launch the ETL pipline

*********** Creating database tables.....
*********** Dropping tables.....
*********** Creating tables   .....
*********** Processing song_data.....
*********** Processing log_data.....
*********** Processing terminated.....


In [8]:
cur.execute("""SELECT * FROM times""")
rows = cur.fetchall()
print( len(rows) )
#rows[:2]

6820


[(0, datetime.datetime(2018, 11, 9, 0, 6, 17), 0, 9, 45, 11, 2018, 4),
 (1, datetime.datetime(2018, 11, 9, 0, 9, 46), 0, 9, 45, 11, 2018, 4)]

In [21]:
cur.execute("""SELECT * FROM songplays""")
rows = cur.fetchall()
print( len(rows) )
#rows[:5]

6820


[(0,
  0,
  42,
  'paid',
  None,
  None,
  275,
  'New York-Newark-Jersey City, NY-NJ-PA',
  '"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""'),
 (1,
  1,
  42,
  'paid',
  None,
  None,
  275,
  'New York-Newark-Jersey City, NY-NJ-PA',
  '"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""'),
 (2,
  2,
  42,
  'paid',
  None,
  None,
  275,
  'New York-Newark-Jersey City, NY-NJ-PA',
  '"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""'),
 (3,
  3,
  42,
  'paid',
  None,
  None,
  275,
  'New York-Newark-Jersey City, NY-NJ-PA',
  '"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""'),
 (4,
  4,
  42,
  'paid',
  None,
  None,
  275,
  'New York-Newark-Jersey City, NY-NJ-PA',
  '"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKi

In [74]:
conn.close()
conn = psycopg2.connect( DSN_SPARKIFY )
cur = conn.cursor()

In [79]:
query_count_user_listen ="""
SELECT
      times.hour
    , COUNT( songplays.songplay_id )
FROM songplays
INNER JOIN times ON times.time_id = songplays.time_id
group by times.hour
ORDER BY times.hour
"""

In [80]:
cur.execute(query_count_user_listen)
rows = cur.fetchall()
print( len(rows) )
rows

24


[(0, 155),
 (1, 154),
 (2, 117),
 (3, 109),
 (4, 136),
 (5, 162),
 (6, 183),
 (7, 179),
 (8, 207),
 (9, 270),
 (10, 312),
 (11, 336),
 (12, 308),
 (13, 324),
 (14, 432),
 (15, 477),
 (16, 542),
 (17, 494),
 (18, 498),
 (19, 367),
 (20, 360),
 (21, 280),
 (22, 217),
 (23, 201)]

In [None]:
SELECT 
    songplays.songplay_id
FROM songplays
INNER JOIN times ON times.time_id = songplays.time_id

SELECT 
    users.user_id
    ,count(songplays.user_id)
FROM users
INNER JOIN songplays ON users.user_id = songplays.user_id
group by users.user_id




     users.user_id
    ,users.first_name ||' '|| users.last_name
    ,COUNT(songplays.user_id)