In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg, max, min
from pyspark.sql.functions import sum as Fsum, row_number
from pyspark.sql import Window

import datetime

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

%matplotlib inline

Create Spark Session

In [2]:
# Note that Spark only allows one Spark context and one Spark session to be defined at any time.
# In the code below, 'GetOrCreate' will either create the Spark session or modify the existing one.

sparkSesh = (
    SparkSession.builder.appName("app Name")
    .config("config option", "config value")
    .master("local[*]")
    .getOrCreate()
)



23/08/27 21:53:40 WARN Utils: Your hostname, rambino-AERO-15-XD resolves to a loopback address: 127.0.1.1; using 192.168.2.55 instead (on interface wlp48s0)
23/08/27 21:53:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/27 21:53:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Look at parameters of Spark context

In [None]:
sparkSesh.sparkContext.getConf().getAll()

Importing a basic file

In [3]:
read_path = "./sparkify_log_small.json"
log_data = sparkSesh.read.json(read_path)

Take a look at the data

In [None]:
# log_data.head()
# log_data.take(2)

# log_data.schema()
log_data.describe()  # Similar to 'str' function (structure) in R for describing data frames

# log_data.count()

Drilling down into particular columns

In [None]:
log_data.select("artist").show()
log_data.select("artist").dropDuplicates().sort("artist").show()

More advanced drilldown

In [None]:
# log_data \
#     .where(log_data.artist != "null") \
#     .groupBy('artist') \
#     .count() \
#     .orderBy('count', ascending=False) \
#     .show()


log_data.select(["userId", "page", "song"]).where(log_data.userId == "1046").collect()

Using custom function to create a new column in data frame

In [4]:
get_hour = udf(lambda x: int(datetime.datetime.fromtimestamp(float(x) / 1000.0).hour))

# Note: this will not evaluate until data is called (lazy evaluation)
log_data = log_data.withColumn("hour", get_hour(log_data.ts).cast("Integer"))

Filtering sessions events by when users choose 'nextSong', and looking at the hour during which that happened.

In [5]:
song_hour_view = (
    log_data.filter(log_data.page == "NextSong")
    .groupBy(log_data.hour)
    .count()
    .orderBy(log_data.hour.cast("float"))
)

song_hour_view.show()

+----+-----+
|hour|count|
+----+-----+
|   0|  147|
|   1|  225|
|   2|  216|
|   3|  179|
|   4|  141|
|   5|  151|
|   6|  113|
|   7|  180|
|   8|   93|
|  23|  205|
+----+-----+



Converting to Pandas data frame
>(Question: Why use pandas over spark?) Maybe to use matplotlib

In [None]:
pd_song_hour_view = song_hour_view.toPandas()

In [None]:
plt.scatter(pd_song_hour_view["hour"], pd_song_hour_view["count"])
plt.xlim(-1, 24)
plt.ylim(0, 1.2 * max(pd_song_hour_view["count"]))
plt.xlabel("Hour")
plt.ylabel("# Songs played")

Data cleaning

In [None]:
# This will drop any records where 'userId' or 'sessionId' is missing
clean_log_data = log_data.dropna(how="any", subset=["userId", "sessionId"])

# Clean out any fields where userId is an empty string
clean_log_data = clean_log_data.filter(clean_log_data["userId"] != "")

clean_log_data.count()

### Additional Analysis: users who have upgraded their service
We'll take a look at the events before and after users decided to upgrade their subscriptions to try and find out why

In [None]:
# simple function for creating a column to flag downgrade events
flag_upgrade_event = udf(lambda x: 1 if x == "Submit Upgrade" else 0, IntegerType())

clean_log_data = clean_log_data.withColumn("upgrade", flag_upgrade_event("page"))
clean_log_data.head()

Definig a window function for partitioning data based on userId and ordering by time.

In [None]:
windowVal = (
    Window.partitionBy("userId")
    .orderBy(desc("ts"))
    .rangeBetween(Window.unboundedPreceding, 0)
)  # This including all PREVIOUS rows, but no rows after criteria.

clean_log_data = clean_log_data.withColumn("phase", Fsum("upgrade").over(windowVal))

In [None]:
clean_log_data.head()

Now, let's find a random customer who downgraded to try this on

In [None]:
clean_log_data.select(["userId"]).where(clean_log_data.page == "Submit Upgrade").show()

In [None]:
clean_log_data.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(
    clean_log_data.userId == "1232"
).sort("ts").collect()

Write data out

In [None]:
# This is cool - check out how Spark saves the file. For me, it's actually not a CSV at all, it's a FOLDER
# which has some metadata files and then multiple partitioned files. This is pretty cool - Spark is automatically
# partitioning my files, but keeping the abstraction very basic on my level.

