In [1]:
!unzip ./data/log-data.zip -d ./data/log_data
!unzip ./data/song-data.zip -d ./data

Archive:  ./data/log-data.zip
  inflating: ./data/log_data/2018-11-01-events.json  
  inflating: ./data/log_data/2018-11-02-events.json  
  inflating: ./data/log_data/2018-11-03-events.json  
  inflating: ./data/log_data/2018-11-04-events.json  
  inflating: ./data/log_data/2018-11-05-events.json  
  inflating: ./data/log_data/2018-11-06-events.json  
  inflating: ./data/log_data/2018-11-07-events.json  
  inflating: ./data/log_data/2018-11-08-events.json  
  inflating: ./data/log_data/2018-11-09-events.json  
  inflating: ./data/log_data/2018-11-10-events.json  
  inflating: ./data/log_data/2018-11-11-events.json  
  inflating: ./data/log_data/2018-11-12-events.json  
  inflating: ./data/log_data/2018-11-13-events.json  
  inflating: ./data/log_data/2018-11-14-events.json  
  inflating: ./data/log_data/2018-11-15-events.json  
  inflating: ./data/log_data/2018-11-16-events.json  
  inflating: ./data/log_data/2018-11-17-events.json  
  inflating: ./data/log_data/2018-11-18-events.json 

In [1]:
!rm -r ./data/output_data

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, StringType

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

### Initiate SPARK session

In [5]:
spark = create_spark_session()

In [6]:
spark

### Process Song Data

In [7]:
# Input path
input_data = "./data/" #"s3a://udacity-dend/"
song_data = input_data + "song_data/*/*/*/*.json"

print(song_data)

