# Spark ETL Pipeline on AWS EMR 

## 1. Import Libraries

In [1]:
# spark libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when
from pyspark.sql.functions import concat, concat_ws, from_unixtime, substring, to_date
from pyspark.sql.functions import dayofmonth, dayofweek, hour, month, weekofyear, year
# config libs
from ..etl.config import Config

## 2. Create Config and Spark Session

#### 2.1 Config

In [None]:
config = Config(local=True)

#### 2.2 Spark Session

In [None]:
# @formatter:off
spark = SparkSession.builder\
    .appName(config.get('SPARK', 'NOTEBOOK_NAME'))\
    .config('spark.jars.packages', config.get('SPARK', 'HADOOP_JAR'))\
    .getOrCreate()
# @formatter:on

## 3. Define Data Storage Folders and Repartition Function

#### 3.1 Define Data Storage Folders

In [3]:
s3_landing = config.get('S3', 'LANDING')
s3_bronze = config.get('S3', 'BRONZE')
s3_silver = config.get('S3', 'SILVER')

#### 3.2 Define Repartition Function

In [4]:
def repartition_data(source_path, target_path):
    dataframe = spark.read.json(source_path).repartition(1)
    dataframe.write.mode('overwrite').json(target_path)
    return spark.read.json(target_path)

#### 3.3 Repartition and Print the Number of Songs

In [5]:
source = config.get('FILES', 'SONGS_LANDING')
target = config.get('FILES', 'SONGS_SILVER')
songs = repartition_data(f'${s3_landing}/${source}', f'${s3_bronze}/${target}')
songs_count = songs.count()

print(f'Songs count: {songs_count}')



Songs count: 14896


                                                                                

#### 3.4 Repartition and Print the Number of Logs

In [6]:
source = config.get('FILES', 'LOGS_LANDING')
target = config.get('FILES', 'LOGS_SILVER')
logs = repartition_data(f'${s3_landing}/${source}', f'${s3_bronze}/${target}')
logs_count = logs.count()

print(f'Logs count: {logs_count}')

Logs count: 8056


## 4. Create and Export Songs Table

#### 4.1 Songs Table Columns & Schema

In [7]:
songs_table_columns = ['song_id', 'title', 'year', 'duration', 'artist_id']

songs_table = songs.select(songs_table_columns)
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)



In [8]:
songs_table.show(5)



