A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now, but need to create a machine learning model that will help predict which customers will churn (stop buying their service) so that they can correctly assign the customers most at risk to churn an account manager. 

Based on historical data, I use logistic regression that will help classify whether or not a customer churned. Then the company can test this against incoming data for future customers to predict which customers will churn and assign them an account manager.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession. builder.appName('churn').getOrCreate()

In [3]:
df = spark.read.csv('customer_churn.csv', header = 'True', inferSchema = 'True')

In [4]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [6]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [4]:
df. select('Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn').describe().show()

+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|summary|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|              Churn|
+-------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+
|  count|              900|              900|               900|              900|               900|                900|
|   mean|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|0.16666666666666666|
| stddev|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.7648355920350969| 0.3728852122772358|
|    min|             22.0|            100.0|                 0|              1.0|               3.0|                  0|
|    max|             65.0|         18026.01|                 1|             9.15|              14.0|                  1|
+-------+---------------

In [7]:
for row in df.head(5):
    print(row)
    print('\n')

Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date='2013-08-30 07:00:40', Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)


Row(Names='Kevin Mueller', Age=41.0, Total_Purchase=11916.22, Account_Manager=0, Years=6.5, Num_Sites=11.0, Onboard_date='2013-08-13 00:38:46', Location='6157 Frank Gardens Suite 019 Carloshaven, RI 17756', Company='Wilson PLC', Churn=1)


Row(Names='Eric Lozano', Age=38.0, Total_Purchase=12884.75, Account_Manager=0, Years=6.67, Num_Sites=12.0, Onboard_date='2016-06-29 06:20:07', Location='1331 Keith Court Alyssahaven, DE 90114', Company='Miller, Johnson and Wallace', Churn=1)


Row(Names='Phillip White', Age=42.0, Total_Purchase=8010.76, Account_Manager=0, Years=6.71, Num_Sites=10.0, Onboard_date='2014-04-22 12:43:12', Location='13120 Daniel Mount Angelabury, WY 30645-4695', Company='Smith Inc', Churn=1)


Row(Names='Cynthia Norton', Age=37.0, Total_Pu

In [None]:
### Thoughts on the columns:
 *'Names'           : arbitrary, irrelevant
 *'Age'             : ideally needs to be categorized to avoid overfit
 *'Total_Purchase'  : looks good
 *'Account_Manager' : will be excluded from features and evaluated after
 *'Years'           : looks good
 *'Num_Sites'       : looks good
 *'Onboard_date'    : irrelevant as this is B2B client. 
                     Onboard time (day or night or specific month and day) might be  important in B2C scenario.
 *'Location'        : needs to explore  
 *'Company'         : needs to explore  
 *'Churn'           : looks good

In [41]:
df_clean=df.na.drop()

In [10]:
df_clean.count()    #nothing is dropped

900

### Explore column Location:
Location could be important factor needs to be count in as one of variables. So I will extract the state code from the address:
1) Split each word in Location column
2) Get the 2nd word from behind of the list. Somehow [-2] did not work for me so I use getItem() and size()-2
   It is a known problem:
   https://stackoverflow.com/questions/40467936/how-do-i-get-the-last-item-from-a-list-using-pyspark

In [5]:
from pyspark.sql.functions import split

In [6]:
df_state=df.withColumn('state', split(df['Location'],' '))
df_state.select('Names','state').show(5, False)

+----------------+-----------------------------------------------------------+
|Names           |state                                                      |
+----------------+-----------------------------------------------------------+
|Cameron Williams|[10265, Elizabeth, Mission, Barkerburgh,, AK, 89518]       |
|Kevin Mueller   |[6157, Frank, Gardens, Suite, 019, Carloshaven,, RI, 17756]|
|Eric Lozano     |[1331, Keith, Court, Alyssahaven,, DE, 90114]              |
|Phillip White   |[13120, Daniel, Mount, Angelabury,, WY, 30645-4695]        |
|Cynthia Norton  |[765, Tricia, Row, Karenshire,, MH, 71730]                 |
+----------------+-----------------------------------------------------------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import size

