# ETL

In [1]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [2]:
def execute_query(query, params = None, fetch = False):
    """
    Executes a query with optional parameters and optionally returns 1 result.
    This is for systematically opening and closing the connection and mainly not 
    running into the problem of forgetting to close a connection.
    
    Optional parameters:
    * params: parameters that will replace %s in the query
    * fetch: whether or not to return the first result of the query
    """
    try:
        conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
    except Exception as e:
        print(e)
    else:
        try:
            cur = conn.cursor()
            cur.execute(query, params)
            conn.commit()
        except Exception as e:
            print(e)
        else:
            if fetch:
                fetched = cur.fetchone()
                return fetched
        finally:
            conn.close()
            

In [3]:
def get_files(filepath):
    """
    Returns a list of all files in a directory.
    """
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
We'll perform ETL on a single song file from `song_data` to create the `songs` and `artists` dimensional tables.

In [4]:
song_files = get_files('data/song_data')

In [5]:
filepath = song_files[0]

In [6]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARD7TVE1187B99BFB1,,California - LA,,Casual,218.93179,1,SOMZWCG12A8C13C480,I Didn't Mean To,0


## #1: `songs` table
#### Extract data

In [7]:
song_data = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
song_data = list(song_data.values[0])
song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179]

#### Insert record

In [8]:
execute_query(song_table_insert, song_data)

#### Check that the record was inserted

In [9]:
execute_query("select * from songs", None, True)

('SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179)

## #2: `artists` table
#### Extract data

In [10]:
artist_data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
artist_data = list(artist_data.values[0])
artist_data

['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan]

#### Insert record

In [11]:
execute_query(artist_table_insert, artist_data)

#### Check that the record was inserted

In [12]:
execute_query("select * from artists", None, True)

('ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan)

# Process `log_data`
In this part, we'll perform ETL on single log file from `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

In [13]:
log_files = get_files('data/log_data')

In [14]:
filepath = log_files[0]

In [15]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


## #3: `times` table
#### Extract data

In [16]:
df = df[df['page']=='NextSong']
df.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [17]:
t = pd.to_datetime(df['ts'], unit='ms')
t.head()

0   2018-11-30 00:22:07.796
1   2018-11-30 01:08:41.796
2   2018-11-30 01:12:48.796
3   2018-11-30 01:17:05.796
4   2018-11-30 01:20:56.796
Name: ts, dtype: datetime64[ns]

In [18]:
frame = {
    'timestamp': t,
    'hour': t.dt.hour,
    'day': t.dt.day,
    'week': t.dt.week,
    'month': t.dt.month,
    'year': t.dt.year,
    'weekday': t.dt.weekday
}
  
time_df = pd.DataFrame(frame)
time_df.head()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-30 00:22:07.796,0,30,48,11,2018,4
1,2018-11-30 01:08:41.796,1,30,48,11,2018,4
2,2018-11-30 01:12:48.796,1,30,48,11,2018,4
3,2018-11-30 01:17:05.796,1,30,48,11,2018,4
4,2018-11-30 01:20:56.796,1,30,48,11,2018,4


#### Insert records

In [19]:
for i, row in time_df.iterrows():
    execute_query(time_table_insert, list(row))

#### Check that the records were inserted

In [20]:
execute_query("select * from times", None, True)

(datetime.datetime(2018, 11, 30, 0, 22, 7, 796000), 0, 30, 48, 11, 2018, 4)

## #4: `users` table
#### Extract data

In [21]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

#### Insert records

In [22]:
for i, row in user_df.iterrows():
    execute_query(user_table_insert, list(row))

#### Check that the records were inserted

In [23]:
execute_query("select * from users", None, True)

('33', 'Bronson', 'Harris', 'M', 'free')

## #5: `songplays` table
#### Extract data
Since the log file does not specify an ID for either the song or the artist, we'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name and song duration.

#### Insert records

In [24]:
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    results = execute_query(song_select, (row.song, row.artist, row.length), True)
    
    if results:
        song_id, artist_id = results
    else:
        song_id, artist_id = None, None

    # insert songplay record
    start_time = pd.to_datetime(row['ts'], unit='ms')
    user_id = row['userId']
    level = row['level']
    session_id =row['sessionId']
    location = row['location']
    user_agent = row['userAgent']
    songplay_data = (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    execute_query(songplay_table_insert, songplay_data)

#### Check that the records were inserted

In [25]:
execute_query("select * from songplays", None, True)

(1,
 datetime.datetime(2018, 11, 30, 0, 22, 7, 796000),
 '91',
 'free',
 None,
 None,
 '829',
 'Dallas-Fort Worth-Arlington, TX',
 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; WOW64; Trident/6.0)')