# Data Lake project using Spark and AWS 

A music streaming startup, Sparkify, has grown their user base and song database even more and want to move their data warehouse to a data lake. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

In this project, we build an ETL pipeline that extracts Sparkify data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables. This will allow Sparkify analytics team to continue finding insights in what songs their users are listening to.

In this project, we first review a small data and apply some transformations to it. Once those transformations are validated, we launch an AWS EMR instance and apply our ETL process on the larger dataset available on AWS S3.

## Load Python packages and open a local Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, TimestampType
from pyspark.sql.functions import *

import pandas as pd

In [2]:
spark = SparkSession.builder.getOrCreate()

## Load & check Songs data

In [3]:
dfSongs = spark.read.json("./data/song_data/*/*/*/*.json")

Check the songs_data schema

In [4]:
dfSongs.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Check the number of records and display first 5 records

In [5]:
dfSongs.count()

71

In [6]:
dfSongs.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

## Load & Check Logs data

In [7]:
dfLogs=spark.read.json("./data/*.json")

In [8]:
dfLogs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Check number of records for Log data and display first 5 records

In [9]:
dfLogs.count()

8056

In [10]:
dfLogs.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

## Create and Write Songs table

Extract relevant columns from songs data to create Songs table

In [11]:
#extract relevant columns to create Songs table
songs_table = dfSongs.filter(dfSongs.song_id != '')\
                     .select(['song_id', 'title', 'artist_id', 'year', 'duration']) 
songs_table.show(5)
#write songs table partioned by year and artist to parquet
songs_table.write.partitionBy("year", "artist_id").mode('overwrite').parquet('./data/output/songs/songs_table.parquet')

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



## Create and Write Artists table

Extract relevant columns from songs data to create Artists table

In [12]:
#extract relevant columns to create Artists table
artists_table = dfSongs.filter(dfSongs.artist_id !='') \
                        .select(col("artist_id"),col("artist_name").alias("name"), col("artist_location").alias("location"),
                                 col("artist_longitude").alias("longitude"), col("artist_latitude").alias("latitude"))\
                        .dropDuplicates()
artists_table.show(5)

#write artists table while renaming column headers
artists_table.write.mode('overwrite').parquet('./data/output/artists/artists_table.parquet')

+------------------+---------------+---------------+----------+--------+
|         artist_id|           name|       location| longitude|latitude|
+------------------+---------------+---------------+----------+--------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY| -74.00712|40.71455|
|ARXR32B1187FB57099|            Gob|               |      null|    null|
|AROGWRA122988FEE45|Christos Dantis|               |      null|    null|
|ARBGXIG122988F409D|     Steel Rain|California - SF|-122.42005|37.77916|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)|  -41.9952| -13.442|
+------------------+---------------+---------------+----------+--------+
only showing top 5 rows



## Filter 'NextPage' records in Log data file

In [13]:
dfNextSongLogs=dfLogs.filter(dfLogs.page == 'NextSong')
dfNextSongLogs.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

## Create and Write Users table

In [14]:
#extract relevant columns to create Users table
users_table = dfNextSongLogs.filter(dfNextSongLogs.userId !='') \
                        .select(col("userId").alias("user_id"),col("firstName").alias("first_name"), col("lastName").alias("last_name"),
                                 col("gender"), col("level")) \
                        .dropDuplicates()

users_table.show(20)

#write users table
users_table.write.mode('overwrite').parquet('./data/output/users/users_table.parquet')


+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
|     23|    Morris|  Gilmore|     M| free|
|     75|    Joseph|Gutierrez|     M| free|
|     16|     Rylan|   George|     M| paid|
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|     54|     Kaleb|     Cook|     M| free|
|     79|     James|   Martin|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     77| Magdalene|   Herman|     F| free|
|     47|    Kimber|   Norris|     F| free|
|     30|     Avery|  Watkins|     F| paid|
|     22|      Sean|   Wilson|     F| free|
|      4|    Alivia|  Terrell|     F| free|
|     55|    Martin|  Johnson|     M| free|
|     20|     Aiden|  Ramirez|  

## Create and Write Time table

In [15]:
from pyspark.sql.functions import year, month, dayofyear, hour, dayofweek, weekofyear, date_format
from datetime import datetime

#start_time, hour, day, week, month, year, weekday

# create timestamp column from original timestamp column
get_timestamp = udf(lambda ms: datetime.fromtimestamp(ms/1000.0), TimestampType())
dfNextSongLogs = dfNextSongLogs.withColumn('start_time', get_timestamp('ts'))
    
# extract columns to create time table
time_table = dfNextSongLogs.select('start_time')\
                            .withColumn('hour',hour('start_time')).withColumn('day',dayofmonth('start_time'))\
                            .withColumn('week',weekofyear('start_time')).withColumn('month', month('start_time'))\
                            .withColumn('year', year('start_time')).withColumn('weekday',dayofweek('start_time'))
time_table.show(5)

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").mode('overwrite').parquet('./data/output/time/time_table.parquet')

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



## Create and Write SongPlays table

In [38]:
# extract columns from joined song and log datasets to create songplays table

songplays_table = dfNextSongLogs.join(dfSongs, (dfNextSongLogs.song == dfSongs.title) & (dfNextSongLogs.length == dfSongs.duration), 'left_outer')\
        .select(
            dfNextSongLogs.start_time,
            col("userId").alias('user_id'),
            dfNextSongLogs.level,
            dfSongs.song_id,
            dfSongs.artist_id,
            col("sessionId").alias("session_id"),
            dfNextSongLogs.location,
            col("useragent").alias("user_agent"),
            year('start_time').alias('year'),
            month('start_time').alias('month'))\
        .withColumn("idx", monotonically_increasing_id())

songplays_table=songplays_table.filter("song_id is not null and artist_id is not null")
songplays_table.show(5)

# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").mode('overwrite').parquet('./data/output/songplays/songplays_table.parquet')

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+---+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|idx|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+---+
|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|882|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+---+

