In [None]:
# Testing Playground
import os
import glob
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, DoubleType, IntegerType, DateType
from pyspark.sql import functions as F

In [None]:
spark = SparkSession \
    .builder \
    .appName("Test AWS Spark") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [None]:
input_data = "data/log_data/*.json"

In [None]:
df_log = spark.read.json(input_data).dropDuplicates()
df_log.createOrReplaceTempView("log_table")
df_log.printSchema()
df_log.show()

In [None]:
df_log.count()

In [None]:
song_data = "data/song_data/*/*/*/*/*.json"
df_song = spark.read.json(song_data)
df_song.createOrReplaceTempView("song_table")
df_song.printSchema()
df_song.head()

In [None]:
df_song.count()

In [None]:
df_log = df_log[df_log['page'] == 'NextSong']

In [None]:
df_log.count()

In [None]:
df_songs = spark.sql("""
    select distinct song_id, title, artist_id, year, duration 
    from song_table
""")

In [None]:
df_songs.count()

In [None]:
df_artists = spark.sql("""
    SELECT DISTINCT artist_id, artist_name, artist_location, artist_latitude, artist_longitude
        FROM ( SELECT artist_id, artist_name, artist_location, artist_latitude, artist_longitude,
                row_number() over (partition by artist_id order by artist_name) as rown
                from song_table
                WHERE artist_id is not NULL)
        WHERE rown=1
""")
df_artists.show()

In [None]:
df_users = spark.sql("""
    SELECT DISTINCT userId, firstName, lastName, gender, level
        FROM ( SELECT userId, firstName, lastName, gender, level,
                row_number() over (partition by userId order by ts desc) as rown
                from log_table
                WHERE page = 'NextSong' AND userId <> '')
        WHERE rown=1
""")
df_users.count()

In [None]:
df_users.write.parquet("output/users", mode='overwrite')
df_users2 = spark.read.parquet("output/users/*.parquet")
df_users2.count()

In [None]:
to_ts = F.udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
df_log = df_log.withColumn("start_time", to_ts(df_log.ts))

df_log.show()

In [None]:
df_log.createOrReplaceTempView("log_table")

In [None]:
df_time = df_log.withColumn("hour", F.hour("start_time")) \
                .withColumn("day", F.dayofmonth("start_time")) \
                .withColumn("week", F.weekofyear("start_time")) \
                .withColumn("month", F.month("start_time")) \
                .withColumn("year", F.year("start_time")) \
                .withColumn("weekday", F.dayofweek("start_time"))\
                .select(["start_time", "hour", "day", "week", "month", "year", "weekday"]).dropDuplicates()\
df_time = df_time.withColumn("pk_year", F.year("start_time")).withColumn("pk_month", F.month("start_time"))
df_time.show()

In [None]:
df_time.write.partitionBy("pk_year", "pk_month").parquet("output/time", mode='overwrite')
df_time2 = spark.read.parquet("output/time/*/*/*.parquet")
df_time2.count()
df_time2.printSchema()

In [None]:
df_songplay = spark.sql("""
    SELECT DISTINCT se.start_time, se.userId as user_id, se.level, ss.song_id, 
    ss.artist_id, se.sessionId as session_id, se.location, se.userAgent as user_agent
    FROM log_table se
    inner join song_table ss on (ss.artist_name = se.artist AND ss.title = se.song)
    where se.page = 'NextSong'
""").withColumn("songplay_id", F.monotonically_increasing_id())
df_songplay.count()


In [None]:
df_songplay.write.parquet("output/songplay", mode='overwrite', partitionBy=["userId", "sessionId"])


In [None]:
df_songplay_par = spark.read.parquet("output/songplay/*/*/*.parquet")
df_songplay_par.count()

In [None]:
df_songplay_par.printSchema()

In [None]:
df_songplay_par.select("*").where(df_songplay_par.level=='n').show()

In [None]:
df_time.createOrReplaceTempView("time_table")

In [None]:
df_songplay = spark.sql("""
    SELECT DISTINCT se.start_time, se.userId as user_id, se.level, ss.song_id, 
    ss.artist_id, se.sessionId as session_id, se.location, se.userAgent as user_agent,
    t3.year, t3.month
    FROM log_table se
    inner join song_table ss on (ss.artist_name = se.artist AND ss.title = se.song)
    left join time_table t3 on t3.start_time = se.start_time 
    where se.page = 'NextSong'
""").withColumn("songplay_id", F.monotonically_increasing_id())

In [None]:
df_songplay.write.parquet("output/songplay", mode='overwrite', partitionBy=["year", "month"])

In [None]:
df_sp_par = spark.read.parquet("output/songplay/*/*/*.parquet")

In [None]:
df_sp_par.withColumn("month", F.month("start_time")).show()

In [None]:
df_sp_par.printSchema()

In [None]:
df_sp_par.createOrReplaceTempView("songplays")

In [None]:
songplays_table = spark.sql("""
            select distinct t1.start_time, t1.userId as user_id, t1.level, t2.song_id, 
            t2.artist_id, t1.sessionId as session_id, t1.location, t1.userAgent as user_agent
            from log_table t1
            inner join song_table t2 on (t2.artist_name = t1.artist AND t2.title = t1.song)
            where t1.page = 'NextSong'
            """) \
        .withColumn("songplay_id", F.monotonically_increasing_id()) \
        .withColumn("year", F.year("start_time")) \
        .withColumn("month", F.month("start_time"))
songplays_table.show()
songplays_table.write.parquet("output/songplay", mode='overwrite', partitionBy = ["year", "month"])

In [None]:
p_someplay = spark.read.parquet("output/songplay/*/*/*.parquet")
p_someplay.printSchema()

In [None]:
p_someplay = spark.read.parquet("output/songplay/*/*/*.parquet")
p_someplay.printSchema()

In [None]:
p_someplay = spark.read.parquet("output/songplay/*/*/*.parquet")
p_someplay.printSchema()

In [None]:
p_users = spark.read.parquet("output/users/*.parquet")
p_users.printSchema()


In [None]:
p_songs = spark.read.parquet("output/songs/*.parquet")
p_songs.printSchema()

In [None]:
p_artists = spark.read.parquet("output/artists/*.parquet")
p_artists.printSchema()

In [None]:
p_time = spark.read.parquet("output/time/*/*/*.parquet")
p_time.printSchema()

In [None]:
p_users.select(["first_name", "last_name"]).where(p_users.level=='paid').orderBy(p_users.first_name).limit(10).show()

In [None]:
p_someplay.show()

In [None]:
df = p_someplay.join(p_users, on=['user_id'], how='inner').join(p_songs, on=['song_id'], how='inner')
df.select(["first_name", "last_name"]).where(df.title=='Setanta matins').show()