In [1]:
from warnings import filterwarnings
filterwarnings('ignore')
import findspark
findspark.init("C:\spark")
import pyspark
from pyspark import SparkContext
import seaborn as sns


In [30]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pandas as pd

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .appName("churn_modellemesi") \
    .config("spark.executer.memory","4" ) \
    .getOrCreate()

In [4]:
sc = spark.sparkContext
sc

In [5]:
spark_df = spark.read.csv("churn.csv", 
                          header = True, 
                          inferSchema = True,
                          sep = ",")
spark_df.cache()

DataFrame[_c0: int, Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Churn: int]

In [6]:
spark_df = spark_df.withColumnRenamed("_c0", "index")
spark_df = spark_df.toDF(*[c.lower() for c in spark_df.columns])

In [7]:
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol = "churn", outputCol = "label")

In [8]:
indexed = stringIndexer.fit(spark_df).transform(spark_df)

In [10]:
spark_df = indexed.withColumn("label", indexed["label"].cast("integer"))

In [11]:
spark_df.dtypes

[('index', 'int'),
 ('names', 'string'),
 ('age', 'double'),
 ('total_purchase', 'double'),
 ('account_manager', 'int'),
 ('years', 'double'),
 ('num_sites', 'double'),
 ('churn', 'int'),
 ('label', 'int')]

In [12]:
bag = ["age","total_purchase", "account_manager","years","num_sites"]

In [13]:
vectorAssembler = VectorAssembler(inputCols = bag, outputCol = "features")

In [14]:
va_df = vectorAssembler.transform(spark_df)

In [15]:
final_df = va_df.select(["features","label"])

In [16]:
final_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



In [17]:
# test eğitim

In [18]:
splits = final_df.randomSplit([0.7,0.3])
train_df = splits[0]
test_df = splits[1]

In [20]:
gbm = GBTClassifier(maxIter = 10, featuresCol = "features", labelCol = "label")

In [21]:
gbm_model = gbm.fit(train_df)

In [22]:
y_pred = gbm_model.transform(test_df)

In [24]:
evaluator = BinaryClassificationEvaluator()

paramGrid = (ParamGridBuilder()
             .addGrid(gbm.maxDepth, [2, 4, 6])
             .addGrid(gbm.maxBins, [20, 30])
             .addGrid(gbm.maxIter, [10, 20])
             .build())

cv = CrossValidator(estimator= gbm, estimatorParamMaps = paramGrid, evaluator=evaluator, numFolds= 10)

In [26]:
cv_model = cv.fit(train_df)

In [27]:
y_pred = cv_model.transform(test_df)
ac = y_pred.select("label","prediction")

In [28]:
ac.filter(ac.label == ac.prediction).count() / ac.count()

0.8787878787878788

In [31]:
names = pd.Series(["Ali Ahmetoğlu", "Berkcan Tanerbey", "Harika Gündüz","Polat Alemdar", "Ata Bakmayan Ali"])
age = pd.Series([38, 43, 34, 50, 40])
total_purchase = pd.Series([30000, 10000, 6000, 30000, 100000])
account_manager = pd.Series([1,0,0,1,1])
years = pd.Series([20, 10, 3, 8, 30])
num_sites = pd.Series([30,8,8,6,50])


yeni_musteriler = pd.DataFrame({
    'names':names,
    'age': age,
    'total_purchase': total_purchase,
    'account_manager': account_manager ,
    'years': years,
    'num_sites': num_sites})

In [32]:
yeni_sdf = spark.createDataFrame(yeni_musteriler)

In [33]:
yeni_musteriler = vectorAssembler.transform(yeni_sdf)

In [34]:
sonuclar = cv_model.transform(yeni_musteriler)

In [35]:
sonuclar.select("names","prediction").show()

+----------------+----------+
|           names|prediction|
+----------------+----------+
|   Ali Ahmetoğlu|       1.0|
|Berkcan Tanerbey|       0.0|
|   Harika Gündüz|       0.0|
|   Polat Alemdar|       0.0|
|Ata Bakmayan Ali|       1.0|
+----------------+----------+

