In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, monotonically_increasing_id
from pyspark.sql.types import TimestampType, DateType

## Read config file

In [None]:
config = configparser.ConfigParser()

config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

SONG_DATA_SET=config['TEST']['SONG_DATA_SET']
LOG_DATA_SET=config['TEST']['LOG_DATA_SET']
OUTPUT_DATA=config['TEST']['OUTPUT_DATA']

print(SONG_DATA_SET)
print(LOG_DATA_SET)
print(OUTPUT_DATA)


## Create spark session

In [None]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Read song data and write to parquet files

In [None]:
df_songdata = spark.read.json(SONG_DATA_SET)
df_songdata.createOrReplaceTempView("df_songs_table")

#df_songdata.printSchema()
#df_songdata.show(5, truncate=False)

In [None]:
songs_table = df_songdata.select("song_id","title","artist_id","year","duration").drop_duplicates()

#songs_table.printSchema()

In [None]:
songs_table.write.parquet(OUTPUT_DATA + "songs/", mode="overwrite", partitionBy=["year","artist_id"])

## Create artists table and write to parquet files

In [None]:
artists_table = df_songdata.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude").drop_duplicates()

#artists_table.printSchema()

In [None]:
artists_table.write.parquet(OUTPUT_DATA + "artists/", mode="overwrite")

## Read log data

In [None]:
df_logdata = spark.read.json(LOG_DATA_SET)

#df_logdata.printSchema()
#df_logdata.show(5, truncate=False)

In [None]:
df_logdata = df_logdata.filter(df_logdata.page == "NextSong")

## Create Users table and write to parquet files

In [None]:
users_table = df_logdata.select("userId","firstName","lastName","gender","level").drop_duplicates()

#users_table.printSchema()
#users_table.count()

In [None]:
users_table.write.parquet(OUTPUT_DATA + "users/", mode="overwrite")

### Add timestamp column

In [None]:
get_timestamp = udf(lambda ts : datetime.utcfromtimestamp(int(ts)/1000), TimestampType())   
df = df_logdata.withColumn("timestamp", get_timestamp(col("ts")))

#df.printSchema()

### Add datetime column

In [None]:
get_datetime = udf(lambda ts: to_date(ts), TimestampType())
df = df.withColumn("start_time", get_timestamp(col("ts")))

#df.printSchema()

In [None]:
df = df.withColumn("hour",hour("start_time"))\
        .withColumn("day",dayofmonth("start_time"))\
        .withColumn("week",weekofyear("start_time"))\
        .withColumn("month",month("start_time"))\
        .withColumn("year",year("start_time"))\
        .withColumn("weekday",dayofweek("start_time"))

## Create time table and write to parquet files

In [None]:
time_table = df.select("start_time", "hour", "day", "week", "month", "year", "weekday").distinct()

#time_table.printSchema()

In [None]:
time_table.write.parquet(OUTPUT_DATA + "time_table/", mode="overwrite", partitionBy=["year","month"])

## Create songplays table and write to parquet files

In [None]:
song_df = spark.sql("SELECT DISTINCT song_id, artist_id, artist_name FROM df_songs_table")

#song_df.printSchema()

In [None]:
songplays_table = df.join(song_df, song_df.artist_name == df.artist, "inner")\
        .distinct()\
        .select("start_time", "userId", "level", "sessionId", "location", "userAgent","song_id","artist_id", "month", "year")\
        .withColumn("songplay_id", monotonically_increasing_id())

#songplays_table.printSchema()

In [None]:
songplays_table.write.parquet(OUTPUT_DATA + "songplays_table/", mode="overwrite", partitionBy=["year", "month"])

### Read songplays table without partitioning columns

In [None]:
read_df = spark.read.parquet(OUTPUT_DATA + "songplays_table/*/*")

In [None]:
read_df.printSchema()

In [None]:
read_df.show(5, truncate=False)