# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

### Metrics to consider:
- Monthly active users
- Daily active users in past month
- Total paid and unpaid users
- Total ads served in the past month
- Cohort per Month - % of users cancelled, % of users upgrades


# Import Libraries

In [346]:
# import libraries
import seaborn as sns
import pandas as pd
import numpy as np
import datetime # missing and together with "pip install --upgrade pyspark" were causing issues with time series
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
from plotly.offline import iplot
import plotly.graph_objects as go
import plotly.express as px
import warnings
warnings.filterwarnings("ignore")
import cufflinks as cf
cf.go_offline()
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [2]:
# create a Spark session
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [3]:
spark

In [4]:
print(spark.catalog.listTables())

[]


In [5]:
pwd

'/home/freemo/Projects/SparkProject'

In [6]:
mini = '/home/freemo/Projects/largeData/mini_sparkify_event_data.json'
medium = '/home/freemo/Projects/largeData/medium_sparkify_event_data.json'

In [7]:
df = spark.read.json(mini)

In [8]:
df.take(1)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')]

In [9]:
df.show(5)

+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|          artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|  Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Boston-Cambridge-...|   PUT|NextSong|1538331630000| 

In [10]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
# check columns with Null values
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 58392|   0|     8346|  8346|            0|    8346| 58392|    0|    8346|     0|   0|        8346|        0|58392|     0|  0|     8346|     0|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



In [12]:
# missing values in userID
df.select([count(when(isnan('userID'),True))]).show()

+--------------------------------------------+
|count(CASE WHEN isnan(userID) THEN true END)|
+--------------------------------------------+
|                                           0|
+--------------------------------------------+



In [13]:
# missing values in sessionID
df.select([count(when(isnan('sessionID'),True))]).show()

+-----------------------------------------------+
|count(CASE WHEN isnan(sessionID) THEN true END)|
+-----------------------------------------------+
|                                              0|
+-----------------------------------------------+



In [None]:
# get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
# df = df.withColumn("hour", get_hour(df.ts))
# songs_in_hour = df.filter(df.page == "NextSong").groupBy(df.hour).count().orderBy(df.hour.cast("float"))
# songs_in_hour.show()

# songs_in_hour_pd = songs_in_hour.toPandas()
# songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [153]:
# create udf (user defined functions) for workdays and hour columns
day_name= ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday','Sunday']
day_ind = [1,2,3,4,5,6,7]
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).hour, IntegerType() )
get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).day, IntegerType() )
get_wkday = udf(lambda x: day_name[datetime.datetime.fromtimestamp(x/1000).weekday()] )
get_month = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).month, IntegerType() )

In [154]:
# create new columns based on ts column for analyses
df = df.withColumn('hour', get_hour('ts'))
df = df.withColumn('day', get_day('ts'))
df = df.withColumn('workday_', get_wkday('ts'))
df = df.withColumn('month', get_month('ts'))

In [155]:
df.select('hour', 'day', 'workday_', 'month').show(7)

+----+---+--------+-----+
|hour|day|workday_|month|
+----+---+--------+-----+
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
|   3|  1|  Monday|   10|
+----+---+--------+-----+
only showing top 7 rows



In [None]:
# df.groupBy('ts_hour').count().show()

In [None]:
# df.groupBy('ts_day').count().show(31)

In [17]:
user_log_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

In [18]:
user_log_valid.count()

286500

In [19]:
df.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
+------+
only showing top 20 rows



In [20]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [21]:
user_log_valid.count() - 286500

-8346

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [22]:
# check the page column
df.select("page").dropDuplicates().sort("page").show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
+--------------------+
only showing top 20 rows



In [23]:
# How many songs do users listen to on average between visiting the home page? 

function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 23.60389072847682|
+------------------+



