# Set Up Local Spark Session

In [2]:
# Import libraries
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
# Config
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get("AWS", 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get("AWS",'AWS_SECRET_ACCESS_KEY')

In [4]:
# Start spark sessions
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

## Useful functions

In [7]:
# Read song_data from S3
song_data_read_path = "s3a://udacity-dend/song_data/A/A/A/*.json"
song_df = spark.read.json(song_data_read_path)

In [8]:
# Get schema
# song_df.describe()
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
# Show first 2 rows
# song_df.take(2)
song_df.show(n=2)

+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|       artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|   The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
+------------------+---------------+--------------------+----------------+------------------+---------+---------+------------------+--------------------+----+
only showing top 2 rows



In [30]:
# Convert to dataframe
song_df.toPandas().head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001
3,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005
4,ARXQBR11187B98A2CC,,"Liverpool, England",,Frankie Goes To Hollywood,821.05424,1,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,1985


In [17]:
# Describe columns
# song_df.count()
song_df.describe("artist_id").show()

+-------+------------------+
|summary|         artist_id|
+-------+------------------+
|  count|                24|
|   mean|              null|
| stddev|              null|
|    min|AR0MWD61187B9B2B12|
|    max|ARZKCQM1257509D107|
+-------+------------------+



In [27]:
# See unique values
song_df.select("num_songs").dropDuplicates().sort("num_songs").show()

+---------+
|num_songs|
+---------+
|        1|
+---------+



In [21]:
# Filter
song_df.select(["title", "artist_name"]).where(song_df.duration < 300).orderBy(song_df.title).collect()
song_df.select(["title", "artist_name"]).filter(song_df.duration < 300).collect()

[Row(title='Drown In My Own Tears (24-Bit Digitally Remastered 04)', artist_name='The Smithereens'),
 Row(title="I'll Slap Your Face (Entertainment USA Theme)", artist_name='Jonathan King'),
 Row(title='A Poor Recipe For Civic Cohesion', artist_name='Western Addiction'),
 Row(title='Drop of Rain', artist_name='Tweeterfriendly Music'),
 Row(title='The One And Only (Edited)', artist_name='Snoop Dogg'),
 Row(title='I Want You (Album Version)', artist_name='Paris Hilton'),
 Row(title='Take Time', artist_name='Chaka Khan_ Rufus'),
 Row(title='Indian Angel', artist_name='Talkdemonic'),
 Row(title='Into The Nightlife', artist_name='Cyndi Lauper'),
 Row(title='Soul Deep', artist_name='The Box Tops'),
 Row(title='Smash It Up', artist_name='International Noise Conspiracy'),
 Row(title='Burn My Body (Album Version)', artist_name='Broken Spindles'),
 Row(title='Hit Da Scene', artist_name='Quest_ Pup_ Kevo'),
 Row(title='Double Wide', artist_name='The Supersuckers'),
 Row(title="It's About Time", a

## Song Data

In [8]:
# Read song_data from S3
song_data_read_path = "s3a://udacity-dend/song_data/A/A/A/*.json"
song_df = spark.read.json(song_data_read_path)

song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
# Create table
song_df.createOrReplaceTempView("staging_songs")

In [24]:
songs = spark.sql("""
SELECT
    song_id, 
    title,
    artist_id,
    CASE WHEN year < 1900 THEN NULL ELSE year END AS year,
    duration
FROM staging_songs
WHERE song_id IS NOT NULL
""")

songs.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|null|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
|SORRNOC12AB017F52B|The Last Beat Of ...|ARSZ7L31187FB4E610|2004|337.81506|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|null|189.57016|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|null|230.42567|
|SOSMJFC12A8C13DE0C|Is That All There...|AR1KTV21187B9ACD72|null|343.87546|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOERIDA12A6

In [27]:
songs.toPandas().head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),ARTC1LV1187B9A4858,1972.0,301.40036
1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,ARA23XO1187B9AF18F,,192.522
2,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),ARSVTNL1187B992A91,2001.0,129.85424
3,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,AR73AIO1187B9AD57B,2005.0,118.07302
4,SOBRKGM12A8C139EF6,Welcome to the Pleasuredome,ARXQBR11187B98A2CC,1985.0,821.05424


In [15]:
artists = spark.sql(
"""
SELECT 
    artist_id,
    artist_name,
    NULLIF(artist_location,'') AS artist_location,
    artist_latitude,
    artist_longitude
FROM staging_songs
WHERE artist_id IS NOT NULL
""")

artists.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|       51.50632|        -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|           null|            null|
|ARSZ7L31187FB4E610|           Devotchka|          Denver, CO|       39.74001|      -104.99226|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|           null|            null|
|ARZ5H0P1187B98A1DD|          Snoop Dogg

## Event Data

In [74]:
# Read log_data from S3
log_data_read_path = "s3a://udacity-dend/log_data/2018/11/2018-11-13-events.json"
log_df = spark.read.json(log_data_read_path)

log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [75]:
# Create table
log_df.createOrReplaceTempView("staging_events")

In [76]:
log_df.toPandas().head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540007000000.0,514,,200,1542069417796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540677000000.0,510,,200,1542071524796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
3,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,510,A Party Song (The Walk of Shame),200,1542071549796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51
4,Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1540872000000.0,379,Pop-Pop!,200,1542079142796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9


In [77]:
# UDF method 1
spark.udf.register("get_hourx", lambda x: 11)
spark.sql("select *, get_hourx(ts) from staging_events limit 6").toPandas().head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,get_hourx(ts)
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540007000000.0,514,,200,1542069417796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,11
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,11


In [78]:
# UDF method 2
get_hour = udf(lambda x: datetime.fromtimestamp(x / 1000.0).hour)
log_df = log_df.withColumn("hour", get_hour(log_df.ts))

log_df.createOrReplaceTempView("staging_events")
spark.sql("""SELECT hour, count(*) from staging_events group by 1 order by hour""").show()

+----+--------+
|hour|count(1)|
+----+--------+
|   0|       2|
|   1|       2|
|  10|      26|
|  11|       5|
|  13|      43|
|  14|      22|
|  15|      23|
|  16|      34|
|  17|      31|
|  18|      18|
|  19|      26|
|  20|      37|
|  21|      29|
|  22|      35|
|  23|      18|
|   3|       2|
|   4|       2|
|   5|       1|
|   6|       3|
|   7|       3|
+----+--------+
only showing top 20 rows



In [86]:
# Get timestamp
get_timestamp = udf(lambda x: x/1000.0)
log_df = log_df.withColumn("start_time", get_timestamp(log_df.ts))

log_df.createOrReplaceTempView("staging_events")

log_df.toPandas().head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,hour,start_time
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540007000000.0,514,,200,1542069417796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,0,1542069417.796
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,514,Ja I Ty,200,1542069637796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,0,1542069637.796
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540677000000.0,510,,200,1542071524796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51,1,1542071524.796
3,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540677000000.0,510,A Party Song (The Walk of Shame),200,1542071549796,"""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",51,1,1542071549.796
4,Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1540872000000.0,379,Pop-Pop!,200,1542079142796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9,3,1542079142.796


In [93]:
spark.sql("""
        SELECT DISTINCT 
            start_time
        FROM staging_events
        WHERE start_time IS NOT NULL
    """).show()
    

+----------------+
|      start_time|
+----------------+
|1.542115408796E9|
|1.542118818796E9|
|1.542128784796E9|
|1.542129654796E9|
|1.542135080796E9|
|1.542139216796E9|
|1.542139469796E9|
|1.542143755796E9|
|1.542123180796E9|
|1.542146550796E9|
|1.542091623796E9|
|1.542139793796E9|
|1.542144812796E9|
|1.542127338796E9|
|1.542099785796E9|
|1.542120547796E9|
|1.542144853796E9|
|1.542146918796E9|
|1.542105601796E9|
|1.542150110796E9|
+----------------+
only showing top 20 rows



In [105]:
spark.udf.register("get_hour", lambda x: datetime.fromtimestamp(x).hour)
spark.udf.register("get_day", lambda x: datetime.fromtimestamp(x).day)
spark.udf.register("get_week", lambda x: datetime.fromtimestamp(x).isocalendar()[1])
spark.udf.register("get_month", lambda x: datetime.fromtimestamp(x).month)
spark.udf.register("get_year", lambda x: datetime.fromtimestamp(x).year)
spark.udf.register("get_weekday", lambda x: datetime.fromtimestamp(x).weekday())

<function __main__.<lambda>(x)>

In [106]:
spark.sql("""
        SELECT DISTINCT 
            start_time, 
            get_hour(start_time),
            get_day(start_time),
            get_week(start_time),
            get_month(start_time),
            get_year(start_time),
            get_weekday(start_time)
        FROM staging_events
        WHERE start_time IS NOT NULL
    """).show()

+----------------+--------------------+-------------------+--------------------+---------------------+--------------------+-----------------------+
|      start_time|get_hour(start_time)|get_day(start_time)|get_week(start_time)|get_month(start_time)|get_year(start_time)|get_weekday(start_time)|
+----------------+--------------------+-------------------+--------------------+---------------------+--------------------+-----------------------+
|1.542148786796E9|                  22|                 13|                  46|                   11|                2018|                      1|
|1.542121737796E9|                  15|                 13|                  46|                   11|                2018|                      1|
|1.542123180796E9|                  15|                 13|                  46|                   11|                2018|                      1|
|1.542101346796E9|                   9|                 13|                  46|                   11|          