# Spark Data Exploration
Notebook to test data analysis and processes before transferring to "production" Python script

In [1]:
import os
import glob
from datetime import datetime

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, minute
from pyspark.sql.types import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark Local Analysis Development") \
    .getOrCreate()

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.20.29'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'Spark Local Analysis Development'),
 ('spark.app.id', 'local-1592142444521'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '65001'),
 ('spark.ui.showConsoleProgress', 'true')]

In [4]:
spark

For development of ETL process, use local data repository

In [5]:
# Walkthrough the data directory to create a list of all files
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [6]:
song_files = get_files('data/song_data')
log_files = get_files('data/log-data')

Define JSON import schemas

In [7]:
song_schema = StructType([StructField('num_songs', IntegerType(), True),
                     StructField('artist_id', StringType(), True),
                     StructField('artist_latitude', DoubleType(), True),
                     StructField('artist_longitude', DoubleType(), True),
                     StructField('artist_location', StringType(), True),
                     StructField('artist_name', StringType(),True),
                     StructField('song_id', StringType(), True),
                     StructField('title', StringType(), True),
                     StructField('duration', DecimalType(18,0), True),
                     StructField('year', IntegerType(), True)])

log_schema = StructType([StructField('artist', StringType(), True),
                     StructField('auth', StringType(), True),
                     StructField('firstName', StringType(), True),
                     StructField('gender', StringType(), True),
                     StructField('itemInSession', LongType(), True),
                     StructField('lastName', StringType(),True),
                     StructField('length', DecimalType(18,0), True),
                     StructField('level', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('method', StringType(), True),
                     StructField('page', StringType(), True),
                     StructField('registration', DoubleType(), True),
                     StructField('sessionId', LongType(), True),
                     StructField('song', StringType(), True),
                     StructField('status', LongType(), True),
                     StructField('ts', LongType(), True),
                     StructField('userAgent', StringType(), True),
                     StructField('userId', StringType(), True)])


In [8]:
song_data = spark.read.option("multiline","true").json(song_files, song_schema)
log_data = spark.read.option("multiline","true").json(log_files, log_schema)

In [9]:
song_data = song_data.dropna()

In [10]:
song_data.show()

+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+--------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|     artist_location|         artist_name|           song_id|               title|duration|year|
+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+--------+----+
|        1|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|   Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|     208|2004|
|        1|ARPBNLO1187FB3D52F|       40.71455|       -74.00712|        New York, NY|            Tiny Tim|SOAOIBZ12AB01815BE|I Hold Your Hand ...|      43|2000|
|        1|ARDNS031187B9924F0|       32.67828|       -83.22295|             Georgia|          Tim Wilson|SONYPOM12A8C13B2D7|I Think My Wife I...|     186|2005|
|        1|ARNF6401187FB57032|       40.

In [11]:
song_data.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: decimal(18,0) (nullable = true)
 |-- year: integer (nullable = true)



In [12]:
song_data.count()

31

In [13]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: decimal(18,0) (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [14]:
log_data.count()

30

In [15]:
log_data = log_data.dropna()

In [16]:
log_data.show()

+--------------------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession| lastName|length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|    Smith|   656| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|     The Grass Roots|Logged In|     Sara|     F|           72|  Johnson|   167| paid|   Winston-Salem, NC|   PU

In [17]:
song_data.show()

+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+--------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|     artist_location|         artist_name|           song_id|               title|duration|year|
+---------+------------------+---------------+----------------+--------------------+--------------------+------------------+--------------------+--------+----+
|        1|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|   Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|     208|2004|
|        1|ARPBNLO1187FB3D52F|       40.71455|       -74.00712|        New York, NY|            Tiny Tim|SOAOIBZ12AB01815BE|I Hold Your Hand ...|      43|2000|
|        1|ARDNS031187B9924F0|       32.67828|       -83.22295|             Georgia|          Tim Wilson|SONYPOM12A8C13B2D7|I Think My Wife I...|     186|2005|
|        1|ARNF6401187FB57032|       40.

Augment the dataset by creating the a new log_data column "timestamp" by converting the "ts" column to a timestamp

In [18]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
log_data = log_data.withColumn("timestamp", get_timestamp(log_data.ts))

In [19]:
log_data.show()

+--------------------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|              artist|     auth|firstName|gender|itemInSession| lastName|length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|           timestamp|
+--------------------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            Harmonia|Logged In|     Ryan|     M|            0|    Smith|   656| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 11:30:...|
|     The Grass Roots|Logged

#### SQL queries

For this project we'll use spark.sql to create our tables for export to parquet files rather than creating new dataframes

In [20]:
song_data.createOrReplaceTempView("song_table")
log_data.createOrReplaceTempView("log_table")

In [21]:
time = spark.sql('''
SELECT DISTINCT timestamp as start_time, 
hour(timestamp) as hour, 
day(timestamp) as day, 
weekofyear(timestamp) as week, 
month(timestamp) as month, 
year(timestamp) as year, 
date_format(timestamp, 'EEEE') as weekday
FROM log_table
CLUSTER BY year, month
''')

In [22]:
time.show()

+--------------------+----+---+----+-----+----+---------+
|          start_time|hour|day|week|month|year|  weekday|
+--------------------+----+---+----+-----+----+---------+
|2018-11-26 11:02:...|  11| 26|  48|   11|2018|   Monday|
|2018-11-02 12:25:...|  12|  2|  44|   11|2018|   Friday|
|2018-11-29 11:00:...|  11| 29|  48|   11|2018| Thursday|
|2018-11-11 13:33:...|  13| 11|  45|   11|2018|   Sunday|
|2018-11-07 11:01:...|  11|  7|  45|   11|2018|Wednesday|
|2018-11-22 11:03:...|  11| 22|  47|   11|2018| Thursday|
|2018-11-10 11:15:...|  11| 10|  45|   11|2018| Saturday|
|2018-11-15 11:30:...|  11| 15|  46|   11|2018| Thursday|
|2018-11-30 11:22:...|  11| 30|  48|   11|2018|   Friday|
|2018-11-16 11:00:...|  11| 16|  46|   11|2018|   Friday|
|2018-11-18 13:33:...|  13| 18|  46|   11|2018|   Sunday|
|2018-11-14 11:03:...|  11| 14|  46|   11|2018|Wednesday|
|2018-11-24 11:45:...|  11| 24|  47|   11|2018| Saturday|
|2018-11-09 11:06:...|  11|  9|  45|   11|2018|   Friday|
|2018-11-25 12

In [23]:
artist = spark.sql('''
SELECT distinct artist_id, 
artist_name,
artist_location,
artist_latitude,
artist_longitude
FROM song_table
''')

In [24]:
artist.show()

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|       40.71455|       -74.00712|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|         8.4177|       -80.11278|
|ARMBR4Y1187B9990EB|        David Martin|     California - SF|       37.77916|      -122.42005|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|       30.08615|       -94.10158|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|
|AR47JEX1187B995D81|        SUE THOMPSON|          Nevada, MO|       37.83721|       -94.35868|
|ARQ9BO41187FB5CF1F|          John Davis|        Pennsylvania|       40.99471|       -77.60454|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...

In [25]:
songs = spark.sql('''
SELECT distinct song_id, 
artist_id, 
title, 
year, 
duration
FROM song_table
CLUSTER BY year, artist_id
''')

In [26]:
songs.show()

+------------------+------------------+--------------------+----+--------+
|           song_id|         artist_id|               title|year|duration|
+------------------+------------------+--------------------+----+--------+
|SOWTBJW12AC468AC6E|ARQGYP71187FB44566|Broken-Down Merry...|   0|     152|
|SOQHXMF12AB0182363|ARGSJW91187B9B1D6B|     Young Boy Blues|   0|     219|
|SOWQTQZ12A58A7B63E|ARPFHN61187FB575F6|Streets On Fire (...|   0|     280|
|SONYPOM12A8C13B2D7|ARDNS031187B9924F0|I Think My Wife I...|2005|     186|
|SOVYKGO12AB0187199|ARH4Z031187B9A71F2|Crazy Mixed Up World|1961|     156|
|SOPVXLX12A8C1402D5|AR3JMC51187B9AE49D|    Larger Than Life|1999|     236|
|SOHUOAP12A8AE488E9|ARD842G1187B997376|            Floating|1987|     491|
|SODUJBS12A8C132150|ARI2JSK1187FB496EF|Wessex Loses a Bride|   0|     112|
|SOQVMXR12A81C21483|ARKULSX1187FB45F84|         Salt In NYC|   0|     424|
|SOFCHDR12AB01866EF|AREVWGE1187B9B890A|         Living Hell|   0|     282|
|SOGXHEG12AB018653E|AR0RC

In [27]:
users = spark.sql('''
SELECT log_table.userId as user_id, log_table.firstname as first_name, log_table.lastname as last_name, log_table.gender, log_table.level
FROM (
    SELECT userId, max(ts) as ts
    FROM log_table
    GROUP BY userId) AS unique_users
INNER JOIN log_table
ON log_table.userId = unique_users.userId AND log_table.ts = unique_users.ts
''')

In [28]:
users.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     95|      Sara|  Johnson|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     91|    Jayden|     Bell|     M| free|
|     44|    Aleena|    Kirby|     F| paid|
|     25|    Jayden|   Graves|     M| paid|
|     26|      Ryan|    Smith|     M| free|
|     73|     Jacob|    Klein|     M| paid|
|     88|  Mohammad|Rodriguez|     M| paid|
|     66|     Kevin| Arellano|     M| free|
|     42|    Harper|  Barrett|     M| paid|
|     37|    Jordan|    Hicks|     F| free|
|     97|      Kate|  Harrell|     F| paid|
|    101|    Jayden|      Fox|     M| free|
|     83|   Stefany|    White|     F| free|
|     49|     Chloe|   Cuevas|     F| paid|
|     15|      Lily|     Koch|     F| paid|
|     96|    Cierra|   Finley|     F| free|
|     69|  Anabelle|  Simpson|     F| free|
|     76|    Jayden|    Duffy|     F| free|
+-------+----------+---------+--

In [29]:
songplays_table = spark.sql('''
                        SELECT log.timestamp as start_time, log.userId, log.level, song.song_id, song.artist_id, log.sessionId, log.location, log.userAgent
                        FROM log_table as log
                        LEFT JOIN song_table as song 
                        ON log.song = song.title
                        AND log.artist = song.artist_name
                        AND log.length = song.duration
                        WHERE log.page = 'NextSong'
                        CLUSTER BY year(start_time), month(start_time)
                        ''')
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id()) # add unique songplay_id

In [30]:
songplays_table.show()

+--------------------+------+-----+-------+---------+---------+--------------------+--------------------+-------------+
|          start_time|userId|level|song_id|artist_id|sessionId|            location|           userAgent|  songplay_id|
+--------------------+------+-----+-------+---------+---------+--------------------+--------------------+-------------+
|2018-11-15 11:30:...|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|1382979469312|
|2018-11-14 11:03:...|    95| paid|   null|     null|      411|   Winston-Salem, NC|"Mozilla/5.0 (iPh...|1382979469313|
|2018-11-28 11:00:...|    80| paid|   null|     null|      992|Portland-South Po...|"Mozilla/5.0 (Mac...|1382979469314|
|2018-11-05 11:33:...|    69| free|   null|     null|      256|Philadelphia-Camd...|"Mozilla/5.0 (Mac...|1382979469315|
|2018-11-30 11:22:...|    91| free|   null|     null|      829|Dallas-Fort Worth...|Mozilla/5.0 (comp...|1382979469316|
|2018-11-16 11:00:...|    44| paid|   nu

### Output to Parquet Files

To test the process for partitioning and outputting parquet files, use the local directory.  In production this will be replaced by S3.

In [31]:
songs.write.partitionBy("year","artist_id").mode("overwrite").parquet("songs")

In [32]:
time.write.partitionBy("year","month").mode("overwrite").parquet("time")

In [33]:
artist.write.mode("overwrite").parquet("artist")

In [34]:
users.write.mode("overwrite").parquet("users")

Demonstrate being able to partition by columns that aren't part of the table schema.  Here we'll partition by minute as a ridiculous partition that isn't part of the schema but can be extracted from the timestamp

In [35]:
time.withColumn("minute",minute('start_time')).write.mode("overwrite").partitionBy("minute").parquet("time_minute")

Read in the partitioned time parquet file using defaul read.parquet

In [36]:
test = spark.read.parquet('time_minute')

In [37]:
test.show()

+--------------------+----+---+----+-----+----+---------+------+
|          start_time|hour|day|week|month|year|  weekday|minute|
+--------------------+----+---+----+-----+----+---------+------+
|2018-11-11 13:33:...|  13| 11|  45|   11|2018|   Sunday|    33|
|2018-11-18 13:33:...|  13| 18|  46|   11|2018|   Sunday|    33|
|2018-11-05 11:33:...|  11|  5|  45|   11|2018|   Monday|    33|
|2018-11-29 11:00:...|  11| 29|  48|   11|2018| Thursday|     0|
|2018-11-16 11:00:...|  11| 16|  46|   11|2018|   Friday|     0|
|2018-11-20 11:00:...|  11| 20|  47|   11|2018|  Tuesday|     0|
|2018-11-28 11:00:...|  11| 28|  48|   11|2018|Wednesday|     0|
|2018-11-07 11:01:...|  11|  7|  45|   11|2018|Wednesday|     1|
|2018-11-15 11:30:...|  11| 15|  46|   11|2018| Thursday|    30|
|2018-11-10 11:15:...|  11| 10|  45|   11|2018| Saturday|    15|
|2018-11-24 11:45:...|  11| 24|  47|   11|2018| Saturday|    45|
|2018-11-27 11:52:...|  11| 27|  48|   11|2018|  Tuesday|    52|
|2018-11-02 12:25:...|  1

Use the wildcard symbol for the partition folder, in this instance we only have 1 partition, so to skip having minute added as a column to our schema use ' time_minute / * '

In [38]:
test2 = spark.read.parquet('time_minute/*')

In [39]:
test2.show()

+--------------------+----+---+----+-----+----+---------+
|          start_time|hour|day|week|month|year|  weekday|
+--------------------+----+---+----+-----+----+---------+
|2018-11-11 13:33:...|  13| 11|  45|   11|2018|   Sunday|
|2018-11-18 13:33:...|  13| 18|  46|   11|2018|   Sunday|
|2018-11-05 11:33:...|  11|  5|  45|   11|2018|   Monday|
|2018-11-29 11:00:...|  11| 29|  48|   11|2018| Thursday|
|2018-11-16 11:00:...|  11| 16|  46|   11|2018|   Friday|
|2018-11-20 11:00:...|  11| 20|  47|   11|2018|  Tuesday|
|2018-11-28 11:00:...|  11| 28|  48|   11|2018|Wednesday|
|2018-11-07 11:01:...|  11|  7|  45|   11|2018|Wednesday|
|2018-11-24 11:45:...|  11| 24|  47|   11|2018| Saturday|
|2018-11-10 11:15:...|  11| 10|  45|   11|2018| Saturday|
|2018-11-15 11:30:...|  11| 15|  46|   11|2018| Thursday|
|2018-11-27 11:52:...|  11| 27|  48|   11|2018|  Tuesday|
|2018-11-23 11:07:...|  11| 23|  47|   11|2018|   Friday|
|2018-11-09 11:06:...|  11|  9|  45|   11|2018|   Friday|
|2018-11-02 12

Back to the correct table schema on import