In [77]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf, col, to_timestamp, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pandas as pd

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [4]:
df = spark\
    .read.option("header", "true")\
    .option("inferSchema", "true")\
    .option("charset", "UTF-8")\
    .json('s3a://udacity-dend/song_data/*/*/*/*.json')

In [5]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [49]:
df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,233.22077,1,SOVIYJY12AF72A4B00,The Dead Next Door (Digitally Remastered 99),1983
1,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,287.92118,1,SOVYXYL12AF72A3373,Rebel Yell (1999 Digital Remaster),1983
2,ARQ846I1187B9A7083,,,,Yvonne S. Moriarty / Walt Fowler / Ladd McInto...,196.04853,1,SOEPTVC12A67ADD0DA,"To Zucchabar [""Gladiator"" - Music from the Mot...",0
3,AR4T2IF1187B9ADBB7,63.96027,"<a href=""http://billyidol.net"" onmousedown='Un...",10.22442,Billy Idol,247.53587,1,SOLQYSZ12AB0181F97,Mony Mony (Live),1987
4,AR3TZ691187FB3DBB1,,,,Russell Watson / Pino Palladino / Robbie McInt...,273.44934,1,SOVPFJK12A6701CB16,Barcelona - (Friends until the end),2000


In [86]:
songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration')

In [14]:
songs_table.limit(10).toPandas()

Unnamed: 0,song_id,artist_id,year,duration
0,SOVIYJY12AF72A4B00,AR4T2IF1187B9ADBB7,1983,233.22077
1,SOVYXYL12AF72A3373,AR4T2IF1187B9ADBB7,1983,287.92118
2,SOEPTVC12A67ADD0DA,ARQ846I1187B9A7083,0,196.04853
3,SOLQYSZ12AB0181F97,AR4T2IF1187B9ADBB7,1987,247.53587
4,SOVPFJK12A6701CB16,AR3TZ691187FB3DBB1,2000,273.44934
5,SOETDBF12A81C20BC0,AR4T2IF1187B9ADBB7,2005,237.06077
6,SOIEXLS12A6D4F792F,AR4T2IF1187B9ADBB7,1993,443.14077
7,SOQEBML12A8C136AA4,AR5R7791187FB3A8C3,0,55.40526
8,SOTCIHX12A8C13DDD2,ARYOIZG1187FB41E30,2006,483.34322
9,SODUMDU12AC468A22B,ARHOSMU1242078130D,0,249.5473


### Log data

read loga data with inferSchema option

In [17]:
log_data = spark \
        .read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("charset", "UTF-8") \
        .json('s3a://udacity-dend/log-data/*/*/*.json')

In [18]:
# display structure
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [66]:
# filter by 'NextSong' event and pring out first 10 records with Pandas

log_data = log_data.filter(col('page')== 'NextSong')
log_data.filter(col('page')== 'NextSong').limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


### Creat user table

Extract data from log file timestamp and create **user_table** with following fields: 

- userId
- firstName 
- lastName
- gender
- level

In [29]:
users_table = log_data.select('userId','firstName', 'lastName', 'gender', 'level')
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|     9|    Wyatt|   Scott|     M| free|
|    12|   Austin| Rosales|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows



### Creat time table

Extract data from log file timestamp and create **time_table** with following fields: 

- ts
- datetime 
- hour
- day
- week
- month 
- year

In [61]:
# Extract ts column to the new dataframe
df_time = log_data.select('ts')

# add 'datetime' column with date in "yyyy-MM-dd hh:mm:ss" format 
df_time = df_time.withColumn('datetime', to_timestamp((df_time['ts']/1000) \
                 .cast('timestamp'), "yyyy-MM-dd hh:mm:ss"))

In [62]:
df_time.show(5)

+-------------+-------------------+
|           ts|           datetime|
+-------------+-------------------+
|1542241826796|2018-11-14 16:30:26|
|1542242481796|2018-11-14 16:41:21|
|1542242741796|2018-11-14 16:45:41|
|1542253449796|2018-11-14 19:44:09|
|1542260935796|2018-11-14 21:48:55|
+-------------+-------------------+
only showing top 5 rows



In [63]:
# extract hour, day, week, month, yeat from datetime and add to the dataframe 
time_table = df_time.withColumn('hour', hour(col('datetime'))) \
                        .withColumn('day', dayofmonth(col('datetime'))) \
                        .withColumn('week', weekofyear(col('datetime'))) \
                        .withColumn('month', month(col('datetime'))) \
                        .withColumn('year', year(col('datetime')))

