In [83]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import monotonically_increasing_id


In [2]:
spark = SparkSession \
        .builder \
        .getOrCreate()

In [3]:
spark

In [4]:
song_data = "song_data/*/*/*"

In [5]:
pwd

'/home/workspace'

In [6]:
ls

[0m[01;34mdata[0m/   etl.py     README.md   [01;34mspark-warehouse[0m/
dl.cfg  [01;34mlog_data[0m/  [01;34msong_data[0m/  Untitled.ipynb


In [7]:
!unzip data/song-data.zip

Archive:  data/song-data.zip
   creating: song_data/
  inflating: song_data/.DS_Store     
   creating: song_data/A/
  inflating: song_data/A/.DS_Store   
   creating: song_data/A/A/
  inflating: song_data/A/A/.DS_Store  
   creating: song_data/A/A/A/
  inflating: song_data/A/A/A/TRAAAAW128F429D538.json  
  inflating: song_data/A/A/A/TRAAABD128F429CF47.json  
  inflating: song_data/A/A/A/TRAAADZ128F9348C2E.json  
  inflating: song_data/A/A/A/TRAAAEF128F4273421.json  
  inflating: song_data/A/A/A/TRAAAFD128F92F423A.json  
  inflating: song_data/A/A/A/TRAAAMO128F1481E7F.json  
  inflating: song_data/A/A/A/TRAAAMQ128F1460CD3.json  
  inflating: song_data/A/A/A/TRAAAPK128E0786D96.json  
  inflating: song_data/A/A/A/TRAAARJ128F9320760.json  
  inflating: song_data/A/A/A/TRAAAVG12903CFA543.json  
  inflating: song_data/A/A/A/TRAAAVO128F93133D4.json  
   creating: song_data/A/A/B/
  inflating: song_data/A/A/B/TRAABCL128F4286650.json  
  inflating: song_data/A/A/B/TRAABDL12903CAABBA.json  
  i

In [7]:
# read song data file
df=spark.read.json(song_data)

In [8]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
# extract columns to create songs table
songs_table=df['song_id','title','artist_id','year','duration']

In [10]:
song_table=songs_table.dropDuplicates(['song_id'])

In [11]:
# write songs table to parquet files partitioned by year and artist

In [12]:
# extract columns to create artists table
artists_table = df['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']

In [13]:
artists_table=artists_table.dropDuplicates(['artist_id'])

In [14]:
# write artists table to parquet files

## process_log_data

In [91]:
# read log data file
df=spark.read.format('json').load("log_data/*")

In [92]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [93]:
# filter by actions for song plays
#df1=df1.filter(df1['page']=='NextSong')

In [94]:
#get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
#spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

In [95]:
# extract columns for users table  
users_table=df['userId','firstName','lastName','gender','level']

In [96]:
users_table=users_table.dropDuplicates(['userId'])

In [97]:
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    51|     Maia|   Burke|     F| free|
|     7|   Adelyn|  Jordan|     F| free|
|    15|     Lily|    Koch|     F| paid|
|    54|    Kaleb|    Cook|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows



In [21]:
# write users table to parquet files

In [98]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x:str(int(int(x)/1000)))
df = df.withColumn('timestamp',get_timestamp(df.ts))

In [99]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x : str(datetime.fromtimestamp(int(x)/1000)))
df = df.withColumn('datetime',get_datetime(df.ts))

In [100]:
df.show(2,truncate=False)

+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+----------+--------------------------+
|artist     |auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration     |sessionId|song           |status|ts           |userAgent                                                                                                                                |userId|timestamp |datetime                  |
+-----------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+---------------+------+-------------+-----------------------------------------------

In [101]:
# extract columns to create time table
time_table = df.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'),
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format("datetime","u").alias('weekday')
)

In [102]:
time_table=time_table.dropDuplicates(['start_time'])

In [105]:
time_table.show(5,truncate=False)

+--------------------------+----+---+----+-----+----+-------+
|start_time                |hour|day|week|month|year|weekday|
+--------------------------+----+---+----+-----+----+-------+
|2018-11-02 10:24:12.796000|10  |2  |44  |11   |2018|5      |
|2018-11-04 06:29:41.796000|6   |4  |44  |11   |2018|7      |
|2018-11-04 09:28:01.796000|9   |4  |44  |11   |2018|7      |
|2018-11-05 03:04:18.796000|3   |5  |45  |11   |2018|1      |
|2018-11-05 10:20:49.796000|10  |5  |45  |11   |2018|1      |
+--------------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [None]:
# write time table to parquet files partitioned by year and month
#time_table

In [106]:
# read in song data to use for songplays table
song_df =spark.read.json(song_data)

In [107]:
#song_data
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [108]:
#log_data
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [127]:
# extract columns from joined song and log datasets to create songplays table 
df1=df.join(song_df,song_df.artist_name==df.artist) 



   # songplays_table = df.join(song_df, song_df.artist_name == df.artist)
    #songplays_table = songplays_table.withColumn("songplay_id",F.monotonically_increasing_id())
    #songplays_table = songplays_table[['songplay_id', 'start_time', 'userId', 'level', 'song_id', 
                                      #'artist_id', 'sessionId', 'location', 'userAgent','month','year']]

In [128]:
df1=df1.withColumn("songplay_id",monotonically_increasing_id())

In [130]:
songplays_table = df1.select(
    col('songplay_id').alias('songplay_id'),
    col('ts').alias('start_time'),
    col('userID').alias('user_id'),
    col('level').alias('level'),
    col('song_id').alias('song_id'),
    col('artist_id').alias('artist_id'),
    col('sessionId').alias('sessionId'),
    col('location').alias('location'),
    col('userAgent').alias('userAgent'),
    col('year').alias('year'),
    month('datetime').alias('month')
)

In [133]:
songplays_table.show(2)

+-----------+-------------+-------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|songplay_id|   start_time|user_id|level|           song_id|         artist_id|sessionId|            location|           userAgent|year|month|
+-----------+-------------+-------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|          0|1542313967796|     44| paid|SOBONFF12A6D4F84D8|ARIK43K1187B9AE54C|      619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|1986|   11|
|          1|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|   0|   11|
+-----------+-------------+-------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
only showing top 2 rows

