In [1]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1562809040696_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
spark

VBox()

<pyspark.sql.session.SparkSession object at 0x7fc36f856e50>

In [3]:
input_data = "s3a://udacity-dend/"
output_data = "s3://swtown-udacity-datalake/"

VBox()

In [10]:
# get filepath to song data file
song_data = os.path.join(input_data, "song_data/*/*/*/*.json")
song_data    

VBox()

's3a://udacity-dend/song_data/*/*/*/*.json'

In [11]:
# read song data file
song_df = spark.read.json(song_data).dropDuplicates() 
song_df.printSchema()

VBox()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

In [12]:
# extract columns to create songs table
songs_table = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration')
songs_table.show(5, truncate=False)

VBox()

+------------------+----------------------------------------------------------+------------------+----+---------+
|song_id           |title                                                     |artist_id         |year|duration |
+------------------+----------------------------------------------------------+------------------+----+---------+
|SOVIYJY12AF72A4B00|The Dead Next Door (Digitally Remastered 99)              |AR4T2IF1187B9ADBB7|1983|233.22077|
|SOVYXYL12AF72A3373|Rebel Yell (1999 Digital Remaster)                        |AR4T2IF1187B9ADBB7|1983|287.92118|
|SOEPTVC12A67ADD0DA|To Zucchabar ["Gladiator" - Music from the Motion Picture]|ARQ846I1187B9A7083|0   |196.04853|
|SOLQYSZ12AB0181F97|Mony Mony (Live)                                          |AR4T2IF1187B9ADBB7|1987|247.53587|
|SOVPFJK12A6701CB16|Barcelona - (Friends until the end)                       |AR3TZ691187FB3DBB1|2000|273.44934|
+------------------+----------------------------------------------------------+---------

In [7]:
# write songs table to parquet files partitioned by year and artist
#songs_table.sort('year','artist_id').write.parquet('s3://swtown-udacity-datalake/songs/')
songs_table.write.partitionBy('year','artist_id').parquet(f'{output_data}songs_table', mode='overwrite')

VBox()

In [8]:
# extract columns to create artists table
artists_table = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') 
artists_table = artists_table.toDF('artist_id', 'name', 'location', 'lattitude','longitude')
artists_table.printSchema()

VBox()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)

In [9]:
# write artists table to parquet files
artists_table.write.parquet(f'{output_data}artists_table', mode='overwrite')

VBox()

In [12]:
# get filepath to log data file
log_data = os.path.join(input_data, "log_data/*/*/*.json")

VBox()

In [13]:
# read log data file
log_df = spark.read.json(log_data).dropDuplicates() 
log_df.printSchema()
log_df.show(5, truncate=False)

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+-------------------------------------+------+--------+-----------------+---------+---------------+------+-------------+---------------------------------------------------------------------------------------------------------

In [16]:
# filter by actions for song plays
log_df = log_df.filter(log_df.page == 'NextSong')
log_df.show(5, truncate=False)

VBox()

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+
|artist     |auth     |firstName|gender|itemInSession|lastName|length   |level|location                            |method|page    |registration     |sessionId|song                                          |status|ts           |userAgent                                                                                                                                |userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+------------------------

In [17]:
# extract columns for users table    
users_table = log_df.select('userId','firstName','lastName','gender','level')  
users_table = users_table.toDF('user_id','first_name','last_name','gender','level' ) 

VBox()

In [18]:
# write users table to parquet files
users_table.write.parquet(f'{output_data}users_table', mode='overwrite')

VBox()

In [20]:
# create timestamp column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
log_df = log_df.withColumn("datetime", get_datetime(log_df.ts))

VBox()

In [22]:
# create datetime column from original timestamp column
get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
log_df = log_df.withColumn("timestamp", get_timestamp(log_df.ts))

VBox()

In [31]:
# extract columns to create time table
#start_time, hour, day, week, month, year, weekday

