In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import udf,to_timestamp,hour,dayofmonth,weekofyear,month,year,dayofweek,date_format,row_number
from pyspark.sql import Window
import datetime

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [5]:
import zipfile
zf1 = zipfile.ZipFile("data/song-data.zip",'r')
zf1.extractall("data/song-data/")
zf2 = zipfile.ZipFile("data/log-data.zip",'r')
zf2.extractall("data/log-data/")
song_df = spark.read.json("data/song-data-json/song_data/*/*/*/*")
log_df = spark.read.json("data/log-data-json/")

In [6]:
filcols = ["song_id", "title", "artist_id", "year", "duration"]

In [7]:
songs_table = song_df[filcols]

In [None]:
songs_table.createOrReplaceTempView("songs")

In [None]:
songs_table = songs_table.write.partitionBy("year","artist_id").parquet("spark-warehouse/songs/")

In [8]:
filartistcols = ["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]

In [9]:
artists_table = song_df[filartistcols].dropDuplicates()

In [10]:
artists_table.createOrReplaceTempView('artists')

In [None]:
artists_table = artists_table.write.parquet("spark-warehouse/artists/")

In [11]:
log_df = log_df[log_df['page']=='NextSong']

In [12]:
filusercols = ["userId","firstName","lastName","gender","level"]

In [13]:
users_table = log_df[filusercols].dropDuplicates()

In [None]:
users_table.createOrReplaceTempView('users')

In [None]:
users_table = users_table.write.parquet("spark-warehouse/users/")

In [14]:
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [15]:
log_df = log_df.withColumn('start_time',to_timestamp(get_timestamp(log_df.ts)))\
       .withColumn('hour',hour(to_timestamp(get_timestamp(log_df.ts))))\
       .withColumn('day',dayofmonth(to_timestamp(get_timestamp(log_df.ts))))\
       .withColumn('week',weekofyear(to_timestamp(get_timestamp(log_df.ts))))\
       .withColumn('month',month(to_timestamp(get_timestamp(log_df.ts))))\
       .withColumn('year',year(to_timestamp(get_timestamp(log_df.ts))))\
       .withColumn('weekday',dayofweek(to_timestamp(get_timestamp(log_df.ts))))

In [16]:
time_table = log_df[['start_time','hour','day','week','month','year','weekday']].dropDuplicates()

In [None]:
time_table.createOrReplaceTempView('time')

In [None]:
time_table = time_table.write.partitionBy("year","month").parquet(("spark-warehouse/time/"))

In [17]:
songplay_df = log_df.join(song_df,(log_df.song == song_df.title)&(log_df.artist == song_df.artist_name)&(log_df.length == song_df.duration)&(log_df.page == 'NextSong'),"inner")

In [18]:
songplay_df = songplay_df.withColumn('Auto_Increment',lit(0))\
                        .withColumn('start_time',to_timestamp(get_timestamp(songplay_df.ts)))\
                        .withColumn('user_id',songplay_df.userId)\
                        .withColumn('level',songplay_df.level)\
                        .withColumn('song_id',songplay_df.song_id)\
                        .withColumn('artist_id',songplay_df.artist_id)\
                        .withColumn('session_id',songplay_df.sessionId)\
                        .withColumn('location',songplay_df.location)\
                        .withColumn('userAgent',songplay_df.userAgent)\
                        .withColumn('songplays_month',month(to_timestamp(get_timestamp(songplay_df.ts))))\
                        .withColumn('songplays_year',year(to_timestamp(get_timestamp(songplay_df.ts))))

In [19]:
songplay_df=songplay_df.withColumn('songplay_id',row_number().over(Window.partitionBy(songplay_df.Auto_Increment).orderBy(songplay_df.user_id)))\
                        .withColumn('start_time',to_timestamp(get_timestamp(songplay_df.ts)))\
                        .withColumn('user_id',songplay_df.userId)\
                        .withColumn('level',songplay_df.level)\
                        .withColumn('song_id',songplay_df.song_id)\
                        .withColumn('artist_id',songplay_df.artist_id)\
                        .withColumn('session_id',songplay_df.sessionId)\
                        .withColumn('location',songplay_df.location)\
                        .withColumn('userAgent',songplay_df.userAgent)\
                        .withColumn('songplays_month',month(to_timestamp(get_timestamp(songplay_df.ts))))\
                        .withColumn('songplays_year',year(to_timestamp(get_timestamp(songplay_df.ts))))

In [20]:
songplays_table = songplay_df[['songplay_id','start_time','user_id','level','song_id','artist_id',\
                               'session_id','location','userAgent','songplays_month','songplays_year']].dropDuplicates()

In [22]:
songplays_table.createOrReplaceTempView('songplays')

In [23]:
spark.sql('select * from songplays').show()

+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+---------------+--------------+
|songplay_id|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|           userAgent|songplays_month|songplays_year|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+---------------+--------------+
|          1|2018-11-21 21:56:47|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|             11|          2018|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+---------------+--------------+



In [24]:
songplays_table = songplays_table.write.partitionBy("songplays_year","songplays_month").parquet("spark-warehouse/songplays/")