In [1]:
import findspark
findspark.init()

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql.types import TimestampType

In [3]:
# Create a SparkSession (Note, the config section is only for Windows!)
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [4]:
input_data = "./data/"
output_data = "./data/output_data/"

Song Data Process
----------------------

In [5]:
# get filepath to song data file
song_data = input_data + "song_data/*/*/*"

In [6]:
# read song data file
df = spark.read.json(song_data + "/*json")

In [7]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").drop_duplicates()
songs_table.head(2)

[Row(song_id='SOGNCJP12A58A80271', title='Do You Finally Need A Friend', artist_id='ARB29H41187B98F0EF', year=1972, duration=342.56934),
 Row(song_id='SOOJPRH12A8C141995', title='Loaded Like A Gun', artist_id='ARBGXIG122988F409D', year=0, duration=173.19138)]

In [9]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet(output_data + "songs/")
#os.listdir(path = output_data + "songs/")

In [10]:
# extract columns to create artists table
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").drop_duplicates()
artists_table.head(2)

[Row(artist_id='ARPBNLO1187FB3D52F', artist_name='Tiny Tim', artist_location='New York, NY', artist_latitude=40.71455, artist_longitude=-74.00712),
 Row(artist_id='ARBEBBY1187B9B43DB', artist_name='Tom Petty', artist_location='Gainesville, FL', artist_latitude=None, artist_longitude=None)]

In [11]:
# write artists table to parquet files
artists_table.write.mode('overwrite').partitionBy("artist_id").parquet(output_data + "artists/")
#os.listdir(path = output_data + "artists/")

Log Data Process
---------------------

In [12]:
# get filepath to log data file
log_data = input_data + "log_data"

In [13]:
# read log data file
df2 = spark.read.json(log_data + "/*json")
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
# filter by actions for song plays
df2 = df2.filter(df2.page == "NextSong")

In [15]:
# extract columns for users table    
users_table = df2.select("userId", "firstName", "lastName", "gender", "level").drop_duplicates()
users_table.head(2)

[Row(userId='98', firstName='Jordyn', lastName='Powell', gender='F', level='free'),
 Row(userId='34', firstName='Evelin', lastName='Ayala', gender='F', level='free')]

In [16]:
# write users table to parquet files
users_table.write.mode('overwrite').partitionBy("userId").parquet(output_data + "users/")
#os.listdir(path = output_data + "users/")

In [17]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(int(x)/1000), TimestampType())

In [18]:
df2 = df2.withColumn("start_time", get_timestamp(df2.ts))
df2.head(2)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time=datetime.datetime(2018, 11, 14, 19, 30, 26, 796000)),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safa

In [19]:
# extract columns to create time table
time_table = df2.select('start_time',
                    hour("start_time").alias('hour'),
                    dayofmonth("start_time").alias('day'),
                    month("start_time").alias('month'), 
                    year("start_time").alias('year'),
                    dayofweek("start_time").alias('weekday') 
                    )
time_table.head(2)

[Row(start_time=datetime.datetime(2018, 11, 14, 19, 30, 26, 796000), hour=19, day=14, month=11, year=2018, weekday=4),
 Row(start_time=datetime.datetime(2018, 11, 14, 19, 41, 21, 796000), hour=19, day=14, month=11, year=2018, weekday=4)]

In [20]:
# write time table to parquet files partitioned by year and month
time_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data + "time_table/")
#os.listdir(path = output_data + "time_table/")

In [21]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + "songs")
song_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [22]:
# Create SQL views for query
song_df.createOrReplaceTempView('song_table')
df2.createOrReplaceTempView('log_table')

In [24]:
# SQL query to join tables to get song_plays
sql_query = '''
        SELECT l.start_time, l.userId, l.level, s.song_id, s.artist_id, l.sessionId, l.location, l.userAgent
        FROM log_table l
        LEFT JOIN song_table s 
        ON l.song = s.title;
    '''
joined_table = spark.sql(sql_query)
#joined_table.show(1)

In [26]:
songplays_table = joined_table.select("start_time",
                                      year("start_time").alias('year'),
                                      month("start_time").alias('month'),
                                      "userId", "level", "song_id", "artist_id", "sessionId", 
                                      "location", "userAgent")
#songplays_table.show(1)

In [27]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data + "song_play/")
#os.listdir(path = output_data + "song_play/")

In [28]:
sp_df = spark.read.parquet(output_data + "song_play")
sp_df.describe()

DataFrame[summary: string, userId: string, level: string, song_id: string, artist_id: string, sessionId: string, location: string, userAgent: string, year: string, month: string]

In [29]:
sp_df.toPandas().describe()

Unnamed: 0,sessionId,year,month
count,6820.0,6820.0,6820.0
mean,599.181818,2018.0,11.0
std,284.953333,0.0,0.0
min,3.0,2018.0,11.0
25%,374.0,2018.0,11.0
50%,605.0,2018.0,11.0
75%,834.0,2018.0,11.0
max,1114.0,2018.0,11.0
