In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, expr, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, max
from pyspark.sql.types import TimestampType, DateType, StringType

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

#AWS_ACCESS_KEY_ID =  config['AWS']['AWS_ACCESS_KEY_ID']
#AWS_SECRET_ACCESS_KEY= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.1")\
        .getOrCreate()

#.config("spark.jars.packages", "org.apache.hadoop.fs.s3a.S3AFileSystem")\

In [5]:
#sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3-eu-west-1.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-eu-west-2.amazonaws.com")

In [6]:
output_data = "s3a://my-datalake-bucket/"
#output_data = "s3a://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@my-datalake-bucket/"
#output_data = "s3a://my-dl-temp-bucket"

# Songs Data

In [7]:
data_path = "./data/song_data/*/*/*/*.json"
songs_data = spark.read.json(data_path)

In [9]:
songs_data.printSchema()
#songs_raw_data.show(3)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



###### Songs Dimension Table - songs in music database
***Columns:*** song_id, title, artist_id, year, duration

In [None]:
# extracting columns to create songs table
songs_table = songs_data.selectExpr(["song_id","title","artist_id","cast(year as int) year","duration"]).orderBy("song_id")

# writing songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs/")

#songs_table.limit(5).toPandas()
songs_table.printSchema()

###### Artists Dimension Table - artists in music database
***Columns:*** artist_id, name, location, latitude, longitude

In [26]:
# extracting columns to create artists table
artist_table = songs_data.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"])\
                         .orderBy("artist_id")

# writing artists table to parquet files
artist_table.write.mode("overwrite").parquet(output_data + "artist/")

#songs_table.limit(3).toPandas()
artist_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



# Log Data

In [5]:
data_path = "./data/log_data/*/*/*.json"
log_data = spark.read.json(data_path)

In [6]:
log_data.printSchema()
#songs_raw_data.show(3)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
#log_data.limit(10).toPandas()

##### Filtering the data with 'NextSong' - as eecords in log data associated with songplays i.e. records with page `NextSong`

In [7]:
#log_data = log_data.filter(log_datapage == 'NextSong')
log_data = log_data.filter(col("page") == 'NextSong')
#log_data.limit(3).toPandas()

###### Users Dimension Table - users in the app
***Columns:*** user_id, first_name, last_name, gender, level

In [8]:
users_table = log_data.selectExpr("userId as user_id",
                                  "firstName as first_name",
                                  "lastName as last_name",
                                  "gender",
                                  "level"
                                 ).dropDuplicates().orderBy("user_id")

In [9]:
# Checking for the duplicate user records who have updated level from 'free' to 'paid' or vice-versa

dup_users = users_table.select(["user_id"]).groupby("user_id").count().where(col("count")>1).toPandas()
dup_user_ids = list(dup_users["user_id"])
if dup_user_ids:
    print(dup_user_ids)

['15', '29', '85', '16', '88', '36', '49', '80']


In [11]:
#getting the latest timestamp for the duplicate user_ids in users_table
dup_rec_latest_ts = log_data.select("userId","ts")\
                                .groupBy("userId")\
                                .agg(max("ts").alias("max_ts"))\
                                .where(col("userId").isin(dup_user_ids))
dup_rec_latest_ts.show()

+------+-------------+
|userId|       max_ts|
+------+-------------+
|    15|1543234288796|
|    29|1543423613796|
|    85|1543592227796|
|    16|1543603884796|
|    88|1543590783796|
|    36|1543591559796|
|    49|1543603162796|
|    80|1543532713796|
+------+-------------+



In [12]:
#Joining two tables

dup_rec_latest_ts.createTempView("latest_timestamp")
log_data.createTempView("log_data")

query = """SELECT log.userId as user_id, 
                                            firstName as first_name,
                                            lastName as last_name,
                                            gender,
                                            level
                                            FROM log_data as log
                                            INNER JOIN latest_timestamp lt 
                                                    ON log.userId= lt.userId
                                                    AND log.ts = lt.max_ts
                                            WHERE log.userId IN {};""".format(tuple(dup_user_ids))

latest_recs_4_dups =  spark.sql(query)

latest_recs_4_dups.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     29|Jacqueline|    Lynch|     F| paid|
|     88|  Mohammad|Rodriguez|     M| paid|
|     36|   Matthew|    Jones|     M| paid|
|     85|   Kinsley|    Young|     F| paid|
|     49|     Chloe|   Cuevas|     F| paid|
|     16|     Rylan|   George|     M| paid|
|     80|     Tegan|   Levine|     F| paid|
|     15|      Lily|     Koch|     F| paid|
+-------+----------+---------+------+-----+



In [13]:
spark.catalog.dropTempView("latest_timestamp")
spark.catalog.dropTempView("log_data")

In [14]:
#Dropping duplicates using specific column
users_table = users_table.dropDuplicates(["user_id"])

#Checking if the duplicates are dropped or not
users_table.select(["user_id"]).groupby("user_id").count().where(col("count")>1).toPandas()

Unnamed: 0,user_id,count


In [16]:
users_table.write.mode("overwrite").parquet(output_data + "users/")
latest_recs_4_dups.write.mode("overwrite").parquet(output_data + "users/")
users_table.printSchema()
latest_recs_4_dups.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



###### Time Dimension Table
###### time - timestamps of records in songplays broken down into specific units
***Columns:*** start_time, hour, day, week, month, year, weekday

*Step 1:* Create timestamp column from original timestamp column <br>
*Step 2:* Create datetime column from original timestamp column <br>
*Step 3:* Extracting columns to create time table <br>
*Step 4:* Writing time table to parquet files partitioned by year and month <br>

In [17]:
get_timestamp = udf(lambda epoch: datetime.fromtimestamp(epoch/1000.0),TimestampType())