In [8]:
df_sc=df_state.withColumn('sc',df_state['state'].getItem(size(df_state['state'])-2))
df_sc.select('Names','sc','Location').show(10, False)
df_sc.groupBy('sc').count().show(10, False)

+----------------+---+-------------------------------------------------------+
|Names           |sc |Location                                               |
+----------------+---+-------------------------------------------------------+
|Cameron Williams|AK |10265 Elizabeth Mission Barkerburgh, AK 89518          |
|Kevin Mueller   |RI |6157 Frank Gardens Suite 019 Carloshaven, RI 17756     |
|Eric Lozano     |DE |1331 Keith Court Alyssahaven, DE 90114                 |
|Phillip White   |WY |13120 Daniel Mount Angelabury, WY 30645-4695           |
|Cynthia Norton  |MH |765 Tricia Row Karenshire, MH 71730                    |
|Jessica Williams|PR |6187 Olson Mountains East Vincentborough, PR 74359     |
|Eric Butler     |IA |4846 Savannah Road West Justin, IA 87713-3460          |
|Zachary Walsh   |FM |25271 Roy Expressway Suite 147 Brownport, FM 59852-6150|
|Ashlee Carr     |MA |3725 Caroline Stravenue South Christineview, MA 82059  |
|Jennifer Lynch  |WI |363 Sandra Lodge Suite 144 Sou

The amount of state code is 62:

In [84]:
df_sc.groupBy('sc').count().count()

62

### Explore column Company:
-as there are same companies

In [106]:
df_sc.groupBy('Company').count().count()

873

In [9]:
dc=df_sc.groupBy('Company').count()
dc.show()

+--------------------+-----+
|             Company|count|
+--------------------+-----+
|Miller, Johnson a...|    1|
|Hunter, Reyes and...|    1|
|          Obrien PLC|    1|
|            Soto PLC|    2|
|            Todd LLC|    1|
|Smith, Marshall a...|    1|
|           Smith PLC|    1|
|          Hall Group|    1|
|Freeman, Lam and ...|    1|
|       Smith-Carroll|    1|
|Hall, Hernandez a...|    1|
|          Cannon Inc|    1|
|        White-Dennis|    1|
|Wilson, Collins a...|    1|
|Jennings, Gates a...|    1|
|     Campbell-Willis|    1|
|    Martinez-Roberts|    1|
|        Robinson PLC|    1|
|          Barton Inc|    1|
|Hernandez, Middle...|    1|
+--------------------+-----+
only showing top 20 rows



In [10]:
dc.filter(dc['count']>1).show()
dc.filter(dc['count']>1).count()

+--------------+-----+
|       Company|count|
+--------------+-----+
|      Soto PLC|    2|
|     Ortiz Ltd|    2|
|     Evans LLC|    2|
|   Davis Group|    2|
|     Jones LLC|    2|
|Davis and Sons|    2|
|     Smith Inc|    2|
|     Gates Ltd|    2|
|     Smith Ltd|    2|
|  Williams LLC|    2|
|      Rice PLC|    2|
|      Webb PLC|    2|
|    Wilson PLC|    3|
|    Nelson LLC|    2|
|   Smith Group|    2|
|Smith and Sons|    2|
|      King LLC|    2|
|Anderson Group|    4|
|    Walker Ltd|    2|
|Perry and Sons|    2|
+--------------+-----+
only showing top 20 rows



23

In [11]:
df_sc.filter( (df_sc['Company'] == 'Anderson Group')|
              (df_sc['Company'] == 'Soto PLC')|
              (df_sc['Company'] == 'Wilson PLC')).select('Company','Names','Churn', 'sc', 'Age').show()

