# Develop ETL Process

### The purpose of this notebook is to develop the ETL processes step-by-steps for this Data Lake project in order to extracts the data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables.

**General Guidelines:**
- We will read from a small subset of data instead of S3 for the input data source during this development phase.
- We will write to our workstation instead of S3 during this development phase.

In [None]:
# import all necessary libraries
import configparser
from datetime import datetime
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, LongType as Long

In [8]:
# read config file
config = configparser.ConfigParser()
config.read('dl.cfg')

# load AWS Credentials
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
# set up Input & Output Path for the read/write
input_data = "data/{}" 
output_data = "output/{}"

## Check Schema for the Song data from the sample data source
***
**Note:**
To improve the reading performance, we need to pre-defined the schema of the data instead of letting Spark figure it out on the fly.
***

In [13]:
# read song data from the sample input data files
song_data_df = spark.read.json(input_data.format("song-data/*/*/*/*.json"))

In [14]:
# print Schema
song_data_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [15]:
# set up the Song data's schema which will use later for reading
songsSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Long()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("year",Long())
])

In [19]:
# read log data from the sample input data files
log_data_df = spark.read.json(input_data.format("log-data/*.json"))

In [20]:
# print Schema
log_data_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [21]:
# set up the log data's schema which will use later when process the log data's files
logsSchema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("firstName",Str()),
    Fld("gender",Str()),
    Fld("itemInSession",Long()),
    Fld("lastName",Str()),
    Fld("length",Dbl()),
    Fld("level",Str()),
    Fld("location",Str()),
    Fld("method",Str()),
    Fld("page",Str()),
    Fld("registration",Dbl()),
    Fld("sessionId",Long()),
    Fld("song",Str()),
    Fld("status",Long()),
    Fld("ts",Long()),
    Fld("userAgent",Str()),
    Fld("userId",Str())
])

In [None]:
def create_spark_session():
    """Create a Spark session in order to process the data
    Parameters:
        None
    Returns:
        Spark session.
    """
    
    # Create Spark Session
    spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
            .getOrCreate()

    # When the value of mapreduce.fileoutputcommitter.algorithm.version is 2, 
    # task moves data generated by a task directly to the final destination
    # which make the writing process much faster
    spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    return spark

In [25]:
def process_song_data(spark, input_data, output_data):
    """This function will read JSON input files from the provided path and extracting the data to be inserted into two tables:
        - songs (will store the queried data into a folder called songs_table)
        - artists (will store the queried data into a folder called artists_table)
    Parameters: 
        spark: Spark session object
        input_data: path of the inputdata to be processed
        output_data: path of the location to store the parquet files
    Returns: 
        None
    """
    
    # get filepath to song data file
    song_data = input_data.format("song-data/*/*/*/*.json")
    
    # read song data file
    df = spark.read.json(song_data, schema=songsSchema)
    
    # Copy for later use
    song_data_df = df
    
    # extract columns to create songs table
    songs_table = df.select(["song_id", "title", "artist_id", "year", "duration"]).dropDuplicates(["song_id"])
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode("overwrite").partitionBy("year", "artist_id")\
    .parquet(output_data.format("songs_table"))

    # extract columns to create artists table
    artists_table = df.select(["artist_id",\
                               col("artist_name").alias("name"), 
                               col("artist_location").alias("location"),
                               col("artist_latitude").alias("latitude"),
                               col("artist_longitude").alias("longitude")])\
                    .dropDuplicates(["artist_id"])
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite")\
    .parquet(output_data.format("artists_table"))

In [83]:
def process_log_data(spark, input_data, output_data):
    """This function will read JSON input files from the provided path and extracting the data to be inserted into three tables:
        - users (will store the queried data into a folder called users_table)
        - time (will store the queried data into a folder called time_table)
        - songplays (will store the queried data into a folder called songplays_table)
    Parameters: 
        spark: Spark session object
        input_data: path of the inputdata to be processed
        output_data: path of the location to store the parquet files
    Returns: 
        None
    """
    
    # get filepath to log data file
    log_data = input_data.format("log-data/*.json")

    # read log data file
    df = spark.read.json(log_data, schema=logsSchema)
    
    # filter by actions for song plays
    df = df[df['page'] == 'NextSong']

    # extract columns for users table    
    users_table = df.select([col("userId").alias("user_id"),
                             col("firstName").alias("first_name"), 
                             col("lastName").alias("last_name"),
                             "gender",
                             "level"])\
                    .dropDuplicates(["user_id"])
    
    # write users table to parquet files
    users_table.write.mode("overwrite")\
    .parquet(output_data.format("users_table"))

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
    df = df.withColumn("timestamp", get_timestamp(df['ts']))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000.0), TimestampType())
    df = df.withColumn("datetime", get_datetime(df['ts']))
    
    # extract columns to create time table
    time_table = df.select([col("datetime").alias("start_time"),
                            hour(df["datetime"]).alias("hour"), 
                            dayofmonth(df["datetime"]).alias("day"), 
                            weekofyear(df["datetime"]).alias("week"), 
                            month(df["datetime"]).alias("month"), 
                            year(df["datetime"]).alias("year"), 
                            date_format(df["datetime"], "E").alias("weekday")])\
                    .dropDuplicates(["start_time"])
    
    # write time table to parquet files partitioned by year and month
    time_table.write.mode("overwrite").partitionBy("year", "month")\
   .parquet(output_data.format("time_table"))

    # read in song data to use for songplays table
    # Note: since we already read song data from the sample input data files, we will use that object instead of re-read the data again
    song_df = song_data_df

    # create songplays table 
    songplays_table = df.select(["song",
                                 "artist",
                                  col("datetime").alias("start_time"),
                                  col("userId").alias("user_id"), 
                                  "level", 
                                  col("sessionId").alias("session_id"), 
                                  "location",
                                 "length",
                                  col("userAgent").alias("user_agent")])
    # joined song and log datasets 
    songplays_table = songplays_table\
                      .join(song_df, (songplays_table.song == song_df.title) & 
                            (songplays_table.artist == song_df.artist_name) &
                            (songplays_table.length == song_df.duration), 'left_outer')
    # add a songplay_id column by auto increment this column
    songplays_table = songplays_table.withColumn("songplay_id", \
                                      monotonically_increasing_id())
    # extract columns to create songplays table 
    songplays_table = songplays_table.select(["songplay_id", "start_time", "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent"])

    # write songplays table to parquet files partitioned by year and month
    songplays_table.withColumn("year",year('start_time'))\
                    .withColumn("month",month('start_time'))\
                    .write.mode("overwrite")\
                    .partitionBy("year", "month")\
                    .parquet(output_data.format("songplays_table"))

