This is just a notebook to explore the data and to practice pyspark with it, in order to get good code to use in etl.py

References:

- https://sparkbyexamples.com/spark/spark-get-day-of-week-number/
- https://mungingdata.com/apache-spark/partitionby/
- https://dzone.com/articles/performance-implications-of-partitioning-in-apache

In [25]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [28]:
import boto3

#s3 = boto3.resource('s3')
#my_bucket = s3.Bucket('udacity-dend-spark')
#for my_bucket_object in my_bucket.objects.all():
#    print(my_bucket_object)

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [27]:
input_data = "s3a://udacity-dend/"
output_data = 's3a://udacity-dend-spark/'

# To test on local data which is faster
input_data = "/home/workspace/data/"
output_data = "/home/workspace/data/"

spark = create_spark_session()

In [5]:
!unzip /home/workspace/data/song-data.zip -d /home/workspace/data/ >/dev/null

In [6]:
!unzip /home/workspace/data/log-data.zip -d /home/workspace/data/log_data >/dev/null

In [7]:
%%time


#song_data = input_data + 'song_data/A/A/A/*.json'
song_data = input_data + 'song_data/*/*/*/*.json'


# read song data file
df = spark.read.json(song_data)

CPU times: user 526 µs, sys: 4 ms, total: 4.53 ms
Wall time: 10.7 s


In [8]:
#!rm -Rf /home/workspace/data/song_data
#!rm -Rf /home/workspace/data/log_data

In [9]:
df.count()

71

In [10]:
df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994


In [11]:
# extract columns to create songs table
songs_table = df.select("song_id", 
                        "title", 
                        "artist_id", 
                        "year", 
                        "duration").dropDuplicates()

# write songs table to parquet files partitioned by year and artist

songs_table.write.parquet(
    output_data + "songs_table.parquet",
    mode="overwrite",
    partitionBy=["year", "artist_id"]
)

In [122]:
# extract columns to create artists table
artists_table = df.select("artist_id",
                          col("artist_name").alias("name"),
                          col("artist_location").alias("location"),
                          col("artist_latitude").alias("latitude"),
                          col("artist_longitude").alias("longitude")).dropDuplicates()


In [13]:
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARPBNLO1187FB3D52F,Tiny Tim,"New York, NY",40.71455,-74.00712
1,ARXR32B1187FB57099,Gob,,,
2,AROGWRA122988FEE45,Christos Dantis,,,
3,ARBGXIG122988F409D,Steel Rain,California - SF,37.77916,-122.42005
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952


In [14]:
artists_table.write.parquet(
    output_data + "artists_table.parquet", mode="overwrite"
)

## Log events file

In [17]:
log_data = input_data + 'log_data/2018/11/*.json'
log_data = input_data + 'log_data'

In [18]:
%%time
df_log = spark.read.json(log_data)

CPU times: user 2.25 ms, sys: 0 ns, total: 2.25 ms
Wall time: 792 ms


In [19]:
df_log.count()

8056

In [132]:
df_log = df_log.filter("userId IS NOT NULL and page='NextSong'")

In [133]:
df_log.count()

6820

In [20]:
df_log.createOrReplaceTempView("log_data_table")

In [21]:
spark.sql("""
select * from log_data_table
limit 5
""").toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [22]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [23]:
spark.sql("SELECT cast(ts/1000 as Timestamp) from log_data_table limit 5").show(5, False)

+--------------------------------------------------------------+
|CAST((CAST(ts AS DOUBLE) / CAST(1000 AS DOUBLE)) AS TIMESTAMP)|
+--------------------------------------------------------------+
|2018-11-15 00:30:26.796                                       |
|2018-11-15 00:41:21.796                                       |
|2018-11-15 00:45:41.796                                       |
|2018-11-15 01:57:51.796                                       |
|2018-11-15 03:29:37.796                                       |
+--------------------------------------------------------------+



