In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, dayofweek, row_number, monotonically_increasing_id

## data preparation

### unzip files

In [31]:
# !rm -R data/log

In [30]:
# !rm -R data/song

In [37]:
# !mkdir data/song

In [38]:
# !mkdir data/log

In [36]:
# !unzip data/log-data.zip -d data/log

In [39]:
# !unzip data/song-data.zip -d data/song

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

## Initialize Spark

In [3]:
# create a Spark Session
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

## Song

In [4]:
# read song json files
df = spark.read.json("data/song/song_data/*/*/*/*.json")

In [5]:
df.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

### songs

In [6]:
# table songs
songs = df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates(subset=["song_id"])

In [7]:
# top 5
songs.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0|218.93179|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|   0|152.92036|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|   0|177.47546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [10]:
## save the data to a parquet file in s3
songs.write.partitionBy("year", "artist_id")\
    .mode("ignore")\
    .parquet("s3a://udacity-data-lake-self/songs-test.parquet")

### Artists

In [11]:
# table artists 
artists = df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).dropDuplicates(subset=["artist_id"])

In [12]:
# top 5
artists.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|           null|            null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |           null|            null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington|        38.8991|         -77.029|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



## logs

In [13]:
# read one json file
logs = spark.read.json("data/log/*.json")

In [14]:
# top 5
logs.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [15]:
# filter page "NextSong"
logsNextSong = logs.filter(logs.page == "NextSong")

In [16]:
# top 5
logsNextSong.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

### users

In [17]:
# create table users 
users = logsNextSong.select(["userId", "firstName", "lastName", "gender", "level"])\
    .dropDuplicates(subset=["userId"])

In [18]:
# top 5
users.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    51|     Maia|   Burke|     F| free|
|     7|   Adelyn|  Jordan|     F| free|
|    15|     Lily|    Koch|     F| paid|
|    54|    Kaleb|    Cook|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows



### time

In [19]:
# add timestamp
logsNextSong = logsNextSong.withColumn("start_time", to_timestamp(col("ts") / 1000.0))

In [20]:
# top 5
logsNextSong.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          start_time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan|     M|            1|  

In [21]:
# add hour, month, year, time_id, etc
time_table = logsNextSong.withColumn("hour", hour(col("start_time")))\
    .withColumn("day", dayofmonth(col("start_time")))\
    .withColumn("week", weekofyear(col("start_time")))\
    .withColumn("month", month(col("start_time"))) \
    .withColumn("year", year(col("start_time"))) \
    .withColumn("weekday", dayofweek(col("start_time")))\
    .select(["start_time", "hour", "day", "week", "month", "year", "weekday"])\
    .dropDuplicates(subset=["start_time"])\
    .withColumn("time_id", row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)\
    .select(["time_id", "start_time", "hour", "day", "week", "month", "year", "weekday"])

In [22]:
# top 5
time_table.show(5)

+-------+--------------------+----+---+----+-----+----+-------+
|time_id|          start_time|hour|day|week|month|year|weekday|
+-------+--------------------+----+---+----+-----+----+-------+
|      0|2018-11-21 06:18:...|   6| 21|  47|   11|2018|      4|
|      1|2018-11-14 15:20:...|  15| 14|  46|   11|2018|      4|
|      2|2018-11-05 16:31:...|  16|  5|  45|   11|2018|      2|
|      3|2018-11-13 18:00:...|  18| 13|  46|   11|2018|      3|
|      4|2018-11-30 04:32:...|   4| 30|  48|   11|2018|      6|
+-------+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



### songplays

In [24]:
# join table songs and artists to get artist_id, song_id, artist_name, song title in one table
songs_artist = songs.alias("s").join(artists.alias("a"), col("s.artist_id") == col("a.artist_id"), "inner")\
    .select(col("s.song_id"), col("s.title"), col("s.artist_id"), col("s.duration"), col("a.artist_name"))\
    .dropDuplicates(subset=["song_id", "artist_id"])

In [25]:
# top 5
songs_artist.show(5)

+------------------+--------------------+------------------+---------+----------------+
|           song_id|               title|         artist_id| duration|     artist_name|
+------------------+--------------------+------------------+---------+----------------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|313.12934|  Sierra Maestra|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|218.93179|          Casual|
|SOUPIRU12A6D4FA1E1| Der Kleine Dompfaff|ARJIE2Y1187B994AB7|152.92036|     Line Renaud|
|SOXVLOJ12AB0189215|     Amor De Cabaret|ARKRRTF1187B9984DA|177.47546|Sonora Santanera|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|151.84934|    Jimmy Wakely|
+------------------+--------------------+------------------+---------+----------------+
only showing top 5 rows



In [26]:
# join songs_artist with log
songplays = logsNextSong.join(songs_artist, [logsNextSong.song == songs_artist.title, logsNextSong.length == songs_artist.duration, logsNextSong.artist == songs_artist.artist_name], "inner")\
    .select(["start_time", "userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent"])\
    .withColumn("songplay_id", row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)\
    .select(["songplay_id", "start_time", "userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent"])

In [27]:
songplays.show(5)

+-----------+--------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|songplay_id|          start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+-----------+--------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|          0|2018-11-21 21:56:...|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+--------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+

