# Sparkify DataLake on S3

In [177]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import os
import configparser
from datetime import datetime

# Make sure that your AWS credentials are loaded as env vars

In [178]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('aws/credentials.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [179]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load Song data from S3

In [77]:
input_data = "s3a://udacity-dend/"
song_data = os.path.join(input_data, "song_data/A/*/*/*.json")
print(song_data)

s3a://udacity-dend/song_data/A/*/*/*.json


## Load into a  Dataframe

In [111]:
df_song = spark.read.json(song_data)

In [112]:
print(df_song.count())
df_song.printSchema()

df_song.show(5)

14896
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|AR4T2IF1187B9ADBB7|       63.96027|<a href="http://b...|        10.22442|          Billy 

In [113]:
songs_table = df_song.select("song_id","title", "artist_id","year","duration").dropDuplicates()

In [114]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAONPI12A6D4F8A49|        Sea Of Tears|AR9JET41187FB3DE77|   0|720.87465|
|SOAQZXK12A6701D993| Hold It Now_ Hit It|ARLHO5Z1187FB4C861|1986|206.28853|
|SOATBPS12A6D4F6C3C|Les Gendarmes S'e...|AR88DP71187B9A9A57|   0|197.40689|
|SOAZQJP12A8C13BD71|That Night (Wah-C...|ARZHCKQ1187B9BA5BA|   0|418.48118|
|SOBGCUO12A6D4FB2B0|       Mil Mariposas|AR5CKT41187B9B9AB0|2006|249.80853|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [115]:
songs_table.count()

14896

In [134]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [None]:
##Create Artists Table

In [120]:
artists_table = df_song.select("artist_id",col("artist_name").alias("name"),col("artist_location").alias("location"),\
                             col("artist_latitude").alias("latitude"),col("artist_longitude").alias("longitude")).dropDuplicates()

In [121]:
artists_table.show()

+------------------+--------------------+--------------------+---------+----------+
|         artist_id|                name|            location| latitude| longitude|
+------------------+--------------------+--------------------+---------+----------+
|ARTHSAE12131B4B70A|Sir Simon Rattle/...|                    |     null|      null|
|ARTRQUJ1187B992ADD|Larry Graham & Gr...|          California| 37.27188|-119.27023|
|ARLKCMN1187FB599FB|          Big & Rich|           Nashville| 36.16778| -86.77836|
|ARKC83D1187B9AB367|          Mark Lowry|                    |     null|      null|
|ARQSM561187FB4A0CF|    Jack's Mannequin|Orange County , USA |  33.6671|-117.76505|
|ART1XOT1187B99A298|        Billy Squier| Wellesley Hills, MA|  42.3076| -71.27951|
|AREM2UI1187FB3F99D|  Stick To Your Guns|CHINO HILLS, Cali...| 33.99604|-117.75801|
|AR6E5S51187B98C1A2|         All Out War|        Newburgh, NY| 41.49994| -74.01023|
|AR8QJHN1187FB3616E|Tord Gustavsen_ H...|                    |     null|    

In [118]:
artists_table.count()

10025

In [133]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



# Load Log data from S3

In [180]:
log_data = input_data + 'log-data/*/*/*.json'
print(log_data)

s3a://udacity-dend/log-data/*/*/*.json


In [181]:
df_log = spark.read.json(log_data)

In [124]:
print(df_log.count())
df_log.printSchema()
df_log.show(5)

8056
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

In [182]:
df_log = df_log.filter(col("page")=='NextSong')

In [183]:
df_log = df_log.filter(df_log.userId.isNotNull())

In [127]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [128]:
df_log.count()

6820

In [184]:
# extract columns for users table    
users_table = df_log.select(col("userId").alias("user_id"),col("firstName").alias("first_name"),\
                        col("lastName").alias("last_name"), "gender", "level").dropDuplicates()

In [185]:
users_table.count()

107

In [186]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [187]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [189]:
output_data = "s3a://sparkify-dend/"

In [190]:
users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

In [135]:
# create a udf to convert epoch time to timestamp
get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
    

In [136]:
# create timestamp column from original timestamp column
df_log = df_log.withColumn("timestamp",get_timestamp(col("ts")))

In [138]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [139]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId| timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval

In [141]:
# create a udf to convert original timestamp to datetime
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))

