In [None]:
import boto3
import configparser
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

from pyspark.sql.window import Window


In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

In [None]:

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']


In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [None]:
input_data = "s3a://udacity-dend/"
#input_data ='data/'

In [None]:
song_data = input_data + "song_data/A/A/A/*"

In [None]:
song_df = spark.read.json(song_data)


In [None]:
song_df.count()
song_df.show(3,False)

In [None]:
songs_table = song_df.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()
songs_table.show(5, False)

In [None]:
artists_table = song_df.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).distinct()
artists_table.show(5,False)

In [None]:
log_data = input_data + 'log_data/*/*/*.json'

In [None]:
log_df = spark.read.json(log_data)
log_df.show(2)

In [None]:
log_df.count()

In [None]:
log_df = log_df.where('page="NextSong"')
log_df.show(2)

In [None]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"]).distinct()
users_table.show(5, truncate = False)

In [None]:
log_df = log_df.withColumn('timestamp',((log_df.ts.cast('float')/1000).cast('timestamp') ))

In [None]:
log_df.select('timestamp').show(3,False) 

In [None]:
time_table = log_df.select(F.col('timestamp').alias('start_time'),\
                          F.hour('timestamp').alias('hour'),\
                          F.dayofmonth('timestamp').alias('day'),\
                          F.weekofyear('timestamp').alias('week'),\
                           F.month('timestamp').alias('month'),\
                           F.year('timestamp').alias('year'),\
                           F.date_format(F.col('timestamp'),'E').alias('weekday')\
                          )

In [None]:
time_table.show(3,False)

In [None]:
song_df = spark.read.json(input_data+'song_data/A/A/A/*.json')

In [None]:
log_df.createOrReplaceTempView("log_data_table")

In [None]:
song_df.createOrReplaceTempView("song_data_table")

In [None]:
#song_log_table = log_df.join(song_df, (log_df.song == song_df.title) & (log_df.artist == song_df.artist_name) & (log_df.length == song_df.duration), how='inner')
songplays_table = spark.sql("""
                                SELECT monotonically_increasing_id() as songplay_id,
                                to_timestamp(lt.ts/1000) as start_time,
                                month(to_timestamp(lt.ts/1000)) as month,
                                year(to_timestamp(lt.ts/1000)) as year,
                                lt.userId as user_id,
                                lt.level as level,
                                st.song_id as song_id,
                                st.artist_id as artist_id,
                                lt.sessionId as session_id,
                                lt.location as location,
                                lt.userAgent as user_agent
                                FROM log_data_table lt
                                JOIN song_data_table st on lt.artist = st.artist_name and lt.song = st.title
                            """)

In [None]:
#song_log_table.show(10)

In [None]:
#songplays_table = song_log_table.distinct() \
#                     .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
#                     .withColumn("songplay_id", F.row_number().over( Window.partitionBy("timestamp").orderBy("timestamp"))) \
#                     .withColumnRenamed("userId","user_id")        \
#                     .withColumnRenamed("timestamp","start_time")  \
#                     .withColumnRenamed("sessionId","session_id")  \
#                     .withColumnRenamed("userAgent", "user_agent") \

In [None]:
songplays_table.show(10)