In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, DateType

from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek, weekofyear, from_unixtime, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1610489326005_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
input_data = "s3://udacity-dend"
output_data = "s3://myudacitybucket/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data_path = input_data + "/song_data/*/*/*/*.json"

    # read song data file
    df =  spark.read.json(song_data_path)

    # extract columns to create songs table and drop na in primary key column
    songs_table = df.select("song_id","title", "artist_id", "year","duration").dropna(subset = 'song_id')

    # Ensure no duplicates in song_id 
    windowSpec = \
      Window \
    .partitionBy("song_id") \
    .orderBy(col("year").desc(),"duration","title","duration")

    songs_table_uniques = songs_table.withColumn("row_number",row_number().over(windowSpec)).where("row_number = 1")


    # write songs table to parquet files partitioned by year and artist
    songs_table_uniques.write \
        .mode("overwrite") \
        .partitionBy("year","artist_id") \
        .save(output_data +"parquet/songs.parquet", format = 'parquet', header = True)


    # extract columns to create artists table
    artists_table = df.select("year","artist_id","artist_name", "artist_location", "artist_latitude","artist_longitude") \
        .dropna(subset = 'artist_id')

    # Ensure no duplicates in artist_id 
    windowSpec = \
      Window \
    .partitionBy("artist_id") \
    .orderBy(col("year").desc(),"artist_location","artist_name","artist_latitude","artist_longitude")

    artists_table_uniques = artists_table.withColumn("row_number",row_number().over(windowSpec)).where("row_number = 1")

    # write artists table to parquet files
    artists_table_uniques \
        .drop("year") \
        .write \
        .mode("overwrite") \
        .save(output_data + "parquet/artists.parquet", header = True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
process_song_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data_path = input_data + "/log_data/*/*/*.json"

    # read log data file
    df = spark.read.json(log_data_path)

    # extract columns for users table    
    users_table = df.select("userid","firstName","lastName","gender","level").dropDuplicates(['userid']).where(col("userid").isNotNull())

    # write users table to parquet files
    users_table.write \
        .mode("overwrite") \
        .save(output_data + "parquet/users.parquet", header = True)
    
    # create timestamp column from original timestamp column
    def format_datetime(ts):
        return datetime.fromtimestamp(ts/1000.0)

    get_timestamp = udf(lambda x: format_datetime(int(x)),TimestampType())
    df = df.withColumn("timestamp", get_timestamp(df.ts))

    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: format_datetime(int(x)),DateType())
    df = df.withColumn("datetime", get_timestamp(df.ts))

    # extract columns to create time table
    time_table = df.select('ts','datetime','timestamp',
                            hour(df.timestamp).alias('hour'),
                            dayofmonth(df.timestamp).alias('day'),
                            dayofweek(df.timestamp).alias('weekday'),
                            weekofyear(df.timestamp).alias('week'),
                            month(df.datetime).alias('month'),
                            year(df.datetime).alias('year')
                          ).dropDuplicates()
    # write time table to parquet files partitioned by year and month
    time_table.write \
        .mode("overwrite") \
        .partitionBy("year","month") \
        .save(output_data + "parquet/time.parquet", header = True)
    
    # read in song data to use for songplays table
    song_df = spark.read.json(input_data + "/song_data/*/*/*/*.json")
    
    
    # extract columns from joined song and log datasets to create songplays table 
    cond = [df.artist == song_df.artist_name , df.song == song_df.title,  df.length == song_df.duration ]
    songplays_table = df.join(song_df,cond) \
      .withColumn("timestamp",get_timestamp(df.ts)) \
      .select("timestamp","userid","level","song_id","artist_id","sessionid", "location", "userAgent")


    # write songplays table to parquet files partitioned by year and month
    songplays_table
    songplays_table.write \
            .mode("overwrite") \
            .partitionBy(year("timestamp"),month("timestamp")) \
            .save(output_data +"parquet/songplays.parquet", format = 'parquet', header = True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
process_log_data(spark, input_data, output_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

KeyboardInterrupt: 

In [None]:

def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = ""
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()