### ETL Processes

This notebook is used to develop ETL processes for each of the tables before completing the 'etl.py' file to load the whole datasets.

In [80]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [81]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=root123")
cur = conn.cursor()

### Following section is for finding the active connections and terminate them, so you can test and run the code.

In [82]:
# Query to find active connections
cur.execute("""
    SELECT pid, usename, datname, client_addr, state, query
    FROM pg_stat_activity
    WHERE datname = 'sparkifydb';
""")
sessions = cur.fetchall()

# Display active sessions
for session in sessions:
    print(session)


(21604, 'postgres', 'sparkifydb', '127.0.0.1', 'idle', 'COMMIT')
(18608, 'postgres', 'sparkifydb', '127.0.0.1', 'active', "\n    SELECT pid, usename, datname, client_addr, state, query\n    FROM pg_stat_activity\n    WHERE datname = 'sparkifydb';\n")


In [83]:
# Terminate all sessions for the specified database except your own
cur.execute("""
    SELECT pg_terminate_backend(pid)
    FROM pg_stat_activity
    WHERE datname = 'sparkifydb'
    AND pid <> pg_backend_pid();  
""")

conn.commit()
print("Terminated all sessions for the database except the current one.")


Terminated all sessions for the database except the current one.


In [84]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))
    return all_files



### Process song_data

In the first part, you will perform ETL on the first dataset, song_data, to create the songs and artists dimensional tables.

Let's perform ETIL on a single song file to load a single record into each table to start.

- Use the get_files function provided above to get a list of all song JSON files in data/song_data- 
Select the first song in this lis
- 
Read the song file and view the data


In [85]:
song_files = get_files(r"C:\Users\Mihai\OneDrive\Desktop\project_etl\data\song_data")

In [86]:
filepath = song_files[0]

In [87]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


### Songs table - extract data

- Select the song_id, title, artist_id, year, duration column according to the songs table desired structure
- Use df.values to select just the values from the dataframe
- Select just the first record from the dataframe
- Create a list for storing this information as song_data

In [96]:
song_data = list(df[['song_id', 'title', 'artist_id', 'duration']].values[0])
song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 218.93179]

### Songs table - insert data 
Use song_table_insert query that we prviously created in 'sql_queries.py'. Make sure that you run 'create_tables.py' so the songs table will be created in the sparkify database.

In [89]:
cur.execute(song_table_insert, song_data)
conn.commit()

Check if the record was successfully added by running 'test.ipynb'

### Artists table - extract data
  - Select artist ID, name, location, latitude and longitude
  - Use df.values to select just the values from the dataframe
  - Select just the first record from the dataframe
  - Create a list storing this information as artist_data

In [97]:
artist_data = list(df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0])
artist_data

['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan]

### Artist table - insert data
Use artist_table query that we previously created in 'sql_queries.py'. Make sure that you run 'create_tables.py' so the artists table will be created in the sparkify database.

In [95]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

### Process log_data
In this section, you will carry out ETL (Extract, Transform, Load) processes on the log_data dataset to populate the time and users dimensional tables, as well as the songplays fact table.

Steps:
ETL on a Single Log File:

Begin by processing one log file to load a single record into each of the relevant tables.
Retrieve Log Files:

Utilize the get_files function provided earlier to generate a list of all JSON files in the data/log_data directory.
Select a Log File:

Choose the first log file from the list obtained.
Read and Inspect the Data:

Read the selected log file and examine the data it contains.

In [14]:
log_files = get_files(r"C:\Users\Mihai\OneDrive\Desktop\project_etl\data\log_data")

In [15]:
filepath = log_files[0]

In [16]:
df = pd.read_json(filepath, lines = True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


### Time table - extract data
- Filter the records: Focus on records where the action is "NextSong".
- Convert the timestamp: Transform the ts column from its current format in milliseconds to a datetime format.
    Tip: Remember, the timestamp is currently in milliseconds.
- Extract specific time components: From the ts column, extract and store the timestamp, hour, day, week of the year, month, year, and weekday into a list named time_data, in that sequence.
    Hint:
  Use the dt attribute in pandas to easily access these datetime properties.
- Assign labels: Create labels for these extracted components and store them in column_labels.
- Create a DataFrame: Combine column_labels and time_data into a dictionary, then convert it into a DataFrame called time_df. This DataFrame will contain the time-related data for this file.

In [17]:
df = df[df['page'] == 'NextSong']
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
5,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
6,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Eriatarka,200,1541106673796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
7,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Becoming Insane,200,1541107053796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [18]:
t = pd.to_datetime(df['ts'])
t.head()

2   1970-01-01 00:25:41.106106796
4   1970-01-01 00:25:41.106352796
5   1970-01-01 00:25:41.106496796
6   1970-01-01 00:25:41.106673796
7   1970-01-01 00:25:41.107053796
Name: ts, dtype: datetime64[ns]

In [19]:
time_data = [(tt.value, tt.hour, tt.day, tt.week, tt.month, tt.year, tt.weekday()) for tt in t]
column_labels = ('timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [20]:
time_df = pd.DataFrame(data=time_data, columns=column_labels)
time_df.head()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,1541106106796,0,1,1,1,1970,3
1,1541106352796,0,1,1,1,1970,3
2,1541106496796,0,1,1,1,1970,3
3,1541106673796,0,1,1,1,1970,3
4,1541107053796,0,1,1,1,1970,3


### Time tables - insert data
Use time_table_insert query that we previously created in 'sql_queries.py'. Make sure that you run 'create_tables.py' so the time table will be created in the sparkify database.


In [21]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

Check if the record was successfully added by running 'test.ipynb'

### Users table - extract data
 - Select columns for user ID, first name, last name, gender and level. Assign them to user_df

In [22]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
2,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free
6,8,Kaylee,Summers,F,free
7,8,Kaylee,Summers,F,free


### Users table - insert data
Use song_table_insert query that we prviously created in 'sql_queries.py'. Make sure that you run 'create_tables.py' so the users table will be created in the sparkify database.

In [24]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, list(row))
    conn.commit()

Check if the record was successfully added by running 'test.ipynb'

### Songplays table - extract & insert data

- Songplays table contains information from all the previous tables (songs table, artists table, original log file). In the log file the ID is not specified for oither song or artist, we need to get the song ID and artist ID by using the joining tables songs and artists based on the title, artist name and duration of the song.

- Select the timestamp, user ID, level, song ID, artist ID, session ID, location and user agent. Assign them to songplay_data


Use time_table_insert query that we previously created in 'sql_queries.py'. Make sure that you run 'create_tables.py' so the songplays table will be created in the sparkify database.

In [27]:
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    songplay_data = (row['ts'], row['userId'], row['level'], songid, artistid, row['sessionId'],
                     row['location'], row['userAgent'])
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()


### Close connection to sparkify database

In [79]:
conn.close()

### Implement 'etl.py'

Use what you have completed in this notebook to implement 'etl.py'