In [48]:
import configparser
import os
import datetime

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import weekofyear, monotonically_increasing_id, udf, col, from_unixtime, year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, to_date, from_unixtime
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

In [49]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['default']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['default']['AWS_SECRET_ACCESS_KEY']
os.environ['region']=config['default']['region']
os.environ['region_name']=config['default']['region']

In [50]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID'])
hadoop_conf.set("fs.s3n.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY'])
hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

In [51]:
# get filepath to song data file
schema = StructType([
    StructField("num_songs", IntegerType(), True),
    StructField("artist_id", StringType(), True),
    StructField("artist_latitude", StringType(), True),
    StructField("artist_longitude", StringType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("song_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("year", IntegerType(), True),
])


In [52]:
song_data_path = "data/song_data/*/*/*/*.json"


In [53]:
#songs - songs in music database
#song_id, title, artist_id, year, duration
song_df = spark.read.json(song_data_path) #schema
songs_table = song_df.select('song_id','title', 'artist_id', 'year', 'duration')
songs_table = songs_table.sort(["song_id"])
songs_table.toPandas().head(2)

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,ARPBNLO1187FB3D52F,2000,43.36281
1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363


In [54]:
#artists - artists in music database
#artist_id, name, location, lattitude, longitude

artists_table = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude' )
artists_table = artists_table.withColumnRenamed("artist_name", "name")
artists_table = artists_table.withColumnRenamed("artist_location", "location")
artists_table = artists_table.withColumnRenamed("artist_latitude", "latitude")
artists_table = artists_table.withColumnRenamed("artist_longitude", "longitude")
artists_table = artists_table.dropDuplicates(["artist_id"])
artists_table = artists_table.sort(["name"])
artists_table.toPandas().head(2)

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR558FS1187FB45658,40 Grit,,,
1,AR7G5I41187FB4CE6C,Adam Ant,"London, England",,


In [55]:
log_data_path = "data/log_data/*.json"
log_df = spark.read.json(log_data_path)
log_df.toPandas().head(2)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [56]:
# users - users in the app
# user_id, first_name, last_name, gender, level
users_table = log_df.select('userId', 'firstName', 'lastName', 'level')
users_table = users_table.withColumnRenamed("userId", "user_id")
users_table = users_table.withColumnRenamed("firstName", "first_name")
users_table = users_table.withColumnRenamed("lastName", "last_name")
users_table = users_table.dropDuplicates().filter("user_id != ''").sort(["user_id"])
users_table.toPandas().head(2)

Unnamed: 0,user_id,first_name,last_name,level
0,10,Sylvie,Cruz,free
1,100,Adler,Barrera,free


In [57]:
# songplays - records in log data associated with song plays i.e. records with page NextSong
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
filter_song_df = song_df
filter_log_df = log_df.select('ts', 'userId', 'level', 'artist', 'sessionId', 'userAgent','song').sort('song')
filter_log_df = filter_log_df.withColumn("start_time", to_timestamp(from_unixtime(F.col('ts')/1000)))
songplays_table = filter_log_df.join(filter_song_df, filter_song_df.title ==  filter_log_df.song)
songplays_table = songplays_table.withColumnRenamed("userId", "user_id")
songplays_table = songplays_table.withColumnRenamed("sessionId", "session_id")
songplays_table = songplays_table.withColumnRenamed("artist_location", "location")
songplays_table = songplays_table.withColumnRenamed("userAgent", "user_agent")
songplays_table.select('start_time', 'user_id', 'level', 'song_id', 'artist_id', 'session_id', 'location', 'user_agent')
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())
songplays_table.toPandas().head(5)

Unnamed: 0,ts,user_id,level,artist,session_id,user_agent,song,start_time,artist_id,artist_latitude,location,artist_longitude,artist_name,duration,num_songs,song_id,title,year,songplay_id
0,1542171963796,10,free,Percubaba,484,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Intro,2018-11-14 05:06:03,AR558FS1187FB45658,,,,40 Grit,75.67628,1,SOGDBUF12A8C140FAA,Intro,2003,721554505728
1,1542618860796,24,paid,Calvin Richardson,672,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",Intro,2018-11-19 09:14:20,AR558FS1187FB45658,,,,40 Grit,75.67628,1,SOGDBUF12A8C140FAA,Intro,2003,721554505729
2,1543358159796,80,paid,Samy Deluxe,992,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Intro,2018-11-27 22:35:59,AR558FS1187FB45658,,,,40 Grit,75.67628,1,SOGDBUF12A8C140FAA,Intro,2003,721554505730
3,1542837407796,15,paid,Elena,818,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",Setanta matins,2018-11-21 21:56:47,AR5KOSW1187FB35FF4,49.80388,Dubai UAE,15.47491,Elena,269.58322,1,SOZCTXZ12AB0182364,Setanta matins,0,1228360646656


In [58]:
# time - timestamps of records in songplays broken down into specific units
# start_time, hour, day, week, month, year, weekday

#Resolve Start Time
time_table = log_df.select('ts').withColumn("start_time", to_timestamp(from_unixtime(F.col('ts')/1000)))

#Resolve Hour
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
time_table = time_table.withColumn("hour", get_hour(log_df.ts))

#Resolve Day
get_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
time_table = time_table.withColumn("day", get_day(log_df.ts))

#Resolve Month
get_month = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).month)
time_table = time_table.withColumn("month", get_month(log_df.ts))

#Resolve Year
get_year = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).year)
time_table = time_table.withColumn("year", get_year(log_df.ts))
#time_table = time_table.withColumn("week", get_week(log_df.ts))

# Resolve WeekDay
get_weekday = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).weekday())
time_table = time_table.withColumn("weekday", get_weekday(log_df.ts))

# Resolve Week In Year
time_table = time_table.withColumn("week", weekofyear(time_table.start_time))

# Remove TS since it's no longer needed
time_table = time_table.drop('ts')


time_table.toPandas().head(5)

Unnamed: 0,start_time,hour,day,month,year,weekday,week
0,2018-11-15 00:30:26,0,15,11,2018,3,46
1,2018-11-15 00:41:21,0,15,11,2018,3,46
2,2018-11-15 00:45:41,0,15,11,2018,3,46
3,2018-11-15 01:57:51,1,15,11,2018,3,46
4,2018-11-15 03:29:37,3,15,11,2018,3,46


In [59]:
# Write To S3
# arn:aws:s3:::udacityproject4
time_table.write.partitionBy(['year']).parquet("s3n://udacityproject4/time.parquet", mode="overwrite")

In [61]:
parquetFile = spark.read.parquet("s3n://udacityproject4/time.parquet")
parquetFile.printSchema()
parquetFile.toPandas().head(3)

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- week: integer (nullable = true)
 |-- year: integer (nullable = true)



Unnamed: 0,start_time,hour,day,month,weekday,week,year
0,2018-11-15 00:30:26,0,15,11,3,46,2018
1,2018-11-15 00:41:21,0,15,11,3,46,2018
2,2018-11-15 00:45:41,0,15,11,3,46,2018


In [None]:
songplays_table.write.parquet("s3n://udacityproject4/songplays.parquet", mode="overwrite")
users_table.write.parquet("s3n://udacityproject4/users.parquet", mode="overwrite")
artists_table.write.parquet("s3n://udacityproject4/artists.parquet", mode="overwrite")
songs_table.write.parquet("s3n://udacityproject4/songs.parquet", mode="overwrite")

In [46]:
# Read From S3 Parquet
parquetFile = spark.read.parquet("s3n://udacityproject4/songs.parquet")
parquetFile.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)