write_path = "./sparkify_log_small.csv"
clean_log_data.write.mode(saveMode="overwrite").csv(write_path)
# clean_log_data.write.save(write_path, format = "csv", header = True,)

### Other Spark Commands & Notes:
- `Where()` (alias for `filter()`) filters rows given a certain condition
- Spark SQL offers aggregation commands like `count()`, `min()`, `max()`, `avg()`, and `countDistinct()`
  - You can also use the `agg()` command and specify multiple types of aggregations like this: `agg({"salary":"avg", "age":"max})`
- Window functions are ways of combining the values of *ranges* of rows in a dataframe. When defining the window, we can choose how to sort and group (within the `partitionBy` method) the rows and how wide of a window we'd like to use (described by `rangeBetween` or `rowsBetween`)
- [PySpark User Guide](https://spark.apache.org/docs/latest/api/python/user_guide/index.html)

## Challenges

Which page did user id "" (empty string) NOT visit?

In [None]:
# 1. Get ALL pages
pages = log_data.select("page").dropDuplicates().toPandas()["page"]

pages = list(pages)

# 2. Get pages where criteria
emptyId_pages = (
    log_data.select("page").filter("userId = ''").dropDuplicates().toPandas()["page"]
)

emptyId_pages = list(emptyId_pages)  # Converting to list

# 3. Get non-matches:
list(filter(lambda x: x not in emptyId_pages, pages))

What type of user does the empty string user id most likely refer to?

In [None]:
log_data.select(["firstname", "level", "page"]).filter("userId = ''").show()

# Looks like users who have not yet logged in to the platform.

How many female users do we have in the data set?

In [None]:
log_data.select(["gender", "userId"]).dropDuplicates().groupBy(
    log_data["gender"]
).count().show()

How many songs were played from the most played artist?

In [None]:
log_data.select("artist").groupBy(log_data["artist"]).count().orderBy(
    desc("count")
).show()

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

---

**My Solution**
How do I need to solve this problem?
- My data is a set of events - when sorted and partitioned by userID- they form a sequence of events for a particular user.
- The number of songs listened between home page visits is the sum of 'NextSong' events between home visits

**Steps to follow:**
1. Add simple 1/0 indicator to show NextSong (i.e., a song was played)
2. Find a way to partition 'time between home page visits'
   1. What if we create a column that indicates the # of home visits cumulatively (cum_sum)? Then, whenever the number changes, we know that we've visited the home page again - and each set of rows with a unique number will indicate a period between 2 home visits?
   2. We can choose to either aggregate song plays *before* or *after* home page visits as a measure of this. I think for convenience we should do 'before' though (sort by ts ascending)
3. Partition by userID and our new cum_sum column and sort by ts. sum all songplays. Average this number
   1. IDEA: Let's also average per UserID just for fun - I'm curious to see if users vary significantly in how often they visit the home page.



In [6]:
# Step 1: Add simple 1/0 indicator to show NextSong (i.e., a song was played)

# simple function for creating a column to flag downgrade events
flag_song_play = udf(lambda x: 1 if x == "NextSong" else 0, IntegerType())

songplay_log_data = log_data.withColumn("songPlay", flag_song_play("page"))
songplay_log_data.select(["page", "songPlay"]).head(10)

[Row(page='NextSong', songPlay=1),
 Row(page='NextSong', songPlay=1),
 Row(page='NextSong', songPlay=1),
 Row(page='NextSong', songPlay=1),
 Row(page='Home', songPlay=0),
 Row(page='Settings', songPlay=0),
 Row(page='NextSong', songPlay=1),
 Row(page='NextSong', songPlay=1),
 Row(page='Home', songPlay=0),
 Row(page='Home', songPlay=0)]

In [7]:
# Step #2a: Create partitions of 'between home visits' by first creating a column to show partitions
# (cumulative sum of 'home visits' per user)
flag_home_visit = udf(lambda x: 1 if x == "Home" else 0, IntegerType())

songplay_log_data = songplay_log_data.withColumn("HomeVisit", flag_home_visit("page"))
songplay_log_data.select(["page", "HomeVisit"]).head(10)

[Row(page='NextSong', HomeVisit=0),
 Row(page='NextSong', HomeVisit=0),
 Row(page='NextSong', HomeVisit=0),
 Row(page='NextSong', HomeVisit=0),
 Row(page='Home', HomeVisit=1),
 Row(page='Settings', HomeVisit=0),
 Row(page='NextSong', HomeVisit=0),
 Row(page='NextSong', HomeVisit=0),
 Row(page='Home', HomeVisit=1),
 Row(page='Home', HomeVisit=1)]

In [8]:
# Step #2b: Partition by userID and sort by ts, then cumulatively sum home visits to produce partitions
windowVal = (
    Window.partitionBy("userID")
    .orderBy(asc("userID"), asc("ts"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

songplay_log_data = songplay_log_data.withColumn(
    "homePartition", Fsum("HomeVisit").over(windowVal)
)

songplay_log_data.select(["userID", "homePartition", "page", "ts"]).show(50)

# Problems: Not all of my sessions have home visits- how do I deal with sessions which have NO home visits?
# Let's work on finishing this analysis and then I can figure that out...

+------+-------------+-----+-------------+
|userID|homePartition| page|           ts|
+------+-------------+-----+-------------+
|      |            0|Login|1513721196284|
|      |            1| Home|1513721274284|
|      |            2| Home|1513722009284|
|      |            3| Home|1513723183284|
|      |            3|Login|1513723184284|
|      |            3|Login|1513723587284|
|      |            4| Home|1513724475284|
|      |            5| Home|1513724530284|
|      |            5|Login|1513728229284|
|      |            6| Home|1513729051284|
|      |            6|Login|1513729052284|
|      |            7| Home|1513729376284|
|      |            7| Help|1513729445284|
|      |            8| Home|1513729762284|
|      |            8|Login|1513729763284|
|      |            9| Home|1513730128284|
|      |            9|Login|1513730129284|
|      |           10| Home|1513730621284|
|      |           10|Login|1513730622284|
|      |           11| Home|1513730953284|
|      |   

In [9]:
# 3. Partition by userID and our new cum_sum column and sort by ts. sum all songplays. Average this number

windowVal = (
    Window.partitionBy(["userID", "homePartition"])
    .orderBy(asc("userID"), asc("ts"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

songplay_log_data = songplay_log_data.withColumn(
    "sum_plays", Fsum("songPlay").over(windowVal)
)

songplay_log_data.select(
    ["userID", "homePartition", "page", "ts", "songPlay", "sum_plays"]
).show(1000)

+------+-------------+--------------+-------------+--------+---------+
|userID|homePartition|          page|           ts|songPlay|sum_plays|
+------+-------------+--------------+-------------+--------+---------+
|      |            0|         Login|1513721196284|       0|        0|
|      |            1|          Home|1513721274284|       0|        0|
|      |            2|          Home|1513722009284|       0|        0|
|      |            3|          Home|1513723183284|       0|        0|
|      |            3|         Login|1513723184284|       0|        0|
|      |            3|         Login|1513723587284|       0|        0|
|      |            4|          Home|1513724475284|       0|        0|
|      |            5|          Home|1513724530284|       0|        0|
|      |            5|         Login|1513728229284|       0|        0|
|      |            6|          Home|1513729051284|       0|        0|
|      |            6|         Login|1513729052284|       0|        0|
|     

In [13]:
# 3. Selecting 'max' from each partition to count the number of songs per partition:
res = songplay_log_data \
    .orderBy('UserID') \
    .groupBy('UserID','homePartition') \
    .agg(max('sum_plays').alias("songplays"))

res.show()

res.filter("songplays > 0").agg(avg('songplays')).show()


+------+-------------+---------+
|UserID|homePartition|songplays|
+------+-------------+---------+
|      |            0|        0|
|      |            1|        0|
|      |            2|        0|
|      |            3|        0|
|      |            4|        0|
|      |            5|        0|
|      |            6|        0|
|      |            7|        0|
|      |            8|        0|
|      |            9|        0|
|      |           10|        0|
|      |           11|        0|
|      |           12|        0|
|      |           13|        0|
|      |           14|        0|
|      |           15|        0|
|      |           16|        0|
|      |           17|        0|
|      |           18|        0|
|      |           19|        0|
+------+-------------+---------+
only showing top 20 rows

+-----------------+
|   avg(songplays)|
+-----------------+
|5.956678700361011|
+-----------------+



388

Instructor's solution: 

>Note: Code below is primarily from Udacity instructors with some small changes by me

In [14]:
# Step #1: Creating UDF to mark the boundaries of window function (i.e., is the record "Home"?)
ishome = udf(lambda ishome: int(ishome == "Home"), IntegerType())

In [15]:
# Step #2: Defining window as needing to partition on userId (so we only look at sessions within each user)
# then order by timestamp (so we can chronologically see session events)
# NOTE: I was wondering if it might be better to partition on 'sessionID' rather than 'userID', but the question
# is only asking for 'song plays in between home visits' and I can imagine that that is actually most relevant on a
# user level rather than a session level (i.e., a user might not need to visit home for many sessions..)
user_window = (
    Window.partitionBy("userID")
    .orderBy(asc("ts"))
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

# Note: More information on 'RangeBetween' here:
# https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.Window.rangeBetween.html
# the window is a 'traveling' window which progresses through the data set according to the variable(s) in the 'orderBy' clause

In [16]:
# Step 3: Cool! Now we're creating a subset of the data frame with ONLY the events we care about (home visits OR songplays)
# This negates the need to somehow mark songplays - if it's not home, we know what it is.
# The 'homevisit' column is marking (with 0 or 1) whether the row corresponds to a home visit. The 'period' column then uses
# this to create mini session intervals in-between home visits. Every time that 'homevisit' is 1 (and we have reached the homepage
# again), 'period' increments by 1. This means that for each user, their entire session will be broken up according to how many
# times they visit 'home', with each visit meaning that all subsequent page visits will have a unique ID (1,1,1,HOME,2,2,HOME,3,
# HOME, etc...)
# This is helpful because it allows us to then group by this new identifier and aggregate within each of these mini-sessions.

cusum = (
    log_data.filter((log_data.page == "NextSong") | (log_data.page == "Home"))
    .select("userID", "page", "ts")
    .withColumn("homevisit", ishome(col("page")))
    .withColumn("period", Fsum("homevisit").over(user_window))
)

cusum.show(
    1000
)  # Select UI option to open as text file and check out how 'period' works.

+------+--------+-------------+---------+------+
|userID|    page|           ts|homevisit|period|
+------+--------+-------------+---------+------+
|      |    Home|1513721274284|        1|     1|
|      |    Home|1513722009284|        1|     2|
|      |    Home|1513723183284|        1|     3|
|      |    Home|1513724475284|        1|     4|
|      |    Home|1513724530284|        1|     5|
|      |    Home|1513729051284|        1|     6|
|      |    Home|1513729376284|        1|     7|
|      |    Home|1513729762284|        1|     8|
|      |    Home|1513730128284|        1|     9|
|      |    Home|1513730621284|        1|    10|
|      |    Home|1513730953284|        1|    11|
|      |    Home|1513731045284|        1|    12|
|      |    Home|1513731277284|        1|    13|
|      |    Home|1513731834284|        1|    14|
|      |    Home|1513733637284|        1|    15|
|      |    Home|1513734883284|        1|    16|
|      |    Home|1513735959284|        1|    17|
|      |    Home|151

In [17]:
# Step 4: Now we have the 'period' column which has incrementing unique ids (1,1,1,2,2,3,etc.) that show the boundaries of when
# users visited 'home' during a session. Since these periods are all unique, it's possible to simply 'group' by these periods,
# and then aggregate (count) over them to show the number of songplays.
# For our purposes, we just want to know how many 'nextSong' events took place in-between home visits, so we can do the following:
# 1. First, filter only for 'nextSong' pages so we don't count 'Home' events
# 2. Group our data by userID and THEN period (period only unique within userID)
# 3. Count the number of events within these groupings. Due to our work, this will equate to the number of 'nextSong' events
# 4. Now we have the count of these events between home visits within each session, for each user.
res2 = cusum.filter((cusum.page == "NextSong"))\
.groupBy("userID", "period")\
    .agg({"period": "count"})

#This view shows our data grouped by 'userID' and 'period', and then counting the number of 'period' elements in
#each of those groups. Again, since we've filtered our data to only include 'NextSong' items, this is counting
#the number of songplays in each 'period'. Each period equates to one 'interval' between home visits, so this is
#the information we need
res2.show()

#Now we average them all:
res2.agg({"count(period)": "avg"}).show()

#Another option: average per user and order:
res2 \
    .groupBy("UserID") \
    .agg(avg("count(period)").alias('avg_plays')) \
    .orderBy(desc("avg_plays")) \
    .show()

+------+------+-------------+
|userID|period|count(period)|
+------+------+-------------+
|   100|     0|            2|
|  1000|     0|            1|
|  1003|     1|            1|
|  1019|     2|            3|
|  1020|     1|            4|
|  1025|     0|            4|
|  1035|     0|           15|
|  1037|     1|           12|
|   104|     0|            1|
|  1046|     0|            7|
|  1061|     0|            9|
|  1061|     1|            8|
|  1065|     1|            6|
|  1077|     1|            4|
|  1079|     2|           26|
|   108|     0|           14|
|  1094|     0|            1|
|  1110|     0|            2|
|  1116|     0|            1|
|  1138|     1|           37|
+------+------+-------------+
only showing top 20 rows

+------+------+-------------+
|userID|period|count(period)|
+------+------+-------------+
|  1232|     1|            7|
|  1232|     2|           57|
|  1232|     3|            3|
+------+------+-------------+

+------------------+
|avg(count(period))|
+