In [73]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import (year,month,dayofmonth,dayofweek,dayofyear,weekofyear,countDistinct)


from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [6]:
spark = SparkSession.builder.master("local").appName("CustomerChurn").getOrCreate()

In [8]:
data = spark.read.csv("customer_churn.csv",header=True,inferSchema=True)

In [12]:
data.show(n=5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|  Cynthia Norton|37.0|    

In [20]:
class datetime_format(Transformer):
    def _transform(self,df: DataFrame) -> DataFrame:
        df = df.withColumn("Year",year("Onboard_date"))
        df = df.withColumn("Month",month("Onboard_date"))
        return df

In [24]:
datetime_formatter = datetime_format()

In [53]:
final_data = datetime_formatter.transform(data).select([
    'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Churn',
 'Year',
 'Month'
])

In [59]:
year_encoder = OneHotEncoder(inputCol="Year",outputCol="YearVec")
month_encoder = OneHotEncoder(inputCol="Month",outputCol="MonthVec")

In [61]:
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'YearVec',
 'MonthVec'],outputCol='features')

In [62]:
log_reg_churn = LogisticRegression(featuresCol="features",labelCol="Churn")

# Pipelining everything

In [63]:
pipeline = Pipeline(stages=[datetime_formatter,year_encoder,month_encoder,assembler,log_reg_churn])

In [66]:
#reading the test data
test_data = spark.read.csv("new_customers.csv",header=True,inferSchema=True)

In [69]:
log_reg_model = pipeline.fit(data)

In [72]:
results = log_reg_model.transform(test_data)

In [78]:
results.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Year',
 'Month',
 'YearVec',
 'MonthVec',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [80]:
results.select(['Company','prediction']).show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