+--------------+---------------+-----+---+----+
|       Company|          Names|Churn| sc| Age|
+--------------+---------------+-----+---+----+
|    Wilson PLC|  Kevin Mueller|    1| RI|41.0|
|      Soto PLC|     Misty Tate|    0| AS|40.0|
|Anderson Group|Darin Alexander|    0| MS|43.0|
|Anderson Group|   Richard Bell|    0| SC|47.0|
|      Soto PLC|  Jodi Thompson|    0| NM|35.0|
|    Wilson PLC|  David Sanders|    0| AP|41.0|
|    Wilson PLC|    Linda Moore|    0| FM|40.0|
|Anderson Group|     Lisa Munoz|    0| NV|39.0|
|Anderson Group|  Monica Graham|    0| WI|40.0|
+--------------+---------------+-----+---+----+



Finding:There are only 23 out of 873 companies which have more than 1 account. Even though coming from the same company but they are from differnt office location and different contact person. So it seems company name is not significant as variable.



### Explore Age column:
There is quite range of age.

In [12]:
from pyspark.sql.functions import concat, col, lit, collect_list

In [13]:
interval = 5
df4= df_sc.withColumn("Age_range",df_sc["Age"] - (df_sc["Age"] % interval))
df5= df4.withColumn("Age_range", concat(df4["Age_range"], lit(" - "), df4["Age_range"]+ interval))
df5.select('Age_range','Names','Age').show()
#showing names per group
#df5.groupBy("range").agg(collect_list("Names")).show(5,False) 

+-----------+-------------------+----+
|  Age_range|              Names| Age|
+-----------+-------------------+----+
|40.0 - 45.0|   Cameron Williams|42.0|
|40.0 - 45.0|      Kevin Mueller|41.0|
|35.0 - 40.0|        Eric Lozano|38.0|
|40.0 - 45.0|      Phillip White|42.0|
|35.0 - 40.0|     Cynthia Norton|37.0|
|45.0 - 50.0|   Jessica Williams|48.0|
|40.0 - 45.0|        Eric Butler|44.0|
|30.0 - 35.0|      Zachary Walsh|32.0|
|40.0 - 45.0|        Ashlee Carr|43.0|
|40.0 - 45.0|     Jennifer Lynch|40.0|
|30.0 - 35.0|       Paula Harris|30.0|
|45.0 - 50.0|     Bruce Phillips|45.0|
|45.0 - 50.0|       Craig Garner|45.0|
|40.0 - 45.0|       Nicole Olson|40.0|
|40.0 - 45.0|     Harold Griffin|41.0|
|35.0 - 40.0|       James Wright|38.0|
|45.0 - 50.0|      Doris Wilkins|45.0|
|40.0 - 45.0|Katherine Carpenter|43.0|
|50.0 - 55.0|     Lindsay Martin|53.0|
|45.0 - 50.0|        Kathy Curry|46.0|
+-----------+-------------------+----+
only showing top 20 rows



In [14]:
df5.groupBy("Age_range").count().orderBy('Age_range').show()

+-----------+-----+
|  Age_range|count|
+-----------+-----+
|20.0 - 25.0|    1|
|25.0 - 30.0|   18|
|30.0 - 35.0|   81|
|35.0 - 40.0|  218|
|40.0 - 45.0|  288|
|45.0 - 50.0|  199|
|50.0 - 55.0|   72|
|55.0 - 60.0|   21|
|60.0 - 65.0|    1|
|65.0 - 70.0|    1|
+-----------+-----+



Based on exploration above, I will exclude Names, Company, and Onboard_date. 
Location & state have been replaced by sc col. 
Age has been replace by Age_range

In [206]:
df5.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn',
 'state',
 'sc',
 'Age_range']

In [16]:
final_data=df5.select('Names',
 #'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 #'Onboard_date',
 #'Location',
 'Company',
 'Churn',
 #'state',
 'sc',
 'Age_range')

In [17]:
final_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- sc: string (nullable = true)
 |-- Age_range: string (nullable = true)



#### Working with Categorical Column:
SC & Age_range need string indexer and need to be encoded.

In [18]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

In [19]:
sc_indexer = StringIndexer (inputCol= 'sc', outputCol= 'scIndex')
sc_encoder = OneHotEncoder (inputCol= 'scIndex', outputCol= 'scVec')

