In [None]:
import wget
# Customer Information
wget.download('https://raw.githubusercontent.com/nwngeek212/DSX-DemoCenter/master/predictCustomerChurn/data_assets/customer.csv')
customer = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load('customer.csv')
  
#Churn information  
wget.download('https://raw.githubusercontent.com/nwngeek212/DSX-DemoCenter/master/predictCustomerChurn/data_assets/churn.csv')
customer_churn = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load('churn.csv')

In [None]:
merged=customer.join(customer_churn,customer['ID']==customer_churn['ID']).select(customer['*'],customer_churn['CHURN'])
merged.toPandas().head()

In [None]:
merged = merged.withColumnRenamed("Est Income", "EstIncome").withColumnRenamed("Car Owner","CarOwner")
merged.toPandas().head()

In [None]:
import brunel
Merged = merged.toPandas()
%brunel data('Merged') bar x(CHURN) y(EstIncome) mean(EstIncome) color(LocalBilltype) stack tooltip(EstIncome) | x(LongDistance) y(Usage)  tooltip(LongDistance, Usage) :: width=1100, height=400

In [None]:
from pixiedust.display import *
display(merged)

In [None]:
# Spark SQL also allow you to use standard SQL
merged.createOrReplaceTempView("merged")
sql = """
SELECT c.*
FROM merged c
WHERE c.EstIncome>90000

"""
spark.sql(sql).toPandas().head()

In [None]:
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Prepare string variables so that they can be used by the decision tree algorithm
stringIndexer1 = StringIndexer(inputCol='Gender', outputCol='GenderEncoded')
stringIndexer2 = StringIndexer(inputCol='Status',outputCol='StatusEncoded')
stringIndexer3 = StringIndexer(inputCol='CarOwner',outputCol='CarOwnerEncoded')
stringIndexer4 = StringIndexer(inputCol='Paymethod',outputCol='PaymethodEncoded')
stringIndexer5 = StringIndexer(inputCol='LocalBilltype',outputCol='LocalBilltypeEncoded')
stringIndexer6 = StringIndexer(inputCol='LongDistanceBilltype',outputCol='LongDistanceBilltypeEncoded')
stringIndexer7 = StringIndexer(inputCol='CHURN', outputCol='label')

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncoded", "StatusEncoded", "CarOwnerEncoded", "PaymethodEncoded", "LocalBilltypeEncoded", \
                                       "LongDistanceBilltypeEncoded", "Children", "EstIncome", "Age", "LongDistance", "International", "Local",\
                                      "Dropped","Usage","RatePlan"], outputCol="features")


# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#pipeline = Pipeline(stages=[stringIndexer1, stringIndexer2, stringIndexer3, assembler, rf])
pipeline = Pipeline(stages=[stringIndexer1,stringIndexer2,stringIndexer3,stringIndexer4,stringIndexer5,stringIndexer6,stringIndexer7, assembler, rf])

In [None]:
# Split data into train and test datasets
(trainingData, testingData) = merged.randomSplit([0.7, 0.3],seed=9)
trainingData.cache()
testingData.cache()

In [None]:
# Build model
model = pipeline.fit(trainingData)

In [None]:
result=model.transform(testingData)
result_display=result.select(result["ID"],result["CHURN"],result["Label"],result["prediction"],result["probability"])
result_display.toPandas().head(6)

In [None]:
print 'Model Accuracy = {:.2f}.'.format(result.filter(result.label == result.prediction).count() / float(result.count()))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(result))

In [None]:
# set different levels for the maxDepth
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth,[4,6,8]).build())

In [None]:
# perform 3 fold cross validation
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

In [None]:
# train the model
cvModel = cv.fit(trainingData)

# pick the best model
best_rfModel = cvModel.bestModel

# score the test data set
cvresult=best_rfModel.transform(testingData)

In [None]:
print 'Model Accuracy of the best fitted model = {:.2f}.'.format(cvresult.filter(cvresult.label == cvresult.prediction).count()/ float(cvresult.count()))
print 'Model Accuracy of the default model = {:.2f}.'.format(result.filter(result.label == result.prediction).count() / float(result.count()))
print '   '
print('Area under the ROC curve of best fitted model = {:.2f}.'.format(evaluator.evaluate(cvresult)))
print 'Area under the ROC curve of the default model = {:.2f}.'.format(evaluator.evaluate(result))

In [None]:
# Overwrite any existing saved model in the specified path
best_rfModel.write().overwrite().save("PredictChurn.churnModel")