## Adding Required Modules

In [None]:
from pyspark.sql import SparkSession
import os
import configparser

## Adding Credentials and Configuration

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')
    
os.environ['AWS_ACCESS_KEY_ID'] = config.get("AWS", "AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get("AWS", "AWS_SECRET_ACCESS_KEY")
input_data = config.get("S3", "INPUT_DATA_LOCATION")
output_data = config.get("S3", "OUTPUT_DATA_LOCATION")

## Create Spark Session with Hadoop AWS Package

In [None]:
# Use latest hadoop package for EMR Cluster 3.3.0
# Do you need this package for loading spark data frames from s3 ?
spark = SparkSession.builder \
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \
                     .getOrCreate()

# Declare Schema for Songs Data

In [None]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
songSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("year",Int()),
])

## Load Data

In [None]:
dfSongs = spark.read.option("recursiveFileLookup","true") \
                .json("./data/song_data", schema=songSchema)

In [None]:
import pyspark.sql.functions as f
dfSongs.printSchema()
dfSongs = dfSongs.withColumn("artist_name",f.lower(f.col("artist_name")))
dfSongs = dfSongs.withColumn("title",f.lower(f.col("title")))
dfSongs.limit(5).toPandas()

## Infer schema, fix header and separator : NOT POSSIBLE

In [None]:
# dfSongs = spark.read.option("recursiveFileLookup","true") \
#                 .json("./data/song_data")
# dfSongs.printSchema()
# dfSongs.show(5)

## Count of songs

In [None]:

dfSongs.count()

## Add Schema for Events

In [None]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, TimestampType as Timestamp, LongType as Long
eventSchema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("firstName",Str()),
    Fld("gender",Str()),
    Fld("itemInSession",Int()),
    Fld("lastName",Str()),
    Fld("length",Dbl()),
    Fld("level",Str()),
    Fld("location",Str()),
    Fld("method",Str()),
    Fld("page",Str()),
    Fld("registration",Dbl()),
    Fld("sessionId",Int()),
    Fld("song",Str()),
    Fld("status",Str()),
    Fld("ts",Long()),
    Fld("userAgent",Str()),
    Fld("userId",Str())
])

## Load Events

In [None]:
dfEvents = spark.read.json("./data/log-data/", schema=eventSchema)
dfEvents.printSchema()
dfEvents = dfEvents.withColumn("artist",f.lower(f.col("artist")))
dfEvents = dfEvents.withColumn("song",f.lower(f.col("song")))
dfEvents.limit(5).toPandas()

## Count of events

In [None]:
dfEvents.count()

## Converting date field to timestamp

### UDF Function for validating TS records 

In [None]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F

@udf
def parseTimestamp(ts):
    from datetime import datetime
    date_string = str(datetime.fromtimestamp(ts/1000))
    return date_string

In [None]:
dfEvents = dfEvents.withColumn("ts", parseTimestamp("ts"))
dfEvents.limit(5).toPandas()

## Convert UserId to Integer

In [None]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, TimestampType as Timestamp, LongType as Long
dfEvents = dfEvents.withColumn("userId", dfEvents["userId"].cast(Int()))
dfEvents.limit(5).toPandas()

In [None]:
dfEvents.printSchema()

## Building SONGS table

In [None]:
songs_table = dfSongs.select("song_id", "title","artist_id", "year", "duration")
songs_table.limit(5).toPandas()

## Building Artists Table

In [None]:
artists_table_fields = ["artist_id", "artist_name","artist_location", "artist_latitude", "artist_longitude"]
artists_table_new_fields = ["artist_id", "name","location", "latitude", "longitude"]
artists_table_exprs = [ "{} as {}".format(oldField, newField) for (oldField, newField) in zip(artists_table_fields, artists_table_new_fields) ]

artists_table = dfSongs.selectExpr(*artists_table_exprs).distinct()
artists_table.limit(5).toPandas()

## Building Users Table

In [None]:
from pyspark.sql.functions import desc
users_table_fields = ["userId", "firstName","lastName", "gender", "level"]
users_table_new_fields = ["user_id", "first_name","last_name", "gender", "level"]
users_table_exprs = [ "{} as {}".format(oldField, newField) for (oldField, newField) in zip(users_table_fields, users_table_new_fields) ]
users_table = dfEvents.selectExpr(*users_table_exprs).distinct()
#users_table.limit(5).toPandas()
#users_table.count() #107
#users_table is showing user_id as double value why ?
# distinct users_table contains NaN values as well.
users_table_with_drop_duplicates = dfEvents.selectExpr(*users_table_exprs)
ans = users_table_with_drop_duplicates.dropDuplicates(["user_id"])
#ans.count() #98
ans.groupBy("user_id").count().orderBy(desc("count")).limit(5).toPandas()

## Building Time Table

In [None]:
from pyspark.sql.functions import desc
time_table_fields = ["ts"]
time_table_new_fields = ["start_time"]
time_table_exprs = [ "{} as {}".format(oldField, newField) for (oldField, newField) in zip(time_table_fields, time_table_new_fields) ]
time_table = dfEvents.selectExpr(*time_table_exprs).dropDuplicates(["start_time"])
# There were 33 duplicate time records
#time_table.count()
time_table = time_table.withColumn("hour", F.hour("start_time"))
time_table = time_table.withColumn("day", F.dayofweek("start_time"))
time_table = time_table.withColumn("week", F.weekofyear("start_time"))
time_table = time_table.withColumn("month", F.month("start_time"))
time_table = time_table.withColumn("year", F.year("start_time"))
# Clear parantheses for logical operators is necessary
time_table = time_table.withColumn("weekday", ((F.dayofweek("start_time") > 0) & (F.dayofweek("start_time") < 6)) )
time_table.limit(5).toPandas()


## Create SongPlays table

In [None]:

#dfSongs = dfSongs.alias('dfSongs')
#dfEvents = dfEvents.alias('dfEvents')
dfEvents = dfEvents.filter(dfEvents.page == "NextSong")
condition = (( dfEvents["artist"] == dfSongs["artist_name"]) & (dfEvents["song"] == dfSongs["title"]) & (dfEvents["length"] == dfSongs["duration"]) )
songplays = dfEvents.join(dfSongs, condition, "inner").select(dfEvents["ts"], dfEvents["userId"], dfEvents["level"], dfSongs["song_id"], dfSongs["artist_id"], dfEvents["sessionId"], dfEvents["location"], dfEvents["userAgent"])
songplays.limit(5).toPandas()
#songplays.count()

## Data cleaning Ideas
### There are event records which are invalid with year 0
### Invalid timestamp where ValueError: year 50841 is out of range