In [64]:
time_table.show(5)

+-------------+-------------------+----+---+----+-----+----+
|           ts|           datetime|hour|day|week|month|year|
+-------------+-------------------+----+---+----+-----+----+
|1542241826796|2018-11-14 16:30:26|  16| 14|  46|   11|2018|
|1542242481796|2018-11-14 16:41:21|  16| 14|  46|   11|2018|
|1542242741796|2018-11-14 16:45:41|  16| 14|  46|   11|2018|
|1542253449796|2018-11-14 19:44:09|  19| 14|  46|   11|2018|
|1542260935796|2018-11-14 21:48:55|  21| 14|  46|   11|2018|
+-------------+-------------------+----+---+----+-----+----+
only showing top 5 rows



### Creat songplay table

Extract data from both song-data and log-data and create **songplays_table**  with following fields: 

- songplayId
- start_time
- userId
- level
- songId
- artistId
- sessionId
- location
- userAgent
- month
- year

In [89]:
songplays_table = df.join(log_data, (df.artist_name == log_data.artist) &  (df.title == log_data.song)) \
                             .withColumn('songplay_id', monotonically_increasing_id()) \
                             .withColumn('start_time', to_timestamp((log_data['ts']/1000) \
                                                       .cast('timestamp'), 'yyyy-MM-dd hh:mm:ss')) \
                             .select('songplay_id', \
                                     'start_time', \
                                     'userId', \
                                     'level', \
                                     'song', \
                                     'sessionId', \
                                     'artist_location', \
                                     'userAgent', \
                                     month(col('start_time')).alias('month'), \
                                     year(col('start_time')).alias('year'))

In [90]:
songplays_table.limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,userId,level,song,sessionId,artist_location,userAgent,month,year
0,8589934592,2018-11-21 00:25:43,88,paid,Die Kunst der Fuge_ BWV 1080 (2007 Digital Rem...,744,"Geneva, Switzerland","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
1,25769803776,2018-11-29 08:58:01,49,paid,Fighters (feat. Matthew Santos) (Amended Album...,1041,"Chicago, IL",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,11,2018
2,34359738368,2018-11-28 00:18:57,58,paid,"Suena (""Some Day"" end title song ""The Hunchbac...",887,"San Juan, Puerto Rico","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",11,2018
3,34359738369,2018-11-28 15:34:43,24,paid,It's Not Easy (Being Green) (Featuring Pierre ...,984,"Berkeley, California","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",11,2018
4,68719476736,2018-11-26 07:37:14,88,paid,Shimmy Shimmy Quarter Turn (Take It Back To Sq...,900,"Huntington Beach, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018


In [93]:
# print out how data looks
songplays_table.limit(10).toPandas()

Unnamed: 0,songplay_id,start_time,userId,level,song,sessionId,artist_location,userAgent,month,year
0,8589934592,2018-11-21 00:25:43,88,paid,Die Kunst der Fuge_ BWV 1080 (2007 Digital Rem...,744,"Geneva, Switzerland","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
1,25769803776,2018-11-29 08:58:01,49,paid,Fighters (feat. Matthew Santos) (Amended Album...,1041,"Chicago, IL",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,11,2018
2,34359738368,2018-11-28 00:18:57,58,paid,"Suena (""Some Day"" end title song ""The Hunchbac...",887,"San Juan, Puerto Rico","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",11,2018
3,34359738369,2018-11-28 15:34:43,24,paid,It's Not Easy (Being Green) (Featuring Pierre ...,984,"Berkeley, California","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",11,2018
4,68719476736,2018-11-26 07:37:14,88,paid,Shimmy Shimmy Quarter Turn (Take It Back To Sq...,900,"Huntington Beach, CA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
5,103079215104,2018-11-16 09:18:51,36,paid,Waterfall (Spirit Of The Rainforest Album Vers...,461,Tennessee,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",11,2018
6,103079215105,2018-11-29 20:57:03,49,paid,Waterfall (Spirit Of The Rainforest Album Vers...,1079,Tennessee,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,11,2018
7,111669149696,2018-11-28 13:05:13,73,paid,Where The Wild Things Were,954,UK - England - South East,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",11,2018
8,111669149697,2018-11-27 10:22:58,36,paid,Let It Go,957,"Oakland, CA","""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",11,2018
9,137438953472,2018-11-03 10:59:01,15,paid,Catch You Baby (Steve Pitron & Max Sanna Radio...,199,Nevada,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",11,2018
