# **<font color='royalblue' face="Trebuchet MS" size="6" >Project: Data Lake - Testing Work**</font>

In [1]:
##########################
#### Import Libraries ####
##########################
from datetime import datetime
import os, time
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql.types import TimestampType, DateType, IntegerType
from pyspark.sql.functions import monotonically_increasing_id

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1653931796932_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
##############################
#### Create Spark Session ####
##############################
def create_spark_session():
    spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
    return spark

spark = create_spark_session()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# **<font color='royalblue' face="Trebuchet MS" size="5" >1. Setup Function : `process_song_data`**</font>

In [3]:
# Set file Path
input_data = "s3://udacity-dend/"
input_song_data = input_data + 'song_data/A/B/C/*.json'

# Load raw data
start_time = time.time()
df_song_data = spark.read.json(input_song_data)
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total time to run: 22.6444 seconds
**************************************************

# **<font color='indianred' face="Trebuchet MS" size="4" >1.1 Process `songs` table**</font>

In [4]:
# Create table
df_songs_table = df_song_data.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()

# Fix Datatypes
df_songs_table = df_songs_table.withColumn("duration",col("duration").cast('float'))
df_songs_table = df_songs_table.withColumn("year",col("year").cast('integer'))

# Create Temp View
# df_songs_table.createOrReplaceTempView('songs_table')
df_songs_table.printSchema()
df_songs_table.show(2)
print("*" * 50)

# Write table to s3 --> parquet files partitioned by year and artist
output_data = 's3a://maitys-sparkify-outputs/'
output_songs_data = output_data + 'songs/songs.parquet'
df_songs_table.write.partitionBy('year', 'artist_id').parquet(output_songs_data, 'overwrite')

# Reload table to check
start_time = time.time()
df_songs_table_check = spark.read.parquet(output_songs_data.replace("s3a", "s3"))
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

df_songs_table_check.printSchema()
df_songs_table_check.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: float (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOQFYBD12AB0182188|               Intro|ARAADXM1187FB3ECDB|1999| 67.63057|
|SOFIUVJ12A8C13C296|Will You Tell Me ...|AR9OEB71187B9A97C6|2005|397.16525|
+------------------+--------------------+------------------+----+---------+
only showing top 2 rows

**************************************************
Total time to run: 3.817 seconds
**************************************************
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (null

# **<font color='indianred' face="Trebuchet MS" size="4" >1.2 Process `artists` table**</font>

In [5]:
# Create table
df_artists_table = df_song_data.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

# Rename columns
column_names = ["artist_id", "name", "location", "latitude", "longitude"]
df_artists_table = df_artists_table.toDF(*column_names)

# Fix Datatypes
df_artists_table = df_artists_table.withColumn("latitude",col("latitude").cast('float'))
df_artists_table = df_artists_table.withColumn("longitude",col("longitude").cast('float'))

# Create Temp View
# df_artists_table.createOrReplaceTempView('artists_table')
df_artists_table.printSchema()
df_artists_table.show(2)
print("*" * 50)

# Write table to s3 --> parquet files
output_data = 's3a://maitys-sparkify-outputs/'
output_artists_data = output_data + 'artists/artists.parquet'
df_artists_table.write.parquet(output_artists_data, 'overwrite')

# Reload table to check
start_time = time.time()
df_artists_table_check = spark.read.parquet(output_artists_data.replace("s3a", "s3"))
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

df_artists_table_check.printSchema()
df_artists_table_check.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)

+------------------+------------+--------------+--------+---------+
|         artist_id|        name|      location|latitude|longitude|
+------------------+------------+--------------+--------+---------+
|AR0IAWL1187B9A96D0|Danilo Perez|        Panama|  8.4177|-80.11278|
|ARWB3G61187FB49404| Steve Morse|Hamilton, Ohio|    null|     null|
+------------------+------------+--------------+--------+---------+
only showing top 2 rows

**************************************************
Total time to run: 0.6307 seconds
**************************************************
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)

+------------------+-------

# **<font color='royalblue' face="Trebuchet MS" size="5" >2. Setup Function : `process_log_data`**</font>

In [6]:
# Set file Path
input_data = "s3://udacity-dend/"
input_log_data = input_data + 'log_data/2018/11/*.json'
input_song_data = input_data + 'song_data/A/B/C/*.json'

# Load raw data
start_time = time.time()
df_log_data = spark.read.json(input_log_data)
df_song_data = spark.read.json(input_song_data)
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total time to run: 10.1455 seconds
**************************************************

# **<font color='indianred' face="Trebuchet MS" size="4" >2.1 Process `users` table**</font>

In [7]:
# Create table
df_users_table = df_log_data.where((df_log_data.page == 'NextSong') & (df_log_data.userId.isNotNull()))
df_users_table = df_users_table.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

# Rename columns
column_names = ["user_id", "first_name", "last_name", "gender", "level"]
df_users_table = df_users_table.toDF(*column_names)

# Fix Datatypes
df_users_table = df_users_table.withColumn("user_id",col("user_id").cast('integer'))

# Create Temp View
# df_users_table.createOrReplaceTempView('users_table')
df_users_table.printSchema()
df_users_table.show(2)
print("*" * 50)