## Process the song and log data files

In [None]:
# run this cell
process_song_data(spark, input_data, output_data)  
process_log_data(spark, input_data, output_data)

## Check the table's output

In [28]:
# read the parquet files from the songs_table folder
songs_parquet_df = spark.read.parquet(output_data.format("songs_table"))

In [29]:
# print schema
songs_parquet_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [31]:
# show 10 records
songs_parquet_df.show(10)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
|SOUDSGM12AC9618304|Insatiable (Instr...|266.39628|   0|ARNTLGG11E2835DDB9|
|SOPEGZN12AB0181B3D|Get Your Head Stu...| 45.66159|   0|AREDL271187FB40F44|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|AREBBGV1187FB523D2|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|   0|ARDR4AC1187FB371A1|
|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|ARMAC4T1187FB3FA4C|
+-----------

In [32]:
# read the parquet files from the artists_table folder
artists_parquet_df = spark.read.parquet(output_data.format("artists_table"))

In [33]:
# print schema
artists_parquet_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [34]:
# show 10 records
artists_parquet_df.show(10)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|40.79195|-73.94512|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|     null|
|ARD842G1187B997376|          Blue Rodeo|Toronto, Ontario,...|43.64856|-79.38533|
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|ARIG6O41187B988BDD|     Richard Souther|       United States|37.16793|-95.84502|
|ARGSJW91187B9B1D6B|        JennyAnyKind|      North Carolina|35.21962|-80.01955|
|AR3JMC51187B9AE

In [85]:
# read the parquet files from the users_table folder
users_parquet_df = spark.read.parquet(output_data.format("users_table"))

In [86]:
# print schema
users_parquet_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [87]:
# show 10 records
users_parquet_df.show(10)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
|     53|   Celeste| Williams|     F| free|
|     75|    Joseph|Gutierrez|     M| free|
|     60|     Devin|   Larson|     M| free|
|     68|    Jordan|Rodriguez|     F| free|
|     90|    Andrea|   Butler|     F| free|
|     14|  Theodore|   Harris|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     77| Magdalene|   Herman|     F| free|
|     89|   Kynnedi|  Sanchez|     F| free|
+-------+----------+---------+------+-----+
only showing top 10 rows



In [88]:
# read the parquet files from the time_table folder
time_parquet_df = spark.read.parquet(output_data.format("time_table"))

In [89]:
# print schema
time_parquet_df.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [90]:
# show 10 records
time_parquet_df.show(10)

+--------------------+----+---+----+-------+----+-----+
|          start_time|hour|day|week|weekday|year|month|
+--------------------+----+---+----+-------+----+-----+
|2018-11-02 11:33:...|  11|  2|  44|    Fri|2018|   11|
|2018-11-02 13:30:...|  13|  2|  44|    Fri|2018|   11|
|2018-11-02 16:46:...|  16|  2|  44|    Fri|2018|   11|
|2018-11-04 06:26:...|   6|  4|  44|    Sun|2018|   11|
|2018-11-04 20:09:...|  20|  4|  44|    Sun|2018|   11|
|2018-11-05 12:26:...|  12|  5|  45|    Mon|2018|   11|
|2018-11-05 14:39:...|  14|  5|  45|    Mon|2018|   11|
|2018-11-06 21:12:...|  21|  6|  45|    Tue|2018|   11|
|2018-11-06 23:28:...|  23|  6|  45|    Tue|2018|   11|
|2018-11-07 14:16:...|  14|  7|  45|    Wed|2018|   11|
+--------------------+----+---+----+-------+----+-----+
only showing top 10 rows



In [91]:
# read the parquet files from the songplays_table folder
songplays_parquet_df = spark.read.parquet(output_data.format("songplays_table"))

In [92]:
# print schema
songplays_parquet_df.printSchema()

root
 |-- songplay_id: long (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [93]:
# show 10 records
songplays_parquet_df.show(10)

+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-15 00:30:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          1|2018-11-15 00:41:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          2|2018-11-15 00:45:...|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          3|2018-11-15 03:44:...|     61| free|   null|     null|       597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|2018|   11|
|          4|2018-11-15 05:48:...|     80| paid|   null|     null|       602|Portla