log_df.show(5, truncate=False)
#date_format('capturetime', 'u').alias('dow_number')

VBox()

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+--------------------------+----------+
|artist     |auth     |firstName|gender|itemInSession|lastName|length   |level|location                            |method|page    |registration     |sessionId|song                                          |status|ts           |userAgent                                                                                                                                |userId|datetime                  |timestamp |
+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------

In [39]:
#year, month, dayofmonth, hour, weekofyear, date_format

time_table =  log_df.select(('timestamp')
                            ,hour('datetime').alias('hour')
                            ,dayofmonth('datetime').alias('day')
                            ,weekofyear('datetime').alias('week')
                            ,month('datetime').alias('month')
                            ,year('datetime').alias('year')
                            ,date_format('datetime','u').alias('weekday'))
                    
time_table.show(5, truncate=False)

VBox()

+----------+----+---+----+-----+----+-------+
|timestamp |hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|1542241826|0   |15 |46  |11   |2018|4      |
|1542242481|0   |15 |46  |11   |2018|4      |
|1542242741|0   |15 |46  |11   |2018|4      |
|1542253449|3   |15 |46  |11   |2018|4      |
|1542260935|5   |15 |46  |11   |2018|4      |
+----------+----+---+----+-----+----+-------+
only showing top 5 rows

In [40]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').parquet(f'{output_data}time_table', mode='overwrite')



VBox()

In [None]:
# read in song data to use for songplays table
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
song_df = song_df.select('song_id','artist_id','artist_name')

In [61]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = log_df.join(song_df,log_df.artist == song_df.artist_name)

songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id())
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = songplays_table.select('songplay_id'
                                         ,'ts'
                                         ,'userId'
                                         ,'level'
                                         ,'song_id'
                                         ,'artist_id'
                                         ,'sessionId'
                                         ,'location'
                                         ,'userAgent'
                                        ,month('datetime').alias('month')
                                        ,year('datetime').alias('year')
                                        )
songplays_table.toDF('songplay_id'
                     ,'start_time'
                     ,'user_id'
                     ,'level'
                     ,'song_id'
                     ,'artist_id'
                     ,'session_id'
                     ,'location'
                     ,'user_agent'
                     ,'month'
                     ,'year'
                    )

songplays_table.show(5)

VBox()

+-----------+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|songplay_id|           ts|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|
+-----------+-------------+------+-----+------------------+------------------+---------+--------------------+--------------------+
|          0|1542300931796|    42| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|      404|New York-Newark-J...|"Mozilla/5.0 (Win...|
|          1|1542778219796|    97| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|      797|Lansing-East Lans...|"Mozilla/5.0 (X11...|
|          2|1543424091796|    14| free|SOIGHOD12A8C13B5A1|ARY589G1187B9A9F4E|      929|       Red Bluff, CA|Mozilla/5.0 (Wind...|
|          3|1541384515796|    44| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|      237|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|
|          4|1543584044796|    43| free|SOXZYWX12A6310ED0C|ARC1IHZ1187FB4E920|     

In [67]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year', 'month').parquet(f'{output_data}songplays_table', mode='overwrite')

VBox()

'DataFrame' object has no attribute '_get_object_id'
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 644, in partitionBy
    self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/column.py", line 66, in _to_seq
    return sc._jvm.PythonUtils.toSeq(cols)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
    args_command, temp_args = self._build_args(*args)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1212, in _build_args
    (new_args, temp_args) = self._get_args(args)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1199, in _get_args
    temp_arg = converter.convert(arg, self.gateway_client)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_collections.py", line 501, in convert
    java_list.add(element)
  File "/

In [58]:
df = spark.read.parquet('s3://swtown-udacity-datalake/songplays/')
df.printSchema()

VBox()

root
 |-- songplay_id: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

In [66]:
log_df.select(year('datetime'),month('datetime'))


VBox()

DataFrame[year(datetime): int, month(datetime): int]