# Sparkify Data Lake ETL Pipeline for Analytics

## Project Introduction
A music streaming startup, __Sparkify__, has grown its user base and song database even more and wants to move its data warehouse to a data lake. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

I am tasked with building an ETL pipeline that extracts their data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables. This will allow their analytics team to continue finding insights about what songs their users are listening to.

I'll be able to test my database and ETL pipeline by running queries given to you by the analytics team from Sparkify and compare my results with their expected results.

In [16]:
# import libs
import os
import glob
import pandas as pd
from datetime import *

In [17]:
# spark imports and configurations
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * 

In [3]:
spark = SparkSession.builder \
        .master("local") \
        .appName("Spakify-ETL") \
        .getOrCreate()

In [4]:
# import data
logs_data = []
songs_data = []

for root, dir, files in os.walk("./datasets/log_data"):
    files = glob.glob(os.path.join(root, "*.json"))
    
    for f in files :
        logs_data.append(os.path.abspath(f))

for root, dir, files in os.walk("./datasets/song_data"):
    files = glob.glob(os.path.join(root, "*.json"))
    
    for f in files :
        songs_data.append(os.path.abspath(f))

In [5]:
print(len(logs_data))

30


In [6]:
print(len(songs_data))

71


## Process Log Data

In [7]:
log_data = spark.read.json(logs_data[0])

In [8]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [33]:
log_data.head(2)

