
# Data Modeling with Python: ETL Processes

In this notebook, we perform data modeling using Python, utilizing libraries such as `pandas` and `numpy`. We used 'pyodbc' to interact with a Microsoft SQL database. and execute predefined SQL queries.


Required Libraries

In [7]:
import os
import glob
import pyodbc
import pandas as pd
import numpy as np

from queries import *
from dotenv import load_dotenv

 Load environment variables from .env file

In [8]:
load_dotenv()

True

In this code snippet, we establish a connection to a SQL Server database using Python's `pyodbc` library. The connection details are retrieved from environment variables:

In [9]:
server = os.getenv("SERVER")
database = os.getenv("DATABASE")
username = os.getenv("UID")
password = os.getenv("PASSWORD")
connectionString = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
conn = pyodbc.connect(connectionString) 
cur = conn.cursor() 

This function takes a file path as input and returns a list of absolute file paths for all JSON files found in the specified directory and its subdirectories.

Parameters:
- `filepath`: The base directory path where the search for JSON files will start.

In [10]:
def get_files(filepath):
    all_files = glob.glob(os.path.join(filepath, '**/*.json'), recursive=True)
    return [os.path.abspath(f) for f in all_files]

## Process song_data

In this part, we will create the `songs` and `artists` dimensional tables.

Initiate the Extract, Transform, Load (ETL) process by working with a single song file and inserting a single record into each corresponding table. Follow these steps:

- Employ the get_files function, as detailed above, to retrieve a list of all song JSON files located in the data/song_data directory.
- Choose the initial song from this list.
- Read the contents of the selected song file and examine the data

In [17]:
song_files = get_files("data/song_data")
filepath = song_files[0]
df = pd.read_json(filepath, lines=True) 
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


## #1: `songs` Table
#### ETL Data for Songs Table

We are extracting specific information from the DataFrame `df`. The `song_data` variable contains details such as the song ID, title, artist ID, year, and duration for the first row in the DataFrame.


In [12]:
song_data = df[["song_id", "title", "artist_id", "year", "duration"]].values[0].tolist()
song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179]

#### Insert Record into Song Table

Execute an SQL statement to insert data into the `song` table and commits the changes to the database.

In [13]:
cur.execute(song_table_insert, song_data)
conn.commit()

## #2: `artists` Table
#### ETL Data for Artists Table

We are extracting relevant information about the artist from the DataFrame `df`. The selected fields include the artist's ID, name, location, latitude, and longitude. The data is then converted to a list and any null values are replaced with `None`.


In [14]:
artist_data = df[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]].values[0].tolist()
artist_data = [None if pd.isnull(value) else value for value in artist_data]

Inserting Data into the artist Table

In [15]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

## Process `log_data`

Use the `get_files` function to retrieve log files from the "data/log_data" directory. 


In [18]:
log_files = get_files("data/log_data")

We are reading JSON data from the file located at `filepath`. The data is loaded into a Pandas DataFrame (`df`) using the `pd.read_json` function with the `lines=True` parameter. 

Let's take a quick look at the first few rows of the DataFrame using `df.head()`:

In [19]:
filepath = log_files[0]
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


## #3: `time` Table
#### ETL Data for Time Table

Filter the DataFrame `df` to include only rows where the "page" column is equal to 'NextSong'

In [20]:
df = df.query("page == 'NextSong'")
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
5,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
6,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Eriatarka,200,1541106673796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
7,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Becoming Insane,200,1541107053796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


We are converting the 'ts' column in the DataFrame `df` from milliseconds to a datetime format. This is done using the `pd.to_datetime` function.

In [21]:
t = pd.to_datetime(df['ts'], unit='ms')
df['ts'] = pd.to_datetime(df['ts'], unit='ms')
df['ts'].head()

2   2018-11-01 21:01:46.796
4   2018-11-01 21:05:52.796
5   2018-11-01 21:08:16.796
6   2018-11-01 21:11:13.796
7   2018-11-01 21:17:33.796
Name: ts, dtype: datetime64[ns]

We are creating a DataFrame (`time_df`) to extract time-related information from a datetime column (`t`). The information includes the `start time`, `hour`, `day`, `week`, `month`, `year`, and `weekday`.


In [22]:
time_data = list((t, t.dt.hour, t.dt.day,  t.dt.isocalendar().week, t.dt.month, t.dt.year, t.dt.weekday))
column_labels = list(('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'))
time_df =  pd.DataFrame.from_dict(dict(zip(column_labels, time_data)))
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
2,2018-11-01 21:01:46.796,21,1,44,11,2018,3
4,2018-11-01 21:05:52.796,21,1,44,11,2018,3
5,2018-11-01 21:08:16.796,21,1,44,11,2018,3
6,2018-11-01 21:11:13.796,21,1,44,11,2018,3
7,2018-11-01 21:17:33.796,21,1,44,11,2018,3


Iterate through each row of the `time_df` DataFrame and inserts the values into a database using the `cur.execute` method. This is part of the process of populating the time table in the database.


In [23]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

## #4: `users` Table
#### ETL Data for Users Table

Extract specific user information from the DataFrame `df`. The resulting DataFrame `user_df` includes columns for `user ID`, `first name`, `last name`, `gender`, and `level`.


In [24]:
user_df = df[["userId", "firstName", "lastName", "gender", "level"]]

Iterates over each row in the `user_df` DataFrame and inserts the user information into the database.

In [25]:
for i, row in user_df.iterrows():
    values = (row['userId'], row['firstName'], row['lastName'], row['gender'], row['level'])
    cur.execute(user_table_insert, values)
    conn.commit()

## #5: `songplays` Table
#### ETL Data and Songplays Table

This code iterates through rows in a DataFrame (`df`) and inserts songplay records into a database. Here's a breakdown of the process:

1. For each row in the DataFrame:
    - Extract song information (song, artist, length).
    - Execute a SQL query (`song_select`) to retrieve song and artist IDs from the database.
    - If the query returns results, assign song and artist IDs; otherwise, set them to None.

2. Prepare songplay data using the obtained IDs and other relevant information.

3. Execute a SQL query (`songplay_table_insert`) to insert the songplay record into the database.

4. Commit the changes to the database.

In [26]:
for index, row in df.iterrows():
        
    # get songid and artistid from song and artist tables
    values = (row['song'], row['artist'], row['length'])
    cur.execute(song_select, values)
    results = cur.fetchone()
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    songplay_data = (index, row.ts, row.userId, row.level, songid, artistid, row.sessionId,\
                 row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

In [27]:
conn.close()