In [1]:
import os
import glob
import psycopg2 as pg 
import pandas as pd 
from queries import *
import numpy as np

In [2]:
def process_log_file(cur, log_filepath):
    # all_files = []
    # for root, dirs, files in os.walk(filepath):
    #     files = glob.glob(os.path.join(root, '*.json'))
    #     for f in files:
    #         all_files.append(os.path.abspath(f))

    # num_files = len(all_files)
    # # print('{} files found in {}'.format(num_files, filepath))
    # for file_path in all_files: 
    
    # loading data into 
    log_df = pd.read_json(log_filepath, lines=True)
    
    # create time dataframe and filter by next song
    time_df = log_df[log_df['page']=='NextSong']
    time = pd.to_datetime(time_df['ts'], unit='ms')

    time_data = []

    for timestamp in time:
        time_data.append([timestamp, timestamp.hour, timestamp.day, timestamp.week, timestamp.month, timestamp.year, timestamp.day_name()])

    df_time = pd.DataFrame.from_records(time_data, columns=('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'))

    for i, row in df_time.iterrows():
        cur.execute(time_table_insert, list(row))
    
    log_df.replace('', np.nan, inplace=True)
    log_df.dropna(subset=['userId'], inplace=True)

    user_df = log_df[['userId', 'firstName', 'lastName', 'gender', 'level']]

    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)

    for index, row in log_df.iterrows():
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()

        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None
        
        songplay_data = (index, pd.to_datetime(row.ts, unit='ms'), int(row.userId), row.level, songid, artistid, row.sessionId, row.location, row.userAgent)

        cur.execute(songplay_table_insert, songplay_data)

        

In [3]:
def process_data(cur, conn, filepath, func):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            all_files.append(os.path.abspath(f))

    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    for i, data_file in enumerate(all_files, 1):
        func(cur, data_file)
        conn.commit()
        print("{} / {} files processed".format(i, num_files))

    return num_files

In [4]:
conn = pg.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

data = process_data(cur, conn, filepath='data/log_data', func=process_log_file)

30 files found in data/log_data
1 / 30 files processed
2 / 30 files processed
3 / 30 files processed
4 / 30 files processed
5 / 30 files processed
6 / 30 files processed
7 / 30 files processed
8 / 30 files processed
9 / 30 files processed
10 / 30 files processed
11 / 30 files processed
12 / 30 files processed
13 / 30 files processed
14 / 30 files processed
15 / 30 files processed
16 / 30 files processed
17 / 30 files processed
18 / 30 files processed
19 / 30 files processed
20 / 30 files processed
21 / 30 files processed
22 / 30 files processed
23 / 30 files processed
24 / 30 files processed
25 / 30 files processed
26 / 30 files processed
27 / 30 files processed
28 / 30 files processed
29 / 30 files processed
30 / 30 files processed


In [5]:
cur.close()
cur.closed

True

In [6]:
data

30