In [1]:
#import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1561596406560_0002,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
                    .getOrCreate()

VBox()

In [3]:
input_data = "s3a://udacity-dend/"
output_data = "./"

VBox()

In [4]:
# get filepath to song data file
file_path = 'song_data/*/*/*'
song_data = input_data + file_path
song_output_data = output_data + 'songs'
artist_output_data = output_data + 'artists'
time_output_data = output_data + 'time'
users_output_data = output_data + 'users'
songplays_output_data = output_data + 'songplays'

VBox()

In [5]:
# read song data file
song_df = spark.read.json(song_data)

VBox()

In [6]:
# extract columns to create songs table
songs_columns = ('song_id','title','artist_id','year','duration')
songs_table = song_df.select(*songs_columns)

VBox()

In [7]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(song_output_data, mode='overwrite')

VBox()

In [8]:
# extract columns to create artists table
artists_columns = ('artist_id','artist_name','artist_location','artist_latitude','artist_longitude')
artists_table = song_df.select(*artists_columns)

VBox()

In [9]:
# write artists table to parquet files
artists_table.write.parquet(artist_output_data, mode='overwrite')

VBox()

In [10]:
# get filepath to log data file
file_path ='log_data/*/*/*'
log_data = input_data + file_path
log_output_data = output_data + 'logs'

VBox()

In [11]:
# read log data file
df = spark.read.json(log_data)

VBox()

In [12]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

VBox()

In [13]:
# extract columns for users table
users_columns = ('userId', 'firstName', 'lastName', 'gender', 'level')
users_table = df.select(*users_columns)

VBox()

In [14]:
# write users table to parquet files
users_table.write.parquet(users_output_data, mode='overwrite')

VBox()

In [15]:
# create datetime column from original timestamp column
#get_datetime = udf()
# divide by 1000 to get seconds and convert to timestamp from unixtime
df = df.withColumn('timestamp', col('ts')/1000).withColumn('timestamp', from_unixtime('timestamp').cast("timestamp"))

VBox()

In [16]:
# extract columns to create time table
time_table = df.select( \
                       'timestamp', \
                        hour('timestamp').alias('hour'), \
                        dayofmonth("timestamp").alias('day'), \
                        weekofyear('timestamp').alias('weekofyear'), \
                        month("timestamp").alias('month'), \
                        year("timestamp").alias('year'), \
                        dayofweek('timestamp').alias('dayofweek') \
                       ).dropDuplicates(subset=['timestamp'])

VBox()

In [17]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet(time_output_data, mode='overwrite')

VBox()

In [18]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, (df.song == song_df.title) & (df.artist == song_df.artist_name))\
                    .select('timestamp','userId','level','sessionId','location','userAgent','song_id','artist_id')

VBox()

In [19]:
# write songplays table to parquet files partitioned by year and month
songplays_table.withColumn('year', year('timestamp'))\
               .withColumn('month', month('timestamp'))\
               .write.partitionBy('year','month').parquet(songplays_output_data, mode='overwrite')

VBox()