In [20]:
age_indexer = StringIndexer (inputCol= 'Age_range', outputCol= 'ageIndex')
age_encoder = OneHotEncoder (inputCol= 'ageIndex', outputCol= 'ageVec')

In [21]:
assembler = VectorAssembler(inputCols= ['ageVec',
                                        'Total_Purchase',
                                        #'Account_Manager',
                                        'Years',
                                        'Num_Sites',
                                        #'Churn', 
                                        'scVec'], 
                             outputCol='features' ) 

In [22]:
from pyspark.ml.classification import LogisticRegression

### Set Up Pipelines:

In [23]:
from pyspark.ml import Pipeline

In [24]:
logrec_churn = LogisticRegression(featuresCol='features', labelCol='Churn')

In [25]:
pipeline = Pipeline(stages= [sc_indexer, age_indexer,
                            sc_encoder, age_encoder,
                            assembler , logrec_churn])

Split the Data, fit the train Data, and transform it with test data:

In [26]:
trainD,testD = final_data.randomSplit([0.7,0.3])

In [27]:
fit_model = pipeline.fit(trainD)

In [28]:
result = fit_model.transform(testD)

In [397]:
result.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- sc: string (nullable = true)
 |-- Age_range: string (nullable = true)
 |-- scIndex: double (nullable = false)
 |-- ageIndex: double (nullable = false)
 |-- scVec: vector (nullable = true)
 |-- ageVec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Evaluate the result:

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [30]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', 
                                     labelCol ='Churn')

In [403]:
result.select('Churn','prediction','Account_Manager').show(100)

+-----+----------+---------------+
|Churn|prediction|Account_Manager|
+-----+----------+---------------+
|    0|       0.0|              0|
|    1|       1.0|              0|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    1|       0.0|              0|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              0|
|    0|       0.0|              0|
|    0|       0.0|              1|
|    0|       1.0|              0|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              0|
|    0|       0.0|              0|
|    0|       0.0|              0|
|    1|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              1|
|    0|       0.0|              0|
|    0|       0.0|              0|
|    0|       0.0|              0|
|    0|       0.0|              1|
|    0|       0.0|  

Area under ROC Curve

I often get error with evaluate: 
     An error occurred while calling o5717.evaluate.
    https://stackoverflow.com/questions/54546513/py4jjavaerror-an-error-occurred-while-calling

In [34]:
AUC = my_eval.evaluate(result) 

Py4JJavaError: An error occurred while calling o496.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 169.0 failed 1 times, most recent failure: Lost task 0.0 in stage 169.0 (TID 809, 192.168.0.25, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3132/168157933: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Unseen label: 20.0 - 25.0. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:182)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:163)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:165)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:165)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:245)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:103)
	at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:114)
	at org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate(BinaryClassificationEvaluator.scala:122)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3132/168157933: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label: 20.0 - 25.0. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:405)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:390)
	... 19 more


In [337]:
AUC 

0.7025786713286714

 including account_manager col in the future and final_data is 0.65
 exclude Manager in the future and final_data :0.68
 exclude Manager in the feature :0.70

# Predict on the new data

In [56]:
new_data= spark.read.csv('new_customers.csv', header=True, inferSchema = True)

In [57]:
new_data.show(1)

+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
|        Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location| Company|
+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
|Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|King Ltd|
+-------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------+
only showing top 1 row



In [58]:
new_data.describe().show()

+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+----------------+
|summary|        Names|               Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|       Onboard_date|            Location|         Company|
+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+----------------+
|  count|            6|                 6|                6|                 6|                6|                 6|                  6|                   6|               6|
|   mean|         null|35.166666666666664|7607.156666666667|0.8333333333333334|6.808333333333334|12.333333333333334|               null|                null|            null|
| stddev|         null| 15.71517313511584|4346.008232825459| 0.408248290463863|3.708737880555414|3.3862466931200785|         

In [59]:
new_data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [60]:
nd_state=new_data.withColumn('state', split(new_data['Location'],' '))

In [61]:
nd_sc=nd_state.withColumn('sc',nd_state['state'].getItem(size(nd_state['state'])-2))

