In [1]:
import psycopg2
from sql_queries import create_db, drop_db
from sql_queries import drop_table_artists, drop_table_songplays, drop_table_songs, drop_table_time, drop_table_users
from sql_queries import create_table_artists, create_table_songplays, create_table_songs, create_table_time, create_table_users

In [2]:
def connect_db(host, user, password):
    try:
        conn = psycopg2.connect('host={} user={} password={}'.format(host, user, password))
        conn.set_session(autocommit=True)
    except psycopg2.Error as e:
        print(e)
    return conn

def disconnect_db(conn):
    try:
        conn.close()
    except psycopg2.Error as e:
        print(e)

def execute_query(curr, query):
    try:
        curr.execute(query)
    except psycopg2.Error as e:
        print(e)

In [3]:
if __name__ == '__main__':
    conn = connect_db(host='localhost', user='postgres', password='docker')
    curr = conn.cursor()
    # create db
    execute_query(curr, drop_db)
    #execute_query(curr, create_db)
    # create table songplays
    execute_query(curr, drop_table_songplays)
    execute_query(curr, create_table_songplays)
    # create table songs
    execute_query(curr, drop_table_songs)
    execute_query(curr, create_table_songs)
    # create table users
    execute_query(curr, drop_table_users)
    execute_query(curr, create_table_users)
    # create table artists
    execute_query(curr, drop_table_artists)
    execute_query(curr, create_table_artists)
    # create table time
    execute_query(curr, drop_table_time)
    execute_query(curr, create_table_time)
    disconnect_db(conn)

In [4]:
import os
import pandas as pd
import psycopg2
from sql_queries import insert_to_songplays, insert_to_users, insert_to_songs, insert_to_artists, insert_to_time
from sql_queries import select_song

In [5]:
DATA_FOLDER = '/home/binhps/de-udacity/data_modeling_with_postgres/MillionSongSample/'
LOG_DATA_FOLDER = DATA_FOLDER + 'log_data/'
SONG_DATA_FOLDER = DATA_FOLDER + 'song_data/'

In [6]:
def connect_db(host, user, password):
    try:
        conn = psycopg2.connect('host={} user={} password={}'.format(host, user, password))
        conn.set_session(autocommit=True)
    except psycopg2.Error as e:
        print(e)
    return conn

def disconnect_db(conn):
    try:
        conn.close()
    except psycopg2.Error as e:
        print(e)

def execute_query(curr, query, values):
    try:
        curr.execute(query, values)
    except psycopg2.Error as e:
        print(e)

# log data
def all_files(_dir):
    for root, dirs, files in os.walk(_dir):
        for file in files:
            path = os.path.join(root, file)
            yield path

In [7]:
def process_song_data(file_df, curr):
    # songs table
    song_data = file_df[['song_id', 'title', 'artist_id', 'year', 'duration']]
    song_data = song_data.values[0]
    execute_query(curr, insert_to_songs, song_data)
    # artists table
    artist_data = file_df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
    artist_data = artist_data.values[0]
    execute_query(curr, insert_to_artists, artist_data)

In [8]:
def process_log_data(file_df, curr):
    df_nextsong = file_df[file_df['page'] == 'NextSong']
    # time table
    datetime = pd.to_datetime(df_nextsong['ts'], unit='ms')
    for _, row in datetime.iteritems():
        execute_query(curr, insert_to_time, 
                      [row, row.hour, row.day, row.dayofweek, row.month, row.year, row.weekday()])
    # users table
    users = df_nextsong[['userId', 'firstName','lastName','gender','level']]
    for _, row in users.iterrows():
        execute_query(curr, insert_to_users, row.values)
    # songplays table
    for _, row in df_nextsong.iterrows():
        curr.execute(select_song, [row.song, row.artist, row.length])
        results = curr.fetchone()
        
        if results:
            song_id, artist_id = results
        else:
            song_id, artist_id = None, None
            
        starttime = pd.to_datetime(row.ts, unit='ms')
        
        songplay_data = (starttime, row.userId, row.level, song_id, artist_id, row.sessionId, row.location, row.userAgent)
        execute_query(curr, insert_to_songplays, songplay_data)

In [9]:
if __name__ == '__main__':
    conn = connect_db(host='localhost', user='postgres', password='docker')
    curr = conn.cursor()
    for file_path in all_files(SONG_DATA_FOLDER):
        file = pd.read_json(file_path, lines=True)
        process_song_data(file, curr)
        
    for file_path in all_files(LOG_DATA_FOLDER):
        file = pd.read_json(file_path, lines=True)
        process_log_data(file, curr)
    disconnect_db(conn)