In [1]:
import pandas as pd 
import os
import glob
import psycopg2
from sql_queries import song_table_insert, artist_table_insert, user_table_insert, time_table_insert, songplay_table_insert, song_select
from dotenv import load_dotenv
load_dotenv()

True

## Main Functions 

In [2]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [3]:
def insert_into_tables(query_string, query_value, conn, cur): 
    try: 
        cur.execute(query_string, query_value)
    except Exception as e: 
        print('INSERT ERROR: ')
        print(e)

In [4]:
def connect_to_database(host, dbname, user, password): 
    try: 
        conn = psycopg2.connect(f"host={host} dbname={dbname} user={user} password={password}") 
        conn.set_session(autocommit=True) 
        cur = conn.cursor()
        return conn, cur 
    except Exception as e: 
        print('DB Connection Error: ')
        print(e)

## Connect to Database

In [None]:
conn, cur = connect_to_database('127.0.0.1', 'sparkifydb', os.environ["POSTGRES_USERNAME"], os.environ["POSTGRES_PASSWORD"])

In [21]:
ROOT_PATH = r'C:\Users\Youssef\Documents\Data Engineer\Lesson 1\Project1\data'

## Songs & Artist ETLs 

In [22]:
songs_path = ROOT_PATH + r'\song_data'
songs_files = get_files(songs_path)

C:\Users\Youssef\Documents\Data Engineer\Lesson 1\Project1\data\song_data


In [6]:
def song_data_etl(song_files, conn, cur): 
    for song in songs_files: 
        song_df = pd.read_json(song, lines=True)
        song_values = song_df.loc[:, ['song_id', 'title', 'artist_id', 'year', 'duration']].values[0].tolist()
        artist_values = song_df.loc[:, ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist()
        insert_into_tables(song_table_insert, song_values, conn, cur) 
        insert_into_tables(artist_table_insert, artist_values, conn, cur)
    print('Done Inserting into songs & artists tables')

In [8]:
song_data_etl(songs_files, conn, cur)

Done Inserting into songs & artists tables


## Time, Users & Songplays ETLs

In [15]:
def user_insert(row):
    insert_into_tables(user_table_insert, row.tolist(), conn, cur)
    
def time_insert(row): 
    insert_into_tables(time_table_insert, row.tolist(), conn, cur)

In [9]:
def build_time_df(log_df): 
    time_df = pd.DataFrame()
    log_df['ts'] = pd.to_datetime(log_df['ts'], unit='ms') 
    time_df['start_time'] = pd.to_datetime(log_df['ts'], unit='ms') 
    time_df['hour'] = time_df['start_time'].dt.hour
    time_df['day'] = time_df['start_time'].dt.day
    time_df['week'] = time_df['start_time'].dt.isocalendar().week
    time_df['month'] = time_df['start_time'].dt.month
    time_df['year'] = time_df['start_time'].dt.year
    time_df['weekday'] = time_df['start_time'].dt.weekday
    
    # Insert Into time Table 
    time_df.apply(time_insert, axis=1)
    return True

In [10]:
def build_user_table(log_df): 
    users_df = log_df.loc[:, ['userId', 'firstName', 'lastName', 'gender', 'level']]
    users_df.apply(user_insert, axis=1)
    return True

In [17]:
def build_songplays_table(row): 
    cur.execute(song_select, (row['song'], row['artist'], row['length']))
    result = cur.fetchone()
    if result: 
        songId, artistId = result 
    else: 
        songId, artistId = None, None 
        
    songplays_values = [row['ts'], row['userId'], row['level'], songId, artistId, row['sessionId'], row['location'], row['userAgent']]
    insert_into_tables(songplay_table_insert, songplays_values, conn, cur)
    
    return True 

In [23]:
log_path = ROOT_PATH + r'\log_data'
log_files = get_files(log_path)

In [13]:
def log_data_etl(log_files, conn, cur): 
    for log in log_files: 
        log_df = pd.read_json(log, lines=True)
        log_df = log_df.loc[log_df['page'] == 'NextSong']
        build_time_df(log_df)
        build_user_table(log_df)
        log_df.apply(build_songplays_table, axis=1)
    print("Done Inserting into time, users & songplays tables")

In [18]:
log_data_etl(log_files, conn, cur)

Done Inserting into time, users & songplays tables


In [19]:
conn.close()