In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
input_data = "s3a://udacity-dend/"
output_data = ""

In [3]:

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()


In [40]:
%%time
# get filepath to song data file
#song_data_path = input_data + 'song_data/*/*/*/*.json'
song_data_path = input_data + 'song_data/A/A/*/*.json'

# read song data file
song_data = spark.read.json(song_data_path)
song_data.printSchema()
song_data.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni..

In [41]:
# extract columns to create songs table
songs_table = song_data.select('song_id','title','artist_id','year','duration')
songs_table.show(5)

+------------------+--------------------+------------------+----+----------+
|           song_id|               title|         artist_id|year|  duration|
+------------------+--------------------+------------------+----+----------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|ARSUVLW12454A4C8B8|   0|  94.56281|
|SOXRPUH12AB017F769|Exodus: Part I: M...|ARXQC081187FB4AD42|   0|1047.71873|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|ARWUNH81187FB4A3E0|2001| 227.10812|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972| 301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|   192.522|
+------------------+--------------------+------------------+----+----------+
only showing top 5 rows



In [42]:
# remove dupliated rows
songs_table = songs_table.dropDuplicates(['song_id'])

In [17]:
dest_s3_path='s3a://myudacity/project_dataLake/'

In [43]:
# write songs table to parquet files partitioned by year and artist
#songs_table.write.parquet('songs2.parquet')
songs_table.write.partitionBy('year','artist_id').parquet('songs2.parquet')

In [None]:
songs_table2 = spark.read.parquet(dest_s3_path+'songs.parquet')
songs_table2.printSchema()

In [44]:
# extract columns to create artists table
artists_table = song_data.select('artist_id','artist_name','artist_location','artist_latitude','artist_longitude')
artists_table.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARSUVLW12454A4C8B8|Royal Philharmoni...|           Tennessee|       35.83073|       -85.97874|
|ARXQC081187FB4AD42|William Shatner_ ...|                  UK|       54.31407|        -2.23001|
|ARWUNH81187FB4A3E0|         Trick Daddy|     Miami , Florida|           null|            null|
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [None]:
artists_table.dropDuplicates(['artist_id']).count()

In [45]:
artists_table = artists_table.dropDuplicates(['artist_id'])

In [46]:
# write artists table to parquet files
artists_table.write.parquet('artists2.parquet')

In [20]:
%%time
# get filepath to log data file
log_data_path = input_data + 'log_data/*/*/*.json'
#log_data_path = input_data + 'log_data/2018/11/*.json'

# read log data file
log_data = spark.read.json(log_data_path)
log_data.printSchema()
log_data.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [None]:
# filter by actions for song plays
df = log_data.select('page')
df.distinct().show()
df.groupBy('page')

In [49]:
# extract columns for users table    
users_table = log_data.select('userId','firstName','lastName','gender','level').dropDuplicates(['userId'])

In [None]:
users_table.count()

In [50]:
# write users table to parquet files
users_table.write.parquet('users2.parquet')

In [24]:
from pyspark.sql.types import TimestampType

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
df2 = log_data.withColumn('timestamp',get_timestamp('ts'))
df2.select('timestamp').limit(10).toPandas()

Unnamed: 0,timestamp
0,2018-11-15 00:30:26.796
1,2018-11-15 00:41:21.796
2,2018-11-15 00:45:41.796
3,2018-11-15 01:57:51.796
4,2018-11-15 03:29:37.796
5,2018-11-15 03:44:09.796
6,2018-11-15 03:44:20.796
7,2018-11-15 05:34:34.796
8,2018-11-15 05:37:57.796
9,2018-11-15 05:48:55.796


In [25]:
from pyspark.sql.types import DateType

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).date(), DateType())
df2 = df2.withColumn('ts_date',get_datetime('ts'))
df2.select('ts_date').limit(10).toPandas()