./data/song_data/*/*/*/*.json


In [8]:
# read song data file
df = spark.read.json(song_data)

In [9]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
df.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

#### Songs Table

In [11]:
# extract columns to create songs table
songs_table = df.select(*["song_id", "title", "artist_id", "year", "duration"]).orderBy("song_id")

In [12]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [13]:
# output Path
output_data = "./data/output_data/" #"s3a://project4-data/output_data/"
songs_table_path = output_data + "songs_table/" + "songs_table" + "_" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songs_table_path

'./data/output_data/songs_table/songs_table_2021-05-19-01-26-45-804473'

In [14]:
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

#### Artists Table

In [15]:
# extract columns to create artists table
artists_table = df.select(*['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']).orderBy("artist_id", ascending=False)
artists_table = artists_table.withColumnRenamed("artist_name", "name") \
                        .withColumnRenamed("artist_location", "location") \
                        .withColumnRenamed("artist_latitude", "latitude") \
                        .withColumnRenamed("artist_longitude", "longitude")

artists_table.show(5)

+------------------+------------------+--------------+--------+---------+
|         artist_id|              name|      location|latitude|longitude|
+------------------+------------------+--------------+--------+---------+
|ARYKCQI1187FB3B18F|             Tesla|              |    null|     null|
|ARXR32B1187FB57099|               Gob|              |    null|     null|
|ARWB3G61187FB49404|       Steve Morse|Hamilton, Ohio|    null|     null|
|ARVBRGZ1187FB4675A|      Gwen Stefani|              |    null|     null|
|ARULZCI1241B9C8611|Luna Orbit Project|              |    null|     null|
+------------------+------------------+--------------+--------+---------+
only showing top 5 rows



In [17]:
# output Path
output_data = "./data/output_data/" #"s3a://"
artists_table_path = output_data + "artists_table/" + "artists_table" + "_" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path

'./data/output_data/artists_table/artists_table_2021-05-19-01-27-18-560659'

In [18]:
artists_table.write.parquet(artists_table_path)

### Process Log Data

In [19]:
# Input path
input_data = "./data/" #"s3a://udacity-dend/"
log_data = input_data + "log_data/*.json"

print(log_data)

./data/log_data/*.json


In [20]:
# read log data file 
df_log = spark.read.json(log_data)

In [21]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [22]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [23]:
# filter by actions for song plays
df_filtered = df_log.filter(df_log["page"]=='NextSong')
df_filtered.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

#### Users Table

In [24]:
# extract columns for users table
users_table = df_filtered.select(*['userId', 'firstName', 'lastName', 'gender', 'level']).orderBy("lastName").dropDuplicates()
users_table = users_table.withColumnRenamed('userId', 'user_id') \
                .withColumnRenamed('firstName', 'first_name') \
                .withColumnRenamed('lastName', 'last_name')

users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     66|     Kevin| Arellano|     M| free|
|     34|    Evelin|    Ayala|     F| free|
|     99|       Ann|    Banks|     F| free|
|    100|     Adler|  Barrera|     M| free|
|     42|    Harper|  Barrett|     M| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [25]:
# output Path
output_data = "./data/output_data/" #"s3a://"
users_table_path = output_data + "users_table/" + "users_table" + "_" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(users_table_path)

users_table.write.parquet(users_table_path)

./data/output_data/users_table/users_table_2021-05-19-01-28-40-241327


#### Time Table

In [26]:
# create timestamp column from original timestamp column
get_timestamp = udf(f=lambda ts: datetime.fromtimestamp(ts/1000.0), returnType=TimestampType())
df_filtered = df_filtered.withColumn("timestamp", get_timestamp("ts"))
df_filtered.select("timestamp").show(5, truncate=False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:30:26.796|
|2018-11-15 00:41:21.796|
|2018-11-15 00:45:41.796|
|2018-11-15 03:44:09.796|
|2018-11-15 05:48:55.796|
+-----------------------+
only showing top 5 rows



In [27]:
# create datetime column from original timestamp column
get_datetime = udf(lambda ts: datetime.fromtimestamp(ts/1000.0).strftime('%Y-%m-%d %H:%M:%S'), returnType=StringType())
df_filtered = df_filtered.withColumn("datetime", get_datetime("ts"))
df_filtered.select("datetime").show(5, truncate=False)

+-------------------+
|datetime           |
+-------------------+
|2018-11-15 00:30:26|
|2018-11-15 00:41:21|
|2018-11-15 00:45:41|
|2018-11-15 03:44:09|
|2018-11-15 05:48:55|
+-------------------+
only showing top 5 rows



In [28]:
# extract columns to create time table
df_filtered.createOrReplaceTempView("df_fil")

time_table = spark.sql('''SELECT DISTINCT datetime AS start_time,
                                          HOUR(datetime) AS hour,
                                          DAY(datetime) AS day,
                                          WEEKOFYEAR(datetime) AS week,
                                          MONTH(datetime) AS month,
                                          YEAR(datetime) AS year,
                                          DAYOFWEEK(datetime) AS weekday
                          FROM df_fil
''')

time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 11:35:35|  11| 15|  46|   11|2018|      5|
|2018-11-21 09:36:21|   9| 21|  47|   11|2018|      4|
|2018-11-14 05:11:42|   5| 14|  46|   11|2018|      4|
|2018-11-14 08:10:20|   8| 14|  46|   11|2018|      4|
|2018-11-28 22:24:08|  22| 28|  48|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [29]:
# write time table to parquet files partitioned by year and month
# output Path
output_data = "./data/output_data/" #"s3a://"
time_table_path = output_data + "time_table/" + "time_table" + "_" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(time_table_path)

time_table.write.parquet(time_table_path)


./data/output_data/time_table/time_table_2021-05-19-01-29-15-280603


#### Song Plays Table

In [30]:
# read in song data to use for songplays table
song_df = spark.read.json(input_data + "song_data/*/*/*/*.json")

In [31]:
df_joined = df_filtered.join(song_df, on=(df_filtered.artist==song_df.artist_name) & (df_filtered.song==song_df.title), how='inner')
df_joined = df_joined.withColumn("songplay_id", F.monotonically_increasing_id())

df_joined.count()

1

In [32]:
# extract columns from joined song and log datasets to create songplays table 
df_joined.createOrReplaceTempView("df_joined")

songplays_table = spark.sql('''SELECT songplay_id,
                                      timestamp AS start_time,
                                      userId AS user_id,
                                      level,
                                      song_id,
                                      artist_id,
                                      sessionId AS session_id,
                                      artist_location AS location,
                                      userAgent AS user_agent
                               FROM df_joined
                               ORDER BY (user_id, session_id)
''')

songplays_table.show(5)

+-----------+--------------------+-------+-----+------------------+------------------+----------+---------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id| location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+---------+--------------------+
|          0|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Dubai UAE|"Mozilla/5.0 (X11...|
+-----------+--------------------+-------+-----+------------------+------------------+----------+---------+--------------------+



In [33]:
# write songplays table to parquet files partitioned by year and month
# output Path
output_data = "./data/output_data/" #"s3a://"
songplays_table_path = output_data + "songplays_table/" + "songplays_table" + "_" + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
print(songplays_table_path)

songplays_table.write.parquet(songplays_table_path)

./data/output_data/songplays_table/songplays_table_2021-05-19-01-29-49-823252


### Queries on Songplay analysis

In [34]:
table_count = lambda table: table.count() 

print("Number of records in songs table: ", table_count(songs_table))
print("Number of records in users table: ", table_count(users_table))  
print("Number of records in artists table: ", table_count(artists_table))  
print("Number of records in time table: ", table_count(time_table))  
print("Number of records in songplays table: ", table_count(songplays_table))

Number of records in songs table:  71
Number of records in users table:  104
Number of records in artists table:  71
Number of records in time table:  6813
Number of records in songplays table:  1


In [35]:
# Get info of songs with duration greater than 200s limit to 10 records
songplays_table.join(songs_table, on=songs_table.song_id==songplays_table.song_id) \
                .filter("duration>200") \
                .select("title", "year", "duration").limit(10).show()

+--------------+----+---------+
|         title|year| duration|
+--------------+----+---------+
|Setanta matins|   0|269.58322|
+--------------+----+---------+

