# Data Lake with Spark

Author: Jun Zhu

In [1]:
import os.path as osp
from datetime import datetime

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql import functions as F
from pyspark.sql.types import (
    DoubleType, LongType, StringType, StructField, StructType, TimestampType
)

# sc.install_pypi_package("pandas==0.25.3")
sc.list_packages()

pyspark.__version__

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1617105126782_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1
boto                       2.49.0
jmespath                   0.9.4
lxml                       4.4.2
mysqlclient                1.4.6
nltk                       3.4.5
nose                       1.3.4
numpy                      1.14.5
pip                        21.0.1
py-dateutil                2.2
python36-sagemaker-pyspark 1.2.6
pytz                       2019.3
PyYAML                     3.11
setuptools                 54.2.0
six                        1.13.0
soupsieve                  1.9.5
wheel                      0.36.2
windmill                   1.6

'2.4.4'

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1617105126782_0001,pyspark,idle,Link,Link,✔


In [3]:
# input_data_path = "s3a://udacity-dend/"
input_data_path = "s3a://spark-data-lake-123/"
output_data_path = "s3a://spark-data-lake-123/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession \
    .builder \
    .appName("Sparkify ETL") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
conf = spark.sparkContext.getConf()
print(conf.get('spark.app.name'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Sparkify ETL

## Song data

In [6]:
song_data_path = osp.join(input_data_path, "song_data/*/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
song_schema = StructType([
    StructField("artist_id",StringType(), False),
    StructField("artist_latitude", DoubleType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", DoubleType()),
    StructField("artist_name", StringType()),
    StructField("duration", DoubleType()),
    StructField("num_songs", LongType()),
    StructField("song_id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("year", LongType()),
])

df_song_data = spark.read.json(song_data_path, schema=song_schema)
df_song_data.show(5)

df_song_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|AR4T2IF1187B9ADBB7|       63.96027|<a href="http://b...|        10.22442|          Billy Idol|233.22077|        1|SOVIYJY12AF72A4B00|The Dead Next Doo...|1983|
|AR4T2IF1187B9ADBB7|       63.96027|<a href="http://b...|        10.22442|          Billy Idol|287.92118|        1|SOVYXYL12AF72A3373|Rebel Yell (1999 ...|1983|
|ARQ846I1187B9A7083|           null|                    |            null|Yvonne S. Moriart...|196.04853|        1|SOEPTVC12A67ADD0DA|To Zucchabar ["Gl...|   0|
|AR4T2IF1187B9ADBB7|       63.9602

In [8]:
df_song_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14896

In [9]:
# Extract columns to create songs table.
songs_table = df_song_data.select(['song_id', 'title', 'artist_id', 'year', 'duration']).drop_duplicates()
songs_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SONKVFM12AB01875C9|Amor De Madre/Mi ...|ARL4W6Q1187FB4713C|   0|639.13751|
|SOVMCQA12AF72A3411|Chillin With My B...|ARBS1OC1187FB44CAF|2004|236.48608|
|SOOORVX12A6D4FA612|Gimme A Chance  (...|ARKDZJP1187B98DD0C|2002|230.89587|
|SOHQZIB12A6D4F9FAF|N****_ What's Up ...|ARWAFY51187FB5C4EF|2006|196.85832|
|SOUCTAJ12AB018F4F4|Laat het licht ma...|ARPZTBN12086C1563D|2008|222.06649|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows

In [10]:
# Write songs table to parquet files.
# Note: There are too many entries with "year = 0", it is extremely slow when the files 
#       are partitioned by year.
songs_table.write.mode("overwrite").parquet(osp.join(output_data_path, "songs"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Extract columns to create artists table.
artists_table = df_song_data.selectExpr(
    ['artist_id', 
     'artist_name as name', 
     'artist_location as location', 
     'artist_latitude as latitude', 
     'artist_longitude as longitude']).drop_duplicates()
artists_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARKC83D1187B9AB367|          Mark Lowry|                    |    null|      null|
|ART1XOT1187B99A298|        Billy Squier| Wellesley Hills, MA| 42.3076| -71.27951|
|AREM2UI1187FB3F99D|  Stick To Your Guns|CHINO HILLS, Cali...|33.99604|-117.75801|
|AR6E5S51187B98C1A2|         All Out War|        Newburgh, NY|41.49994| -74.01023|
|AR8QJHN1187FB3616E|Tord Gustavsen_ H...|                    |    null|      null|
+------------------+--------------------+--------------------+--------+----------+
only showing top 5 rows

In [12]:
# Write artists table to parquet files.
artists_table.write.mode("overwrite").parquet(osp.join(output_data_path, "artists"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Log data

In [13]:
log_data_path = osp.join(input_data_path, "log_data/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
log_schema = StructType([
    StructField("artist",StringType()),
    StructField("auth", StringType()),
    StructField("firstName", StringType()),
    StructField("gender", StringType()),
    StructField("itemInSession", LongType()),
    StructField("lastName", StringType()),
    StructField("length", DoubleType()),
    StructField("level", StringType()),
    StructField("location", StringType()),
    StructField("method", StringType()),
    StructField("page", StringType()),
    StructField("registration", DoubleType()),
    StructField("sessionId", LongType()),
    StructField("song", StringType(), False),
    StructField("status", LongType()),
    StructField("ts", LongType(), False),
    StructField("userAgent", StringType()),
    StructField("userId", StringType(), False)
])

df_log_data = spark.read.json(log_data_path, schema=log_schema)
df_log_data.show(5)

df_log_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [15]:
df_log_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8056

In [16]:
df_log_data = df_log_data.filter(col('page') == 'NextSong')
df_log_data.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6820

In [17]:
# Extract columns for users table.
users_table = df_log_data.selectExpr(
    ['userId as user_id', 
     'firstName as first_name', 
     'lastName as last_name', 
     'gender', 
     'level']).drop_duplicates()
users_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows

In [18]:
# Write users table to parquet files.
users_table.write.mode("overwrite").parquet(osp.join(output_data_path, "users"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Create timestamp column from original timestamp column.
get_timestamp = udf(lambda x: datetime.utcfromtimestamp(0.001 * int(x)), TimestampType())

df_log_data = df_log_data.withColumn("start_time", get_timestamp(col("ts")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Create datetime column from original timestamp column.
df_log_data = df_log_data.withColumn("hour", F.hour("start_time")) \
                         .withColumn("day", F.dayofmonth("start_time")) \
                         .withColumn("week", F.weekofyear("start_time")) \
                         .withColumn("month", F.month("start_time")) \
                         .withColumn("year", F.year("start_time")) \
                         .withColumn("weekday", F.dayofweek("start_time"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Extract columns to create time table.
time_table = df_log_data.select(
    ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']).drop_duplicates()
time_table.show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-05 09:55:50.796|9   |5  |45  |11   |2018|2      |
|2018-11-05 17:51:44.796|17  |5  |45  |11   |2018|2      |
|2018-11-13 10:32:40.796|10  |13 |46  |11   |2018|3      |
|2018-11-30 12:34:47.796|12  |30 |48  |11   |2018|6      |
|2018-11-30 14:33:39.796|14  |30 |48  |11   |2018|6      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows

In [22]:
# Write time table to parquet files partitioned by year and month.
# Note: There are indeed only one year (2018) and one month (11)!
time_table.write.partitionBy("year", "month").mode("overwrite").parquet(osp.join(output_data_path, "time"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Read in song data to use for songplays table.
songs_df = spark.read.load(osp.join(output_data_path, "songs"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Extract columns from joined song and log datasets to create songplays table.
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = (
    df_log_data.join(songs_df, songs_df.title == col("song"))
               .selectExpr([
                   "start_time", 
                   "userId as user_id",
                   "level",
                   "song as song_id",
                   "artist_id",
                   "sessionId as session_id",
                   "location",
                   "userAgent as user_agent"])
               .withColumn("songplay_id", F.monotonically_increasing_id())
)

songplays_table.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-----+-------+------------------+----------+--------------------+--------------------+-----------+
|          start_time|user_id|level|song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+--------------------+-------+-----+-------+------------------+----------+--------------------+--------------------+-----------+
|2018-11-15 05:55:...|     80| paid|   Home|ARFJ3CE1187B99612E|       602|Portland-South Po...|"Mozilla/5.0 (Mac...|          0|
|2018-11-15 05:55:...|     80| paid|   Home|ARU9OUL1187FB39BC1|       602|Portland-South Po...|"Mozilla/5.0 (Mac...|          1|
|2018-11-15 05:55:...|     80| paid|   Home|ARNRA801187FB5587A|       602|Portland-South Po...|"Mozilla/5.0 (Mac...|          2|
|2018-11-15 05:55:...|     80| paid|   Home|ARX1P2N1187FB59127|       602|Portland-South Po...|"Mozilla/5.0 (Mac...|          3|
|2018-11-15 06:17:...|     80| paid|  Human|ARRTT611187B99D82B|       602|Portland-South Po...|"M

In [25]:
songplays_table.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1144

In [26]:
# Write songplays table to parquet files.
songplays_table.write.mode("overwrite").parquet(osp.join(output_data_path, "songplays"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…