In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("CustomerChurn").getOrCreate()

In [3]:
data = spark.read.csv('/FileStore/tables/customer_churn.csv',inferSchema=True,header=True)

In [4]:
data.show()

In [5]:
from pyspark.ml.feature import StringIndexer,VectorAssembler

## Converting company string to vector

In [6]:
company_indexer = StringIndexer(inputCol='Company',outputCol='company_vector')
company_model = company_indexer.fit(data)
data = company_model.transform(data)

In [7]:
data.show()

In [8]:
data.dtypes

In [9]:
from pyspark.ml.linalg import Vector

In [10]:
data.columns

In [11]:
assembler = VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Years',
 'Num_Sites',
 'company_vector'],outputCol='features')

In [12]:
final_data = assembler.transform(data)

In [13]:
final_data.show()

## Performing Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression

In [15]:
lg = LogisticRegression(featuresCol='features',labelCol='Churn')

## Splitting Test data

In [16]:
train_data,test_data = final_data.select('features','Churn').randomSplit([0.7,0.3])

In [17]:
train_data.show()

In [18]:
model = lg.fit(train_data)

In [19]:
result = model.transform(test_data)

In [20]:
result.columns

In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

In [22]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Churn')

In [23]:
aoc = my_eval.evaluate(result)

In [24]:
aoc

In [25]:
trainingSummary = model.summary

## Printing out the metrics

In [26]:
trainingSummary = model.summary
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [27]:
new_customers = spark.read.csv('/FileStore/tables/new_customers.csv',inferSchema=True,header=True)

In [28]:
new_customers.show()

In [29]:
new_customer = company_indexer.fit(new_customers).transform(new_customers)

In [30]:
new_customer_final = assembler.transform(new_customer)

In [31]:
new_customer_final.select('features').show()

## Fitting whole test data to model to predict new customer data

In [32]:
model = lg.fit(final_data.select('features','Churn'))

## Metrics after training all test data

In [33]:
trainingSummary = model.summary
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [34]:
final_result = model.transform(new_customer_final.select('features'))

## final Result after new customer prediction

In [35]:
final_result.show()