In [142]:
# create datetime column from the original timestamp column
df_log = df_log.withColumn("datetime", get_datetime(col("ts")))

In [143]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [144]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId| timestamp|            datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+----------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|2018-11-14 16:30:...|
|The Prodigy

In [146]:
# Extract column values to be populated in time_table
time_table = df_log.select(
         col('datetime').alias('start_time'),
         hour('datetime').alias('hour'),
         dayofmonth('datetime').alias('day'),
         weekofyear('datetime').alias('week'),
         month('datetime').alias('month'),
         year('datetime').alias('year'),
         date_format('datetime', 'F').alias('weekday')
     ) 

In [147]:
time_table = time_table.dropDuplicates(['start_time'])

In [148]:
time_table.count()

6813

In [149]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-02 02:12:...|   2|  2|  44|   11|2018|      1|
|2018-11-03 09:34:...|   9|  3|  44|   11|2018|      1|
|2018-11-05 09:18:...|   9|  5|  45|   11|2018|      1|
|2018-11-06 13:40:...|  13|  6|  45|   11|2018|      1|
|2018-11-06 14:19:...|  14|  6|  45|   11|2018|      1|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [158]:
df_songplay = df_song.join(df_log,
                           (df_song.artist_name == df_log.artist) &
                           (df_song.title == df_log.song) &
                           (df_song.duration == df_log.length))

In [192]:
songplays_table = df_songplay.select(
        monotonically_increasing_id().alias('songplay_id'),
        col('datetime').alias('start_time'),
        col('level').alias('level'),
        col('userId').alias('user_id'),
        col('song_id').alias('song_id'),
        col('artist_id').alias('artist_id'),
        col('sessionId').alias('session_id'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
        col('year').alias('year'),
        month('datetime').alias('month')
        ).dropDuplicates()

In [193]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: string (nullable = true)
 |-- level: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: long (nullable = true)
 |-- month: integer (nullable = true)



In [194]:
songplays_table.count()

319

In [195]:
songplays_table.show(5)

+-------------+--------------------+-----+-------+------------------+------------------+----------+--------------------+--------------------+----+-----+
|  songplay_id|          start_time|level|user_id|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-------------+--------------------+-----+-------+------------------+------------------+----------+--------------------+--------------------+----+-----+
| 111669149697|2018-11-27 10:22:...| paid|     36|SODFRAX12A8C13274B|ARP29T31187B98DD5F|       957|Janesville-Beloit...|"Mozilla/5.0 (Win...|2007|   11|
|2190433320961|2018-11-23 10:11:...| free|     86|SOQDMXT12A6D4F8255|ART5MUE1187B98C961|       869|La Crosse-Onalask...|"Mozilla/5.0 (Mac...|1984|   11|
|1340029796361|2018-11-22 22:15:...| free|      6|SOBONKR12A58A7A7E0|AR5E44Z1187B9A1D74|       847|Atlanta-Sandy Spr...|Mozilla/5.0 (Wind...|1990|   11|
|  34359738368|2018-11-28 00:18:...| paid|     58|SOJWCWM12A8C13B664|ARM6T8I1187FB

# Load tables into Parquet files in S3

In [171]:
output_data = "s3a://sparkify-dend/"

In [None]:
songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs.parquet'), 'overwrite')

In [None]:
artists_table.write.parquet(os.path.join(output_data, 'artists.parquet'), 'overwrite')

In [None]:
users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

In [None]:
time_table.write.partitionBy('year','month').parquet(os.path.join(output_data, 'time.parquet'), 'overwrite')

In [None]:
songplays_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')