# 0. Prepare working environment

In [287]:
# Import necessary libraries
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pandas as pd
from pyspark.sql.types import *
from datetime import datetime

In [2]:
# Create local spark session
spark = SparkSession.builder\
                    .master("local")\
                    .appName("SparkifyMusic")\
                    .getOrCreate()

In [3]:
# Checking spark session details
spark

In [4]:
# Checking spark session config details
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'SparkifyMusic'),
 ('spark.app.id', 'local-1561176073104'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.1.10'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '39495')]

In [309]:
# Define paths
output_path = "./output_data/"
song_data = "./data/song-data/song_data/*/*/*/*.json"
log_data = "./data/log-data/*.json"

# 1. Process Song Data

## 1.1 Create and store songs table

In [316]:
# Read song data
songs_df = spark.read.json(song_data)
songs_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [317]:
# Redefine schema to enforce the constraints (if needed, part of cleansing step)
# This is based on the needs of specific business requirements
songs_schema = StructType([
    StructField("artist_id", StringType(), False),
    StructField("artist_latitude", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_longitude", DoubleType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("num_songs", LongType(), True),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True)
])

In [318]:
# Reload the songs_df after apply the pre-defined schema struct type
# Any violated rows will be droped during loading
songs_df = spark.read.json(song_data,
                    schema=songs_schema,
                    mode="DROPMALFORMED")

In [319]:
# Check the new schema
songs_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [320]:
# Create temp view for querying
songs_df.createOrReplaceTempView("songs")

In [321]:
# Get songs table
songs_table = spark.sql("""
            SELECT song_id, 
            title,
            artist_id,
            year,
            duration
            FROM songs
            """)
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159
2,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,ARMAC4T1187FB3FA4C,2004,207.77751
3,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert Hall],ARPBNLO1187FB3D52F,2000,43.36281
4,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.162


In [322]:
# Store songs table to disk by partition by year and artist_id
songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet( output_path + "songs")

## 1.2 Create and store artists table

In [323]:
# Get artist table from songs temp view
artists_table = spark.sql("""
                SELECT DISTINCT artist_id,
                artist_name name,
                artist_location location,
                artist_latitude latitude,
                artist_longitude longitude
                FROM songs
                """)
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARBEBBY1187B9B43DB,Tom Petty,"Gainesville, FL",,
2,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
3,ARMBR4Y1187B9990EB,David Martin,California - SF,37.77916,-122.42005
4,ARD0S291187B9B7BF5,Rated R,Ohio,,


In [324]:
# Store artists and save to disks
artists_table.write.mode("overwrite").parquet(output_path + "artists")

# 2. Process log data

## 2.1 Read log data and prepare data to build "users", "time" & "songplays" table

In [325]:
# Read logs and parse to spark dataframe
logs_df = spark.read.json(log_data)
logs_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [327]:
# Filter page of NextSong only
logs_df = logs_df.filter(logs_df["page"] == "NextSong")
logs_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4""",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",80


In [328]:
# Create temp view used for SQL query
logs_df.createOrReplaceTempView("logs")

## 2.2 Create and store users table

In [330]:
# Get user table from logs
users_table = spark.sql("""
                    SELECT DISTINCT userId user_id,
                    firstName first_name,
                    lastName last_name,
                    gender,
                    level
                    FROM logs
                    WHERE TRIM(userId) <> ''
                """)
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,98,Jordyn,Powell,F,free
1,34,Evelin,Ayala,F,free
2,85,Kinsley,Young,F,paid
3,38,Gianna,Jones,F,free
4,85,Kinsley,Young,F,free


In [331]:
# Store user table
users_table.write.mode("overwrite").parquet(output_path + "users")

## 2.3 Create and store time table

In [337]:
# Add in timestamp column for current logs table
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
logs_df = logs_df.withColumn("timestamp", get_timestamp(logs_df.ts))
logs_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,timestamp
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26,2018-11-15 07:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26,2018-11-15 07:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",26,2018-11-15 07:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4""",61,2018-11-15 10:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",80,2018-11-15 12:48:55.796


In [333]:
# Check the schema
logs_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [301]:
# Create temp view for timestamp for query
logs_df.select("timestamp").createOrReplaceTempView("time")

In [334]:
# Get time table from user's event logs
time_table = spark.sql("""
SELECT DISTINCT timestamp start_time,
HOUR(timestamp) hour,
DAYOFMONTH(timestamp) day,
WEEKOFYEAR(timestamp) week,
MONTH(timestamp) month,
YEAR(timestamp) year,
DAYOFWEEK(timestamp) weekday
FROM time
""")
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 13:07:37.796,13,15,46,11,2018,5
1,2018-11-15 19:30:36.796,19,15,46,11,2018,5
2,2018-11-15 21:49:51.796,21,15,46,11,2018,5
3,2018-11-15 23:06:06.796,23,15,46,11,2018,5
4,2018-11-16 04:14:54.796,4,16,46,11,2018,6


In [304]:
# Write time table to disk with partitioning by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_path + "time")

## 2.4 Create and store songplays_table

In [338]:
# Create temp view used for SQL query
logs_df.createOrReplaceTempView("logs")

# Load songs table
songs_df = spark.read.parquet(output_path + "songs")
songs_df.createOrReplaceTempView("songs")

# Load artists table
artists_df = spark.read.parquet(output_path + "artists")
artists_df.createOrReplaceTempView("artists")


In [350]:
# Create songplays table
songplays_table = spark.sql("""
SELECT monotonically_increasing_id() songplay_id,
l.timestamp start_time,
l.userId user_id,
s.song_id,
a.artist_id,
l.sessionId session_id,
l.location,
l.userAgent user_agent,
MONTH(l.timestamp) month,
YEAR(l.timestamp) year
FROM logs l
INNER JOIN songs s ON s.title = l.song AND s.duration = l.length
INNER JOIN artists a ON a.artist_id = s.artist_id AND a.name = l.artist
""")
songplays_table.limit(10).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,song_id,artist_id,session_id,location,user_agent,month,year
0,0,2018-11-22 04:56:47.796,15,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36""",11,2018


In [351]:
# Write songplays table to disk
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(output_path + "songplays")