# Test on local machine

In [2]:
import configparser
import datetime as dt
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import (year, month, dayofmonth, hour,
                                   weekofyear, dayofweek, date_format)
from pyspark.sql.types import TimestampType, DateType

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

## Create Spark Session

In [4]:
spark = (
    SparkSession.builder
                .appName("sparkify")
                .config(
                    "spark.jars.packages",
                    "org.apache.hadoop:hadoop-aws:3.2.0"
                )
                .getOrCreate()
)
# spark = (
#     SparkSession.builder
#                 .getOrCreate()
# )

In [5]:
spark

In [6]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://sbucket62/sparkify/"

## Process song data

In [7]:
song_data = os.path.join(input_data, "song_data/A/A/[AB]/*.json") # using a small subset of data
song_data

's3a://udacity-dend/song_data/A/A/[AB]/*.json'

### Read song data files

In [8]:
song_data_df = spark.read.json(song_data)

In [9]:
song_data_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
1,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0
2,ARSVTNL1187B992A91,51.50632,"London, England",-0.12714,Jonathan King,129.85424,1,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),2001
3,ARKIQCZ1187B9A7C7C,52.23974,"Northampton, Northamptonshire, En",-0.88576,Bauhaus,248.65914,1,SOSIJKW12A8C1330E3,A God In An Alcove (Session Version),0
4,AR73AIO1187B9AD57B,37.77916,"San Francisco, CA",-122.42005,Western Addiction,118.07302,1,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,2005


In [10]:
song_data_df.count()

50

In [11]:
song_data_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### Extract columns to create songs table

In [12]:
songs_table_df = song_data_df.select("song_id", "title", "artist_id", "year", "duration")
songs_table_df.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),ARTC1LV1187B9A4858,1972,301.40036
1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,ARA23XO1187B9AF18F,0,192.522
2,SOEKAZG12AB018837E,I'll Slap Your Face (Entertainment USA Theme),ARSVTNL1187B992A91,2001,129.85424
3,SOSIJKW12A8C1330E3,A God In An Alcove (Session Version),ARKIQCZ1187B9A7C7C,0,248.65914
4,SOQPWCR12A6D4FB2A3,A Poor Recipe For Civic Cohesion,AR73AIO1187B9AD57B,2005,118.07302


In [13]:
songs_table_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



### Write songs table to parquet files partitioned by year and artist 

In [14]:
songs_output_path = os.path.join(output_data, "songs/songs_table.parquet")
songs_output_path

's3a://sbucket62/sparkify/songs/songs_table.parquet'

In [15]:
songs_table_df.write.partitionBy("year", "artist_id").mode("overwrite").parquet(songs_output_path)

### Extract columns to create artists table

In [16]:
artists_table_df = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
artists_table_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARTC1LV1187B9A4858,The Bonzo Dog Band,"Goldsmith's College, Lewisham, Lo",51.4536,-0.01802
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956
2,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
3,ARKIQCZ1187B9A7C7C,Bauhaus,"Northampton, Northamptonshire, En",52.23974,-0.88576
4,AR73AIO1187B9AD57B,Western Addiction,"San Francisco, CA",37.77916,-122.42005


In [17]:
artists_table_df.count()

50

In [18]:
artists_table_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [19]:
artists_table_unique_df = artists_table_df.dropDuplicates()

In [20]:
artists_table_unique_df.count()

50

In [21]:
artists_table_unique_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



### Write artist table to parquet files

In [22]:
artists_output_path = os.path.join(output_data, "artists/artists_table.parquet")
artists_output_path

's3a://sbucket62/sparkify/artists/artists_table.parquet'

In [23]:
artists_table_unique_df.write.mode("overwrite").parquet(artists_output_path)

## Process log data

In [24]:
log_data = os.path.join(input_data, "log_data/*/*/*.json")
log_data

's3a://udacity-dend/log_data/*/*/*.json'

### Read log data files

In [25]:
logs_df = spark.read.json(log_data)
logs_df.limit(10).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9.0
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12.0
5,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61.0
6,,Logged In,Samuel,M,1,Gonzalez,,free,"Houston-The Woodlands-Sugar Land, TX",GET,About,1540493000000.0,597,,200,1542253460796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61.0
7,,Logged Out,,,0,,,paid,,PUT,Login,,602,,307,1542260074796,,
8,,Logged In,Tegan,F,1,Levine,,paid,"Portland-South Portland, ME",GET,Home,1540794000000.0,602,,200,1542260277796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0
9,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0


### Filter by actions for song plays

In [26]:
logs_filtered_df = logs_df.where(logs_df.page == "NextSong")
logs_filtered_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [27]:
logs_filtered_df.count()

6820

In [28]:
logs_filtered_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Extract columns for users table

In [29]:
users_table_df = logs_filtered_df.selectExpr("userId AS user_id",
                                             "firstName AS first_name",
                                             "lastName AS last_name",
                                             "gender", 
                                             "level"
                                            )
users_table_df.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,26,Ryan,Smith,M,free
2,26,Ryan,Smith,M,free
3,61,Samuel,Gonzalez,M,free
4,80,Tegan,Levine,F,paid