Unnamed: 0,ts_date
0,2018-11-15
1,2018-11-15
2,2018-11-15
3,2018-11-15
4,2018-11-15
5,2018-11-15
6,2018-11-15
7,2018-11-15
8,2018-11-15
9,2018-11-15


In [None]:
spark.udf.register("get_datetime", get_datetime)

In [26]:
# extract columns to create time table
time_table = df2.select(col('timestamp').alias('start_time'),month('ts_date').alias('month'), year('ts_date').alias('year'),dayofmonth('ts_date').alias('day'),hour('timestamp').alias('hour'),weekofyear('ts_date').alias('week'), date_format('ts_date', 'u').cast('int').alias('weekday'))
time_table.limit(10).toPandas()

Unnamed: 0,start_time,month,year,day,hour,week,weekday
0,2018-11-15 00:30:26.796,11,2018,15,0,46,4
1,2018-11-15 00:41:21.796,11,2018,15,0,46,4
2,2018-11-15 00:45:41.796,11,2018,15,0,46,4
3,2018-11-15 01:57:51.796,11,2018,15,1,46,4
4,2018-11-15 03:29:37.796,11,2018,15,3,46,4
5,2018-11-15 03:44:09.796,11,2018,15,3,46,4
6,2018-11-15 03:44:20.796,11,2018,15,3,46,4
7,2018-11-15 05:34:34.796,11,2018,15,5,46,4
8,2018-11-15 05:37:57.796,11,2018,15,5,46,4
9,2018-11-15 05:48:55.796,11,2018,15,5,46,4


In [None]:
time_table.count()

In [27]:
time_table = time_table.dropDuplicates(['start_time'])

In [51]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').parquet('time2.parquet')

In [28]:
time_table.createOrReplaceTempView('times')

