In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col

from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear,dayofweek, date_format
import  pyspark.sql.functions as F

In [None]:
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [None]:
output_data = "/home/workspace/"

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config["AWS"]['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config["AWS"]['AWS_SECRET_ACCESS_KEY']

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate() 

In [None]:
song_data = "/home/workspace/data/song_data"
df = spark.read.json(song_data)

In [None]:
df.printSchema()

In [None]:
df.createOrReplaceTempView("v_song_data")

In [None]:
songs_table = spark.sql('select distinct song_id, title, artist_id, year, duration from v_song_data')

In [None]:
songs_table.toPandas()

In [None]:
songs_table.write.mode('overwrite').parquet(output_data+'songs_table/')

In [None]:
songs_table.write.mode('overwrite').partitionBy("year","artist_id").parquet(output_data+'song/') 


In [None]:
artists_table = spark.sql('select distinct artist_id, artist_name, artist_location, artist_latitude, artist_longitude from v_song_data')

In [None]:
artists_table.toPandas()

In [None]:
artists_table.write.mode('overwrite').parquet(output_data+'artists_table/')

In [None]:
log_data = "/home/workspace/data/log_data"
df2 = spark.read.json(log_data)

In [None]:
df2 = df2.filter(df2.page == 'NextSong')

In [None]:
df2.createOrReplaceTempView("v_log_data")


In [None]:
df2.printSchema()

In [None]:
df2.toPandas().head(1)

In [None]:
get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000).strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
df2=df2.withColumn("start_time", get_timestamp("ts"))

In [None]:
df2.printSchema()

In [None]:
df2.toPandas().head(1)

In [None]:
 time_table = df2.withColumn("hour",F.hour("start_time"))\
                    .withColumn("day",F.dayofmonth("start_time"))\
                    .withColumn("week",F.weekofyear("start_time"))\
                    .withColumn("month",F.month("start_time"))\
                    .withColumn("year",F.year("start_time"))\
                    .withColumn("weekday",F.dayofweek("start_time"))\
                    .select("ts","start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()

In [None]:
time_table.show()

In [None]:
time_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+'time_table/')

In [None]:
# read in song data to use for songplays table
song_df = spark.read.json(song_data)

In [None]:
df.createOrReplaceTempView("stage_songs")

In [None]:
spark.sql(""" 
    
        SELECT userId 
        FROM v_log_data 
        
                                 
        """)

In [None]:
songplays_table = spark.sql(""" 
    
        SELECT  DISTINCT monotonically_increasing_id() as songplay_id,
        to_timestamp(log.ts/1000) as start_time,
        log.userId, song.song_id, song.artist_id, log.sessionId, log.location, log.userAgent
        FROM v_log_data log
        JOIN
        v_song_data song
        ON  log.song  = song.title
        AND log.artist = song.artist_name
        AND log.length = song.duration
                                 
        """)



In [None]:
songplays_table.toPandas().head(10)

In [None]:
songplays_table = songplays_table.withColumn("month",F.month("start_time")).withColumn("year",F.year("start_time"))

In [None]:
songplays_table.write.partitionBy("year","month").parquet(output_data+'songplays_table/')
