## Set up

In [1]:
# import packages to use in the pipeline
import configparser
from datetime import datetime
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, TimestampType, DateType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, monotonically_increasing_id

In [3]:
# get AWS access key from config file: dl.cfg
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
# launch spark
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [5]:
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

## Load data from S3

In [6]:
song_data = 's3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json'  #'song_data/*/*/*/*.json'
log_data = 's3a://udacity-dend/log_data/2018/11/2018-11-13-events.json'   #'log_data/*/*/*.json'
output_data = 's3a://sparkify0823/'

In [7]:
songdf = spark.read.json(song_data)
logdf = spark.read.json(log_data)

In [8]:
songdf.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
logdf.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



## Process data into analytics tables using Spark
- Fact Table
    - songplays - records in log data associated with song plays i.e. records with page NextSong
        - songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
- Dimension Tables
    - users - users in the app
        - user_id, first_name, last_name, gender, level
    - songs - songs in music database
        - song_id, title, artist_id, year, duration
    - artists - artists in music database
        - artist_id, name, location, latitude, longitude
    - time - timestamps of records in songplays broken down into specific units
        - start_time, hour, day, week, month, year, weekday

In [8]:
# create songs table
songs_table = songdf.select('song_id', 'title', 'artist_id', 'year', 'duration')\
                .dropDuplicates()

In [9]:
# create artists table
artists_table = songdf.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
                 .withColumnRenamed('artist_name', 'name')\
                 .withColumnRenamed('artist_location', 'location')\
                 .withColumnRenamed('artist_latitude', 'latitude')\
                 .withColumnRenamed('artist_longitude', 'longitude')\
                 .dropDuplicates()

In [10]:
# filter out songplay related log only
playlogdf = logdf.filter(logdf.page == 'NextSong')\
       .select('ts', 'userId', 'level', 'song', 'artist', 'sessionId', 'location', 'userAgent')

In [11]:
# create users table
users_table =logdf.select('userId', 'firstName', 'lastName', 'gender', 'level')\
                  .dropDuplicates()

In [12]:
# create time table

def format_datetime(ts):
        return datetime.fromtimestamp(ts/1000.0)
# create timestamp column
get_timestamp = udf(lambda x: format_datetime(int(x)),TimestampType())
# create datetime column
get_datetime = udf(lambda x: format_datetime(int(x)), DateType())
playlogdf = playlogdf.withColumn('timestamp', get_timestamp(playlogdf.ts))\
                     .withColumn('datetime', get_datetime(playlogdf.ts))
playlogdf.show(3)

+-------------+------+-----+--------------------+------------+---------+--------------------+--------------------+--------------------+----------+
|           ts|userId|level|                song|      artist|sessionId|            location|           userAgent|           timestamp|  datetime|
+-------------+------+-----+--------------------+------------+---------+--------------------+--------------------+--------------------+----------+
|1542069637796|    66| free|             Ja I Ty|          Fu|      514|Harrisburg-Carlis...|"Mozilla/5.0 (Mac...|2018-11-13 00:40:...|2018-11-13|
|1542071549796|    51| free|A Party Song (The...|All Time Low|      510|Houston-The Woodl...|"Mozilla/5.0 (Win...|2018-11-13 01:12:...|2018-11-13|
|1542079142796|     9| free|            Pop-Pop!|   Nik & Jay|      379|Eureka-Arcata-For...|Mozilla/5.0 (Wind...|2018-11-13 03:19:...|2018-11-13|
+-------------+------+-----+--------------------+------------+---------+--------------------+--------------------+----

In [13]:
# extract columns to create time table
tsdf = playlogdf.select('timestamp').dropDuplicates()
time_table = tsdf.withColumn('start_time', tsdf.timestamp)\
                      .withColumn('hour',  hour(tsdf.timestamp))\
                      .withColumn('day',   dayofmonth(tsdf.timestamp))\
                      .withColumn('week',  weekofyear(tsdf.timestamp))\
                      .withColumn('month', month(tsdf.timestamp))\
                      .withColumn('year',  year(tsdf.timestamp))\
                      .withColumn('weekday', dayofweek(tsdf.timestamp))\
                      .dropDuplicates()

In [27]:
time_table.show(3)

+--------------------+--------------------+----+---+----+-----+----+-------+
|           timestamp|          start_time|hour|day|week|month|year|weekday|
+--------------------+--------------------+----+---+----+-----+----+-------+
|2018-11-13 18:00:...|2018-11-13 18:00:...|  18| 13|  46|   11|2018|      3|
|2018-11-13 22:23:...|2018-11-13 22:23:...|  22| 13|  46|   11|2018|      3|
|2018-11-13 15:54:...|2018-11-13 15:54:...|  15| 13|  46|   11|2018|      3|
+--------------------+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [14]:
# extract columns from joined song and log datasets to create songplays table 
songdf.createOrReplaceTempView('songdf_table')
playlogdf.createOrReplaceTempView('playlogdf_table')

songplays_table = spark.sql('''
                               Select row_number() over(order by a.timestamp) as songplay_id,
                                      a.timestamp as start_time, 
                                      a.userID as user_id,
                                      a.level,
                                      s.song_id,
                                      s.artist_id,
                                      a.sessionId as session_id,
                                      a.location,
                                      a.userAgent as user_agent
                               from songdf_table s inner join playlogdf_table a
                                   on s.title=a.song and a.artist=s.artist_name
                            ''')

In [15]:
songplays_table.show(3)

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+



## load data back to S3

In [21]:
# write songs table to parquet files partitioned by year and artist
songs_table=songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs'), 'overwrite')

In [25]:
# write artists table back to S3
artists_table=artists_table.write.parquet(os.path.join(output_data, 'artists'), 'overwrite')

In [31]:
# write users table back to S3
users_table = users_table.write.parquet(os.path.join(output_data, 'users'), 'overwrite')

In [None]:
# write time table to S3 as parquet files
time_table = time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time'), 'overwrite')

In [16]:
# write songplays table to parquet files partitioned by year and month
songplays_table = songplays_table.withColumn("year",year('start_time'))\
                                 .withColumn("month",month('start_time'))\
                                 .write.partitionBy("year","month").parquet(os.path.join(output_data, 'songplays'), 'overwrite')