In [2]:
%load_ext sql

In [3]:
%sql postgresql://student:student@127.0.0.1/sparkifydb

'Connected: student@sparkifydb'

In [4]:
%sql SELECT * FROM songplays LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
5 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
1,2018-11-29 00:00:57.796000,73,paid,,,954,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"""
2,2018-11-29 00:01:30.796000,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""
3,2018-11-29 00:04:01.796000,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""
4,2018-11-29 00:04:55.796000,73,paid,,,954,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.78.2 (KHTML, like Gecko) Version/7.0.6 Safari/537.78.2"""
5,2018-11-29 00:07:13.796000,24,paid,,,984,"Lake Havasu City-Kingman, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"""


In [6]:
%sql SELECT * FROM users LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.


user_id,first_name,last_name,gender,level


In [7]:
%sql SELECT * FROM songs LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.


song_id,title,artist_id,year,duration


In [8]:
%sql SELECT * FROM artists LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.


artist_id,name,location,lattitude,longitude


In [9]:
%sql SELECT * FROM time LIMIT 5;

 * postgresql://student:***@127.0.0.1/sparkifydb
0 rows affected.


start_time,hour,day,week,month,year,weekday


## REMEMBER: Restart this notebook to close connection to `sparkifydb`
Each time you run the cells above, remember to restart this notebook to close the connection to your database. Otherwise, you won't be able to run your code in `create_tables.py`, `etl.py`, or `etl.ipynb` files since you can't make multiple connections to the same database (in this case, sparkifydb).

In [10]:
%sql SELECT * FROM songplays INNER JOIN users ON songplays.user_id = users.user_id WHERE users.level = 'paid' AND users.gender = 'M' LIMIT 1;

 * postgresql://student:***@127.0.0.1/sparkifydb
(psycopg2.ProgrammingError) operator does not exist: integer = character varying
LINE 1: ...M songplays INNER JOIN users ON songplays.user_id = users.us...
                                                             ^
HINT:  No operator matches the given name and argument type(s). You might need to add explicit type casts.
 [SQL: "SELECT * FROM songplays INNER JOIN users ON songplays.user_id = users.user_id WHERE users.level = 'paid' AND users.gender = 'M' LIMIT 1;"]


In [1]:
# %load create_tables.py
import psycopg2
from sql_queries import create_table_queries, drop_table_queries


def create_database():
    # connect to default database
    conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    # create sparkify database with UTF8 encoding
    cur.execute("DROP DATABASE IF EXISTS sparkifydb")
    cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")

    # close connection to default database
    conn.close()    
    
    # connect to sparkify database
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
    cur = conn.cursor()
    
    return cur, conn


def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    cur, conn = create_database()
    
    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

In [1]:
# %load etl.py
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *


def process_song_file(cur, filepath):
    """ 
    Reads song data from JSON file in given filepath into a dataframe, and insert the relevant data into
    songs and artists DB tables       
    """
    # open song file
    df = pd.read_json(filepath, lines = True)

    # insert song record
    song_data = song_data = tuple(df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0].tolist())
    cur.execute(song_table_insert, song_data)
    
    # insert artist record
    artist_data = tuple(df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist())
    cur.execute(artist_table_insert, artist_data)


def process_log_file(cur, filepath):
    """
    Reads log data from JSON file in given filepath into a dataframe, filters the 
    'NextSong' action, and inserts relevant attributes into time, users and songplays DB tables        
    """
    # open log file
    df = pd.read_json(filepath, lines = True)

    # filter by NextSong action
    df = df[df['page'] == 'NextSong']

    # convert timestamp column to datetime
    t = df['ts'] = pd.to_datetime(df['ts'], unit='ms') 
    
    # insert time data records
    time_data = ([df['ts'], 
              df['ts'].dt.hour, 
              df['ts'].dt.day, 
              df['ts'].dt.weekofyear, 
              df['ts'].dt.month,
              df['ts'].dt.year, 
              df['ts'].dt.weekday])
    
    column_labels = (['timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday'])
    time_dict = dict((k,v) for (k,v) in zip(column_labels, time_data))
    time_df = pd.DataFrame.from_dict(time_dict)

    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]

    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)

    # insert songplay records
    for index, row in df.iterrows():
        
        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (pd.to_datetime(row.ts,unit='ms'),row.userId,row.level,songid,artistid,row.sessionId,row.location,row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)


def process_data(cur, conn, filepath, func):
    """
    Locates and iterates over the files located in the filepath and applies specified function.
    func :- user defined function to be applied to each file
    """
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))


def main():
    """
    Connects to sparkifydb DB, creates cursor and passes this to the process_data function
    """
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/song_data', func=process_song_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)

    conn.close()


if __name__ == "__main__":
    main()

72 files found in data/song_data
1/72 files processed.
2/72 files processed.
3/72 files processed.
4/72 files processed.
5/72 files processed.
6/72 files processed.
7/72 files processed.
8/72 files processed.
9/72 files processed.
10/72 files processed.
11/72 files processed.
12/72 files processed.
13/72 files processed.
14/72 files processed.
15/72 files processed.
16/72 files processed.
17/72 files processed.
18/72 files processed.
19/72 files processed.
20/72 files processed.
21/72 files processed.
22/72 files processed.
23/72 files processed.
24/72 files processed.
25/72 files processed.
26/72 files processed.
27/72 files processed.
28/72 files processed.
29/72 files processed.
30/72 files processed.
31/72 files processed.
32/72 files processed.
33/72 files processed.
34/72 files processed.
35/72 files processed.
36/72 files processed.
37/72 files processed.
38/72 files processed.
39/72 files processed.
40/72 files processed.
41/72 files processed.
42/72 files processed.
43/72 file

In [5]:
%sql select * from songplays where artist_id is not NULL;

 * postgresql://student:***@127.0.0.1/sparkifydb
1 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
3225,2018-11-21 21:56:47.796000,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"""
