# Project overview
Imagine you are working on the data team for a popular digital music service similar to Spotify or Pandora. Many of users stream their favorite songs to your service every day either using the free tier that place advertisements between the songs or using the premium subscription model, where they stream music as free but pay a monthly flat rate.

Users can upgrade, downgrade, or cancel their service at any time. So, it's crucial to make sure your users love the service. Every time a user interacts with the service while they're playing songs, logging out, like in a song with a thumbs up, hearing an ad, or downgrading their service, it generates data. All this data contains the key insights for keeping your users happy and helping your business thrive. (The full dataset is 12GB)

It's your job on the data team to predict which users are at risk to churn either downgrading from premium to free tier or cancelling their service altogether. If you can accurately identify these users before they leave, your business can offer them discounts and incentives, potentially saving your business millions in revenue.

To tackle this project, you will need to load, explorer, and clean this dataset with Spark. Based on your explanation, you will create features and build models with Spark to predict which users were churn from your digital music service.


Predicting churn rates is a challenging and common problem that data scientists and analysts regularly encounter in any customer-facing business. Additionally, the ability to efficiently manipulate large datasets with Spark is one of the highest-demand skills in the field of data.

# Problem Statement
* Load large datasets into Spark and manipulate them using Spark SQL and Spark Dataframes
* Gain insight of data set for better understanding (Data Exploration, Data Visualization etc.)
* Data preprocessing
* Use the machine learning APIs within Spark ML to build and tune models
* Predicting which users are at risk to churn and validate predict result

# Metrics
Some of the metrics we might use:
* Accuracy, recall, prescision, fmeasure(f1 socre)
* Coefficient

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini-sparkify-event-data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [66]:
# import libraries
# refer 'https://changhsinlee.com/install-pyspark-windows-jupyter/#comment-4302741820'
import findspark
findspark.init()

import zipfile
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.ml.feature import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

plt.rcParams['figure.figsize']=(15,5)

In [67]:
# upzip files
with zipfile.ZipFile("data_zip/mini_sparkify_event_data.zip",'r') as zf:
    zf.extractall("../Sparkify--Pyspark-Big-Data-Project")

In [68]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()
# Read in full sparkify dataset
event_data = "mini_sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [69]:
# df schema
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [70]:
# check for differnet pages
df.createOrReplaceTempView('t1')
spark.sql('''
select distinct page
from t1''').show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
| Submit Registration|
|            Settings|
|               Login|
|            Register|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
+--------------------+
only showing top 20 rows



In [71]:
# check for different userAgent
spark.sql('''
select distinct userAgent
from t1
''').show(20,False)

+-------------------------------------------------------------------------------------------------------------------------------------------+
|userAgent                                                                                                                                  |
+-------------------------------------------------------------------------------------------------------------------------------------------+
|"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"                 |
|"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"                                   |
|Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:31.0) Gecko/20100101 Firefox/31.0                                                                 |
|"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"                 |
|"Mozi

###### clean userAgent column

Detailed sub-categories on userAgent is not useful for aggregation and analytical purpose. Instead We will only keep system information.

In [72]:
# remove ""
df=df.withColumn("userAgent",regexp_replace('userAgent','"',''))
# remove Mozilla/5.0 
df=df.withColumn("userAgent",regexp_replace('userAgent','Mozilla/5.0 ',''))

In [73]:
# tokenize userAgent
regexTokenizer =RegexTokenizer(inputCol='userAgent',outputCol='words',pattern="\\W")
df=regexTokenizer.transform(df)

In [74]:
# keep first element in words column
df=df.withColumn('OS',df.words[0].cast(StringType())).drop('words')

In [75]:
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='(Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', OS='windows')

In [76]:
df.createOrReplaceTempView('t1')
spark.sql('''
select count(distinct userAgent)
from t1
''').show()

+-------------------------+
|count(DISTINCT userAgent)|
+-------------------------+
|                       56|
+-------------------------+



###### check missing values for column userId and sessionId

In [77]:
# records without userids or sessionids
spark.sql('''
select *
from t1
where sessionId is null or userId is null''').show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId| OS|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+



> There are no Nan values in either column

In [78]:
# check empty userids
spark.sql('''
select distinct userId
from t1
order by 1''').show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
+------+
only showing top 20 rows



> However there are empty userids