In [18]:
log_data = log_data.withColumn("timestamp",get_timestamp("ts"))
#df.show(3)
log_data.select(["ts","timestamp"]).limit(3).toPandas()

Unnamed: 0,ts,timestamp
0,1542241826796,2018-11-15 00:30:26.796
1,1542242481796,2018-11-15 00:41:21.796
2,1542242741796,2018-11-15 00:45:41.796


In [19]:
get_datetime = udf(lambda epoch: datetime.fromtimestamp(epoch/1000).strftime('%Y-%m-%d %H:%M:%S'),StringType())
#get_datetime = udf(lambda epoch: datetime.fromtimestamp(epoch/1000),DateType())

In [20]:
log_data = log_data.withColumn("date_time",get_datetime("ts"))
#df.show(3)
log_data.select(["ts","timestamp","date_time"]).limit(5).toPandas()

Unnamed: 0,ts,timestamp,date_time
0,1542241826796,2018-11-15 00:30:26.796,2018-11-15 00:30:26
1,1542242481796,2018-11-15 00:41:21.796,2018-11-15 00:41:21
2,1542242741796,2018-11-15 00:45:41.796,2018-11-15 00:45:41
3,1542253449796,2018-11-15 03:44:09.796,2018-11-15 03:44:09
4,1542260935796,2018-11-15 05:48:55.796,2018-11-15 05:48:55


In [21]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date_time: string (nullable = true)



In [22]:
# Method -1 extract columns to create time table (With Aliases)
log_data.select("timestamp",
                hour("timestamp").alias("hour"),
                dayofmonth("timestamp").alias("day"),
                weekofyear("timestamp").alias("week"),
                month("timestamp").alias("month"),
                year("timestamp").alias("year"),
                dayofweek("timestamp").alias("weekday")
               ).distinct().limit(3).toPandas()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,2018-11-15 21:04:27.796,21,15,46,11,2018,5
1,2018-11-21 00:57:58.796,0,21,47,11,2018,4
2,2018-11-14 00:17:37.796,0,14,46,11,2018,4


In [29]:
# Method -2 extract columns to create time table using selectExpr
time_table = log_data.selectExpr("timestamp as start_time",
                                 "hour(timestamp) as hour",
                                 "dayofmonth(timestamp) as day",
                                 "weekofyear(timestamp) as week",
                                 "month(timestamp) as month",
                                 "year(timestamp) as year",
                                 "dayofweek(timestamp) as weekday"
                                ).distinct()

time_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data + "time/")

time_table.limit(3).toPandas()
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



#### Fact Table songplays
***Columns:*** songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

In [31]:
# reading in song data to use for songplays table
song_df = spark.read.json("./data/song_data/*/*/*/*.json")\
                    .select(["artist_id","song_id","artist_name","title","duration"])

log_df = log_data.selectExpr("artist","song","length",
                             "timestamp as start_time",
                             "level",
                             "location",
                             "cast(sessionId as int) as session_id",
                             "userId as user_id",
                             "userAgent as user_agent")
song_df.printSchema()
log_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)

root
 |-- artist: string (nullable = true)
 |-- song: string (nullable = true)
 |-- length: double (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [53]:
joined_df = log_df.join(song_df, (log_df.artist == song_df.artist_name) & \
                     (log_df.song == song_df.title) & \
                    (log_df.length == song_df.duration), how = "left") 

joined_df = joined_df.withColumn("songplay_id",monotonically_increasing_id())

songplays_table = joined_df.selectExpr(["cast(songplay_id as bigint) as songplay_id",
                                    "start_time",
                                    "user_id","level",
                                    "song_id",
                                    "artist_id",
                                    "session_id",
                                    "location",
                                    "user_agent",
                                    "year(start_time) as year",    
                                    "month(start_time) as month"])

songplays_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data + "songplays/")

In [34]:
songplays_table.limit(3).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-15 00:30:26.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,1,2018-11-15 00:41:21.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,2,2018-11-15 00:45:41.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [None]:
#song_df.createTempView("songs_data_table")
#log_df.createTempView("log_data_table")

#songplays_df = spark.sql("""
#                        SELECT lt.timestamp as start_time,
#                                lt.user_id,
#                                lt.level,
#                                st.song_id,
#                                st.artist_id,
#                                lt.session_id,
#                                lt.location,
#                                lt.user_agent
#                                FROM log_data_table lt
#                                    LEFT JOIN songs_data_table st
#                                                ON lt.artist = st.artist_id
#                                                AND lt.song = st.title
#                                                AND lt.length = st.duration
#                    """)

#songplays_df.limit(5).toPandas()

In [38]:
spark.sql(""" select song_id,artist_id,count(*) from songplays_table group by song_id,artist_id """).show()

+-------+---------+--------+
|song_id|artist_id|count(1)|
+-------+---------+--------+
|   null|     null|    6820|
+-------+---------+--------+



In [51]:
songplays_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [None]:
songplays_df.where(col("song_id").isNull()).show(10)

In [33]:
songplays_table.createTempView("songplays_table")

In [None]:
spark.sql(""" select * from songplays_table where song_id IS NULL and artist_id IS NULL """).show(10)

## Running the `etl.py` file

In [57]:
%run etl.py

Loading songs data in ./ETLOutputs/songs
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)

Loading artists data in ./ETLOutputs/artists
root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)

Loading users data in ./ETLOutputs/users
Duplicate UserIDs whoose levels are changed ['15', '29', '85', '16', '88', '36', '49', '80']
Loading latest data for Duplicate records.... 
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

Loading time data in ./ETLOutputs/time
root
 |-- start_time: timestamp (nullable = true)
 |-- hour