# Sparkify Data Lake

## Create a Spark Session

In [1]:
%%spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1621450081106_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Import Spark Functions

In [10]:
from pyspark.sql.functions import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Set variables for s3 input and output data paths 

In [2]:
input_data = "s3a://udacity-dend/"
output_data = 's3a://scotty-test-spark/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read in song data

In [4]:
song_data = spark.read.json(input_data + "song_data/*/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read in log data

In [8]:
log_data = spark.read.json(input_data + "log-data/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Filter log data on page = NextSong and add a new start_time column from the ts column

In [11]:
log_data = log_data.filter(log_data.page == "NextSong").withColumn('start_time', from_unixtime(log_data.ts/1000)).drop('ts')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create temp tables to query

In [12]:
song_data.createOrReplaceTempView("songs")
log_data.createOrReplaceTempView("logs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Join the two tables for easier queries 

In [16]:
joined_data = spark.sql(
    '''SELECT *
    FROM logs l
    LEFT OUTER JOIN songs s
    ON (l.artist = s.artist_name
    AND l.song = s.title
    AND l.length = s.duration)''')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create raw_data temp table to query

In [17]:
joined_data.createOrReplaceTempView('raw_data')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create a songs table

In [19]:
songs = spark.sql(
    '''SELECT
    DISTINCT song_id,
    title,
    artist_id,
    year,
    duration
    FROM raw_data
    WHERE song_id IS NOT NULL
    AND title IS NOT NULL
    AND artist_id IS NOT NULL'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write songs table as a parqeut file to s3

In [21]:
songs.write.format('parquet').partitionBy('year', 'artist_id').save(output_data + 'songs')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create an artists table

In [22]:
artists = spark.sql(
    '''SELECT
    DISTINCT artist_id,
    artist_name,
    artist_location,
    artist_latitude,
    artist_longitude
    FROM raw_data
    WHERE artist_id IS NOT NULL
    AND artist_name IS NOT NULL'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write artists table as a parqeut file to s3

In [None]:
artists.write.format('parquet').save(output_data + 'artist')

## Create a users table

In [26]:
users = spark.sql(
    '''SELECT
    DISTINCT userId,
    firstName,
    lastName,
    gender,
    level
    FROM raw_data rd1
    WHERE start_time = (SELECT MAX(start_time) FROM raw_data rd2
    WHERE rd1.userId = rd2.userId)'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write users table as a parqeut file to s3

In [None]:
users.write.format('parquet').save(output_data + 'users')

## Create a time table

In [27]:
time = spark.sql(
    '''SELECT DISTINCT start_time,
    date_format(start_time, 'HH') AS hour,
    date_format(start_time, 'dd') AS day,
    date_format(start_time, 'F') AS week,
    date_format(start_time, 'MM') AS month,
    date_format(start_time, 'y') AS year,
    date_format(start_time, 'E') AS weekday
    FROM raw_data'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write time table as a parqeut file to s3

In [None]:
time.write.format('parquet').partitionBy('year', 'month').save(output_data  + 'time')

In [28]:
time.createOrReplaceTempView('time_table')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create a songplays fact table

In [30]:
songplays_table = spark.sql(
    '''SELECT
    monotonically_increasing_id() AS songplay_id,
    t.start_time,
    userId,
    level,
    song_id,
    artist_id,
    sessionId,
    location,
    userAgent,
    t.year,
    t.month
    FROM raw_data r
    LEFT JOIN time_table t
    ON r.start_time = t.start_time'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write songplay fact table as a parqeut file to s3

In [None]:
songplay_table.write.format('parquet').partitionBy('year', 'month').save(output_data + 'songplays')