+------------------+--------------------+----+---------+------------------+
|           song_id|               title|year| duration|         artist_id|
+------------------+--------------------+----+---------+------------------+
|SOVIYJY12AF72A4B00|The Dead Next Doo...|1983|233.22077|AR4T2IF1187B9ADBB7|
|SOVYXYL12AF72A3373|Rebel Yell (1999 ...|1983|287.92118|AR4T2IF1187B9ADBB7|
|SOEPTVC12A67ADD0DA|To Zucchabar ["Gl...|   0|196.04853|ARQ846I1187B9A7083|
|SOLQYSZ12AB0181F97|    Mony Mony (Live)|1987|247.53587|AR4T2IF1187B9ADBB7|
|SOVPFJK12A6701CB16|Barcelona - (Frie...|2000|273.44934|AR3TZ691187FB3DBB1|
+------------------+--------------------+----+---------+------------------+
only showing top 5 rows



                                                                                

#### 4.2 Count Null Values

In [9]:
nulls_filter = [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in songs_table_columns]
songs_table.select(nulls_filter).show()



+-------+-----+----+--------+---------+
|song_id|title|year|duration|artist_id|
+-------+-----+----+--------+---------+
|      0|    0|   0|       0|        0|
+-------+-----+----+--------+---------+



                                                                                

#### 4.3 Check Duplicates

In [10]:
unique_songs_count = songs_table.dropDuplicates(['song_id']).count()
print(f'Songs duplicates: {songs_count - unique_songs_count}')



Songs duplicates: 0


                                                                                

#### 4.4 Export to Songs Parquet Table

In [11]:
table = config.get('FILES', 'SONGS_SILVER')
songs_table.drop_duplicates(['song_id']).write.mode('overwrite').parquet(f'${s3_silver}/${table}')

                                                                                

## 5. Create and Export Artists Table

#### 5.1 Artists Table Columns & Schema

In [12]:
artists_json_columns = ['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
artists_table_columns = ['artist_id', 'name', 'location', 'latitude', 'longitude']

artists_table = songs.select(artists_json_columns).toDF(*artists_table_columns)
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [13]:
artists_table.show(5)



+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR4T2IF1187B9ADBB7|          Billy Idol|<a href="http://b...|63.96027| 10.22442|
|AR4T2IF1187B9ADBB7|          Billy Idol|<a href="http://b...|63.96027| 10.22442|
|ARQ846I1187B9A7083|Yvonne S. Moriart...|                    |    null|     null|
|AR4T2IF1187B9ADBB7|          Billy Idol|<a href="http://b...|63.96027| 10.22442|
|AR3TZ691187FB3DBB1|Russell Watson / ...|                    |    null|     null|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



                                                                                

#### 5.2 Count Null Values

In [14]:
nulls_filter = [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in artists_table_columns]
artists_table.select(nulls_filter).show()



+---------+----+--------+--------+---------+
|artist_id|name|location|latitude|longitude|
+---------+----+--------+--------+---------+
|        0|   0|       1|    9619|     9619|
+---------+----+--------+--------+---------+



                                                                                

#### 5.3 Check Duplicates

In [15]:
unique_artists_count = artists_table.dropDuplicates(['artist_id']).count()
print(f'Artists duplicates: {songs_count - unique_artists_count}')



Artists duplicates: 5343


                                                                                

#### 5.4 Export to Artists Parquet Table

In [16]:
table = config.get('FILES', 'ARTISTS_SILVER')
artists_table.drop_duplicates(['artist_id']).write.mode('overwrite').parquet(f'${s3_silver}/${table}')

                                                                                

## 6. Filter Logs and Update Count

In [17]:
logs = logs.where(logs.page == 'NextSong')

logs_count = logs.count()
print(f'Logs count: {logs_count}')

Logs count: 6820


## 7. Create and Export Users Table

#### 7.1 Users Table Columns & Schema

In [18]:
users_json_columns = ['userId', 'firstName', 'lastName', 'gender', 'level']
users_table_columns = ['user_id', 'first_name', 'last_name', 'gender', 'level']

users_table = logs.select(users_json_columns).toDF(*users_table_columns)
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [19]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



#### 7.2 Count Null Values

In [20]:
nulls_filter = [count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in users_table_columns]
users_table.select(nulls_filter).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      0|         0|        0|     0|    0|
+-------+----------+---------+------+-----+



#### 7.3 Check Duplicates

In [21]:
unique_users_count = users_table.dropDuplicates(['user_id']).count()
print(f'Users duplicates: {logs_count - unique_users_count}')

Users duplicates: 6724


#### 7.4 Export to Users Parquet Table

In [22]:
table = config.get('FILES', 'USERS_SILVER')
users_table.drop_duplicates(['user_id']).write.mode('overwrite').parquet(f'${s3_silver}/${table}')

## 8. Create and Export Time Table

#### 8.1 Time Table Columns & Schema

In [23]:
time_table_columns = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']

# @formatter:off
time_table = logs.select(['ts'])\
    .withColumn('start_time',
        concat_ws(".",
                  from_unixtime((col('ts') / 1000), 'yyyy-MM-dd HH:mm:ss'),
                  substring(col('ts'), -3, 3)))\
    .withColumn('date', to_date('start_time'))\
    .withColumn('hour',hour('date'))\
    .withColumn('day', dayofmonth('date'))\
    .withColumn('week', weekofyear('date'))\
    .withColumn('month', month('date'))\
    .withColumn('year', year('date'))\
    .withColumn('weekday', dayofweek('date'))\
    .select(time_table_columns)
# @formatter:on

time_table.printSchema()

root
 |-- start_time: string (nullable = false)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [24]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-14 22:30:...|   0| 14|  46|   11|2018|      4|
|2018-11-14 22:41:...|   0| 14|  46|   11|2018|      4|
|2018-11-14 22:45:...|   0| 14|  46|   11|2018|      4|
|2018-11-15 01:44:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:48:...|   0| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



#### 8.2 Count Null Values

In [25]:
nulls_filter = [count(when(col(c).isNull(), c)).alias(c) for c in time_table_columns]
time_table.select(nulls_filter).show()

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|         0|   0|  0|   0|    0|   0|      0|
+----------+----+---+----+-----+----+-------+



#### 8.3 Check Duplicates

In [26]:
unique_time_count = time_table.dropDuplicates(['start_time']).count()
print(f'Time duplicates: {logs_count - unique_time_count}')

Time duplicates: 7


#### 8.4 Export to Time Parquet Table

In [27]:
table = config.get('FILES', 'TIME_SILVER')
time_table.drop_duplicates(['start_time']).write.mode('overwrite').parquet(f'${s3_silver}/${table}')

## 9. Create and Export Songplays Table

#### 9.1 Songplays Table Columns & Schema

In [28]:
songplays_json_columns = ['level', 'location', 'userAgent', 'sessionId', 'itemInSession',
                          'userId', 'ts', 'song', 'artist', 'length']
songplays_table_columns = ['songplay_id', 'level', 'location', 'user_agent', 'session_id',
                           'user_id', 'song_id', 'artist_id', 'start_time']

# @formatter:off
songplays_table = logs.select(songplays_json_columns).alias('sp')\
    .join(songs.alias('so'), [
        (col('sp.song') == col('so.title')) &
        (col('sp.artist') == col('so.artist_name')) &
        (col('sp.length') == col('so.duration'))])\
    .withColumn('songplay_id',
        concat(col('userId'), col('sessionId').cast('string'), col('itemInSession').cast('string')))\
    .withColumn('start_time',
        concat_ws(".", from_unixtime((col('ts') / 1000), 'yyyy-MM-dd HH:mm:ss'), substring(col('ts'), -3, 3)))\
    .withColumnRenamed('userAgent', 'user_agent')\
    .withColumnRenamed('sessionId', 'session_id')\
    .withColumnRenamed('userId', 'user_id')\
    .select(songplays_table_columns)
# @formatter:on

songplays_table.printSchema()

root
 |-- songplay_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- start_time: string (nullable = false)



In [29]:
songplays_table.show(5)



+-----------+-----+--------------------+--------------------+----------+-------+------------------+------------------+--------------------+
|songplay_id|level|            location|          user_agent|session_id|user_id|           song_id|         artist_id|          start_time|
+-----------+-----+--------------------+--------------------+----------+-------+------------------+------------------+--------------------+
|    8874413| paid|Sacramento--Rosev...|"Mozilla/5.0 (Mac...|       744|     88|SOCHPTV12A6BD53113|ARN8NCB1187FB49652|2018-11-21 06:25:...|
|   49104147| paid|San Francisco-Oak...|Mozilla/5.0 (Wind...|      1041|     49|SOGXSWA12A6D4FBC99|ARPFHN61187FB575F6|2018-11-29 14:58:...|
|    5888720| paid|Augusta-Richmond ...|"Mozilla/5.0 (Win...|       887|     58|SOJWCWM12A8C13B664|ARM6T8I1187FB36CC8|2018-11-28 06:18:...|
|    2498481| paid|Lake Havasu City-...|"Mozilla/5.0 (Win...|       984|     24|SOHRHCN12AB018B0F4|ARHQBRZ1187FB3BDA2|2018-11-28 21:34:...|
|    8890049| paid|S

                                                                                

#### 9.2 Count Null Values

In [30]:
nulls_filter = [count(when(col(c).isNull(), c)).alias(c) for c in songplays_table_columns]
songplays_table.select(nulls_filter).show()



+-----------+-----+--------+----------+----------+-------+-------+---------+----------+
|songplay_id|level|location|user_agent|session_id|user_id|song_id|artist_id|start_time|
+-----------+-----+--------+----------+----------+-------+-------+---------+----------+
|          0|    0|       0|         0|         0|      0|      0|        0|         0|
+-----------+-----+--------+----------+----------+-------+-------+---------+----------+



                                                                                

#### 9.3 Export to Songplays Parquet Table

In [31]:
table = config.get('FILES', 'SONGPLAYS_SILVER')
songplays_table.write.mode('overwrite').parquet(f'${s3_silver}/${table}')

                                                                                

## 10. Stop Spark Session

In [32]:
spark.stop()