# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [1]:
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from sql_queries import *
from tqdm import tqdm

In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=dattlee")
cur = conn.cursor()
conn.set_session(autocommit=True)

In [3]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [4]:
filepath = "data/song_data"
song_files = get_files(filepath)

> Get Song data into a dataframe
`pd.read_json(song_files[i]`

In [5]:
df = pd.concat([pd.read_json(song_files[i], lines=True) for i in range(len(song_files))])
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
0,1,AR8ZCNI1187B9A069B,,,,Planet P Project,SOIAZJW12AB01853F1,Pink World,269.81832,1984
0,1,ARXR32B1187FB57099,,,,Gob,SOFSOCN12A8C143F5D,Face the Ashes,209.60608,2007
0,1,AR10USD1187B99F3F1,,,"Burlington, Ontario, Canada",Tweeterfriendly Music,SOHKNRJ12A6701D1F8,Drop of Rain,189.57016,0
0,1,ARGSJW91187B9B1D6B,35.21962,-80.01955,North Carolina,JennyAnyKind,SOQHXMF12AB0182363,Young Boy Blues,218.77506,0


## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [6]:
song_id = 6
title = 7
artist_id = 1
year = 9
duration = 8
song_data = df.to_numpy()[:,[song_id, title, artist_id, year, duration]]

duration   =   NUMERIC(precision, scale)

#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

In [7]:
song_table_insert = '''
INSERT INTO songs (song_id, title, artist_id, year, duration)
VALUES (%s, %s, %s, %s, %s);'''

In [8]:
song_data[[71],:]


array([['SOBCOSW12A8C13D398', 'Rumba De Barcelona', 'AR7SMBG1187B9B9066',
        0, 218.38322]], dtype=object)

In [9]:
for song in tqdm(song_data):
    try:
        cur.execute(song_table_insert, song)
    except psycopg2.errors.UniqueViolation as err:
        print(err)
        print(song)
    # conn.commit()  # autocommit on

100%|██████████| 72/72 [00:00<00:00, 1312.55it/s]

duplicate key value violates unique constraint "songs_pkey"
DETAIL:  Key (song_id)=(SONHOTT12A8C13493C) already exists.

['SONHOTT12A8C13493C' 'Something Girls' 'AR7G5I41187FB4CE6C' 1982
 233.40363]
duplicate key value violates unique constraint "songs_pkey"
DETAIL:  Key (song_id)=(SOIAZJW12AB01853F1) already exists.

['SOIAZJW12AB01853F1' 'Pink World' 'AR8ZCNI1187B9A069B' 1984 269.81832]
duplicate key value violates unique constraint "songs_pkey"
DETAIL:  Key (song_id)=(SOFSOCN12A8C143F5D) already exists.

['SOFSOCN12A8C143F5D' 'Face the Ashes' 'ARXR32B1187FB57099' 2007 209.60608]
duplicate key value violates unique constraint "songs_pkey"
DETAIL:  Key (song_id)=(SOHKNRJ12A6701D1F8) already exists.

['SOHKNRJ12A6701D1F8' 'Drop of Rain' 'AR10USD1187B99F3F1' 0 189.57016]
duplicate key value violates unique constraint "songs_pkey"
DETAIL:  Key (song_id)=(SOQHXMF12AB0182363) already exists.

['SOQHXMF12AB0182363' 'Young Boy Blues' 'ARGSJW91187B9B1D6B' 0 218.77506]
duplicate key value viol




Found a duplicate entry.

Run `test.ipynb` to see if you've successfully added a record to this table.

### I'm testing using an SQL client locally so no need.

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [10]:
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
0,1,AR8ZCNI1187B9A069B,,,,Planet P Project,SOIAZJW12AB01853F1,Pink World,269.81832,1984
0,1,ARXR32B1187FB57099,,,,Gob,SOFSOCN12A8C143F5D,Face the Ashes,209.60608,2007
0,1,AR10USD1187B99F3F1,,,"Burlington, Ontario, Canada",Tweeterfriendly Music,SOHKNRJ12A6701D1F8,Drop of Rain,189.57016,0
0,1,ARGSJW91187B9B1D6B,35.21962,-80.01955,North Carolina,JennyAnyKind,SOQHXMF12AB0182363,Young Boy Blues,218.77506,0


