# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, count, desc, explode, lit, min, max, split, stddev, udf, rand
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.evaluation import MulticlassMetrics

import re
import copy
import collections
import time
import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from statsmodels.stats.proportion import proportions_ztest

%matplotlib inline

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# create a Spark session
spark = SparkSession.builder.master("local").appName("Sparkify").getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
df = spark.read.json('mini_sparkify_event_data.json')


In [None]:
df.limit(5).toPandas()

In [None]:
df.printSchema()

In [None]:
df.toPandas().describe()

In [None]:
df.toPandas().info()

In [None]:
df.toPandas().skew(axis = 0, skipna = True)

In [None]:
df.toPandas().kurt(axis = 0, skipna = True)

In [None]:
df.toPandas().hist();

In [None]:
df.describe('userId').show()

In [None]:
df.describe('sessionId').show()

In [None]:
df.count()

In [None]:
# check for empty sessionId values
df.filter(df.sessionId == '').count()

In [None]:
# check for empty userId values
df.filter(df.userId == '').count()

In [None]:
# check for duplicates in userId
df.select("userId").dropDuplicates().count()

In [None]:
# check for duplicates in sessionId
df.select("sessionId").dropDuplicates().count()

In [None]:
# drop userId fields that have empty values
df_clean = df.dropna(how = "any", subset = ["userId", "sessionId"])
df_clean = df_clean.filter(df_clean["userId"] != "")

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [None]:
# Churn is a label for user who have confirmed cancellation

churn = udf(lambda x: int(x=="Cancellation Confirmation"), IntegerType())
downgrade_churn = udf(lambda x: int(x=="Submit Downgrade"), IntegerType())

df_clean = df_clean.withColumn("downgraded", downgrade_churn("page")).withColumn("cancelled", churn("page"))

In [None]:
df_pandas = df_clean.toPandas()

##### User Agent extraction and count

In [None]:
# get browser specific details
def get_browser(x):
    '''
    Args:
        userAgent text value
    Returns:
        Browser value
    '''
    
    if 'Firefox' in x:
        return 'Firefox'
    elif 'Safari' in x:
        if 'Chrome' in x:
            return 'Chrome'
        else:
            return 'Safari'
    elif 'Trident' in x:
        return 'IE'
    else:
        return np.NaN
    

In [None]:
platform_dict = {'compatible': 'Windows',  'iPad': 'iPad',  'iPhone': 'iPhone',  
          'Macintosh': 'Mac',  'Windows NT 5.1': 'Windows','Windows NT 6.0': 'Windows', 'Windows NT 6.1': 'Windows',  
          'Windows NT 6.2': 'Windows',  'Windows NT 6.3': 'Windows', 'X11': 'Linux'}

In [None]:
df_pandas['browser'] = df_pandas['userAgent'].apply(get_browser)

In [None]:
df_pandas['platform'] = df_pandas['userAgent'].str.extract(r'\(([^\)]*)\)')[0].str.split(';').str[0].map(platform_dict)

In [None]:
# convert to pandas dataframe
df_clean = spark.createDataFrame(df_pandas)

In [None]:
df_pandas = df_clean.toPandas()
df_pandas.head()

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
df_pandas.drop_duplicates(subset='userId').groupby(['downgraded'])['userId'].count()

In [None]:
df_pandas.drop_duplicates(subset='userId').groupby(['cancelled'])['userId'].count()

In [None]:
downgraded_Users = list(df_pandas[df_pandas['page'] == 'Submit Downgrade']['userId'].unique())
downgraded_Users.extend(list(df_pandas[df_pandas['page'] == 'Cancellation Confirmation']['userId'].unique()))
downgraded_Users = list(set(downgraded_Users))

In [None]:
unique_userIds = df_pandas['userId'].unique().tolist()
gender = []
for userId in unique_userIds:
    gender.append(df_pandas[df_pandas['userId'] == userId]['gender'].iloc[0])