[Row(artist='Infected Mushroom', auth='Logged In', firstName='Kaylee', gender='F', itemInSession=6, lastName='Summers', length=440.2673, level='free', location='Phoenix-Mesa-Scottsdale, AZ', method='PUT', page='NextSong', registration=1540344794796.0, sessionId=139, song='Becoming Insane', status=200, ts=1541107053796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"', userId='8', datetime=datetime.datetime(2018, 11, 1, 22, 17, 33, 796000), datetimeUDF=datetime.datetime(2018, 11, 1, 21, 17, 33, 796000)),
 Row(artist='Girl Talk', auth='Logged In', firstName='Kaylee', gender='F', itemInSession=8, lastName='Summers', length=160.15628, level='free', location='Phoenix-Mesa-Scottsdale, AZ', method='PUT', page='NextSong', registration=1540344794796.0, sessionId=139, song='Once again', status=200, ts=1541107734796, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153

In [10]:
log_data = log_data.filter(log_data.page == "NextSong")

In [11]:
log_data = log_data.drop_duplicates()

In [12]:
log_data = log_data.na.drop()

In [27]:
# udf to convert millsecond ts to timestamp format
timestamp_udf = F.udf(lambda x : datetime.utcfromtimestamp(int(x) / 1000), TimestampType())

In [28]:
log_data = log_data.withColumn("datetimeUDF", timestamp_udf(F.col("ts")))

In [29]:
# convert Time stamp
datetimeFormat = "MM:dd:yy HH:mm:ss.SSS"
log_data = log_data.withColumn("datetime", F.to_timestamp(F.col("ts") / 1000))

In [30]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- datetimeUDF: timestamp (nullable = true)



In [31]:
log_data.select("datetimeUDF").head(5)

[Row(datetimeUDF=datetime.datetime(2018, 11, 1, 21, 17, 33, 796000)),
 Row(datetimeUDF=datetime.datetime(2018, 11, 1, 21, 28, 54, 796000)),
 Row(datetimeUDF=datetime.datetime(2018, 11, 1, 22, 23, 14, 796000)),
 Row(datetimeUDF=datetime.datetime(2018, 11, 1, 21, 42, 0, 796000)),
 Row(datetimeUDF=datetime.datetime(2018, 11, 1, 21, 11, 13, 796000))]

In [32]:
log_data.select("datetime").head(5)

[Row(datetime=datetime.datetime(2018, 11, 1, 22, 17, 33, 796000)),
 Row(datetime=datetime.datetime(2018, 11, 1, 22, 28, 54, 796000)),
 Row(datetime=datetime.datetime(2018, 11, 1, 23, 23, 14, 796000)),
 Row(datetime=datetime.datetime(2018, 11, 1, 22, 42, 0, 796000)),
 Row(datetime=datetime.datetime(2018, 11, 1, 22, 11, 13, 796000))]

In [35]:
log_data = log_data.withColumn("year", F.year("datetime"))
log_data = log_data.withColumn("month", F.month("datetime"))
log_data = log_data.withColumn("weekofyear", F.weekofyear("datetime"))
log_data = log_data.withColumn("day", F.dayofmonth("datetime"))
log_data = log_data.withColumn("hour", F.hour("datetime"))
log_data = log_data.withColumn("weekday", F.dayofweek("datetime"))

In [38]:
log_data.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,userAgent,userId,datetime,datetimeUDF,year,month,weekofyear,day,hour,weekday
0,Infected Mushroom,Logged In,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-01 22:17:33.796,2018-11-01 21:17:33.796,2018,11,44,1,22,5
1,Girl Talk,Logged In,Kaylee,F,8,Summers,160.15628,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-01 22:28:54.796,2018-11-01 21:28:54.796,2018,11,44,1,22,5
2,Survivor,Logged In,Jayden,M,0,Fox,245.36771,free,"New Orleans-Metairie, LA",PUT,...,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",101,2018-11-01 23:23:14.796,2018-11-01 22:23:14.796,2018,11,44,1,23,5
3,Black Eyed Peas,Logged In,Sylvie,F,0,Cruz,214.93506,free,"Washington-Arlington-Alexandria, DC-VA-MD-WV",PUT,...,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",10,2018-11-01 22:42:00.796,2018-11-01 21:42:00.796,2018,11,44,1,22,5
4,The Mars Volta,Logged In,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,...,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8,2018-11-01 22:11:13.796,2018-11-01 21:11:13.796,2018,11,44,1,22,5


## Process Song files

In [39]:
song_df = spark.read.json(songs_data[0])

In [40]:
song_df.head(5)

[Row(artist_id='ARD7TVE1187B99BFB1', artist_latitude=None, artist_location='California - LA', artist_longitude=None, artist_name='Casual', duration=218.93179, num_songs=1, song_id='SOMZWCG12A8C13C480', title="I Didn't Mean To", year=0)]

In [41]:
song_df = song_df.na.drop()
song_df = song_df.drop_duplicates()

## Selecting Tables and Write to Parquet

In [42]:
song_table = song_df.select("song_id", "title", "artist_id", "year", "duration")

In [51]:
song_table.write.mode("overwrite").partitionBy("year", "song_id").parquet("analytical tables/songs_table") 

In [53]:
artist_table = song_df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")

In [54]:
artist_table.write.parquet("analytical tables/artisttable", mode = "overwrite", partitionBy=["artist_id", "artist_name"])

In [55]:
users_table = log_data.select("userid", "firstName", "lastName", "gender", "level")

In [57]:
users_table.write.parquet("analytical tables/userstable", mode = "overwrite", partitionBy=["userid", "firstName"])

In [58]:
time_table = log_data.select("ts", "datetime", "year", "month", "weekofyear", "day", "weekday", "hour")

In [59]:
time_table.write.parquet("analytical tables/timetable", mode = "overwrite", partitionBy=["year", "month"])

In [63]:
songsplay_table = log_data.join(song_df, log_data.song == song_df.title, how = "inner")\
                .select(F.monotonically_increasing_id().alias("songsplay_id"), F.col("datetime").alias("start_time"), F.col("userid").alias("user_id"),"level","song_id","artist_id", F.col("sessionId").alias("session_id"), "location", F.col("userAgent").alias("user_agent"))

In [64]:
songsplay_table.write.parquet("analytical tables/songsplaytable", mode = "overwrite", partitionBy=["location", "songsplay_id"])