In [65]:
# empty userid recrods characteristics
df.createOrReplaceTempView('t1')
spark.sql('''
select *
from t1
where userId =""
''').show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId| OS|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+---+



In [None]:
spark.sql('''
select distinct auth, page
from t1
where userId =""
''').show()

> Empty userId records have majority features as NaN, uers didn't listen to any music. We will drop these rows.

In [64]:
df=spark.sql('''
select *
from t1
where userId !=""
''')
df.count()

278154

###### clean location column

Location provides detailed sub-categories, which doesn't provide district level geographic data distribution. Instead we will keep the state abbreviation only so that we can aggregate data by states.

In [None]:
# check different locations
spark.sql('''
select distinct location
from t1
''').show(10,False)

In [None]:
# check distinct amount
spark.sql('''
select count(distinct location)
from t1
''').show()

In [None]:
# keep state abb only
state=udf(lambda x: x.strip().split(', ')[-1])
df=df.withColumn('location',state(df.location))

In [None]:
# check distinct amount of state abb
df.select('location').dropDuplicates().count()

In [None]:
df.head()

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.


In [None]:
df.createOrReplaceTempView('t1')
print("amount of people stay (without Cancellation Confirmation status)")
spark.sql('''
select count(distinct userId)
from t1
where userId not in (
select distinct userId
from t1
where page="Cancellation Confirmation")
''').show()

In [None]:
print("amount of people chruned (with Cancellation Confirmation status)")
spark.sql('''
select count(distinct userId)
from t1
where page="Cancellation Confirmation"
''').show()

In [None]:
# flag 'Cancellation Confirmation' status
df.createOrReplaceTempView('t1')
spark.udf.register("churn", lambda x: 1 if x=="Cancellation Confirmation" else 0, IntegerType())
df=spark.sql('''
select *, sum(churn(page))over(partition by userId) Churn
from t1
''')

In [None]:
# check result
df.createOrReplaceTempView('t1')
spark.sql('''
select Churn, count(*)
from t1
group by 1''').show()

In [None]:
233290/44864

> Our label on Churn is 1:5.2. This may cause imblance data problem for mechine learning procedure.


### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

###### Churn rate vs gender

In [None]:
df.createOrReplaceTempView("churnsex")
churnsex=spark.sql('''
select gender, avg(Churn) churn_rate
from (
select distinct userId,gender,Churn
from churnsex) temp
group by 1
''').toPandas()

In [None]:
churnsex

In [None]:
sns.barplot(churnsex['gender'],100*churnsex['churn_rate'])
plt.title('Churn rate vs gender');

> We can see that male churn rate is higher than female. 26.4% VS. 19.2% to be specific.

###### churn rate vs location

In [None]:
df.createOrReplaceTempView("chrunstate")
chrunstate=spark.sql('''
select location, avg(Churn) churn_rate
from (
    select distinct userId,location,Churn
    from chrunstate) temp
group by 1
having avg(Churn)>0
order by 2 desc, 1
''').toPandas()

In [None]:
# user amount vs churn user amount every location
chruncountstate=spark.sql('''
select distinct userId,location,Churn
from chrunstate
order by 2
''').toPandas()

In [None]:
plt.figure(figsize=(20,15))
plt.subplot(211)
sns.barplot(100*chrunstate['churn_rate'],chrunstate['location'],color='c')
plt.title('churn rate vs location')

plt.subplot(212)
sns.countplot(data=chruncountstate,x='location',hue='Churn')
plt.title('total uers and total churn per location')
plt.xticks(rotation=90);

> Here we can see churn ratio per location as well as uers/churn users count per location. One thing worth to notice, even though some of the state has a high churn ratio, it might due to the fact the user base is really small. Vice versa. 

> E.g. AR's churn rate is 100% because it has a total amount of 1 users while 1 churned.

###### Churn status vs avg amount of songs listened
###### Churn status vs avg amount of listen time (in hours)

In [None]:
# Churn status vs avg amount of songs listened
df.createOrReplaceTempView('avgsong')
avgsongpersuer=spark.sql('''
select Churn, count(song)/count(distinct userId) avgsongpersuer
from avgsong
group by 1''').toPandas()
avgsongpersuer

In [None]:
# Churn status vs avg amount of listen time (in hours)
avgtime=spark.sql('''
select Churn, sum(length)/count(distinct userId)/60/60 avgtimepersuer
from avgsong
group by 1''').toPandas()
avgtime