df_gender = pd.DataFrame({'userId': unique_userIds, 'gender': gender})
df_gender = pd.DataFrame({'userId': unique_userIds, 'gender': gender})
df_gender['churn'] = df_gender['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)

In [None]:
df_gender.head()

In [None]:
# churn with respect to gender
sns.set_color_codes("pastel")
sns.set_style("whitegrid")
sns.countplot(x = "gender", hue = "churn", data = df_gender);

In [None]:
#Constructing dataframe to visualize churn with respect to level
level = []
for userId in unique_userIds:
    level.append(df_pandas[df_pandas['userId'] == userId]['level'].iloc[0])
df_level = pd.DataFrame({'userId': unique_userIds, 'level': level})
df_level['churn'] = df_gender['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
df_level.head()

In [None]:
sns.countplot(x = "level", hue = "churn", data = df_level);

In [None]:
df_clean.select(['userId', 'downgraded', 'cancelled'])\
    .groupBy('userId').sum()\
    .withColumnRenamed('sum(downgraded)', 'downgraded')\
    .withColumnRenamed('sum(cancelled)', 'cancelled')\
    .filter((col("downgraded")==0)&(col("cancelled")==1))\
    .count()

In [None]:
#number of paid users to drop without downgrading -> big chunk of those
print(df_clean.filter((col('cancelled')==1) & (col('downgraded')==0) & (col('level')=='paid'))\
      .select('userId').dropDuplicates().count())

In [None]:
#Creating SQL view to manipulate the data

df_clean.createOrReplaceTempView("sqlTable")

In [None]:
gender_count = spark.sql('''
        SELECT gender,COUNT(DISTINCT userId) AS user_counts
        FROM sqlTable
        GROUP BY gender
        ORDER BY user_counts DESC
''')
gender_count.show()

In [None]:
sns.barplot(x='gender',y='user_counts',data=gender_count.toPandas());

In [None]:
#concentration of length of songs
length_data = spark.sql('''
        SELECT length
        FROM sqlTable
''')

sns.distplot(length_data.toPandas().dropna());

In [None]:
# Get user level type and no of users

user_type = spark.sql('''
        SELECT level,COUNT(DISTINCT userId) AS user_counts
        FROM sqlTable
        GROUP BY level
        ORDER BY user_counts DESC
''')
user_type.show()

In [None]:
sns.barplot(x='level',y='user_counts',data=user_type.toPandas());

In [None]:
location_count = spark.sql('''
        SELECT location,COUNT(DISTINCT userId) AS user_counts
        FROM sqlTable
        GROUP BY location
        ORDER BY user_counts DESC
''').toPandas()
#split city and state
location_count = location_count.join(location_count['location'].str.split(',',expand=True).rename(columns={0:'city',1:'state'})).drop('location',axis=1)
location_count.head()

In [None]:
location_count.groupby('city')['user_counts'].sum().sort_values(ascending=False).plot(kind='barh',figsize=(10,20));

In [None]:
location_count.groupby('state')['user_counts'].sum().sort_values(ascending=False).plot(kind='barh',figsize=(10,15));

* Los Angeles and New York are top cities which have the largest user count, 
* California is the top state has the largest user count followed by Taxas.

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

##### Average Number of songs per user session

In [None]:
number_of_sessions = spark.sql("SELECT userId, avg(count) as average from (SELECT userId, count(*) as count FROM sqlTable group by sessionId, userId) group by userId")
number_of_sessions.show()

In [None]:
sns.barplot(x='userId',y='average',data=number_of_sessions.toPandas());

In [None]:
pd_number_of_sessions = number_of_sessions.toPandas()
pd_number_of_sessions['churn'] = pd_number_of_sessions['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="average", data=pd_number_of_sessions);

##### Total Songs played by user

In [None]:
total_songs = spark.sql("SELECT userId, count(*) as count_total_songs FROM sqlTable group by userId")
total_songs.show()

In [None]:
sns.barplot(x='userId',y='count_total_songs',data=total_songs.toPandas());

In [None]:
pd_total_songs = total_songs.toPandas()
pd_total_songs['churn'] = pd_total_songs['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_total_songs", data=pd_total_songs);

##### Thumbs Down for each user

In [None]:
thumbs_down = spark.sql("SELECT userId, count(*) as count_thumbs_down FROM sqlTable where page = 'Thumbs Down' group by userId")
thumbs_down.show()

In [None]:
sns.barplot(x='userId',y='count_thumbs_down',data=thumbs_down.toPandas());

In [None]:
pd_thumbs_down = thumbs_down.toPandas()
pd_thumbs_down['churn'] = pd_thumbs_down['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_thumbs_down", data=pd_thumbs_down);

In [None]:
thumbs_up = spark.sql("SELECT userId, count(*) as count_thumbs_up FROM sqlTable where page = 'Thumbs Up' group by userId")
thumbs_up.show()

In [None]:
sns.barplot(x='userId',y='count_thumbs_up',data=thumbs_up.toPandas());

In [None]:
pd_thumbs_up = thumbs_up.toPandas()
pd_thumbs_up['churn'] = pd_thumbs_up['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_thumbs_up", data=pd_thumbs_up);

##### Number of songs couldn't be played due to errors

In [None]:
errors = spark.sql("SELECT userId, count(*) as count_errors FROM sqlTable where page = 'Error' group by userId")
errors.show()

In [None]:
sns.barplot(x='userId',y='count_errors',data=errors.toPandas());

In [None]:
pd_errors = errors.toPandas()
pd_errors['churn'] = pd_thumbs_up['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_errors", data=pd_errors);

##### Number of times user opted for help

In [None]:
asked_help = spark.sql("SELECT userId, count(*) as count_help FROM sqlTable where page = 'Help' group by userId")
asked_help.show();

In [None]:
sns.barplot(x='userId',y='count_help',data=asked_help.toPandas());

In [None]:
pd_asked_help = asked_help.toPandas()
pd_asked_help['churn'] = pd_asked_help['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_help", data=pd_asked_help);

##### Number of events per user to add friend

In [None]:
add_friend = spark.sql("SELECT userId, count(*) as count_add_friend FROM sqlTable where page = 'Add Friend' group by userId")
add_friend.show();

In [None]:
sns.barplot(x='userId',y='count_add_friend',data=add_friend.toPandas());

In [None]:
pd_add_friend = add_friend.toPandas()
pd_add_friend['churn'] = pd_add_friend['userId'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="churn", y="count_add_friend", data=pd_add_friend);

##### Getting Number of events per user based on Browser agent and platform agent

In [None]:
browser_agent = spark.sql("SELECT browser, COUNT(DISTINCT userId) AS user_counts FROM sqlTable GROUP BY browser ORDER BY user_counts DESC")
browser_agent.show();

In [None]:
pd_browser_agent = browser_agent.toPandas()
pd_browser_agent['churn'] = pd_browser_agent['browser'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="browser", y="user_counts", data=pd_browser_agent);

In [None]:
platform_agent = spark.sql("SELECT platform, COUNT(DISTINCT userId) AS user_counts FROM sqlTable GROUP BY platform ORDER BY user_counts DESC")
platform_agent.show();

In [None]:
pd_platform_agent = platform_agent.toPandas()
pd_platform_agent['churn'] = pd_platform_agent['platform'].apply(lambda x: 1 if x in downgraded_Users else 0)
sns.barplot(x="platform", y="user_counts", data=pd_platform_agent);

In [None]:
total_windows_user = spark.sql("SELECT userId, count(*) AS windows_user_count FROM sqlTable WHERE platform = 'Windows' GROUP BY userId")
total_windows_user.show();

In [None]:
total_Mac_user = spark.sql("SELECT userId, count(*) AS mac_user_count FROM sqlTable WHERE platform = 'Mac' GROUP BY userId")
total_Mac_user.show();

In [None]:
total_iPhone_user = spark.sql("SELECT userId, count(*) AS iPhone_user_count FROM sqlTable WHERE platform = 'iPhone' GROUP BY userId")
total_iPhone_user.show();

In [None]:
total_linux_user = spark.sql("SELECT userId, count(*) AS linux_user_count FROM sqlTable WHERE platform = 'Linux' GROUP BY userId")
total_linux_user.show();

In [None]:
total_iPad_user = spark.sql("SELECT userId, count(*) AS iPad_user_count FROM sqlTable WHERE platform = 'iPad' GROUP BY userId")
total_iPad_user.show();

In [None]:
pd_total_windows_user = total_windows_user.toPandas()
pd_total_Mac_user = total_Mac_user.toPandas()
pd_total_iPhone_user = total_iPhone_user.toPandas()
pd_total_linux_user = total_linux_user.toPandas()
pd_total_iPad_user = total_iPad_user.toPandas()

In [None]:
total_chrome_user = spark.sql("SELECT userId, count(*) AS chrome_user_count FROM sqlTable WHERE browser = 'Chrome' GROUP BY userId")
total_chrome_user.show();

In [None]:
total_firefox_user = spark.sql("SELECT userId, count(*) AS firefox_user_count FROM sqlTable WHERE browser = 'Firefox' GROUP BY userId")
total_firefox_user.show();

In [None]:
total_safari_user = spark.sql("SELECT userId, count(*) AS safari_user_count FROM sqlTable WHERE browser = 'Safari' GROUP BY userId")
total_safari_user.show();

In [None]:
total_ie_user = spark.sql("SELECT userId, count(*) AS ie_user_count FROM sqlTable WHERE browser = 'IE' GROUP BY userId")
total_ie_user.show();

In [None]:
pd_total_chrome_user = total_chrome_user.toPandas()
pd_total_firefox_user = total_firefox_user.toPandas()
pd_total_safari_user = total_safari_user.toPandas()
pd_total_ie_user = total_ie_user.toPandas()

##### Joining all data frames to construct a unified which can be fed to machine lerning algorithms

In [None]:

temp_df = thumbs_down.join(thumbs_up, thumbs_down.userId == thumbs_up.userId).drop(thumbs_up.userId)
temp_df = temp_df.join(errors, errors.userId == temp_df.userId).drop(errors.userId)
temp_df = temp_df.join(asked_help, asked_help.userId == temp_df.userId).drop(asked_help.userId)
temp_df = temp_df.join(add_friend, add_friend.userId == temp_df.userId).drop(add_friend.userId)
temp_df = temp_df.join(total_songs, total_songs.userId == temp_df.userId).drop(total_songs.userId)
temp_df = temp_df.join(number_of_sessions, number_of_sessions.userId == temp_df.userId).drop(number_of_sessions.userId)


In [None]:
temp_df.show()

In [None]:
cancle_or_downgrade = spark.sql("SELECT userId, CASE when page = 'Submit Downgrade' or page = 'Cancellation Confirmation' THEN 1 END as churn from sqlTable")

In [None]:
cancle_or_downgrade.createOrReplaceTempView('churn_table')
churned_users = spark.sql("SELECT * FROM churn_table where churn is not null")
churned_users.createOrReplaceTempView('churned_users')

In [None]:
spark.sql("select * from churned_users").show()

In [None]:
temp_df = temp_df.join(churned_users, churned_users.userId == temp_df.userId, "left_outer").drop(churned_users.userId)

In [None]:
temp_df.createOrReplaceTempView('temp_table')

In [None]:
processed_df = spark.sql("select userId, average, count_total_songs, count_thumbs_down, count_thumbs_up, count_errors, count_help, count_add_friend, CASE when churn is null then 0 else 1 END as label from temp_table")

In [None]:
processed_df.show()

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

##### The dataset is split into training and validation data in a ratio of 90% to 10%.

We will use following three algorithms to build our model

* Logistic Regression
* Random Forest Classifier
* Gradient Boosting Trees

In [None]:
train, test = processed_df.randomSplit([0.9, 0.1], seed=42)

In [None]:
input_cols = ['count_thumbs_down',
 'count_thumbs_up',
 'count_errors',
 'count_help',
 'count_add_friend',
 'count_total_songs',
 'average']

assembler = VectorAssembler(inputCols=input_cols, outputCol="features")
scalar = MinMaxScaler(inputCol="features", outputCol="scaled_features")

### Logistic Regression

In [None]:
lr =  LogisticRegression(labelCol="label", featuresCol="scaled_features", maxIter=10, regParam=0.0, elasticNetParam=0)

pipeline_lr = Pipeline(stages=[assembler, scalar, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .build()

crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)
cvModel_lr = crossval_lr.fit(train)
cvModel_lr.avgMetrics
results_lr = cvModel_lr.transform(test)
print("Accuracy for Logistic Regression Model is: ", results_lr.filter(results_lr.label == results_lr.prediction).count()/ results_lr.count())
evaluator = MulticlassClassificationEvaluator(metricName="f1")
score = evaluator.evaluate(results_lr)
print("F1 score for Logistic Regression model is : ", score)

##### Accuracy for Logistic Regression Model is: 0.5
##### F1 score for Logistic Regression model is :  0.549

### Random Forest Classifier

In [None]:
rf = RandomForestClassifier(labelCol="label", featuresCol="scaled_features", numTrees=10)
pipeline_rf = Pipeline(stages=[assembler, scalar, rf])


paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees,[5, 10]) \
    .build()

crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

cvModel_rf = crossval_rf.fit(train)
cvModel_rf.avgMetrics
results_rf = cvModel_rf.transform(test)
print("Accuracy for Random Forest Model is: ", results_rf.filter(results_rf.label == results_rf.prediction).count()/ results_rf.count())

evaluator = MulticlassClassificationEvaluator(metricName="f1")
score = evaluator.evaluate(results_rf)
print("F1 score for Random Forest Model is: ", score)

##### Accuracy for Random Forest Model is:  0.8
##### F1 score for Random Forest Model is:  0.819

### Gradient Boosting Tree Classifier

In [None]:
gbt = GBTClassifier(labelCol="label", featuresCol="scaled_features", maxIter=10)
pipeline_gbt = Pipeline(stages=[assembler, scalar, gbt])
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter,[5, 10]) \
    .build()
crossval_gbt = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3, collectSubModels=True)
cvModel_gbt = crossval_gbt.fit(train)
cvModel_gbt.avgMetrics
results_gbt = cvModel_gbt.transform(test)
print("Accuracy for Gradient Boosting Tree Model is: ", results_gbt.filter(results_gbt.label == results_gbt.prediction).count()/ results_gbt.count())

evaluator = MulticlassClassificationEvaluator(metricName="f1")
score = evaluator.evaluate(results_gbt)
print("F1 score for Gradient Boosting Tree classifier is : ", score)

##### Accuracy for Gradient Boosting Tree Model is:  0.8
##### F1 score for Gradient Boosting Tree classifier is :  0.819

In [None]:
TestResult = collections.namedtuple("TestResult", ["params", "metrics"])

class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        folds = []
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()
        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            folds.append([])
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                # TODO: duplicate evaluator to take extra params from input
                prediction = model.transform(validation, paramMap)
                metric = eva.evaluate(prediction)
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % ({param.name: val for (param, val) in paramMap.items()}, metricName, metric, avgSoFar))
                
                predictionLabels = prediction.select("prediction", "label")
                allMetrics = MulticlassMetrics(predictionLabels.rdd)
                folds[i].append(TestResult(paramMap.items(), allMetrics))
                

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % ({param.name: val for (param, val) in bestParams.items()},metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics)), folds


In [None]:
crossval  = CrossValidatorVerbose(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [None]:
cvModel, folds  = crossval.fit(train)

In [None]:
feature_importances = cvModel_gbt.bestModel.stages[2].featureImportances.values.tolist()
feature_importance_df = pd.DataFrame({'feature_importance': feature_importances, 'columns': input_cols})
feature_importance_df.head()


In [None]:
sns.barplot(x='feature_importance', y='columns', data=feature_importance_df);

We can see from above analysis that Gradient Boosting Tree model proves to be the best machine lerning model in this case and that gives us the highest accuracy.

However adding some more features like the customer location (city/state) may add some insight if the curn in anyway reflects location of customer.

In the above case the most important features stands out to be the thumbs up count followed by number of friends added and average time to listen.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.