The purpose of this project is to implement an ETL pipeline to a cloud data lake for a fictitious music-streaming startup called Sparkify that is trying to find insights in what songs their users are listening to. Sparkify has song and event-log datasets stored on S3, at s3://udacity-dend/song_data
and s3://udacity-dend/log_data
respectively. This project implements an ETL pipeline using Python and the PySpark API to read each dataset from S3 into Spark dataframes, transform them using Spark, and write data back into S3 as a set of dimensional tables in a star schema.
The files found in s3://udacity-dend/song_data
are a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID. For example, here are paths to two files in this dataset:
s3://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json
s3://udacity-dend/song_data/A/A/B/TRAABJL12903CDCF1A.json
Below is an example of what a single song file, TRAABJL12903CDCF1A.json, looks like:
{"num_songs": 1, "artist_id": "ARJIE2Y1187B994AB7", "artist_latitude": null, "artist_longitude": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0}
The second dataset found in s3://udacity-dend/log_data
consists of log files in JSON format generated by this event simulator based on the songs in the dataset above. These simulate activity logs from a music streaming app based on specified configurations.
The log files in this dataset are partitioned by year and month. For example, here are filepaths to two files in this dataset:
s3://udacity-dend/log_data/2018/11/2018-11-12-events.json
s3://udacity-dend/log_data/2018/11/2018-11-13-events.json
Below is an example of what the data in a log file, 2018-11-12-events.json, looks like:
The data lake consists of 5 parquet files modeled according to the following star schema:
- songplays - records in log data associated with song plays i.e. records with page
NextSong
- songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
-
users - users in the app
- user_id, first_name, ast_name, gender, level
-
songs - songs in music database
- song_id, title, artist_id, year, duration
-
artists - artists in music database
- artist_id, name, location, latitude, longitude
-
time - timestamps of records in songplays broken down into specific units
- start_time, hour, day, week, month, year, weekday
The songs
parquet file is partioned by year
and artist_id
, and both the songplays
and time
parquet files are partitioned by year
and month
. The first column (ID column) in each table is unique and not null.
For more details on how this data is modeled as it pertains to analytical queries, see this project.
The ETL pipeline begins by extracting data from S3 using s3a
into Spark dataframes with an explicit schema, using commands that include the following:
input_data = "s3a://udacity-dend/"
song_data = input_data + "song_data/*/*/*/*.json"
schema = StructType([
StructField("num_songs", IntegerType()),
StructField("artist_id", StringType()),
StructField("artist_latitude", FloatType()),
StructField("artist_longitude", FloatType()),
StructField("artist_location", StringType()),
StructField("artist_name", StringType()),
StructField("song_id", StringType()),
StructField("title", StringType()),
StructField("duration", FloatType()),
StructField("year", IntegerType())
])
df = spark.read.json(song_data, schema)
Then, data is transformed and loaded back into S3 as parquet files, a columnar file format, using commands that include the following:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration") \
.dropna(subset=["song_id"]).dropDuplicates(subset=["song_id"])
songs_table.write.partitionBy("year", "artist_id").parquet(output_data + "songs.parquet")
All code and functionality for this project is written in the etl.py
python script. The script includes logic to read data from S3, transform and clean it, and load the resulting tables back into S3. The dl.cfg
file is a config file in which one can specify AWS IAM User credentials to access S3 and EMR. The data
directory is included as an example of a subset of the data that exists in S3, compressed in the .zip file format. The data
directory is not used in the pipeline as is implemented in this repo, however this data can be decompressed to view or use if the script is modified to run locally.
To perform the ETL implemented in this project, one must specify AWS IAM User credentials in the dl.cfg
file and run the etl.py
script using the spark-submit --master yarn etl.py
command in the AWS EMR terminal, or run python3 etl.py
in a local terminal.
Finds the most recent songplay event log for each user and extracts user-data columns to write to the users
table:
df = spark.read.json(log_data, schema)
df = df.where(df.page == "NextSong")
w = Window.partitionBy("userId")
users_table = df.dropna(subset=["userId"]) \
.withColumn("userid_occurrence_num", row_number() \
.over(w.orderBy("ts"))) \
.withColumn("max_occurrence_num", max("userid_occurrence_num").over(w)) \
.where(col("userid_occurrence_num") == col("max_occurrence_num")) \
.select("userId", "firstName", "lastName", "gender", "level")
users_table.write.parquet(output_data + "users.parquet")
Extracts datetime data from a millisecond-timestamp to write to the time
table:
get_timestamp = udf(lambda ts: ts / 1000)
df = df.withColumn("epoch_ts", get_timestamp(df.ts))
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts), TimestampType())
df = df.withColumn("datetime", get_datetime(df.epoch_ts))
time_table = df.select("datetime").dropna().dropDuplicates()
time_table = time_table.withColumn("hour", hour(col("datetime"))) \
.withColumn("day", dayofmonth(col("datetime"))) \
.withColumn("week", weekofyear(col("datetime"))) \
.withColumn("month", month(col("datetime"))) \
.withColumn("year", year(col("datetime"))) \
.withColumn("weekday", date_format(col("datetime"), "u"))
time_table.write.partitionBy("year", "month").parquet(output_data + "time.parquet")
Numbers songplay event-log rows chronologically and left-joins them with song metadata on the song title column to write to the songplays
table:
df = df.withColumn("songplay_id", row_number() \
.over(Window.partitionBy("page").orderBy("ts")))
join_condition = [df.song == song_df.title]
songplays_table = df.join(song_df, join_condition, "left").select("songplay_id",
col("datetime").alias("start_time"),
col("userId").alias("user_id"),
"level",
"song_id",
"artist_id",
col("sessionId").alias("session_id"),
"location",
col("userAgent").alias("user_agent")) \
.withColumn("year", year(col("start_time"))) \
.withColumn("month", month(col("start_time")))
songplays_table.write.partitionBy("year", "month").parquet(output_data + "songplays.parquet")