In [11]:
df.isnull().sum(axis = 0)

num_songs            0
artist_id            0
artist_latitude     41
artist_longitude    41
artist_location      0
artist_name          0
song_id              0
title                0
duration             0
year                 0
dtype: int64

In [12]:
artist_id = 1
name = 5
location = 4
latitude = 2
longitude = 3
artist_data = df.to_numpy()[:,[artist_id, name, location, latitude, longitude]]

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [13]:
for artist in tqdm(artist_data):
    try:
        if pd.isna(artist[3]):
            artist[3] = None
        if pd.isna(artist[4]):
            artist[4] = None
        cur.execute(artist_table_insert, artist)
    except psycopg2.errors.UniqueViolation as err:
        print(err)
        print(artist)

100%|██████████| 72/72 [00:00<00:00, 6448.78it/s]


Run `test.ipynb` to see if you've successfully added a record to this table.

> TESTED LOCALLY

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [14]:
filepath = "data/log_data"
logfiles = get_files(filepath)

In [15]:
df = pd.concat([pd.read_json(logfiles[i], lines=True) for i in range(len(logfiles))])
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,1541904034796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
3,,Logged In,Lily,F,0,Burns,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1540621000000.0,456,,200,1541910841796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,1541910973796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32


In [16]:
df.isnull().sum(axis = 0)

artist           1236
auth                0
firstName         286
gender            286
itemInSession       0
lastName          286
length           1236
level               0
location          286
method              0
page                0
registration      286
sessionId           0
song             1236
status              0
ts                  0
userAgent         286
userId              0
dtype: int64

## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [17]:
time_df = df.loc[df['page'] == 'NextSong']

In [18]:
import datetime

In [19]:
x = datetime.datetime.fromtimestamp(1541903636796/1000)

In [20]:
time_df.head(1)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


In [21]:
time_df['ts'].apply(lambda x: datetime.datetime.fromtimestamp(x/1000)).values

array(['2018-11-10T21:33:56.796000000', '2018-11-10T21:36:10.796000000',
       '2018-11-10T21:40:34.796000000', ...,
       '2018-11-24T17:15:50.796000000', '2018-11-24T17:19:38.796000000',
       '2018-11-24T18:46:14.796000000'], dtype='datetime64[ns]')

In [22]:
time_df['ts'] = time_df['ts'].apply(lambda x: datetime.datetime.fromtimestamp(x/1000))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  time_df['ts'] = time_df['ts'].apply(lambda x: datetime.datetime.fromtimestamp(x/1000))


In [23]:
len(time_df)

6820

In [24]:
time_data = []
for item in time_df.ts:
    t = item
    row = [
        t.strftime("%Y-%m-%d %H:%M:%S"),
        t.hour,
        t.day,
        t.isocalendar()[1], # week of year
        t.month,
        t.year,
        t.weekday()
    ]
    time_data.append(row)

#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `time` table in the sparkify database.

In [25]:
time_data[1000:1005]

[['2018-11-07 20:28:18', 20, 7, 45, 11, 2018, 2],
 ['2018-11-07 20:30:55', 20, 7, 45, 11, 2018, 2],
 ['2018-11-07 20:35:35', 20, 7, 45, 11, 2018, 2],
 ['2018-11-07 20:38:19', 20, 7, 45, 11, 2018, 2],
 ['2018-11-07 20:39:40', 20, 7, 45, 11, 2018, 2]]

In [26]:
time_table_insert = """
INSERT INTO time(start_time, hour, day, week, month, year, weekday)
values(%s, %s, %s, %s, %s, %s, %s);
"""

In [27]:
count=0
for t in tqdm(time_data):
    try:
        cur.execute(time_table_insert, t)
    except psycopg2.errors.UniqueViolation as err:
        count += 1
#         print(err)
#         print(t)
print(count)

100%|██████████| 6820/6820 [00:01<00:00, 5514.65it/s]

