#### This notebook is used for developing and testing code for Sparkify in local mode before moving on to bigger data

In [None]:
# Imports from Python and Spark

import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType as R, StructField as Fld, \
    DoubleType as Dbl, StringType as Str, IntegerType as Int, LongType as LInt, TimestampType
from pyspark.sql.functions import monotonically_increasing_id

In [None]:
# GET input and output paths
config = configparser.ConfigParser()
config.read('dl.cfg')

input_data = config['PATHS']['LOCAL_INPUT_DATA']
output_data = config['PATHS']['LOCAL_OUTPUT_DATA']

print(input_data)
print(output_data)

In [None]:
# Get or create Spark Session
from pyspark import SparkContext, SparkConf

spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [None]:
# Read and Extract Song data

# get filepath to song data files
song_data = os.path.join(input_data, 'song_data/*/*/*/*.json')

# Specify Schema for reading song data
songdata_schema = R([
    Fld("artist_id", Str()),
    Fld("artist_latitude", Dbl()),
    Fld("artist_location", Str()),
    Fld("artist_longitude", Dbl()),
    Fld("artist_name", Str()),
    Fld("duration", Dbl()),
    Fld("num_songs", Int()),
    Fld("song_id", Str()),
    Fld("title", Str()),
    Fld("year", Int()),
])

# read song data
df = spark.read.json(song_data, schema=songdata_schema, multiLine=True, mode='PERMISSIVE', columnNameOfCorruptRecord='corrupt_record')

df.printSchema()
df.show(5)

# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()

songs_table.printSchema()
songs_table.show(5)

In [None]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet(os.path.join(output_data, 'songs'))

In [None]:
# Read and Extract Artist data from song-data.zip

# extract columns to create artists table
artist_cols = ["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"]

artists_table = df.selectExpr(artist_cols).dropDuplicates()

artists_table.printSchema()
artists_table.show(5)
artists_table.count()

In [None]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(os.path.join(output_data, 'artists'))

In [None]:
# Create view for use in creating songplays table in log processing function
df.createOrReplaceTempView('song_df_table')

In [None]:
# Read and Extract user data from log-data.zip file

# get filepath to log data file
log_data = os.path.join(input_data, 'log_data/*.json')

# read log data file
df = spark.read.json(log_data, multiLine=True, mode='PERMISSIVE', columnNameOfCorruptRecord='corrupt_record')
    
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

df.printSchema()
df.show(5)

# extract columns for users table
users_cols = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]

users_table = df.selectExpr(users_cols).dropDuplicates()

users_table.printSchema()
users_table.show(5)
users_table.count()

In [None]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet(os.path.join(output_data, 'users'))

In [None]:
# Read and Extract time data from log-data.zip file

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0))) 
df = df.withColumn("start_time", get_timestamp(col('ts')))

df.printSchema()
df.show(5)

# extract columns to create time table
time_table = df.select("start_time").dropDuplicates() \
    .withColumn("hour", hour(col("start_time").cast(TimestampType()))) \
    .withColumn("day", dayofmonth(col("start_time").cast(TimestampType()))) \
    .withColumn("week", weekofyear(col("start_time").cast(TimestampType()))) \
    .withColumn("month", month(col("start_time").cast(TimestampType()))) \
    .withColumn("year", year(col("start_time").cast(TimestampType()))) \
    .withColumn("weekday", date_format(col("start_time").cast(TimestampType()), 'E'))

time_table.printSchema()
time_table.show(22)
time_table.count()

In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(os.path.join(output_data, 'time'))

In [None]:
# Create Songplay Fact Table using song data and log data

# read in song data to use for songplays table
# song_df = spark.read.parquet(os.path.join(output_data, 'songs'))
song_df = song_df = spark.sql('SELECT DISTINCT song_id, title, artist_id, artist_name,duration FROM song_df_table')

song_df.printSchema()
song_df.show(5)

df.printSchema()
df.show(5)

# extract columns from joined song and log datasets to create songplays table partitioned by year and month
songplays_table = df.join(song_df, (df.song == song_df.title) & (df.artist == song_df.artist_name) & (df.length == song_df.duration), how='left_outer') \
    .distinct() \
    .select(monotonically_increasing_id().alias("songplay_id"),
             col("start_time"),
             col("userId").alias("user_id"),
             col("level"),
             col("song_id"),
             col("artist_id"),
             col("sessionId").alias('session_id'),
             col("location"),
             col("userAgent").alias("user_agent"),
    ).withColumn("month", month(col("start_time"))) \
     .withColumn("year", year(col("start_time")))

songplays_table.printSchema()
songplays_table.show(92)
songplays_table.count()

In [None]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(os.path.join(output_data, 'songplays'))