In [30]:
users_table_df.count()

6820

In [31]:
users_table_unique_df = users_table_df.dropDuplicates()
users_table_unique_df.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free


In [32]:
users_table_unique_df.count()

104

In [33]:
users_table_unique_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



### Write users table to parquet files

In [34]:
users_output_path = os.path.join(output_data, "users/users_table.parquet")
users_output_path

's3a://sbucket62/sparkify/users/users_table.parquet'

In [35]:
users_table_unique_df.write.mode("overwrite").parquet(users_output_path)

### Create timestamp column from original timestamp column

In [36]:
get_timestamp = udf(lambda x: dt.datetime.utcfromtimestamp(x / 1000), TimestampType())

In [37]:
logs_filtered_df = logs_filtered_df.withColumn("start_time", get_timestamp("ts"))
logs_filtered_df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796


In [38]:
logs_filtered_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)



### Extract columns to create time table

In [39]:
time_table_df = logs_filtered_df.select("start_time")
time_table_df.count()

6820

In [40]:
time_table_unique_df = time_table_df.dropDuplicates()
time_table_unique_df.count()

6813

In [41]:
time_table_final_df = time_table_unique_df.select("start_time",
                                                  hour("start_time").alias("hour"),
                                                  dayofmonth("start_time").alias("day"),
                                                  weekofyear("start_time").alias("week"),
                                                  month("start_time").alias("month"),
                                                  year("start_time").alias("year"),
                                                  dayofweek("start_time").alias("weekday")
                                                  )
time_table_final_df.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 20:49:56.796,20,15,46,11,2018,5
1,2018-11-21 04:57:02.796,4,21,47,11,2018,4
2,2018-11-21 09:57:27.796,9,21,47,11,2018,4
3,2018-11-14 11:57:20.796,11,14,46,11,2018,4
4,2018-11-28 23:31:19.796,23,28,48,11,2018,4


In [42]:
time_table_final_df.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



### Write time table to parquet files partioned by year and month

In [43]:
time_output_path = os.path.join(output_data, "time/time_table.parquet")
time_output_path

's3a://sbucket62/sparkify/time/time_table.parquet'

In [44]:
# write just 50 rows to check
time_table_final_df.limit(50).write.partitionBy("year", "month").mode("overwrite").parquet(time_output_path)

### Create songplays table by joining `logs_filtered_df` and `song_data_df`  

In [59]:
logs_filtered_df.createOrReplaceTempView("logs_filtered_table")
song_data_df.createOrReplaceTempView("song_data_table")

In [75]:
songplays_table_df = spark.sql("""
    SELECT
        row_number() OVER(ORDER BY start_time) AS songplay_id,
        l.start_time,
        l.userId AS user_id,
        l.level,
        s.song_id,
        s.artist_id,
        l.sessionId AS session_id,
        s.artist_location,
        l.userAgent AS user_agent
    FROM logs_filtered_table AS l
    LEFT OUTER JOIN song_data_table AS s ON
        (l.song = s.title)
        AND (l.artist = s.artist_name)
        AND (l.length = s.duration)
""")

In [76]:
songplays_table_df.limit(20).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,artist_location,user_agent
0,1,2018-11-01 21:01:46.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
1,2,2018-11-01 21:05:52.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
2,3,2018-11-01 21:08:16.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
3,4,2018-11-01 21:11:13.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
4,5,2018-11-01 21:17:33.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
5,6,2018-11-01 21:24:53.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
6,7,2018-11-01 21:28:54.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK..."
7,8,2018-11-01 21:42:00.796,10,free,,,9,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
8,9,2018-11-01 21:52:05.796,26,free,,,169,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
9,10,2018-11-01 21:55:25.796,26,free,,,169,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [77]:
songplays_table_df.count()

6820

### Write songplays table to parquet files partitioned by year and month

In [78]:
songplays_table_df = songplays_table_df.withColumn("year", year("start_time")) \
                                       .withColumn("month", month("start_time"))
songplays_table_df.limit(10).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,artist_location,user_agent,year,month
0,1,2018-11-01 21:01:46.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
1,2,2018-11-01 21:05:52.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
2,3,2018-11-01 21:08:16.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
3,4,2018-11-01 21:11:13.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
4,5,2018-11-01 21:17:33.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
5,6,2018-11-01 21:24:53.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
6,7,2018-11-01 21:28:54.796,8,free,,,139,,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2018,11
7,8,2018-11-01 21:42:00.796,10,free,,,9,,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
8,9,2018-11-01 21:52:05.796,26,free,,,169,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
9,10,2018-11-01 21:55:25.796,26,free,,,169,,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11


In [79]:
songplays_output_path = os.path.join(output_data, "songplays/songplays_table.parquet")
songplays_output_path

's3a://sbucket62/sparkify/songplays/songplays_table.parquet'

In [80]:
# write just few rows to check
songplays_table_df.limit(20).write.partitionBy("year", "month").mode("overwrite").parquet(songplays_output_path)