6820





Run `test.ipynb` to see if you've successfully added records to this table.

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [28]:
df.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


In [29]:
users_df = df[['userId', 'firstName', 'lastName', 'gender', 'level','ts']]
len(users_df)

8056

In [30]:
users_df = df.loc[df['page'] == 'NextSong']

In [31]:
len(users_df)

6820

# not needed
remove users with "" as the id

users_df = users_df.drop(users_df.loc[users_df['userId'] == ""].index)
len(users_df)

In [32]:
# make all user ids ints

# users_df = users_df.replace([int(x) for x in users_df['userId'].values])
users_df['userId'] = users_df['userId'].astype('int')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  users_df['userId'] = users_df['userId'].astype('int')


#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [33]:
users_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,1541904034796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,1541910973796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
5,Matmos,Logged In,Joseph,M,0,Gutierrez,1449.11628,free,"Columbia, SC",PUT,NextSong,1540809000000.0,284,Supreme Balloon,200,1541911006796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3...",75


In [36]:
count = 0
for i in tqdm(range(len(users_df))):
    # check for duplicates entrys so no need to UPSERT
    if i < len(users_df.values)-1 and users_df.iloc[i][0] == users_df.iloc[i+1][0]:
        continue
    try:
        cur.execute(user_table_insert, np.concatenate(([int(users_df.iloc[i][0])],users_df.iloc[i][1:-1], users_df.iloc[i][1:-1])))
        count += 1
    except psycopg2.errors.UniqueViolation as err:
        print(err)
        print(u)
        
print(count)


  0%|          | 0/6820 [00:00<?, ?it/s]


ValueError: invalid literal for int() with base 10: 'Dim Chris_ Thomas Gold'

In [37]:
users_df.head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
50,Dim Chris_ Thomas Gold,Logged In,Jizelle,F,0,Benjamin,504.60689,free,"Plymouth, IN",PUT,NextSong,1539909000000.0,126,Self Control (Laurent Wolf & Anton Wick),200,1541519780796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2
55,Lupe Fiasco,Logged In,Jizelle,F,1,Benjamin,262.89587,free,"Plymouth, IN",PUT,NextSong,1539909000000.0,126,Hurt Me Soul (Explicit Album Version),200,1541520284796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2


In [38]:
users_df.drop(columns=['artist'],inplace=True)

Run `test.ipynb` to see if you've successfully added records to this table.

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the **song ID** and **artist ID** by querying the songs and artists tables to find matches based on:
- song title
- artist name
- song duration time.

- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [39]:
for _, row in df.iterrows():
    
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    print(results)
    break

None


In [40]:
(row.song, row.artist, row.length)

('Fuck Kitty', 'Frumpies', 134.47791)

In [41]:
songplay_df = df.loc[df['page'] == 'NextSong']

In [42]:
songId, artistId = None,None

In [43]:
row.ts, row.userId, row.level, songId, artistId, row.sessionId, row.location, row.userAgent

(1541903636796,
 '69',
 'free',
 None,
 None,
 455,
 'Philadelphia-Camden-Wilmington, PA-NJ-DE-MD',
 '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"')

In [44]:
for index, row in tqdm(songplay_df.iterrows()):
    
    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songId, artistId = results
    else:
        songId, artistId = None, None

    # insert songplay record
    row_time = datetime.datetime.fromtimestamp(row.ts/1000).strftime("%Y-%m-%d %H:%M:%S")
    
    songplay_data = (row_time, row.userId, row.level, songId, artistId, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
#     conn.commit()

6820it [00:07, 868.53it/s]


In [45]:
(row.song, row.artist, row.length)

('Inevitable', 'Shakira', 193.82812)

In [46]:
print(song_select %  (row.song, row.artist, row.length))


SELECT song_id, a.artist_id 
FROM 
    songs AS s JOIN artists AS a
    ON s.artist_id = a.artist_id
WHERE title = Inevitable AND name = Shakira AND duration = 193.82812;



Run `test.ipynb` to see if you've successfully added records to this table.

# Close Connection to Sparkify Database

In [47]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.