In [47]:
# read in song data to use for songplays table
song_df = spark.read.parquet('songs2.parquet')
song_df.createOrReplaceTempView('songs')
song_df.show(4)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...| 94.56281|   0|ARSUVLW12454A4C8B8|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|227.10812|2001|ARWUNH81187FB4A3E0|
|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|221.70077|   0|ARLRWBW1242077EB29|
|SODBHKO12A58A77F36|Fingers Of Love (...|335.93424|   0|ARKGS2Z1187FB494B5|
+------------------+--------------------+---------+----+------------------+
only showing top 4 rows



In [48]:
artists_df = spark.read.parquet('artists2.parquet')
artists_df.createOrReplaceTempView('artists')
artists_df.limit(4).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR3WWZM1187B996646,Weeping Willows,Massachusetts,42.18419,-71.71818
1,AR8Y6HV1187FB5546D,Danny Byrd,UK,54.31407,-2.23001
2,ARBJQTM1187B9B862B,Cocteau Twins,"Grangemouth, Scotland",56.01162,-3.71947
3,ARBM57Q1187B9AF97C,James Horner,"Los Angeles, CA",34.05349,-118.24532


In [None]:
log_data.limit(4).toPandas()

In [None]:
song_df.printSchema()

In [33]:

df2.createOrReplaceTempView('logs')

In [53]:
spark.sql("""
    SELECT timestamp as start_time, userId, level, sessionid, location, useragent, song_id, artist_id, year(timestamp)
        FROM logs l JOIN (
            SELECT s.song_id,s.title,a.artist_id,a.artist_name,s.duration 
            FROM songs s JOIN artists a ON s.artist_id = a.artist_id
            ) sa ON trim(l.artist) = trim(sa.artist_name) and trim(l.song) = trim(sa.title) and l.length = sa.duration
            WHERE l.page='NextSong'

""").show(4)

+--------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+-----------------------------+
|          start_time|userId|level|sessionid|            location|           useragent|           song_id|         artist_id|year(CAST(timestamp AS DATE))|
+--------------------+------+-----+---------+--------------------+--------------------+------------------+------------------+-----------------------------+
|2018-11-03 21:14:...|    49| free|      195|San Francisco-Oak...|Mozilla/5.0 (Wind...|SOFVOQL12A6D4F7456|ARPN0Y61187B9ABAA0|                         2018|
|2018-11-08 15:01:...|    29| paid|      372|Atlanta-Sandy Spr...|"Mozilla/5.0 (Mac...|SOFVOQL12A6D4F7456|ARPN0Y61187B9ABAA0|                         2018|
|2018-11-16 14:21:...|    85| paid|      436|       Red Bluff, CA|"Mozilla/5.0 (Mac...|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|                         2018|
|2018-11-26 18:25:...|    92| free|      938|       Palestine, T

In [55]:
# extract columns from joined song and log datasets to create songplays table 
#songplay_id, start_time, user_id, level, session_id, location, user_agent, song_id, artist_id
songplays_table = spark.sql("""
    SELECT all.*, t.year, t.month
    FROM (
        SELECT timestamp as start_time, userId, level, sessionid, location, useragent, song_id, artist_id
        FROM logs l JOIN (
            SELECT s.song_id,s.title,a.artist_id,a.artist_name,s.duration 
            FROM songs s JOIN artists a ON s.artist_id = a.artist_id
            ) sa ON trim(l.artist) = trim(sa.artist_name) and trim(l.song) = trim(sa.title) and l.length = sa.duration
            WHERE l.page='NextSong'
        ) all JOIN times t ON all.start_time = t.start_time
    ORDER BY start_time
""")

In [59]:
from pyspark.sql import Window
from pyspark.sql import functions as F
# add sequential ID, songplay_id
window = Window.orderBy(col('start_time'))
songplays_table = songplays_table.withColumn('songplay_id',row_number().over(window))
songplays_table.limit(10).toPandas()

Unnamed: 0,start_time,userId,level,sessionid,location,useragent,song_id,artist_id,year,month,songplay_id
0,2018-11-03 21:14:28.796,49,free,195,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,SOFVOQL12A6D4F7456,ARPN0Y61187B9ABAA0,2018,11,1
1,2018-11-05 17:49:42.796,73,paid,255,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",SOHDWWH12A6D4F7F6A,ARC0IOF1187FB3F6E6,2018,11,2
2,2018-11-08 15:01:57.796,29,paid,372,"Atlanta-Sandy Springs-Roswell, GA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",SOFVOQL12A6D4F7456,ARPN0Y61187B9ABAA0,2018,11,3
3,2018-11-09 17:55:00.796,80,paid,416,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",SOAOJYY12A58A7B2F9,ARFVYJI1187B9B8E13,2018,11,4
4,2018-11-09 19:57:57.796,36,paid,392,"Janesville-Beloit, WI","""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",SODWXQV12A6310F10D,AR6892W1187B9AC71B,2018,11,5
5,2018-11-11 15:00:37.796,67,free,414,"Nashville-Davidson--Murfreesboro--Franklin, TN","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",SOCHRXB12A8AE48069,ARTDQRC1187FB4EFD4,2018,11,6
6,2018-11-13 22:39:39.796,55,free,415,"Minneapolis-St. Paul-Bloomington, MN-WI","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",SOXQYSC12A6310E908,AR0L04E1187B9AE90C,2018,11,7
7,2018-11-16 14:21:12.796,85,paid,436,"Red Bluff, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_...",SOLRYQR12A670215BF,ARNLO5S1187B9B80CC,2018,11,8
8,2018-11-20 17:46:38.796,49,paid,758,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,SOCHRXB12A8AE48069,ARTDQRC1187FB4EFD4,2018,11,9
9,2018-11-21 21:56:47.796,15,paid,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,2018,11,10


In [57]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year','month').parquet('songplays2.parquet')

In [None]:
songs_table.write.partitionBy('year','artist_id').parquet('s3a://')

In [None]:
users_table.write.parquet('s3a://myudacity/project_dataLake/users.parquet')