# Project 4: Data Lake

In [2]:
from pyspark.sql import SparkSession
import os
import configparser
from datetime import datetime

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql import types as T

### Load config

In [3]:
config = configparser.ConfigParser()

config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]    = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

OUTPUT_DATA = config['AWS']['OUTPUT_DATA']

### Create spark session

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Process song data files

### Read song data from S3

In [37]:
songdata_df = spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")

In [8]:
songdata_df.printSchema()
songdata_df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|3

### Create view of song data

In [9]:
songdata_df.createOrReplaceTempView("songdata_view")

### Create songs_table on fly

In [10]:
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songdata_view
""")

In [11]:
songs_table.printSchema()
songs_table.show(5)

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



### Write songs_table to S3 as parquet file

In [12]:
songs_table_out_path = OUTPUT_DATA + "songs_table.parquet"
songs_table_out_path

songs_table.write.mode("overwrite").parquet(songs_table_out_path)

### Create artists_table on fly

In [13]:
artists_table = spark.sql("""
    SELECT artist_id        AS artist_id, 
           artist_name      AS name,
           artist_location  AS location,
           artist_latitude  AS latitude, 
           artist_longitude AS longitude 
    FROM songdata_view
""")

In [14]:
artists_table.printSchema()
artists_table.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...| 51.4536|  -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|40.57885| -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|51.50632|  -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|37.77916|-122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|    null|      null|
+------------------+--------------------+--------------------+--------+----------+
only showing top 5 rows



### Write artists_table to S3 as parquet file

In [15]:
# write artists_table to parquet file
artists_table_out_path = OUTPUT_DATA + "artists_table.parquet"
artists_table_out_path

artists_table.write.mode("overwrite").parquet(artists_table_out_path)

# Process log data files

In [5]:
logdata_df = spark.read.json("s3a://udacity-dend/log_data/2018/11/*.json")

In [17]:
logdata_df.printSchema()
logdata_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [6]:
# Filter data with page = NextSong
logdata_nextsong_df = logdata_df.filter(logdata_df.page == 'NextSong')

In [19]:
logdata_nextsong_df.printSchema()
logdata_nextsong_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|         

### Create the view of log data (page = NextSong)

In [7]:
logdata_nextsong_df.createOrReplaceTempView("logdata_nextsong_view")

### Create users_table on fly

In [37]:
users_table = spark.sql("""
    SELECT DISTINCT userId AS user_id, 
           firstName       AS first_name, 
           lastName        AS last_name, 
           gender, 
           level
    FROM logdata_nextsong_view
""")

In [22]:
users_table.printSchema()
users_table.show(5)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



### Write users_table to S3 as parquet file

In [23]:
users_table_out_path = OUTPUT_DATA + "users_table.parquet"

users_table.write.mode("overwrite").parquet(users_table_out_path)

#### Create function to convert timestamp

In [8]:
@udf(T.TimestampType())
def to_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)

#### Add startTime column to logdata_nextsong_df

In [9]:
logdata_nextsong_df = logdata_nextsong_df.withColumn("startTime", to_timestamp("ts"))

In [26]:
logdata_nextsong_df.printSchema()
logdata_nextsong_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTime: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|

### Create view of log data (page = NextSong) after add the startTime column

In [10]:
logdata_nextsong_df.createOrReplaceTempView("logdata_nextsong_view")

### Create time table on fly

In [18]:
time_table = spark.sql("""
    SELECT DISTINCT startTime AS start_time, 
                    hour(startTime) AS hour,
                    day(startTime)  AS day, 
                    weekofyear(startTime) AS week,
                    month(startTime) AS month,
                    year(startTime) AS year,
                    dayofweek(startTime) AS weekday
    FROM logdata_nextsong_view
    ORDER BY start_time
""")

In [19]:
time_table.printSchema()
time_table.show(5)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



### Write time_table to S3 as parquet file

In [36]:
time_table_out_path = OUTPUT_DATA + "time_table.parquet"

# limit to write faster to S3
time_table2 = time_table.limit(5)

#
time_table2.write.partitionBy("year", "month").mode("overwrite").parquet(time_table_out_path)

### Create new dataframe was joined by song data and log data

In [38]:
log_song_data_join_df = logdata_nextsong_df.join(songdata_df, 
                            (logdata_nextsong_df.artist == songdata_df.artist_name) 
                          & (logdata_nextsong_df.song == songdata_df.title)
                          & (logdata_nextsong_df.length == songdata_df.duration)
                  )

In [39]:
log_song_data_join_df.printSchema()
log_song_data_join_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTime: timestamp (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = 

#### Add songplay_id as auto increase

In [40]:
log_song_data_join_df = log_song_data_join_df.withColumn("songplay_id", F.monotonically_increasing_id())

In [41]:
log_song_data_join_df.printSchema()
log_song_data_join_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- startTime: timestamp (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = 

### Create view of log_song_data_join_df

In [42]:
log_song_data_join_df.createOrReplaceTempView("log_song_data_join_view")

In [43]:
songplays_table = spark.sql("""
    SELECT songplay_id AS songplay_id, 
           startTime   AS start_time, 
           userId      AS user_id, 
           level       AS level,
           song_id     AS song_id,
           artist_id   AS artist_id,
           sessionId   AS session_id,
           location    AS location,
           userAgent   AS user_agent
    FROM log_song_data_join_view
""")

In [44]:
songplays_table.printSchema()
songplays_table.show(5)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+



### Write songplays table to S3 as parquet file

In [46]:
songplays_out_path = OUTPUT_DATA + "songplays_table.parquet"

#
songplays_table.write.mode("overwrite").parquet(songplays_out_path)