In [62]:
interval = 5
nd1= nd_sc.withColumn("Age_range",nd_sc["Age"] - (nd_sc["Age"] % interval))
nd2= nd1.withColumn("Age_range", concat(nd1["Age_range"], lit(" - "), nd1["Age_range"]+ interval))
nd2.select('Age_range','Names','Age').show()

+-----------+--------------+----+
|  Age_range|         Names| Age|
+-----------+--------------+----+
|35.0 - 40.0| Andrew Mccall|37.0|
|20.0 - 25.0|Michele Wright|23.0|
|65.0 - 70.0|  Jeremy Chang|65.0|
|30.0 - 35.0|Megan Ferguson|32.0|
|30.0 - 35.0|  Taylor Young|32.0|
|20.0 - 25.0| Jessica Drake|22.0|
+-----------+--------------+----+



In [63]:
nd2.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'state',
 'sc',
 'Age_range']

In [93]:
newDataSet = nd2.select('Names',
 #'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 #'Onboard_date',
 #'Location',
 'Company',                  
 #'state',
 'sc',
 'Age_range')

In [78]:
new_result = fit_model.transform(newDataSet)

In [79]:
new_result.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- sc: string (nullable = true)
 |-- Age_range: string (nullable = true)
 |-- scIndex: double (nullable = false)
 |-- ageIndex: double (nullable = false)
 |-- scVec: vector (nullable = true)
 |-- ageVec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



I often get error with showing result: 
    An error occurred while calling o5800.showString.

In [116]:
for row in new_result.head(5):
    print (row)
    print ('\n')


Row(Names='Andrew Mccall', Total_Purchase=9935.53, Account_Manager=1, Years=7.71, Num_Sites=8.0, Company='King Ltd', sc='WI', Age_range='35.0 - 40.0', scIndex=38.0, ageIndex=1.0, scVec=SparseVector(61, {38: 1.0}), ageVec=SparseVector(9, {1: 1.0}), features=SparseVector(73, {1: 1.0, 9: 9935.53, 10: 7.71, 11: 8.0, 50: 1.0}), rawPrediction=DenseVector([3.0222, -3.0222]), probability=DenseVector([0.9536, 0.0464]), prediction=0.0)


Row(Names='Michele Wright', Total_Purchase=7526.94, Account_Manager=1, Years=9.28, Num_Sites=15.0, Company='Cannon-Benson', sc='ME', Age_range='20.0 - 25.0', scIndex=18.0, ageIndex=7.0, scVec=SparseVector(61, {18: 1.0}), ageVec=SparseVector(9, {7: 1.0}), features=SparseVector(73, {7: 1.0, 9: 7526.94, 10: 9.28, 11: 15.0, 30: 1.0}), rawPrediction=DenseVector([15.2072, -15.2072]), probability=DenseVector([1.0, 0.0]), prediction=0.0)


Row(Names='Jeremy Chang', Total_Purchase=100.0, Account_Manager=1, Years=1.0, Num_Sites=15.0, Company='Barron-Robertson', sc='WY', A

In [119]:
new_data.select('Names','Company').show()

+--------------+----------------+
|         Names|         Company|
+--------------+----------------+
| Andrew Mccall|        King Ltd|
|Michele Wright|   Cannon-Benson|
|  Jeremy Chang|Barron-Robertson|
|Megan Ferguson|   Sexton-Golden|
|  Taylor Young|        Wood LLC|
| Jessica Drake|   Parks-Robbins|
+--------------+----------------+



Received Py4J error when trying to show dataframe. However we could see from head() that the predict
         King Ltd|0
|   Cannon-Benson|0
|Barron-Robertson|0
|   Sexton-Golden|1
|        Wood LLC|0
|   Parks-Robbins|?

Sexton Golden will churn.We don't have info of Parks-Robbins since head only displayed first 5. It gives error when  I put head(6)


In [33]:
#new_result.select('Names','prediction').show() #Received Py4J error when trying to show dataframe.  A known problem