# Write table to s3 --> parquet files partitioned by level
output_data = 's3a://maitys-sparkify-outputs/'
output_users_data = output_data + 'users/users.parquet'
df_users_table.write.partitionBy('level').parquet(output_users_data, 'overwrite')

# Reload table to check
start_time = time.time()
df_users_table_check = spark.read.parquet(output_users_data.replace("s3a", "s3"))
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

df_users_table.printSchema()
df_users_table.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     57| Katherine|      Gay|     F| free|
|     22|      Sean|   Wilson|     F| free|
+-------+----------+---------+------+-----+
only showing top 2 rows

**************************************************
Total time to run: 0.4432 seconds
**************************************************
root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     57| Katherine|      Gay|     

# **<font color='indianred' face="Trebuchet MS" size="4" >2.2 Process `time` table**</font>

In [8]:
# Create table
df_time_table = df_log_data.where((df_log_data.page == 'NextSong') & (df_log_data.ts.isNotNull()))
df_time_table = df_time_table.select('ts').dropDuplicates()

# convert unix time to timestamp
func_ts = udf(lambda ts: datetime.fromtimestamp(ts/1000).isoformat())
df_time_table = df_time_table.withColumn('start_time', func_ts('ts').cast(TimestampType()))

# add columns
df_time_table = df_time_table.withColumn('hour', hour('start_time'))
df_time_table = df_time_table.withColumn('day', dayofmonth('start_time'))
df_time_table = df_time_table.withColumn('week', weekofyear('start_time'))
df_time_table = df_time_table.withColumn('month', month('start_time'))
df_time_table = df_time_table.withColumn('year', year('start_time'))
df_time_table = df_time_table.withColumn('weekday', dayofweek('start_time'))

# remove columns
df_time_table = df_time_table.drop('ts')

# Create Temp View
# df_time_table.createOrReplaceTempView('time_table')
df_time_table.printSchema()
df_time_table.show(2)
print("*" * 50)

# Write table to s3 --> parquet files partitioned by year and month
output_data = 's3a://maitys-sparkify-outputs/'
output_time_data = output_data + 'time/time.parquet'
df_time_table.write.partitionBy('year', 'month').parquet(output_time_data, 'overwrite')

# Reload table to check
start_time = time.time()
df_time_table_check = spark.read.parquet(output_time_data.replace("s3a", "s3"))
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

df_time_table_check.printSchema()
df_time_table_check.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-28 16:10:...|  16| 28|  48|   11|2018|      4|
|2018-11-05 04:40:...|   4|  5|  45|   11|2018|      2|
+--------------------+----+---+----+-----+----+-------+
only showing top 2 rows

**************************************************
Total time to run: 8.7331 seconds
**************************************************
root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullabl

# **<font color='indianred' face="Trebuchet MS" size="4" >2.3 Process `songplays` table**</font>

In [10]:
# Set file Path
input_data = "s3://udacity-dend/"
input_log_data = input_data + 'log_data/2018/11/*.json'
input_song_data = input_data + 'song_data/A/B/C/*.json'

# Load raw data
start_time = time.time()
df_log_data = spark.read.json(input_log_data)
df_song_data = spark.read.json(input_song_data)
print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
print("*" * 50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total time to run: 20.513 seconds
**************************************************

In [16]:
# Create table
df_songplays_table = df_log_data.join(df_song_data, (df_log_data.song == df_song_data.title) & (df_log_data.artist == df_song_data.artist_name), 
                                      how='inner')

# Filter
df_songplays_table = df_songplays_table.where((df_songplays_table.page == 'NextSong') & (df_songplays_table.ts.isNotNull()))

# convert unix time to timestamp
func_ts = udf(lambda ts: datetime.fromtimestamp(ts/1000).isoformat())
df_songplays_table = df_songplays_table.withColumn('start_time', func_ts('ts').cast(TimestampType()))

# Add songplay_id
df_songplays_table = df_songplays_table.withColumn('songplay_id', monotonically_increasing_id())

# Add columns
df_songplays_table = df_songplays_table.withColumn('month', month('start_time'))
df_songplays_table = df_songplays_table.withColumn('year', year('start_time'))

# Select required columns
df_songplays_table = df_songplays_table.select('songplay_id', 'start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent', 'month', 'year').dropDuplicates()

# Rename columns
column_names = ['songplay_id', 'start_time', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent', 'month', 'year']
df_songplays_table = df_songplays_table.toDF(*column_names)

# Write table to s3 --> parquet files partitioned by year and month
output_data = 's3a://maitys-sparkify-outputs/'
output_songplays_data = output_data + 'songplays/songplays.parquet'
df_songplays_table.write.partitionBy('year', 'month').parquet(output_songplays_data, 'overwrite')

df_songplays_table.printSchema()
df_songplays_table.show(2)

# # Reload table to check
# start_time = time.time()
# df_songplays_table_check = spark.read.parquet(output_songplays_data.replace("s3a", "s3"))
# print("Total time to run: {} seconds".format(round((time.time() - start_time),4)))
# print("*" * 50)

# df_songplays_table_check.printSchema()
# df_songplays_table_check.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…