# Developing ETL

#### Imports and configs

In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, to_date


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

#### Start Spark session

In [None]:
spark = create_spark_session()
spark

#### Define input / output

In [None]:
# TEST VARIABLES 
input_data = "data/"
output_data = "data/output/"

## Process Song

In [None]:
# get filepath to song data file
song_data = input_data + 'song_data/*/*/*/*.json'

# read song data file
df = spark.read.json( song_data )

df.printSchema()
df.show(5)

In [None]:
df.printSchema()

In [None]:
# extract columns to create songs table
songs_table = df.select( "song_id", "title", "artist_id", "year", "duration" ).distinct()

# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").parquet(output_data + "songs.parquet", mode="overwrite")

In [None]:
# verify if SONGS ok
test = spark.read.parquet(output_data + "songs.parquet")
test.show(5)

In [None]:
# extract columns to create artists table
artists_table = df.selectExpr( "artist_id", "artist_name as name", "artist_location as location", "artist_latitude as lattitude", "artist_longitude as longitude" )\
    .distinct()

# write artists table to parquet files
artists_table.write.parquet(output_data + "artists.parquet", mode="overwrite")

In [None]:
# verify if ARTISTS ok
test = spark.read.parquet(output_data + "artists.parquet")
test.show(5)

## Process Logs

Fetch Logs data

In [None]:
# get filepath to log data file
log_data = input_data + 'log_data/'

# read log data file
df = spark.read.option("recursiveFileLookup","true").json( log_data )

# filter by actions for song plays
df = df.filter( col("page") == "NextSong" ) 

In [None]:
df.printSchema()
df.show()
df.count()

Users dimension

In [None]:
# extract columns for users table    
users_table = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level").distinct()

# write users table to parquet files
users_table.write.parquet(output_data + "users.parquet", mode="overwrite")

In [None]:
# verify if USERS ok
test = spark.read.parquet(output_data + "users.parquet")
test.show(5)

Time Dimension

In [None]:
df.select("ts").show(10)

In [None]:
import datetime
datetime.datetime.fromtimestamp(1542261224796/1000)

In [None]:
# create timestamp column from original timestamp column
get_timestamp = udf( lambda x : datetime.datetime.fromtimestamp( x / 1000 ).strftime( "%Y-%m-%d %H:%M:%S" ) )
df = df.withColumn( "timestamp", to_timestamp( get_timestamp( "ts" ) ) )

# create datetime column from original timestamp column
get_datetime = udf( lambda x : datetime.datetime.fromtimestamp( x / 1000 ).strftime( "%Y-%m-%d" ) )
df = df.withColumn( "date", to_date(get_datetime( "ts" )) )

In [None]:
df.printSchema()

In [None]:
# extract columns to create time table
df.createOrReplaceTempView("timetable")

time_table = spark.sql("""
        SELECT DISTINCT 
                timestamp AS start_time, 
                HOUR(timestamp) AS hour, 
                DAY(timestamp) AS day, 
                WEEKOFYEAR(timestamp) AS week, 
                MONTH(timestamp) AS month, 
                YEAR(timestamp) AS year, 
                DAYOFWEEK(timestamp) AS weekday
            FROM timetable 
    """)

time_table.show()

In [None]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(output_data + "time.parquet", mode="overwrite")

In [None]:
# verify if TIME ok
test = spark.read.parquet(output_data + "time.parquet")
test.show(5)

In [None]:
# read in song data to use for songplays table
song_df = spark.read.parquet( output_data + "songs.parquet" )
artist_df = spark.read.parquet( output_data + "artists.parquet" ).selectExpr("artist_id as ref_artist" , "name")
song_df = song_df.join(artist_df, song_df.artist_id == artist_df.ref_artist )


In [None]:
song_df.printSchema()

In [None]:
df.printSchema()

In [None]:
if song_df.count() > 0 : 
    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df , (df.artist == song_df.name) & (df.song == song_df.title) , how='left')\
        .selectExpr("concat_ws('_', userId, ts) as songplay_id", "timestamp as start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionId as session_id", "location", "userAgent as user_agent" )

    songplays_table.limit(10).show()
    songplays_table.printSchema()

In [None]:
    from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, to_date
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.withColumn("year", year("start_time")).withColumn("month", month("start_time"))\
        .write.partitionBy("year", "month")\
        .parquet(output_data + "songplays.parquet", mode="overwrite")