In [24]:
get_timestamp = udf(lambda x: str(int(x) // 1000))
df_log = df_log.withColumn("timestamp", get_timestamp(col("ts")))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
df_log = df_log.withColumn("datetime", get_datetime(col("ts")))

In [25]:
df_log.select("ts", "timestamp", "datetime").show(2, False)

+-------------+----------+--------------------------+
|ts           |timestamp |datetime                  |
+-------------+----------+--------------------------+
|1542241826796|1542241826|2018-11-15 00:30:26.796000|
|1542242481796|1542242481|2018-11-15 00:41:21.796000|
+-------------+----------+--------------------------+
only showing top 2 rows



In [26]:
# extract columns to create time table
time_table = df_log.select(
     col('timestamp').alias('start_time'),
     hour('datetime').alias('hour'),
     dayofmonth('datetime').alias('day'),
     weekofyear('datetime').alias('week'),
     month('datetime').alias('month'),
     year('datetime').alias('year'),
     date_format('datetime', 'u').cast('int').alias('weekday')
 )

In [27]:
time_table.show(5)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|1542241826|   0| 15|  46|   11|2018|      4|
|1542242481|   0| 15|  46|   11|2018|      4|
|1542242741|   0| 15|  46|   11|2018|      4|
|1542247071|   1| 15|  46|   11|2018|      4|
|1542252577|   3| 15|  46|   11|2018|      4|
+----------+----+---+----+-----+----+-------+
only showing top 5 rows



In [28]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [142]:
users_table = df_log.select(col("userId").alias("user_id"), 
                        col("firstName").alias("first_name"), 
                        col("lastName").alias("last_name"), 
                        "gender", 
                        "level").dropDuplicates()


In [143]:
# write users table to parquet files
users_table.write.parquet(
    output_data + "users_table.parquet", mode="overwrite"
) 

In [144]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



### Building the songplays_table

In [114]:
songx_df = spark.read.parquet(output_data + 'songs_table.parquet')

In [115]:
songx_df.show(5)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



In [137]:
songx_df.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)



In [116]:
artistx_table = spark.read.parquet(output_data + 'artists_table.parquet')

In [117]:
artistx_table.show(5)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|40.79195|-73.94512|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
+------------------+--------------------+--------------------+--------+---------+
only showing top 5 rows



In [138]:
artistx_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [118]:
songx_df.createOrReplaceTempView("songs_table")

In [119]:
artistx_table.createOrReplaceTempView("artists_table")

In [120]:
time_table.createOrReplaceTempView("time_table")

In [32]:
df_log.createOrReplaceTempView("log_table")

In [33]:
spark.sql("select * from songs_table limit 5").show(5, False)

+------------------+---------------------------------------------------+------------------+----+---------+
|song_id           |title                                              |artist_id         |year|duration |
+------------------+---------------------------------------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|¿Dónde va Chichi?                                  |ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas                               |ARMBR4Y1187B9990EB|0   |241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                    |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|Pink World                                         |AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Around On Me (Taco Hell)|ARDNS031187B9924F0|2005|186.48771|
+------------------+---------------------------------------------------+------------------+----+---------+



In [34]:
spark.sql("select * from artists_table limit 5").show(5, False)

+------------------+---------------+---------------+--------+----------+
|artist_id         |name           |location       |latitude|longitude |
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|Tiny Tim       |New York, NY   |40.71455|-74.00712 |
|ARXR32B1187FB57099|Gob            |               |null    |null      |
|AROGWRA122988FEE45|Christos Dantis|               |null    |null      |
|ARBGXIG122988F409D|Steel Rain     |California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|Bitter End     |Noci (BA)      |-13.442 |-41.9952  |
+------------------+---------------+---------------+--------+----------+



In [73]:
spark.sql("select * from time_table limit 5").show(5, False)

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|1542241826|0   |15 |46  |11   |2018|4      |
|1542242481|0   |15 |46  |11   |2018|4      |
|1542242741|0   |15 |46  |11   |2018|4      |
|1542247071|1   |15 |46  |11   |2018|4      |
|1542252577|3   |15 |46  |11   |2018|4      |
+----------+----+---+----+-----+----+-------+



In [35]:
spark.sql("select * from log_table limit 5").show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+----------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId| timestamp|            datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+----------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|2018-11-15 00:30:...|
|The Prodigy|Logged In|     Ryan

In [47]:
spark.sql("select count(*) as songs from songs_table").show()

+-----+
|songs|
+-----+
|   71|
+-----+



In [48]:
spark.sql("select count(*) as artists from artists_table").show()

+-------+
|artists|
+-------+
|     69|
+-------+



In [49]:
spark.sql("select count(*) as logs from log_table").show()

+----+
|logs|
+----+
|8056|
+----+



In [97]:
songplays_table = spark.sql("""
    SELECT monotonically_increasing_id() AS songplay_id,
           time_table.start_time, 
           log_table.userId, 
           log_table.level,     
           songs_table.song_id, 
           songs_table.artist_id,            
           log_table.sessionId, 
           log_table.location, 
           log_table.userAgent,
           time_table.year, 
           time_table.month           
    FROM log_table
    JOIN songs_table 
        ON log_table.song = songs_table.title
    JOIN artists_table 
        ON log_table.artist = artists_table.name 
    JOIN time_table 
        ON time_table.start_time = log_table.timestamp
""")

In [106]:
songplays_table.limit(5).show()

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------+------+-------------+--------------------+------+----------+--------------------+------------------+--------------+------------------+----+---------+----------+----+---+----+-----+----+-------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|          song|status|           ts|           userAgent|userId| timestamp|            datetime|           song_id|         title|         artist_id|year| duration|start_time|hour|day|week|month|year|weekday|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------+------+-------------+--------------------+------+----------+--------------------+------------------+--------------+--

In [107]:
songplays_table.count()

4

In [100]:
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')

### TODO: obtener el start_time con el udf utilizado para obtener la tabla tiempo e incluir en el sql de songplays_table los campos finales:

songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent  

In [74]:
df_log.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp',
 'datetime']

In [77]:
df_log.limit(5).toPandas()['timestamp']

0    1542241826
1    1542242481
2    1542242741
3    1542247071
4    1542252577
Name: timestamp, dtype: object

In [86]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [134]:
songplayx_table = spark.read.parquet(output_data + 'songplays_table.parquet')

In [135]:
songplayx_table.count()

1

In [136]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [139]:
songplays_table.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- start_time: string (nullabl

In [5]:
input_data = "s3a://udacity-dend/"
output_data = 's3a://udacity-dend-spark/'

In [8]:
song_df = spark.read.parquet(output_data + 'songs_table.parquet')

AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'

In [6]:
songplays_table = spark.read.parquet(output_data + 'songplays_table.parquet')

AnalysisException: 'Path does not exist: s3a://udacity-dend-spark/songplays_table.parquet;'

In [None]:
songplays_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data+'songplays_table/')