In [None]:
from pyspark.sql import SparkSession
import zipfile
import os
import glob
import pandas as pd

In [None]:
spark = SparkSession \
    .builder \
    .appName("Data Frames practice") \
    .getOrCreate()

In [None]:
zf = zipfile.ZipFile('data/song-data.zip') 
zf.extractall(path='data/song-data')

In [21]:
df = spark.read.format('json').load('data/song-data/song_data/*/*/*')

In [22]:
df.count()

71

In [None]:
df.printSchema()

In [23]:
df_song_table = df.select(['song_id','title','artist_id','year','duration']).dropDuplicates()

In [None]:
df_song_table.count()

In [None]:
df = spark.read.json('data/schema/song_table/*/*/*')

In [26]:
df_song_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet("data/schema/song_table")

In [25]:
df_song_table.write.save("data/schema/song_table", format="parquet", header=True, \
                         partitionBy = ('year', 'artist_id'), mode = 'overwrite')

In [None]:
df_artist_table = df.select(['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']).dropDuplicates()

In [None]:
df_artist_table.count()

In [None]:
df_artist_table.write.save("data/schema/artist_table", format="json", header=True)

In [27]:
df_log = spark.read.json('data/log-data')

In [None]:
df_log.printSchema()

In [None]:
df_log.count()

In [None]:
df_log.show()

In [None]:
df_log = df_log.where(df_log.page == 'NextSong')

In [None]:
df_user_table = df_log.select(['userId','firstName','lastName','gender','level']).where('userId != ""') \
.sort('ts').dropDuplicates(['userId','firstName','lastName'])

In [None]:
df_user_table.count()

In [None]:
df_user_table.sort('userId').show(n=110)

In [None]:
df_user_table.write.save("data/schema/user_table", format="json", header=True)

In [29]:
from datetime import datetime
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, dayofweek, concat
from pyspark.sql.types import StringType

In [None]:
df_log.printSchema()

In [None]:
df_time = df_log.select('ts').dropDuplicates() \
                .withColumn('ts', to_timestamp('ts')).select(col("ts").alias("start_time")) \
                .withColumn('hour', hour('start_time')) \
                .withColumn('day', dayofmonth('start_time')) \
                .withColumn('week', weekofyear('start_time')) \
                .withColumn('month', month('start_time')) \
                .withColumn('year', year('start_time')) \
                .withColumn('weekday', dayofweek('start_time'))

In [None]:
df_time.show(n=2)

In [None]:
df_time.printSchema()

In [None]:
df_time.show(n=2,truncate=False)

In [None]:
df_time.write.save("data/schema/time_table", format="json", header=True)

In [None]:
df.printSchema()

In [None]:
df_log.printSchema()

In [None]:
df_log = df_log.withColumn('sessionId', df_log.sessionId.cast(StringType()))

In [None]:
df = df.select(df.df_song_artist, df.artist_id, df.song_id, df.dfsong.alias("df_song_song"))

In [44]:
df_song_play = df_log.join(df, df_log.artist == df.artist_name, 'inner')\
                    .select(concat(df_log.sessionId, df_log.itemInSession).alias('songplay_id'),\
                            to_timestamp(df_log.ts).alias('start_time'),\
                            year(to_timestamp(df_log.ts)).alias('year'),\
                            month(to_timestamp(df_log.ts)).alias('month'),\
                   df_log.userId,df_log.level,df.song_id, df.artist_id,df_log.sessionId,\
                    df.artist_location.alias('location'),df_log.userAgent)

In [45]:
df_song_play.write.mode('overwrite').partitionBy('year', 'month') \
    .parquet("data/schema/song_play_table")

In [None]:
df_song_play.printSchema()

In [None]:
df_song_play.write.save("data/schema/song_play_table_2", format="json", header=True)

In [None]:
df_song_play.show(truncate=False)

In [None]:
df_song_play = spark.read.json('data/schema/song_play_table_2')