In [None]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [None]:
# load data from S3
log_path = "s3://udacity-dend/log_data"
song_path = "s3://udacity-dend/song_data"
log_data = spark.read.json(log_path)
song_data = spark.read.json(song_path)

log_data.createOrReplaceTempView("log_data")
song_data.createOrReplaceTempView("song_data")


In [None]:
# drop unnecessary tables 
spark.sql("drop table if exists songplays")
spark.sql("drop table if exists users")
spark.sql("drop table if exists songs")
spark.sql("drop table if exists artists")
spark.sql("drop table if exists songs")

In [None]:
# create tables
spark.sql("""
                         CREATE TABLE IF NOT EXISTS songplays (
                             songplay_id INTEGER IDENTITY(0,1) PRIMARY KEY SORTKEY,
                             start_time TIMESTAMP,
                             user_id VARCHAR,
                             level VARCHAR,
                             song_id VARCHAR NOT NULL,
                             artist_id VARCHAR NOT NULL,
                             session_id INTEGER,
                             location VARCHAR,
                             user_agent VARCHAR);
                         """)

spark.sql("""
                     CREATE TABLE IF NOT EXISTS users (
                         user_id INTEGER NOT NULL PRIMARY KEY DISTKEY,
                         first_name VARCHAR,
                         last_name VARCHAR,
                         gender VARCHAR,
                         level VARCHAR);
                     """)

spark.sql("""
                     CREATE TABLE IF NOT EXISTS songs (
                         song_id VARCHAR NOT NULL PRIMARY KEY,
                         title VARCHAR,
                         artist_id VARCHAR NOT NULL DISTKEY,
                         year INTEGER,
                         duration DECIMAL);
                     """)

spark.sql("""
                       CREATE TABLE IF NOT EXISTS artists (
                           artist_id VARCHAR NOT NULL PRIMARY KEY DISTKEY,
                           name VARCHAR,
                           location VARCHAR,
                           latitude DECIMAL,
                           longitude DECIMAL);
                       """)

spark.sql("""
                     CREATE TABLE IF NOT EXISTS time (
                         start_time TIMESTAMP NOT NULL PRIMARY KEY SORTKEY DISTKEY,
                         hour INTEGER,
                         day INTEGER,
                         week INTEGER,
                         month INTEGER,
                         year INTEGER,
                         weekday INTEGER);
                     """)

In [None]:
# transform data into final tables

songplay = spark.sql("""
                         INSERT INTO songplays (start_time,
                                                user_id,
                                                level,
                                                song_id,
                                                artist_id,
                                                session_id,
                                                location,
                                                user_agent)

                        SELECT events.ts as start_time,
                                         events.userId AS user_id,
                                         events.level AS level,
                                         songs.song_id AS song_id,
                                         songs.artist_id AS artist_id,
                                         events.sessionId AS session_id,
                                         events.location AS location,
                                         events.userAgent AS user_agent
                         FROM log_data events
                         JOIN song_data songs
                             ON events.song = songs.title
                             AND events.artist = songs.artist_name
                         WHERE events.page = 'NextSong';
                         """)

users = spark.sql("""
                     INSERT INTO users (user_id,
                                        first_name,
                                        last_name,
                                        gender,
                                        level)
                     SELECT DISTINCT userId AS user_id,
                                     firstName AS first_name,
                                     lastName AS last_name,
                                     gender AS gender,
                                     level AS level
                     FROM staging_events
                     WHERE userId IS NOT NULL;
                     """)

songs = spark.sql("""
                     INSERT INTO songs (song_id,
                                        title,
                                        artist_id,
                                        year,
                                        duration)
                     SELECT DISTINCT song_id AS song_id,
                                     title AS title,
                                     artist_id AS artist_id,
                                     year AS year,
                                     duration AS duration
                     FROM staging_songs
                     WHERE song_id IS NOT NULL;
                     """)

artists = spark.sql("""
                       INSERT INTO artists (artist_id,
                                            name,
                                            location,
                                            latitude,
                                            longitude)
                       SELECT DISTINCT artist_id AS artist_id,
                                       artist_name AS name,
                                       location AS location,
                                       latitude AS latitude,
                                       longitude AS longitude
                       FROM staging_songs
                       WHERE artist_id IS NOT NULL;
                       """)

time = spark.sql("""
                     INSERT INTO time (start_time,
                                       hour,
                                       day,
                                       week,
                                       month,
                                       year,
                                       weekday)
                     SELECT DISTINCT ts,
                     EXTRACT(hour FROM ts),
                     EXTRACT(day FROM ts),
                     EXTRACT(week FROM ts),
                     EXTRACT(month FROM ts),
                     EXTRACT(year FROM ts),
                     EXTRACT(weekday FROM ts)
                     FROM staging_events
                     WHERE ts IS NOT NULL;
                     """)

In [None]:
# write final tables into s3
outpath_songplay = "data/sparkify_songplay.csv"
outpath_users   = "data/sparkify_users.csv"
outpath_songs  = "data/sparkify_songs.csv"
outpath_artists = "data/sparkify_artists.csv"
outpath_time = "data/sparkify_time.csv"

songplay.write.save(outpath_songplay, format="csv", header=True)
users.write.save(outpath_users, format="csv", header=True)
songs.write.save(outpath_songs, format="csv", header=True)
artists.write.save(outpath_artists, format="csv", header=True)
time.write.save(outpath_time, format="csv", header=True)