In [24]:
# top 5 played artist
df.filter(df.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artistcount') \
    .sort(desc('Artistcount')) \
    .show(5)

+--------------------+-----------+
|              Artist|Artistcount|
+--------------------+-----------+
|       Kings Of Leon|       1841|
|            Coldplay|       1813|
|Florence + The Ma...|       1236|
|       Dwight Yoakam|       1135|
|            BjÃÂ¶rk|       1133|
+--------------------+-----------+
only showing top 5 rows



In [26]:
# top 5 played songs
df.filter(df.page == 'NextSong') \
    .select('song') \
    .groupBy('song') \
    .agg({'song':'count'}) \
    .withColumnRenamed('count(song)', 'SongCount') \
    .sort(desc('Songcount')) \
    .show(5)

+--------------------+---------+
|                song|SongCount|
+--------------------+---------+
|      You're The One|     1153|
|                Undo|     1026|
|             Revelry|      854|
|       Sehr kosmisch|      728|
|Horn Concerto No....|      641|
+--------------------+---------+
only showing top 5 rows



In [27]:
# check a random user
df.select(["userId", "firstname", "page", "level", "song"]).where(df.userId == 98).collect()

[Row(userId='98', firstname='Sawyer', page='Home', level='free', song=None),
 Row(userId='98', firstname='Sawyer', page='Add Friend', level='free', song=None),
 Row(userId='98', firstname='Sawyer', page='Add Friend', level='free', song=None),
 Row(userId='98', firstname='Sawyer', page='Home', level='free', song=None),
 Row(userId='98', firstname='Sawyer', page='NextSong', level='free', song="They Can't Take That Away From Me (1999 Digital Remaster)"),
 Row(userId='98', firstname='Sawyer', page='NextSong', level='free', song='Little Yellow Spider'),
 Row(userId='98', firstname='Sawyer', page='NextSong', level='free', song='Walk Through Hell (featuring Max Bemis Acoustic Exclusive)'),
 Row(userId='98', firstname='Sawyer', page='Thumbs Down', level='free', song=None),
 Row(userId='98', firstname='Sawyer', page='NextSong', level='free', song="Baby Girl_ I'm A Blur"),
 Row(userId='98', firstname='Sawyer', page='Thumbs Down', level='free', song=None),
 Row(userId='98', firstname='Sawyer', pa

In [28]:
# create a udf to flag Churned customers
flag_downgrade_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [29]:
# create new column "churn"
df = df.withColumn("churn", flag_downgrade_event("page"))

In [30]:
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', hour=3, day=1, workday='Monday', month=10, churn=0)

In [31]:
# add a staging: stage=1 if the user is paid, stage=2 if the user churned
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [32]:
# create new column stage to assign values
df = df.withColumn("phase", Fsum("churn").over(windowval))

User 39 played the biggest number of songs. Let's check if that matters by going through it's records.

In [33]:
# check result
df.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(df.userId == "39").sort("ts").collect()

[Row(userId='39', firstname='Payton', ts=1538375748000, page='Home', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538375752000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538375952000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538376332000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538376376000, page='Roll Advert', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538376557000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538376754000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538377032000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538377079000, page='Roll Advert', level='free', phase=0),
 Row(userId='39', firstname='Payton', ts=1538377451000, page='NextSong', level='free', phase=0),
 Row(userId='39', firstname=

User 39 started as free, but after some decent amount of songs listened it became a paid member. 

In [34]:
# check the number of songs played per user
df.where(df.song != 'null').groupBy(['churn','userId']) \
    .agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

+-----+------+-----------+
|churn|userId|SongsPlayed|
+-----+------+-----------+
|    0|    39|       8002|
|    0|    92|       5945|
|    0|   140|       5664|
|    0|300011|       4619|
|    0|   124|       4079|
|    0|300021|       3816|
|    0|300017|       3632|
|    0|    85|       3616|
|    0|    42|       3573|
|    0|     6|       3159|
|    0|    29|       3028|
|    0|200023|       2955|
|    0|    54|       2841|
|    0|   100|       2682|
|    0|     9|       2676|
|    0|    91|       2580|
|    0|   126|       2577|
|    0|300015|       2524|
|    0|    98|       2401|
|    0|    74|       2400|
+-----+------+-----------+
only showing top 20 rows



### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [35]:
# split by gender 
df.dropDuplicates(["userId"]).groupBy(['gender','churn']).count().show()

+------+-----+-----+
|gender|churn|count|
+------+-----+-----+
|     F|    0|  104|
|     M|    0|  121|
|  null|    0|    1|
+------+-----+-----+



In [36]:
df.dropDuplicates(['userId']).groupBy(['song','churn']).count().show()

+--------------------+-----+-----+
|                song|churn|count|
+--------------------+-----+-----+
|Welcome To The Wo...|    0|    1|
|               Alice|    0|    1|
|Will You (Single ...|    0|    1|
|      You're The One|    0|    2|
|He's The DJ_ I'm ...|    0|    1|
|Taking Control (A...|    0|    1|
|         The Shining|    0|    1|
|Still Smokin (D L...|    0|    1|
|  Glitter In The Air|    0|    1|
|      Little Darlin'|    0|    1|
|Phoenix (Basement...|    0|    1|
|                FACK|    0|    1|
|        Jernskjorten|    0|    1|
|          Yesterdays|    0|    1|
|Beyond The Beyond...|    0|    1|
|          Quiet Life|    0|    1|
|Judgement Begins ...|    0|    1|
|Citizens Of Tomorrow|    0|    1|
|          Hammerhart|    0|    1|
|     I Need A Dollar|    0|    1|
+--------------------+-----+-----+
only showing top 20 rows



In [37]:
df.where(df.song != 'null').groupBy(['churn','userId']) \
    .agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

+-----+------+-----------+
|churn|userId|SongsPlayed|
+-----+------+-----------+
|    0|    39|       8002|
|    0|    92|       5945|
|    0|   140|       5664|
|    0|300011|       4619|
|    0|   124|       4079|
|    0|300021|       3816|
|    0|300017|       3632|
|    0|    85|       3616|
|    0|    42|       3573|
|    0|     6|       3159|
|    0|    29|       3028|
|    0|200023|       2955|
|    0|    54|       2841|
|    0|   100|       2682|
|    0|     9|       2676|
|    0|    91|       2580|
|    0|   126|       2577|
|    0|300015|       2524|
|    0|    98|       2401|
|    0|    74|       2400|
+-----+------+-----------+
only showing top 20 rows



In [38]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- workday: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- churn: integer (nullable = true)
 |-- phase: long (nullable = true)



In [None]:
# plot locations 
# On which days do people use sparkify more?
# What are the most active hours?
# Songs played per user group - churned/not churned
# Which days of the month are more active?
# Which subscription level is more likely to churn?
# Do activity times have effect over churn?
# https://hendra-herviawan.github.io/pyspark-groupBy-and-aggregate-functions.html
# https://github.com/ihsankose/Udacity-Capstone/blob/master/Sparkify.ipynb
# https://nbviewer.jupyter.org/github/elifinspace/sparkify/blob/master/Sparkify_final_.ipynb

1. Try to aggregate based on songs and month
2. Plot some EDA based on notes
3. Clean data set for modeling (refer to previous projects with ETL pipelines)
4. Model based on DataCamp course (if possible|)

In [39]:
# plot location
plot = df.groupBy('location', "churn").count().dropDuplicates().sort('count', ascending=False).toPandas()

In [40]:
fig = px.bar(plot, x='location', y='count')
fig.show()

In [41]:
# create state column
df = df.withColumn("state", df.location.substr(-2,2))

In [42]:
df.select('state').show()

+-----+
|state|
+-----+
|   CA|
|   NH|
|   CA|
|   NH|
|   CA|
|   NH|
|   NH|
|   CA|
|   CA|
|   CA|
|   NH|
|   NH|
|   CA|
|   NH|
|   NH|
|   CA|
|   NH|
|   FL|
|   CA|
|   NH|
+-----+
only showing top 20 rows



In [43]:
df.groupBy("state").count().sort("count", ascending=False).show(5)

+-----+-----+
|state|count|
+-----+-----+
|   CA|46771|
|   PA|26607|
|   TX|23494|
|   NH|18637|
|   FL|13190|
+-----+-----+
only showing top 5 rows



In [44]:
# convert to pandas
state = df.groupBy("state", "churn").count().sort("count", ascending=False).toPandas()

In [45]:
fig = px.bar(state, x='state', y='count')
fig.show()

In [46]:
# check state by churn
churn = df.filter(df.churn == 1)
churn.groupBy("state", "churn").count().sort("count", ascending=False).show(5)

+-----+-----+-----+
|state|churn|count|
+-----+-----+-----+
|   CA|    1|    6|
|   PA|    1|    5|
|   MD|    1|    3|
|   TX|    1|    3|
|   MI|    1|    3|
+-----+-----+-----+
only showing top 5 rows



In [47]:
# plot churn by state
churned_state = churn.groupBy("state", "churn").count().sort("count", ascending=False).toPandas()
fig = px.bar(churned_state, x='state', y='count')
fig.show()

In [48]:
# churn by workday
temp = churn.groupBy('workday').count().sort('count').toPandas()
fig = px.bar(temp, x='workday', y='count', text="count")
fig.update_layout(title_text='What are the most active days for churn?')
fig.show()

In [49]:
churn.groupBy('song', "churn").count().dropDuplicates().sort('count', ascending=False).show()

+----+-----+-----+
|song|churn|count|
+----+-----+-----+
|null|    1|   52|
+----+-----+-----+



**No songs available for churned users**

In [50]:
# what are the most active days?
temp = df.groupBy('workday').count().sort('count').toPandas()
fig = px.bar(temp, x='workday', y='count', text="count")
fig.update_layout(title_text='What are the most active days?')
fig.show()

In [51]:
# What are the most active hours?
temp = df.groupBy('hour').count().sort('hour').toPandas()
fig = px.bar(temp, x='hour', y='count', text='count')
fig.update_layout(title_text='What are the most active hours?')
fig.show()

In [52]:
# What are the most active hours for churn?
temp = churn.groupBy('hour').count().sort('hour').toPandas()
fig = px.bar(temp, x='hour', y='count', text='count')
fig.update_layout(title_text='What are the most active hours for churn?')
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside')
fig.show()

In [53]:
# What are the most active days during the month?
temp = df.groupBy('day').count().sort('day').toPandas()
fig = px.bar(temp, x='day', y='count', text='count')
fig.update_layout(title_text='What are the most active days during the month?')
fig.show()

In [54]:
# What are the most active days during the month for churn?
temp = churn.groupBy('day').count().sort('day').toPandas()
fig = px.bar(temp, x='day', y='count', text='count')
fig.update_layout(title_text='What are the most active days during the month?')
fig.show()

In [55]:
# How people are split per gender?
temp = df.groupBy('gender').count().sort('gender').toPandas()
fig = px.bar(temp, x='gender', y='count', text='count')
fig.update_layout(title_text='Gender split')
fig.show()

In [56]:
# How people are split per gender?
temp = churn.groupBy('gender').count().sort('gender').toPandas()
fig = px.bar(temp, x='gender', y='count', text='count')
fig.update_layout(title_text='Gender split (churn)')
fig.show()

Eventhough more users are female, it turns out that Men are churning the most.

### Based on the EDA I would choose the below features:
- state
- workday
- day
- songs played
- gender
- level

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [302]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- workday: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- churn: integer (nullable = true)
 |-- phase: long (nullable = true)
 |-- state: string (nullable = true)
 |-- workday_: string (nullable = true)



In [303]:
df.select('gender', 'userId', 'workday_', 'workday', 'day', 'churn', 'state').show()

+------+------+--------+-------+---+-----+-----+
|gender|userId|workday_|workday|day|churn|state|
+------+------+--------+-------+---+-----+-----+
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     M|    30|  Monday|      1|  1|    0|   CA|
|     M|     9|  Monday|      1|  1|    0|   NH|
|     F|    74|  Mon

In [None]:
# create state column
# df = df.withColumn("state", df.location.substr(-2,2))

# create udf (user defined functions) for workdays and hour columns
#day_name= ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday','Sunday']
#day_ind = [1,2,3,4,5,6,7]
#get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).hour, IntegerType() )
#get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).day, IntegerType() )
#get_wkday = udf(lambda x: day_name[datetime.datetime.fromtimestamp(x/1000).weekday()] )
#get_month = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).month, IntegerType() )

# create new columns based on ts column for analyses
#df = df.withColumn('hour', get_hour('ts'))
#df = df.withColumn('day', get_day('ts'))
#df = df.withColumn('workday', get_wkday('ts'))
#df = df.withColumn('month', get_month('ts'))

# check the number of songs played per user
#df.where(df.song != 'null').groupBy(['churn','userId']) \
    #.agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

### Feature 1 - gender

In [358]:
# gender column
gender = df.dropDuplicates(['userId']).sort('userId').select(['userId','gender'])
gender = gender.filter(gender["userId"] != "")
gender = gender.replace(['M','F'], ['1', '0'], 'gender')
gender = gender.withColumn('gender', gender.gender.cast("int"))

In [305]:
gender.take(1)

[Row(userId='10', gender=1)]

### Feature 2 - workday

In [359]:
# convert workdays in digits
get_wkday = udf(lambda x: day_ind[datetime.datetime.fromtimestamp(x/1000).weekday()], IntegerType() )
df = df.withColumn('workday', get_wkday('ts'))

In [307]:
df.take(1)

[Row(artist=None, auth='Logged In', firstName='Darianna', gender='F', itemInSession=34, lastName='Carpenter', length=None, level='free', location='Bridgeport-Stamford-Norwalk, CT', method='PUT', page='Logout', registration=1538016340000, sessionId=187, song=None, status=307, ts=1542823952000, userAgent='"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53"', userId='100010', hour=20, day=21, workday=3, month=11, churn=0, phase=0, state='CT', workday_='Wednesday')]

In [360]:
# workday column
workday = df.dropDuplicates(['userId']).sort('userId').select(['userId','workday'])
workday = workday.filter(workday["userId"] != "")

In [311]:
workday.take(1)

[Row(userId='10', workday=1)]

### Feature 3 - day

In [361]:
# day column
day = df.dropDuplicates(['userId']).sort('userId').select(['userId','day'])
day = day.filter(day["userId"] != "")

In [319]:
day.take(1)

[Row(userId='10', day=8)]

### Feature 4 - state

In [362]:
# state column
state = df.dropDuplicates(['userId']).sort('userId').select(['userId','state'])
state = state.filter(state["userId"] != "")

In [321]:
state.take(1)

[Row(userId='10', state='MS')]

### Feature 5 - SongsPlayed

In [363]:
# songs played column
songs_played = df.where(df.song!='null').groupby('userId') \
    .agg(count(df.song).alias('SongsPlayed')).orderBy('userId').select(['userId','SongsPlayed'])
songs_played = songs_played.filter(songs_played["userId"] != "")

In [323]:
songs_played.take(1)

[Row(userId='10', SongsPlayed=673)]

### Features data frame

In [364]:
# create features data frame
model_data = df.dropDuplicates(['userId']).select(['userId','churn'])
model_data = model_data.filter(state["userId"] != "")

In [353]:
model_data.take(1)

[Row(userId='100010', churn=0)]

In [365]:
# convert column churn to label
# model_data = model_data.withColumn("label", model_data.churn)
model_data = model_data.withColumnRenamed("churn", "label")

In [327]:
model_data.take(1)

[Row(userId='100010', label=0)]

In [366]:
# join all features into one data frame
model_data = model_data.join(gender, 'userId')
model_data = model_data.join(workday, 'userId')
model_data = model_data.join(day, 'userId')
#model_data = model_data.join(state, 'userId')
model_data = model_data.join(songs_played, 'userId')

In [367]:
model_data.show()

+------+-----+------+-------+---+-----------+
|userId|label|gender|workday|day|SongsPlayed|
+------+-----+------+-------+---+-----------+
|100010|    0|     0|      1|  8|        275|
|200002|    0|     1|      1|  1|        387|
|   125|    0|     1|      5| 12|          8|
|   124|    0|     0|      1|  1|       4079|
|    51|    0|     1|      1|  1|       2111|
|     7|    0|     1|      3|  3|        150|
|    15|    0|     1|      1|  1|       1914|
|    54|    0|     0|      1|  1|       2841|
|   155|    0|     0|      5|  2|        820|
|100014|    0|     1|      3| 10|        257|
|   132|    0|     0|      2|  2|       1928|
|   154|    0|     0|      5|  2|         84|
|   101|    0|     1|      1|  1|       1797|
|    11|    0|     0|      5|  5|        647|
|   138|    0|     1|      6|  6|       2070|
|300017|    0|     0|      1|  1|       3632|
|100021|    0|     1|      4| 11|        230|
|    29|    0|     1|      2|  2|       3028|
|    69|    0|     0|      3|  3| 

In [368]:
# check column types
model_data.dtypes

[('userId', 'string'),
 ('label', 'int'),
 ('gender', 'int'),
 ('workday', 'int'),
 ('day', 'int'),
 ('SongsPlayed', 'bigint')]

### Covert columns to integers for modeling

In [369]:
# convert string columns to integers
model_data = model_data.withColumn("userId", model_data.userId.cast("integer"))

In [370]:
model_data.dtypes

[('userId', 'int'),
 ('label', 'int'),
 ('gender', 'int'),
 ('workday', 'int'),
 ('day', 'int'),
 ('SongsPlayed', 'bigint')]

In [374]:
# remove any missing values
model_data = model_data.filter("userId is not NULL \
and label is not NULL \
and gender is not NULL \
and workday is not NULL \
and day is not NULL \
and SongsPlayed is not NULL")

In [372]:
# remove any missing values
model_data = model_data.filter("userId is not NULL \
and churn is not NULL \
and gender is not NULL \
and workday is not NULL \
and day is not NULL \
and state is not NULL \
and SongsPlayed is not NULL")

AnalysisException: cannot resolve '`churn`' given input columns: [SongsPlayed, day, gender, label, userId, workday]; line 1 pos 23;
'Filter (((isnotnull(userId#162872) AND isnotnull('churn)) AND (isnotnull(gender#162446) AND isnotnull(workday#162450))) AND ((isnotnull(day#55907) AND isnotnull('state)) AND isnotnull(SongsPlayed#162510L)))
+- Project [cast(userId#40 as int) AS userId#162872, label#162517, gender#162446, workday#162450, day#55907, SongsPlayed#162510L]
   +- Project [userId#40, label#162517, gender#162446, workday#162450, day#55907, SongsPlayed#162510L]
      +- Join Inner, (userId#40 = userId#162603)
         :- Project [userId#40, label#162517, gender#162446, workday#162450, day#55907]
         :  +- Join Inner, (userId#40 = userId#162580)
         :     :- Project [userId#40, label#162517, gender#162446, workday#162450]
         :     :  +- Join Inner, (userId#40 = userId#162558)
         :     :     :- Project [userId#40, label#162517, gender#162446]
         :     :     :  +- Join Inner, (userId#40 = userId#162537)
         :     :     :     :- Project [userId#40, churn#905 AS label#162517]
         :     :     :     :  +- Filter NOT (userId#40 = )
         :     :     :     :     +- Project [userId#40, churn#905]
         :     :     :     :        +- Deduplicate [userId#40]
         :     :     :     :           +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#162450, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :              +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#116412, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                 +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#61170, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                    +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#59628, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                       +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#57331, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                          +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#57222, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                             +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#57110, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, <lambda>(ts#38L) AS workday#56140, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                   +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, workday#3683, <lambda>(ts#38L) AS month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                      +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                         +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55879, <lambda>(ts#38L) AS day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                            +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, <lambda>(ts#38L) AS hour#55879, day#55744, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                               +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55717, day#55744, workday#3683, <lambda>(ts#38L) AS month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                                  +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55717, day#55744, workday#3683, month#3630, churn#905, phase#953L, ... 2 more fields]
         :     :     :     :                                                     +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#55717, <lambda>(ts#38L) AS day#55744, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :     :                                                        +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, <lambda>(ts#38L) AS hour#55717, day#3576, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :     :                                                           +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3549, day#3576, <lambda>(ts#38L) AS workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :     :                                                              +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3549, day#3576, workday#3603, <lambda>(ts#38L) AS month#3630, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                 +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3549, day#3576, <lambda>(ts#38L) AS workday#3603, month#3496, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                    +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3549, <lambda>(ts#38L) AS day#3576, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                       +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, <lambda>(ts#38L) AS hour#3549, day#3442, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                          +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3415, day#3442, workday#3469, <lambda>(ts#38L) AS month#3496, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                             +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3415, day#3442, <lambda>(ts#38L) AS workday#3469, month#438, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                                +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#3415, <lambda>(ts#38L) AS day#3442, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                                   +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, <lambda>(ts#38L) AS hour#3415, day#393, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :     :     :                                                                                      +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, substring(location#31, -2, 2) AS state#1725]
         :     :     :     :                                                                                         +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, month#438, churn#905, phase#953L]
         :     :     :     :                                                                                            +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, phase#953L]
         :     :     :     :                                                                                               +- Window [sum(cast(churn#905 as bigint)) windowspecdefinition(userId#40, ts#38L DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS phase#953L], [userId#40], [ts#38L DESC NULLS LAST]
         :     :     :     :                                                                                                  +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, month#438, churn#905]
         :     :     :     :                                                                                                     +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, month#438, <lambda>(page#33) AS churn#905]
         :     :     :     :                                                                                                        +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, workday#415, <lambda>(ts#38L) AS month#438]
         :     :     :     :                                                                                                           +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, day#393, <lambda>(ts#38L) AS workday#415]
         :     :     :     :                                                                                                              +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, hour#372, <lambda>(ts#38L) AS day#393]
         :     :     :     :                                                                                                                 +- Project [artist#23, auth#24, firstName#25, gender#26, itemInSession#27L, lastName#28, length#29, level#30, location#31, method#32, page#33, registration#34L, sessionId#35L, song#36, status#37L, ts#38L, userAgent#39, userId#40, <lambda>(ts#38L) AS hour#372]
         :     :     :     :                                                                                                                    +- Relation[artist#23,auth#24,firstName#25,gender#26,itemInSession#27L,lastName#28,length#29,level#30,location#31,method#32,page#33,registration#34L,sessionId#35L,song#36,status#37L,ts#38L,userAgent#39,userId#40] json
         :     :     :     +- Project [userId#162537, cast(gender#162443 as int) AS gender#162446]
         :     :     :        +- Project [userId#162537, CASE WHEN (gender#162523 = F) THEN cast(0 as string) WHEN (gender#162523 = M) THEN cast(1 as string) ELSE gender#162523 END AS gender#162443]
         :     :     :           +- Filter NOT (userId#162537 = )
         :     :     :              +- Project [userId#162537, gender#162523]
         :     :     :                 +- Sort [userId#162537 ASC NULLS FIRST], true
         :     :     :                    +- Deduplicate [userId#162537]
         :     :     :                       +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#116412, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                          +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#61170, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                             +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#59628, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#57331, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                   +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#57222, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                      +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#57110, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                         +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, <lambda>(ts#162535L) AS workday#56140, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                            +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, workday#3683, <lambda>(ts#162535L) AS month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                               +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                                  +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55879, <lambda>(ts#162535L) AS day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                                     +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, <lambda>(ts#162535L) AS hour#55879, day#55744, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                                        +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55717, day#55744, workday#3683, <lambda>(ts#162535L) AS month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                                           +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55717, day#55744, workday#3683, month#3630, churn#905, phase#953L, ... 2 more fields]
         :     :     :                                                              +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#55717, <lambda>(ts#162535L) AS day#55744, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :                                                                 +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, <lambda>(ts#162535L) AS hour#55717, day#3576, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :                                                                    +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3549, day#3576, <lambda>(ts#162535L) AS workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :     :                                                                       +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3549, day#3576, workday#3603, <lambda>(ts#162535L) AS month#3630, churn#905, phase#953L, state#1725]
         :     :     :                                                                          +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3549, day#3576, <lambda>(ts#162535L) AS workday#3603, month#3496, churn#905, phase#953L, state#1725]
         :     :     :                                                                             +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3549, <lambda>(ts#162535L) AS day#3576, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :     :                                                                                +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, <lambda>(ts#162535L) AS hour#3549, day#3442, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :     :                                                                                   +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3415, day#3442, workday#3469, <lambda>(ts#162535L) AS month#3496, churn#905, phase#953L, state#1725]
         :     :     :                                                                                      +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3415, day#3442, <lambda>(ts#162535L) AS workday#3469, month#438, churn#905, phase#953L, state#1725]
         :     :     :                                                                                         +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#3415, <lambda>(ts#162535L) AS day#3442, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :     :                                                                                            +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, <lambda>(ts#162535L) AS hour#3415, day#393, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :     :                                                                                               +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, substring(location#162528, -2, 2) AS state#1725]
         :     :     :                                                                                                  +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, month#438, churn#905, phase#953L]
         :     :     :                                                                                                     +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, phase#953L]
         :     :     :                                                                                                        +- Window [sum(cast(churn#905 as bigint)) windowspecdefinition(userId#162537, ts#162535L DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS phase#953L], [userId#162537], [ts#162535L DESC NULLS LAST]
         :     :     :                                                                                                           +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, month#438, churn#905]
         :     :     :                                                                                                              +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, month#438, <lambda>(page#162530) AS churn#905]
         :     :     :                                                                                                                 +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, workday#415, <lambda>(ts#162535L) AS month#438]
         :     :     :                                                                                                                    +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, day#393, <lambda>(ts#162535L) AS workday#415]
         :     :     :                                                                                                                       +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, hour#372, <lambda>(ts#162535L) AS day#393]
         :     :     :                                                                                                                          +- Project [artist#162520, auth#162521, firstName#162522, gender#162523, itemInSession#162524L, lastName#162525, length#162526, level#162527, location#162528, method#162529, page#162530, registration#162531L, sessionId#162532L, song#162533, status#162534L, ts#162535L, userAgent#162536, userId#162537, <lambda>(ts#162535L) AS hour#372]
         :     :     :                                                                                                                             +- Relation[artist#162520,auth#162521,firstName#162522,gender#162523,itemInSession#162524L,lastName#162525,length#162526,level#162527,location#162528,method#162529,page#162530,registration#162531L,sessionId#162532L,song#162533,status#162534L,ts#162535L,userAgent#162536,userId#162537] json
         :     :     +- Filter NOT (userId#162558 = )
         :     :        +- Project [userId#162558, workday#162450]
         :     :           +- Sort [userId#162558 ASC NULLS FIRST], true
         :     :              +- Deduplicate [userId#162558]
         :     :                 +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#162450, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                    +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#116412, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                       +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#61170, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                          +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#59628, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                             +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#57331, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                                +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#57222, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                                   +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#57110, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                                      +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, <lambda>(ts#162556L) AS workday#56140, month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                                         +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, workday#3683, <lambda>(ts#162556L) AS month#55963, churn#905, phase#953L, ... 2 more fields]
         :     :                                            +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :                                               +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55879, <lambda>(ts#162556L) AS day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :                                                  +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, <lambda>(ts#162556L) AS hour#55879, day#55744, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :                                                     +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55717, day#55744, workday#3683, <lambda>(ts#162556L) AS month#55799, churn#905, phase#953L, ... 2 more fields]
         :     :                                                        +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55717, day#55744, workday#3683, month#3630, churn#905, phase#953L, ... 2 more fields]
         :     :                                                           +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#55717, <lambda>(ts#162556L) AS day#55744, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :                                                              +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, <lambda>(ts#162556L) AS hour#55717, day#3576, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :                                                                 +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3549, day#3576, <lambda>(ts#162556L) AS workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :     :                                                                    +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3549, day#3576, workday#3603, <lambda>(ts#162556L) AS month#3630, churn#905, phase#953L, state#1725]
         :     :                                                                       +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3549, day#3576, <lambda>(ts#162556L) AS workday#3603, month#3496, churn#905, phase#953L, state#1725]
         :     :                                                                          +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3549, <lambda>(ts#162556L) AS day#3576, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :                                                                             +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, <lambda>(ts#162556L) AS hour#3549, day#3442, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :     :                                                                                +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3415, day#3442, workday#3469, <lambda>(ts#162556L) AS month#3496, churn#905, phase#953L, state#1725]
         :     :                                                                                   +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3415, day#3442, <lambda>(ts#162556L) AS workday#3469, month#438, churn#905, phase#953L, state#1725]
         :     :                                                                                      +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#3415, <lambda>(ts#162556L) AS day#3442, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :                                                                                         +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, <lambda>(ts#162556L) AS hour#3415, day#393, workday#415, month#438, churn#905, phase#953L, state#1725]
         :     :                                                                                            +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, substring(location#162549, -2, 2) AS state#1725]
         :     :                                                                                               +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, month#438, churn#905, phase#953L]
         :     :                                                                                                  +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, phase#953L]
         :     :                                                                                                     +- Window [sum(cast(churn#905 as bigint)) windowspecdefinition(userId#162558, ts#162556L DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS phase#953L], [userId#162558], [ts#162556L DESC NULLS LAST]
         :     :                                                                                                        +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, month#438, churn#905]
         :     :                                                                                                           +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, month#438, <lambda>(page#162551) AS churn#905]
         :     :                                                                                                              +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, workday#415, <lambda>(ts#162556L) AS month#438]
         :     :                                                                                                                 +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, day#393, <lambda>(ts#162556L) AS workday#415]
         :     :                                                                                                                    +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, hour#372, <lambda>(ts#162556L) AS day#393]
         :     :                                                                                                                       +- Project [artist#162541, auth#162542, firstName#162543, gender#162544, itemInSession#162545L, lastName#162546, length#162547, level#162548, location#162549, method#162550, page#162551, registration#162552L, sessionId#162553L, song#162554, status#162555L, ts#162556L, userAgent#162557, userId#162558, <lambda>(ts#162556L) AS hour#372]
         :     :                                                                                                                          +- Relation[artist#162541,auth#162542,firstName#162543,gender#162544,itemInSession#162545L,lastName#162546,length#162547,level#162548,location#162549,method#162550,page#162551,registration#162552L,sessionId#162553L,song#162554,status#162555L,ts#162556L,userAgent#162557,userId#162558] json
         :     +- Filter NOT (userId#162580 = )
         :        +- Project [userId#162580, day#55907]
         :           +- Sort [userId#162580 ASC NULLS FIRST], true
         :              +- Deduplicate [userId#162580]
         :                 +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#162450, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                    +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#116412, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                       +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#61170, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                          +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#59628, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                             +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#57331, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                                +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#57222, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                                   +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#57110, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                                      +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, <lambda>(ts#162578L) AS workday#56140, month#55963, churn#905, phase#953L, ... 2 more fields]
         :                                         +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, workday#3683, <lambda>(ts#162578L) AS month#55963, churn#905, phase#953L, ... 2 more fields]
         :                                            +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :                                               +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55879, <lambda>(ts#162578L) AS day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :                                                  +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, <lambda>(ts#162578L) AS hour#55879, day#55744, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
         :                                                     +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55717, day#55744, workday#3683, <lambda>(ts#162578L) AS month#55799, churn#905, phase#953L, ... 2 more fields]
         :                                                        +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55717, day#55744, workday#3683, month#3630, churn#905, phase#953L, ... 2 more fields]
         :                                                           +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#55717, <lambda>(ts#162578L) AS day#55744, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :                                                              +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, <lambda>(ts#162578L) AS hour#55717, day#3576, workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :                                                                 +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3549, day#3576, <lambda>(ts#162578L) AS workday#3683, month#3630, churn#905, phase#953L, state#1725]
         :                                                                    +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3549, day#3576, workday#3603, <lambda>(ts#162578L) AS month#3630, churn#905, phase#953L, state#1725]
         :                                                                       +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3549, day#3576, <lambda>(ts#162578L) AS workday#3603, month#3496, churn#905, phase#953L, state#1725]
         :                                                                          +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3549, <lambda>(ts#162578L) AS day#3576, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :                                                                             +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, <lambda>(ts#162578L) AS hour#3549, day#3442, workday#3469, month#3496, churn#905, phase#953L, state#1725]
         :                                                                                +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3415, day#3442, workday#3469, <lambda>(ts#162578L) AS month#3496, churn#905, phase#953L, state#1725]
         :                                                                                   +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3415, day#3442, <lambda>(ts#162578L) AS workday#3469, month#438, churn#905, phase#953L, state#1725]
         :                                                                                      +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#3415, <lambda>(ts#162578L) AS day#3442, workday#415, month#438, churn#905, phase#953L, state#1725]
         :                                                                                         +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, <lambda>(ts#162578L) AS hour#3415, day#393, workday#415, month#438, churn#905, phase#953L, state#1725]
         :                                                                                            +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, substring(location#162571, -2, 2) AS state#1725]
         :                                                                                               +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, month#438, churn#905, phase#953L]
         :                                                                                                  +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, phase#953L]
         :                                                                                                     +- Window [sum(cast(churn#905 as bigint)) windowspecdefinition(userId#162580, ts#162578L DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS phase#953L], [userId#162580], [ts#162578L DESC NULLS LAST]
         :                                                                                                        +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, month#438, churn#905]
         :                                                                                                           +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, month#438, <lambda>(page#162573) AS churn#905]
         :                                                                                                              +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, workday#415, <lambda>(ts#162578L) AS month#438]
         :                                                                                                                 +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, day#393, <lambda>(ts#162578L) AS workday#415]
         :                                                                                                                    +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, hour#372, <lambda>(ts#162578L) AS day#393]
         :                                                                                                                       +- Project [artist#162563, auth#162564, firstName#162565, gender#162566, itemInSession#162567L, lastName#162568, length#162569, level#162570, location#162571, method#162572, page#162573, registration#162574L, sessionId#162575L, song#162576, status#162577L, ts#162578L, userAgent#162579, userId#162580, <lambda>(ts#162578L) AS hour#372]
         :                                                                                                                          +- Relation[artist#162563,auth#162564,firstName#162565,gender#162566,itemInSession#162567L,lastName#162568,length#162569,level#162570,location#162571,method#162572,page#162573,registration#162574L,sessionId#162575L,song#162576,status#162577L,ts#162578L,userAgent#162579,userId#162580] json
         +- Filter NOT (userId#162603 = )
            +- Project [userId#162603, SongsPlayed#162510L]
               +- Sort [userId#162603 ASC NULLS FIRST], true
                  +- Aggregate [userId#162603], [userId#162603, count(song#162599) AS SongsPlayed#162510L]
                     +- Filter NOT (song#162599 = null)
                        +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#162450, month#55963, churn#905, phase#953L, ... 2 more fields]
                           +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#116412, month#55963, churn#905, phase#953L, ... 2 more fields]
                              +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#61170, month#55963, churn#905, phase#953L, ... 2 more fields]
                                 +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#59628, month#55963, churn#905, phase#953L, ... 2 more fields]
                                    +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#57331, month#55963, churn#905, phase#953L, ... 2 more fields]
                                       +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#57222, month#55963, churn#905, phase#953L, ... 2 more fields]
                                          +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#57110, month#55963, churn#905, phase#953L, ... 2 more fields]
                                             +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, <lambda>(ts#162601L) AS workday#56140, month#55963, churn#905, phase#953L, ... 2 more fields]
                                                +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, workday#3683, <lambda>(ts#162601L) AS month#55963, churn#905, phase#953L, ... 2 more fields]
                                                   +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
                                                      +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55879, <lambda>(ts#162601L) AS day#55907, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
                                                         +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, <lambda>(ts#162601L) AS hour#55879, day#55744, workday#3683, month#55799, churn#905, phase#953L, ... 2 more fields]
                                                            +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55717, day#55744, workday#3683, <lambda>(ts#162601L) AS month#55799, churn#905, phase#953L, ... 2 more fields]
                                                               +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55717, day#55744, workday#3683, month#3630, churn#905, phase#953L, ... 2 more fields]
                                                                  +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#55717, <lambda>(ts#162601L) AS day#55744, workday#3683, month#3630, churn#905, phase#953L, state#1725]
                                                                     +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, <lambda>(ts#162601L) AS hour#55717, day#3576, workday#3683, month#3630, churn#905, phase#953L, state#1725]
                                                                        +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3549, day#3576, <lambda>(ts#162601L) AS workday#3683, month#3630, churn#905, phase#953L, state#1725]
                                                                           +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3549, day#3576, workday#3603, <lambda>(ts#162601L) AS month#3630, churn#905, phase#953L, state#1725]
                                                                              +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3549, day#3576, <lambda>(ts#162601L) AS workday#3603, month#3496, churn#905, phase#953L, state#1725]
                                                                                 +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3549, <lambda>(ts#162601L) AS day#3576, workday#3469, month#3496, churn#905, phase#953L, state#1725]
                                                                                    +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, <lambda>(ts#162601L) AS hour#3549, day#3442, workday#3469, month#3496, churn#905, phase#953L, state#1725]
                                                                                       +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3415, day#3442, workday#3469, <lambda>(ts#162601L) AS month#3496, churn#905, phase#953L, state#1725]
                                                                                          +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3415, day#3442, <lambda>(ts#162601L) AS workday#3469, month#438, churn#905, phase#953L, state#1725]
                                                                                             +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#3415, <lambda>(ts#162601L) AS day#3442, workday#415, month#438, churn#905, phase#953L, state#1725]
                                                                                                +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, <lambda>(ts#162601L) AS hour#3415, day#393, workday#415, month#438, churn#905, phase#953L, state#1725]
                                                                                                   +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, substring(location#162594, -2, 2) AS state#1725]
                                                                                                      +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, month#438, churn#905, phase#953L]
                                                                                                         +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, month#438, churn#905, phase#953L, phase#953L]
                                                                                                            +- Window [sum(cast(churn#905 as bigint)) windowspecdefinition(userId#162603, ts#162601L DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS phase#953L], [userId#162603], [ts#162601L DESC NULLS LAST]
                                                                                                               +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, month#438, churn#905]
                                                                                                                  +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, month#438, <lambda>(page#162596) AS churn#905]
                                                                                                                     +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, workday#415, <lambda>(ts#162601L) AS month#438]
                                                                                                                        +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, day#393, <lambda>(ts#162601L) AS workday#415]
                                                                                                                           +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, hour#372, <lambda>(ts#162601L) AS day#393]
                                                                                                                              +- Project [artist#162586, auth#162587, firstName#162588, gender#162589, itemInSession#162590L, lastName#162591, length#162592, level#162593, location#162594, method#162595, page#162596, registration#162597L, sessionId#162598L, song#162599, status#162600L, ts#162601L, userAgent#162602, userId#162603, <lambda>(ts#162601L) AS hour#372]
                                                                                                                                 +- Relation[artist#162586,auth#162587,firstName#162588,gender#162589,itemInSession#162590L,lastName#162591,length#162592,level#162593,location#162594,method#162595,page#162596,registration#162597L,sessionId#162598L,song#162599,status#162600L,ts#162601L,userAgent#162602,userId#162603] json


In [335]:
# Create a StringIndexer
state_indexer = StringIndexer(inputCol ="state", outputCol ="state_index")

In [336]:
# Create a OneHotEncoder
state_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_fact")

In [338]:
model_data.take(1)

[Row(userId=100010, label=0, gender=0, workday=1, day=8, state='CT', SongsPlayed=275)]

In [339]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["gender", "state_fact", "workday", "day", "SongsPlayed"], outputCol="features")

ETL pipeline

1. Load data
2. Create workdays/days columns
3. Create features
4. Join data

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

### 1. Train Test split

In [283]:
# split the data
train, test = model_data.randomSplit([0.6, 0.4])

### 2. Build Pipeline

In [298]:
# Create a StringIndexer
state_indexer = StringIndexer(inputCol ="state", outputCol ="state_index")

# Create a OneHotEncoder
state_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_fact")

In [375]:
%%timeit

# vector for features
vec_assembler = VectorAssembler(inputCols=["gender", "workday", "day", "SongsPlayed"], \
                                outputCol="features")

# standard scaler
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True)

# build indexer
indexer = StringIndexer(inputCol="churn", outputCol="label")

# Call Logistic regression
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

#stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)

# Build pipeline
pipeline_lr = Pipeline(stages=[state_indexer, state_encoder, vec_assembler, scaler, indexer, lr])

13.9 ms ± 1.14 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


### 3. Tune model

In [376]:
# Build parameter grid 
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1, 0.01]) \
    .build()

# Build Cross Validator
crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

In [377]:
# vector problem
cvModel_lr = crossval_lr.fit(train)

IllegalArgumentException: requirement failed: rawPredictionCol vectors must have length=2, but got 1

### 4. Compute Accuracy of the best model

In [340]:
# Make the pipeline
churn_pipe = Pipeline(stages=[state_indexer, state_encoder, vec_assembler])

#flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [341]:
# Fit and transform the data
piped_data = churn_pipe.fit(model_data).transform(model_data)

In [142]:
piped_data.dtypes

[('userId', 'int'),
 ('label', 'int'),
 ('gender', 'int'),
 ('workday', 'int'),
 ('day', 'int'),
 ('state', 'string'),
 ('SongsPlayed', 'bigint'),
 ('state_index', 'double'),
 ('state_fact', 'vector'),
 ('features', 'vector'),
 ('ScaledFeatures', 'vector'),
 ('rawPrediction', 'vector'),
 ('probability', 'vector'),
 ('prediction', 'double')]

In Spark it's important to make sure you split the data after all the transformations. This is because operations like StringIndexer don't always produce the same index even when given the same list of strings.

In [342]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [144]:
training.take(1)

[Row(userId=100010, label=0, gender=0, workday=100010, day=8, state='CT', SongsPlayed=275, state_index=9.0, state_fact=SparseVector(38, {9: 1.0}), features=SparseVector(43, {0: 100010.0, 11: 1.0, 40: 100010.0, 41: 8.0, 42: 275.0}), ScaledFeatures=SparseVector(43, {0: 0.3333, 11: 1.0, 40: 0.3333, 41: 0.2333, 42: 0.034}), rawPrediction=DenseVector([1.5435, -1.5435]), probability=DenseVector([0.9564, 0.0436]), prediction=0.0)]

In [343]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [345]:
# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [347]:
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [348]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
            estimatorParamMaps=grid,
            evaluator=evaluator
            )

In [350]:
# Fit cross validation models
models = cv.fit(training)

IllegalArgumentException: requirement failed: rawPredictionCol vectors must have length=2, but got 1

In [351]:
# Extract the best model
best_lr = models.bestModel

NameError: name 'models' is not defined

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.