In [None]:
# casster
hrssongs=spark.sql('''
select distinct userId, Churn, round(sum(length)/60/60,2) hrs, count(song) songs
from avgsong
group by 1,2''').toPandas()
hrssongs

In [None]:
plt.subplot(121)
sns.barplot(avgsongpersuer['Churn'],avgsongpersuer['avgsongpersuer'])
plt.title('Churn status vs avg song listened')
plt.ylabel('count')

plt.subplot(122)
sns.barplot(avgtime['Churn'],avgtime['avgtimepersuer'])
plt.title('Churn status vs avg amount of listen time (in hours)')
plt.ylabel("hours");

In [None]:
g=sns.FacetGrid(data=hrssongs, hue='Churn',height=5)
g.map(plt.scatter,'songs','hrs')
g.add_legend()
plt.title('songs vs listen hrs between two group');

In [None]:
print("Difference of amount of songs between two groups is:",
int(avgsongpersuer['avgsongpersuer'].max()-avgsongpersuer['avgsongpersuer'].min()))

In [None]:
print("Difference of amount of listen time between two groups is {:.2f} hrs"
      .format(avgtime['avgtimepersuer'].max()-avgtime['avgtimepersuer'].min()))

> We can see a trend that user who stayed listend 408 more songs on average than the ones who churned. User who stayed spent 28.38 hours more than user who churned on listening musics. Scatter plot indicates the relationships.

###### Song length proportional distribution by churn group

In [None]:
churnedlen=spark.sql('''
select length
from avgsong
where Churn=1''').toPandas()

staylen=spark.sql('''
select length
from avgsong
where Churn=0''').toPandas()

In [None]:
sns.distplot(churnedlen,label='churn',hist_kws={'alpha':1})
sns.distplot(staylen,label='stay',hist_kws={'alpha':.5})
plt.legend(loc=0)
plt.title('song length distribution by churn group')
plt.show();

> In term of proportional distributions of lengths of songs listened by two cohorts, they are very similar.
###### Status 404 distribution between cohorts

In [None]:
status=spark.sql('''
select status, Churn, count(status)/count(distinct userId) ratio
from avgsong
where status='404'
group by 1,2''').toPandas()

In [None]:
sns.barplot(data=status,x='status',y='ratio',hue='Churn')
plt.ylabel('count')
plt.title('avg 404 received per user');

In [None]:
status

> To inspect if user churn due to HTTP technical reason, we can check average 404 message received per user in each group. User who stayed receive more than 2 per person while user churned receive less than 1.5. This indicates status may not be a strong causation.
###### Page distributions in each group

In [None]:
page=spark.sql('''
select Churn,page, count(userId)/count(distinct userId) avg_interaction
from avgsong
group by 1,2
order by 2,1''').toPandas()

In [None]:
sns.barplot(y=page['page'],x=page['avg_interaction'],hue=page['Churn'])
plt.xscale('log')
locs, labels=plt.xticks()
plt.xticks(ticks=locs,labels=locs)
plt.xlim(.1,1500)
plt.title('Avg Page Interaction Per User')
plt.xlabel('avg_interaction times');

In [None]:
print("Roll Advert difference between two cohorts is :",page.query("page=='Roll Advert'")['avg_interaction'].diff()[21])

In [None]:
print("Upgrade difference between two cohorts is :",page.query("page=='Upgrade'")['avg_interaction'].diff()[35])

> Observing 'Avg Page Interaction Per User' plot, we can gain some interesting insights. Gerenally speaking, stayed users interact with app more than churned users. But we can see four page categories churned users interact more than stayed users. They are:
* Cancel
* Cancellation Confirmation
* Roll Advert
* Upgrade

> Regardless of cancel interactions, churned users have 0.45 more roll advert page than stayed user on average, could they churned because they received too many ads? Another interesting fact is churned uer land on Upgrade page more than stayed users even though the difference is really small (around 0.07). This might suggest more customers have considered upgrade account in churn cohort than stay cohort. But when we look at Submit Upgrade page interactions, stay cohort still higher than churned cohort, means convert rate of stayed users is higher than churned users, which make sense.
###### Operating system 

In [None]:
df.toPandas().sort_values(by=['userId','sessionId'])

In [None]:
df.printSchema()

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.