In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 51kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 42.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=4d5b86b4c6ec0b4cf2476236f65e9f9c44f88d2d8201cf9cf6d39fc12a15a936
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
data = spark.read.format("csv").load("final.csv",header=True,inferschema=True)

In [3]:
data.printSchema()

root
 |-- msno: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- city: double (nullable = true)
 |-- bd: double (nullable = true)
 |-- gender: integer (nullable = true)
 |-- registered_via: double (nullable = true)
 |-- registration_init_time: double (nullable = true)
 |-- payment_method_id: integer (nullable = true)
 |-- payment_plan_days: integer (nullable = true)
 |-- plan_list_price: integer (nullable = true)
 |-- actual_amount_paid: integer (nullable = true)
 |-- is_auto_renew: integer (nullable = true)
 |-- transaction_date: integer (nullable = true)
 |-- membership_expire_date: integer (nullable = true)
 |-- is_cancel: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- num_25: integer (nullable = true)
 |-- num_50: integer (nullable = true)
 |-- num_75: integer (nullable = true)
 |-- num_985: integer (nullable = true)
 |-- num_100: integer (nullable = true)
 |-- num_unq: integer (nullable = true)
 |-- total_secs: double (nullable = true)



In [7]:
columns=['total_secs','bd','num_unq','num_100','num_25']

In [4]:
data2=data.select(['total_secs','bd','num_unq','num_100','num_25','is_churn'])

In [5]:
data2.printSchema()

root
 |-- total_secs: double (nullable = true)
 |-- bd: double (nullable = true)
 |-- num_unq: integer (nullable = true)
 |-- num_100: integer (nullable = true)
 |-- num_25: integer (nullable = true)
 |-- is_churn: integer (nullable = true)



In [8]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=columns,outputCol="features")

output = assembler.transform(data2)

In [9]:
output.show()

+----------+----+-------+-------+------+--------+--------------------+
|total_secs|  bd|num_unq|num_100|num_25|is_churn|            features|
+----------+----+-------+-------+------+--------+--------------------+
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   8490.88|26.0|     41|     28|     6|       1|[8490.88,26.0,41....|
|   84

In [10]:
labelIndexer = StringIndexer(inputCol="is_churn", outputCol="indexedLabel").fit(output)

In [12]:
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(output)

In [13]:
(trainingData, testData) = output.randomSplit([0.9, 0.1])

In [14]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [15]:
model = pipeline.fit(trainingData)




In [16]:
predictions = model.transform(testData)

In [19]:
predictions.show(5)

+----------+----+-------+-------+------+--------+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|total_secs|  bd|num_unq|num_100|num_25|is_churn|            features|indexedLabel|     indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
+----------+----+-------+-------+------+--------+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
|     0.418|25.0|      1|      0|     1|       0|[0.418,25.0,1.0,0...|         0.0|[0.418,25.0,1.0,0...|[9.81682340926831...|[0.98168234092683...|       0.0|             0|
|     0.418|25.0|      1|      0|     1|       0|[0.418,25.0,1.0,0...|         0.0|[0.418,25.0,1.0,0...|[9.81682340926831...|[0.98168234092683...|       0.0|             0|
|     0.418|25.0|      1|      0|     1|       0|[0.418,25.0,1.0,0...|         0.0|[0.418,25.0,1.0,0...|[9.81682340926831...|[0.9816823

In [20]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.016443


In [21]:
accuracy

0.9835569763212803

In [25]:
import pickle

In [24]:
model.save("mymodel.pkl")

In [None]:
model.save("mymodel.pkl")