# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [10]:
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
from sql_queries import *
import os
import glob
import math

def process_song_file(cur, filepath):
    """processing the song files and getting data from it and insert it in songs and artists tables"""
    # open song file
    df = pd.read_json(filepath, lines=True)

    # insert song record
    song_data = [
        df.song_id[0],
        df.title[0],
        df.artist_id[0],
        int(df.year[0]),
        float(df.duration[0])
    ]
    cur.execute(song_table_insert, song_data)

    # Insert artist record
    artist_data = [
        df.artist_id[0],
        df.artist_name[0],
        df.artist_location[0],
        None if math.isnan(float(df.artist_latitude[0])) else float(df.artist_latitude[0]),
        None if math.isnan(float(df.artist_longitude[0])) else float(df.artist_longitude[0])
    ]
    cur.execute(artist_table_insert, artist_data)
    

def process_log_file(cur, filepath):
    """processing the log files and getting data from it to insert in time, users and songplays tables"""
    # open log file
    df = pd.read_json(filepath, lines=True)
    
    # filter by NextSong action
    df = df[df['page'] == 'NextSong']

    # convert timestamp column to datetime
    df['TS'] = pd.to_datetime(df['ts'], unit='ms')
    
    # insert time data records
    time_df = pd.DataFrame({
        'TS': df['TS'],
        'Hour': df['TS'].dt.hour,
        'Day': df['TS'].dt.day,
        'WeekOfYear': df['TS'].dt.isocalendar().week,
        'Month': df['TS'].dt.month,
        'Year': df['TS'].dt.year,
        'WeekDay': df['TS'].dt.dayofweek
    }).drop_duplicates(subset=['TS'])

    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']].drop_duplicates(subset=['userId'])
    
    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, list(row))
        
    # insert songplay records
    for index, row in df.iterrows():
        # get songid and artistid from songs and artists tables
        SongIDD = row.song.replace('\'', '\'\'')
        ArtistIDD = row.artist.replace('\'', '\'\'')
        
        Query_artist_song = song_select.format(SongIDD, ArtistIDD)
        cur.execute(Query_artist_song)
        results = cur.fetchone()
        if results:
            songid, artistid = results
        else:
            songid, artistid = None , None
        
        songplay_data = (row.TS, row.userId, row.level, songid, 
                             artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)

                


def process_data(cur, conn, filepath, func):
    """getting all the json files from the filepath and aplly funcs on every fill to fill the tables """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))



    
conn = pyodbc.connect(
'Driver={ODBC Driver 17 for SQL Server};'
'Server=DESKTOP-23VJCM8\SQL2022;'
'Database=DataModellingWithSQL;'
'Trusted_Connection=yes;'
)
cur = conn.cursor()


process_data(cur, conn, filepath='Data/song_data', func=process_song_file)
process_data(cur, conn, filepath='Data/log_data', func=process_log_file)

cur.close()
conn.close()


71 files found in Data/song_data
1/71 files processed.
2/71 files processed.
3/71 files processed.
4/71 files processed.
5/71 files processed.
6/71 files processed.
7/71 files processed.
8/71 files processed.
9/71 files processed.
10/71 files processed.
11/71 files processed.
12/71 files processed.
13/71 files processed.
14/71 files processed.
15/71 files processed.
16/71 files processed.
17/71 files processed.
18/71 files processed.
19/71 files processed.
20/71 files processed.
21/71 files processed.
22/71 files processed.
23/71 files processed.
24/71 files processed.
25/71 files processed.
26/71 files processed.
27/71 files processed.
28/71 files processed.
29/71 files processed.
30/71 files processed.
31/71 files processed.
32/71 files processed.
33/71 files processed.
34/71 files processed.
35/71 files processed.
36/71 files processed.
37/71 files processed.
38/71 files processed.
39/71 files processed.
40/71 files processed.
41/71 files processed.
42/71 files processed.
43/71 file