## İş Problemi

***Şirketi terk edecek müşterileri tahmin edebilecek bir makine öğrenmesi modeli geliştirilmesi beklenmektedir.***

## Veri Seti Hikayesi

***10000 gözlemden ve 12 değişkenden oluşmaktadır.
Bağımsız değişkenler müşterilere ilişkin bilgiler barındırmaktadır. Bağımlı değişken müşteri terk durumunu ifade etmektedir.***

## Değişkenler

**Surname – Soy isim**

**CreditScore – Kredi skoru**

**Geography – Ülke (Germany/France/Spain)**

**Gender – Cinsiyet**

**Age – Yaş**

**Tenure – Kaç yıllık müşteri olduğu bilgisi** 

**NumOfProducts – Kullanılan banka ürünü**

**HasCrCard – Kredi kartı durumu (0=No,1=Yes)** 

**IsActiveMember – Aktif üyelik durumu (0=No,1=Yes)**

**EstimatedSalary – Tahmini maaş**

**Exited: – Terk mi değil mi? (0=No,1=Yes)**

## Görev

**PySpark ile aşağıdaki adımları kapsayan bir makine öğrenmesi modeli kurunuz.**

**▪ Exploratory Data Analysis**

**▪ Data Preprocessing**

**▪ Feature Engineering**

**▪ Gradient Boosted Tree Classifier Model**

**▪ Model Tuning**

In [1]:
from warnings import filterwarnings
filterwarnings("ignore" , category=FutureWarning)
filterwarnings("ignore" , category=DeprecationWarning) 

In [2]:
import pandas as pd
import numpy as np

import findspark
import pyspark
from pyspark.sql import SparkSession

In [3]:
pd.set_option("display.max_columns", None)
pd.set_option("display.float_format" , lambda x : "%.4f" % x)
pd.set_option("display.width",200)

In [4]:
findspark.init("/Users/gokhanersoz/spark")

In [5]:
spark = SparkSession.builder \
        .master("local") \
        .appName("pyspark_churn") \
        .getOrCreate()

In [6]:
sc = spark.sparkContext
sc

In [7]:
############################
# Exploratory Data Analysis
############################

In [8]:
# inferSchema False olursa bütün değerler string olarak geliyor !!!!

spark_df = spark.read.csv("/Users/gokhanersoz/Desktop/Veri_Bilimi/Dersler/Hafta_11/churn2.csv",
                         header=True,inferSchema=True)

In [9]:
spark_df.show(1,truncate=True,vertical=True)

-RECORD 0--------------------
 RowNumber       | 1         
 CustomerId      | 15634602  
 Surname         | Hargrave  
 CreditScore     | 619       
 Geography       | France    
 Gender          | Female    
 Age             | 42        
 Tenure          | 2         
 Balance         | 0.0       
 NumOfProducts   | 1         
 HasCrCard       | 1         
 IsActiveMember  | 1         
 EstimatedSalary | 101348.88 
 Exited          | 1         
only showing top 1 row



In [10]:
spark_df.dtypes

[('RowNumber', 'int'),
 ('CustomerId', 'int'),
 ('Surname', 'string'),
 ('CreditScore', 'int'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Tenure', 'int'),
 ('Balance', 'double'),
 ('NumOfProducts', 'int'),
 ('HasCrCard', 'int'),
 ('IsActiveMember', 'int'),
 ('EstimatedSalary', 'double'),
 ('Exited', 'int')]

In [11]:
spark_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [12]:
print(f"Spark_Df Shape : {(spark_df.count(),len(spark_df.columns))}")

Spark_Df Shape : (10000, 14)


In [13]:
# Değişken İsimlerinin Değiştirilmesi

spark_df = spark_df.toDF(*[col.upper() for col in spark_df.columns])
spark_df.show(1,truncate=False, vertical=True)

-RECORD 0--------------------
 ROWNUMBER       | 1         
 CUSTOMERID      | 15634602  
 SURNAME         | Hargrave  
 CREDITSCORE     | 619       
 GEOGRAPHY       | France    
 GENDER          | Female    
 AGE             | 42        
 TENURE          | 2         
 BALANCE         | 0.0       
 NUMOFPRODUCTS   | 1         
 HASCRCARD       | 1         
 ISACTIVEMEMBER  | 1         
 ESTIMATEDSALARY | 101348.88 
 EXITED          | 1         
only showing top 1 row



In [14]:
# Özet İstatistikleri 
num_cols = [col[0] for col in spark_df.dtypes if col[1] != "string"]
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == "string"]

spark_df.describe(num_cols).toPandas().set_index("summary").transpose()

summary,count,mean,stddev,min,max
ROWNUMBER,10000,5000.5,2886.8956799071675,1.0,10000.0
CUSTOMERID,10000,15690940.5694,71936.18612274907,15565701.0,15815690.0
CREDITSCORE,10000,650.5288,96.65329873613037,350.0,850.0
AGE,10000,38.9218,10.487806451704587,18.0,92.0
TENURE,10000,5.0128,2.892174377049684,0.0,10.0
BALANCE,10000,76485.88928799961,62397.40520238599,0.0,250898.09
NUMOFPRODUCTS,10000,1.5302,0.5816543579989917,1.0,4.0
HASCRCARD,10000,0.7055,0.4558404644751332,0.0,1.0
ISACTIVEMEMBER,10000,0.5151,0.4997969284589181,0.0,1.0
ESTIMATEDSALARY,10000,100090.2398809998,57510.49281769821,11.58,199992.48


In [15]:
# Exited'e göre sayısal değişkenlerin seçimi ve özeti
#for num in num_cols:

#    if num not in "Exited".upper():

#        spark_df.groupby("Exited").agg({num : "mean"}).show()

In [16]:
# Tüm kategorik değişkenlerin eşsiz değerleri

#for cat in cat_cols:
    
#    spark_df.select(cat).distinct().show()

In [17]:
##################################################
# Data Preprocessing & Feature Engineering
##################################################

############################
# Missing Values
############################

In [18]:
# Farklı methodla baktım herhangi bir boş değer yok 
spark_df.toPandas().isnull().sum()

ROWNUMBER          0
CUSTOMERID         0
SURNAME            0
CREDITSCORE        0
GEOGRAPHY          0
GENDER             0
AGE                0
TENURE             0
BALANCE            0
NUMOFPRODUCTS      0
HASCRCARD          0
ISACTIVEMEMBER     0
ESTIMATEDSALARY    0
EXITED             0
dtype: int64

In [19]:
############################
# Feature Interaction
############################

In [20]:
spark_df.show(2,vertical=True)

-RECORD 0--------------------
 ROWNUMBER       | 1         
 CUSTOMERID      | 15634602  
 SURNAME         | Hargrave  
 CREDITSCORE     | 619       
 GEOGRAPHY       | France    
 GENDER          | Female    
 AGE             | 42        
 TENURE          | 2         
 BALANCE         | 0.0       
 NUMOFPRODUCTS   | 1         
 HASCRCARD       | 1         
 ISACTIVEMEMBER  | 1         
 ESTIMATEDSALARY | 101348.88 
 EXITED          | 1         
-RECORD 1--------------------
 ROWNUMBER       | 2         
 CUSTOMERID      | 15647311  
 SURNAME         | Hill      
 CREDITSCORE     | 608       
 GEOGRAPHY       | Spain     
 GENDER          | Female    
 AGE             | 41        
 TENURE          | 1         
 BALANCE         | 83807.86  
 NUMOFPRODUCTS   | 1         
 HASCRCARD       | 0         
 ISACTIVEMEMBER  | 1         
 ESTIMATEDSALARY | 112542.58 
 EXITED          | 0         
only showing top 2 rows



In [21]:
# Burda CustomerID,RowNumber ile SurName çıkarabiliriz bizim için faydalı bilgi değiller !!!

spark_df = spark_df.drop("SURNAME","ROWNUMBER","CUSTOMERID")

# Güncelledik num_cols !!!
num_cols = [col for col in num_cols if col not in ["SURNAME","ROWNUMBER","CUSTOMERID","EXITED"]]


spark_df.columns

['CREDITSCORE',
 'GEOGRAPHY',
 'GENDER',
 'AGE',
 'TENURE',
 'BALANCE',
 'NUMOFPRODUCTS',
 'HASCRCARD',
 'ISACTIVEMEMBER',
 'ESTIMATEDSALARY',
 'EXITED']

In [22]:
spark_df.show(4)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|CREDITSCORE|GEOGRAPHY|GENDER|AGE|TENURE| BALANCE|NUMOFPRODUCTS|HASCRCARD|ISACTIVEMEMBER|ESTIMATEDSALARY|EXITED|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|     0.0|            1|        1|             1|      101348.88|     1|
|        608|    Spain|Female| 41|     1|83807.86|            1|        0|             1|      112542.58|     0|
|        502|   France|Female| 42|     8|159660.8|            3|        1|             0|      113931.57|     1|
|        699|   France|Female| 39|     1|     0.0|            2|        0|             0|       93826.63|     0|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
only showing top 4 rows



In [23]:
spark_df.toPandas().describe([0.1,0.3,0.5,0.7,0.9]).T

Unnamed: 0,count,mean,std,min,10%,30%,50%,70%,90%,max
CREDITSCORE,10000.0,650.5288,96.6533,350.0,521.0,598.7,652.0,704.0,778.0,850.0
AGE,10000.0,38.9218,10.4878,18.0,27.0,33.0,37.0,42.0,53.0,92.0
TENURE,10000.0,5.0128,2.8922,0.0,1.0,3.0,5.0,7.0,9.0,10.0
BALANCE,10000.0,76485.8893,62397.4052,0.0,0.0,0.0,97198.54,122029.87,149244.792,250898.09
NUMOFPRODUCTS,10000.0,1.5302,0.5817,1.0,1.0,1.0,1.0,2.0,2.0,4.0
HASCRCARD,10000.0,0.7055,0.4558,0.0,0.0,1.0,1.0,1.0,1.0,1.0
ISACTIVEMEMBER,10000.0,0.5151,0.4998,0.0,0.0,0.0,1.0,1.0,1.0,1.0
ESTIMATEDSALARY,10000.0,100090.2399,57510.4928,11.58,20273.58,60736.079,100193.915,139432.236,179674.704,199992.48
EXITED,10000.0,0.2037,0.4028,0.0,0.0,0.0,0.0,0.0,1.0,1.0


In [24]:
spark_df = spark_df.withColumn("AGE_ESTIMATED_SALARY" , spark_df.AGE / spark_df.ESTIMATEDSALARY)
spark_df = spark_df.withColumn("CREDITSCORE_TENURE" , spark_df.CREDITSCORE * spark_df.TENURE)
spark_df = spark_df.withColumn("ESTIMATEDSALARY_CREDITSCORE" , spark_df.ESTIMATEDSALARY / spark_df.CREDITSCORE)

In [25]:
spark_df.show(10,False,True)

-RECORD 0--------------------------------------------
 CREDITSCORE                 | 619                   
 GEOGRAPHY                   | France                
 GENDER                      | Female                
 AGE                         | 42                    
 TENURE                      | 2                     
 BALANCE                     | 0.0                   
 NUMOFPRODUCTS               | 1                     
 HASCRCARD                   | 1                     
 ISACTIVEMEMBER              | 1                     
 ESTIMATEDSALARY             | 101348.88             
 EXITED                      | 1                     
 AGE_ESTIMATED_SALARY        | 4.144101049759997E-4  
 CREDITSCORE_TENURE          | 1238                  
 ESTIMATEDSALARY_CREDITSCORE | 163.73001615508886    
-RECORD 1--------------------------------------------
 CREDITSCORE                 | 608                   
 GEOGRAPHY                   | Spain                 
 GENDER                     

In [26]:
from pyspark.ml.feature import Bucketizer

In [27]:
#[(17.926, 36.5] < (36.5, 55.0] < (55.0, 73.5] < (73.5, 92.0]]

bucketizer_age = Bucketizer(splits = [16,37,55,74,93] , 
                            inputCol = "AGE",
                            outputCol = "AGE_CAT")

spark_df = bucketizer_age.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn("AGE_CAT" , spark_df.AGE_CAT + 1)

spark_df.select("AGE_CAT").distinct().show()

+-------+
|AGE_CAT|
+-------+
|    1.0|
|    4.0|
|    3.0|
|    2.0|
+-------+



In [28]:
# [(-188.401, 50006.805] < (50006.805, 100002.03] < (100002.03, 149997.255] < (149997.255, 199992.48]]

bucketizer_estimated_salary = Bucketizer(splits = [0, 50000, 100000, 150000, 200000],
                                         inputCol = "ESTIMATEDSALARY",
                                         outputCol = "ESTIMATEDSALARY_CAT")

spark_df = bucketizer_estimated_salary.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn("ESTIMATEDSALARY_CAT" , spark_df.ESTIMATEDSALARY_CAT + 1)

spark_df.select(spark_df.ESTIMATEDSALARY_CAT).distinct().show()

+-------------------+
|ESTIMATEDSALARY_CAT|
+-------------------+
|                1.0|
|                4.0|
|                3.0|
|                2.0|
+-------------------+



In [29]:
#[(349.5, 475.0] < (475.0, 600.0] < (600.0, 725.0] < (725.0, 850.0]]

bucketizer_credit = Bucketizer(splits = [340 , 475, 600, 725, 851],
                               inputCol = "CREDITSCORE",
                               outputCol = "CREDITSCORE_CAT")

spark_df = bucketizer_credit.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn("CREDITSCORE_CAT", spark_df.CREDITSCORE_CAT + 1)

spark_df.select("CREDITSCORE_CAT").distinct().show()

+---------------+
|CREDITSCORE_CAT|
+---------------+
|            1.0|
|            4.0|
|            3.0|
|            2.0|
+---------------+



In [30]:
# [(-0.01, 2.5] < (2.5, 5.0] < (5.0, 7.5] < (7.5, 10.0]

bucketizer_tenure = Bucketizer(splits = [0 , 3, 5, 8, 11],
                              inputCol = "TENURE",
                              outputCol = "TENURE_CAT")

spark_df = bucketizer_tenure.setHandleInvalid("keep").transform(spark_df)
spark_df = spark_df.withColumn("TENURE_CAT", spark_df.TENURE_CAT + 1)

spark_df.select("TENURE_CAT").distinct().show()

+----------+
|TENURE_CAT|
+----------+
|       1.0|
|       4.0|
|       3.0|
|       2.0|
+----------+



In [31]:
spark_df.toPandas().isnull().sum()

CREDITSCORE                    0
GEOGRAPHY                      0
GENDER                         0
AGE                            0
TENURE                         0
BALANCE                        0
NUMOFPRODUCTS                  0
HASCRCARD                      0
ISACTIVEMEMBER                 0
ESTIMATEDSALARY                0
EXITED                         0
AGE_ESTIMATED_SALARY           0
CREDITSCORE_TENURE             0
ESTIMATEDSALARY_CREDITSCORE    0
AGE_CAT                        0
ESTIMATEDSALARY_CAT            0
CREDITSCORE_CAT                0
TENURE_CAT                     0
dtype: int64

In [32]:
############################
# when ile Değişken Türetmek (segment)
############################

In [33]:
spark_df.toPandas().head(10)

Unnamed: 0,CREDITSCORE,GEOGRAPHY,GENDER,AGE,TENURE,BALANCE,NUMOFPRODUCTS,HASCRCARD,ISACTIVEMEMBER,ESTIMATEDSALARY,EXITED,AGE_ESTIMATED_SALARY,CREDITSCORE_TENURE,ESTIMATEDSALARY_CREDITSCORE,AGE_CAT,ESTIMATEDSALARY_CAT,CREDITSCORE_CAT,TENURE_CAT
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0004,1238,163.73,2.0,3.0,3.0,1.0
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,0.0004,608,185.1029,2.0,3.0,3.0,1.0
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0004,4016,226.9553,2.0,3.0,2.0,4.0
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0004,699,134.2298,2.0,2.0,3.0,1.0
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,0.0005,1700,93.0401,2.0,2.0,4.0,1.0
5,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,0.0003,5160,232.1809,2.0,3.0,3.0,4.0
6,822,France,Male,50,7,0.0,2,1,1,10062.8,0,0.005,5754,12.2418,2.0,1.0,4.0,3.0
7,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,0.0002,1504,317.4119,1.0,3.0,1.0,2.0
8,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,0.0006,2004,149.5818,2.0,2.0,2.0,2.0
9,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,0.0004,1368,104.8622,1.0,2.0,3.0,1.0


In [34]:
from pyspark.sql.functions import when

card =spark_df.HASCRCARD
active = spark_df.ISACTIVEMEMBER

spark_df = spark_df.withColumn( "SEGMENT_CUSTOMER", 
                                 when( ((card == 1) & (active == 1 )) , "BEST_CUSTOMER" ).
                                 when( ((card == 0) & (active == 1 )) , "ACTIVE_CUSTOMER").
                                 when( ((card == 1) & (active == 0 )) , "HAS_CARD_CUSTOMER").
                                 otherwise("CHURN_CUSTOMER"))

In [35]:
tenure = spark_df.TENURE
product = spark_df.NUMOFPRODUCTS

spark_df = spark_df.withColumn("SEGMENT_REWARD",
                                when( (tenure > 6) & (product > 2 ), "HAS_BEST_REWARD").
                                when( (tenure <= 6) & (tenure > 3) & (product <= 2) , "HAS_MIDDLE_REWARD").
                                otherwise("STANDART_REWARD"))

In [36]:
spark_df.toPandas().isnull().sum()

CREDITSCORE                    0
GEOGRAPHY                      0
GENDER                         0
AGE                            0
TENURE                         0
BALANCE                        0
NUMOFPRODUCTS                  0
HASCRCARD                      0
ISACTIVEMEMBER                 0
ESTIMATEDSALARY                0
EXITED                         0
AGE_ESTIMATED_SALARY           0
CREDITSCORE_TENURE             0
ESTIMATEDSALARY_CREDITSCORE    0
AGE_CAT                        0
ESTIMATEDSALARY_CAT            0
CREDITSCORE_CAT                0
TENURE_CAT                     0
SEGMENT_CUSTOMER               0
SEGMENT_REWARD                 0
dtype: int64

In [37]:
############################
# Label Encoding
############################

In [38]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

In [39]:
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == "string"]
cat_cols

['GEOGRAPHY', 'GENDER', 'SEGMENT_CUSTOMER', 'SEGMENT_REWARD']

In [40]:
spark_df.select(cat_cols).show(5)

+---------+------+-----------------+---------------+
|GEOGRAPHY|GENDER| SEGMENT_CUSTOMER| SEGMENT_REWARD|
+---------+------+-----------------+---------------+
|   France|Female|    BEST_CUSTOMER|STANDART_REWARD|
|    Spain|Female|  ACTIVE_CUSTOMER|STANDART_REWARD|
|   France|Female|HAS_CARD_CUSTOMER|HAS_BEST_REWARD|
|   France|Female|   CHURN_CUSTOMER|STANDART_REWARD|
|    Spain|Female|    BEST_CUSTOMER|STANDART_REWARD|
+---------+------+-----------------+---------------+
only showing top 5 rows



In [41]:
indexer_geo = StringIndexer(inputCol="GEOGRAPHY", outputCol = "GEOGRAPHY_LABEL")
spark_df = indexer_geo.fit(spark_df).transform(spark_df)
spark_df = spark_df.withColumn("GEOGRAPHY_LABEL" , spark_df.GEOGRAPHY_LABEL.cast("integer"))
spark_df.select("GEOGRAPHY_LABEL").show(4)

+---------------+
|GEOGRAPHY_LABEL|
+---------------+
|              0|
|              2|
|              0|
|              0|
+---------------+
only showing top 4 rows



In [42]:
indexer_gender = StringIndexer(inputCol="GENDER",outputCol = "GENDER_LABEL")
spark_df = indexer_gender.fit(spark_df).transform(spark_df)
spark_df = spark_df.withColumn("GENDER_LABEL", spark_df.GENDER_LABEL.cast("integer"))
spark_df.select("GENDER_LABEL").show(4)

+------------+
|GENDER_LABEL|
+------------+
|           1|
|           1|
|           1|
|           1|
+------------+
only showing top 4 rows



In [43]:
indexer_customer = StringIndexer(inputCol = "SEGMENT_CUSTOMER" , outputCol = "CUSTOMER_LABEL")
spark_df = indexer_customer.fit(spark_df).transform(spark_df)
spark_df = spark_df.withColumn("CUSTOMER_LABEL",spark_df.CUSTOMER_LABEL.cast("integer"))
spark_df.select("CUSTOMER_LABEL").show(4)

+--------------+
|CUSTOMER_LABEL|
+--------------+
|             0|
|             2|
|             1|
|             3|
+--------------+
only showing top 4 rows



In [44]:
indexer_reward = StringIndexer(inputCol= "SEGMENT_REWARD", outputCol = "REWARD_LABEL")
spark_df = indexer_reward.fit(spark_df).transform(spark_df)
spark_df = spark_df.withColumn("REWARD_LABEL", spark_df.REWARD_LABEL.cast("integer"))
spark_df.select("REWARD_LABEL").show(4)

+------------+
|REWARD_LABEL|
+------------+
|           0|
|           0|
|           2|
|           0|
+------------+
only showing top 4 rows



In [45]:
spark_df.toPandas().isnull().sum()

CREDITSCORE                    0
GEOGRAPHY                      0
GENDER                         0
AGE                            0
TENURE                         0
BALANCE                        0
NUMOFPRODUCTS                  0
HASCRCARD                      0
ISACTIVEMEMBER                 0
ESTIMATEDSALARY                0
EXITED                         0
AGE_ESTIMATED_SALARY           0
CREDITSCORE_TENURE             0
ESTIMATEDSALARY_CREDITSCORE    0
AGE_CAT                        0
ESTIMATEDSALARY_CAT            0
CREDITSCORE_CAT                0
TENURE_CAT                     0
SEGMENT_CUSTOMER               0
SEGMENT_REWARD                 0
GEOGRAPHY_LABEL                0
GENDER_LABEL                   0
CUSTOMER_LABEL                 0
REWARD_LABEL                   0
dtype: int64

In [46]:
spark_df.toPandas().head(10)

Unnamed: 0,CREDITSCORE,GEOGRAPHY,GENDER,AGE,TENURE,BALANCE,NUMOFPRODUCTS,HASCRCARD,ISACTIVEMEMBER,ESTIMATEDSALARY,EXITED,AGE_ESTIMATED_SALARY,CREDITSCORE_TENURE,ESTIMATEDSALARY_CREDITSCORE,AGE_CAT,ESTIMATEDSALARY_CAT,CREDITSCORE_CAT,TENURE_CAT,SEGMENT_CUSTOMER,SEGMENT_REWARD,GEOGRAPHY_LABEL,GENDER_LABEL,CUSTOMER_LABEL,REWARD_LABEL
0,619,France,Female,42,2,0.0,1,1,1,101348.88,1,0.0004,1238,163.73,2.0,3.0,3.0,1.0,BEST_CUSTOMER,STANDART_REWARD,0,1,0,0
1,608,Spain,Female,41,1,83807.86,1,0,1,112542.58,0,0.0004,608,185.1029,2.0,3.0,3.0,1.0,ACTIVE_CUSTOMER,STANDART_REWARD,2,1,2,0
2,502,France,Female,42,8,159660.8,3,1,0,113931.57,1,0.0004,4016,226.9553,2.0,3.0,2.0,4.0,HAS_CARD_CUSTOMER,HAS_BEST_REWARD,0,1,1,2
3,699,France,Female,39,1,0.0,2,0,0,93826.63,0,0.0004,699,134.2298,2.0,2.0,3.0,1.0,CHURN_CUSTOMER,STANDART_REWARD,0,1,3,0
4,850,Spain,Female,43,2,125510.82,1,1,1,79084.1,0,0.0005,1700,93.0401,2.0,2.0,4.0,1.0,BEST_CUSTOMER,STANDART_REWARD,2,1,0,0
5,645,Spain,Male,44,8,113755.78,2,1,0,149756.71,1,0.0003,5160,232.1809,2.0,3.0,3.0,4.0,HAS_CARD_CUSTOMER,STANDART_REWARD,2,0,1,0
6,822,France,Male,50,7,0.0,2,1,1,10062.8,0,0.005,5754,12.2418,2.0,1.0,4.0,3.0,BEST_CUSTOMER,STANDART_REWARD,0,0,0,0
7,376,Germany,Female,29,4,115046.74,4,1,0,119346.88,1,0.0002,1504,317.4119,1.0,3.0,1.0,2.0,HAS_CARD_CUSTOMER,STANDART_REWARD,1,1,1,0
8,501,France,Male,44,4,142051.07,2,0,1,74940.5,0,0.0006,2004,149.5818,2.0,2.0,2.0,2.0,ACTIVE_CUSTOMER,HAS_MIDDLE_REWARD,0,0,2,1
9,684,France,Male,27,2,134603.88,1,1,1,71725.73,0,0.0004,1368,104.8622,1.0,2.0,3.0,1.0,BEST_CUSTOMER,STANDART_REWARD,0,0,0,0


In [47]:
############################
# One Hot Encoding
############################

In [48]:
from pyspark.ml.feature import OneHotEncoder

In [49]:
labels = ['GEOGRAPHY_LABEL', 'GENDER_LABEL','CUSTOMER_LABEL','REWARD_LABEL']
labels_one = [col + str("_ONE") for col in labels]

In [50]:
labels_one

['GEOGRAPHY_LABEL_ONE',
 'GENDER_LABEL_ONE',
 'CUSTOMER_LABEL_ONE',
 'REWARD_LABEL_ONE']

In [51]:
encoder = OneHotEncoder(inputCols=labels, outputCols=labels_one)
spark_df = encoder.fit(spark_df).transform(spark_df)
spark_df.show(10,truncate=False,vertical=True)

-RECORD 0--------------------------------------------
 CREDITSCORE                 | 619                   
 GEOGRAPHY                   | France                
 GENDER                      | Female                
 AGE                         | 42                    
 TENURE                      | 2                     
 BALANCE                     | 0.0                   
 NUMOFPRODUCTS               | 1                     
 HASCRCARD                   | 1                     
 ISACTIVEMEMBER              | 1                     
 ESTIMATEDSALARY             | 101348.88             
 EXITED                      | 1                     
 AGE_ESTIMATED_SALARY        | 4.144101049759997E-4  
 CREDITSCORE_TENURE          | 1238                  
 ESTIMATEDSALARY_CREDITSCORE | 163.73001615508886    
 AGE_CAT                     | 2.0                   
 ESTIMATEDSALARY_CAT         | 3.0                   
 CREDITSCORE_CAT             | 3.0                   
 TENURE_CAT                 

In [52]:
############################
# TARGET'ın Tanımlanması
############################

In [53]:
# İlk önce stringleri bir atalım hazırlık yapalım...
# String yok !!!

cat_cols = [col[0] for col in spark_df.dtypes if col[1] == "string"]

spark_df = spark_df.drop('GEOGRAPHY', 'GENDER', 'SEGMENT_CUSTOMER', 'SEGMENT_REWARD', \
                         'GEOGRAPHY_LABEL', 'GENDER_LABEL', 'CUSTOMER_LABEL', 'REWARD_LABEL')

string_ = [col[0] for col in spark_df.dtypes if col[1] == "string"]
string_

[]

In [54]:
spark_df.toPandas().isnull().sum()

CREDITSCORE                    0
AGE                            0
TENURE                         0
BALANCE                        0
NUMOFPRODUCTS                  0
HASCRCARD                      0
ISACTIVEMEMBER                 0
ESTIMATEDSALARY                0
EXITED                         0
AGE_ESTIMATED_SALARY           0
CREDITSCORE_TENURE             0
ESTIMATEDSALARY_CREDITSCORE    0
AGE_CAT                        0
ESTIMATEDSALARY_CAT            0
CREDITSCORE_CAT                0
TENURE_CAT                     0
GEOGRAPHY_LABEL_ONE            0
GENDER_LABEL_ONE               0
CUSTOMER_LABEL_ONE             0
REWARD_LABEL_ONE               0
dtype: int64

In [55]:
spark_df.columns

['CREDITSCORE',
 'AGE',
 'TENURE',
 'BALANCE',
 'NUMOFPRODUCTS',
 'HASCRCARD',
 'ISACTIVEMEMBER',
 'ESTIMATEDSALARY',
 'EXITED',
 'AGE_ESTIMATED_SALARY',
 'CREDITSCORE_TENURE',
 'ESTIMATEDSALARY_CREDITSCORE',
 'AGE_CAT',
 'ESTIMATEDSALARY_CAT',
 'CREDITSCORE_CAT',
 'TENURE_CAT',
 'GEOGRAPHY_LABEL_ONE',
 'GENDER_LABEL_ONE',
 'CUSTOMER_LABEL_ONE',
 'REWARD_LABEL_ONE']

In [56]:
indexer_exited = StringIndexer(inputCol = "EXITED", outputCol = "TARGET")
spark_df = indexer_exited.fit(spark_df).transform(spark_df)
spark_df = spark_df.withColumn("TARGET", spark_df.TARGET.cast("integer"))
spark_df.select("TARGET","EXITED").show(4)

+------+------+
|TARGET|EXITED|
+------+------+
|     1|     1|
|     0|     0|
|     1|     1|
|     0|     0|
+------+------+
only showing top 4 rows



In [57]:
spark_df = spark_df.drop("EXITED")
spark_df.columns

['CREDITSCORE',
 'AGE',
 'TENURE',
 'BALANCE',
 'NUMOFPRODUCTS',
 'HASCRCARD',
 'ISACTIVEMEMBER',
 'ESTIMATEDSALARY',
 'AGE_ESTIMATED_SALARY',
 'CREDITSCORE_TENURE',
 'ESTIMATEDSALARY_CREDITSCORE',
 'AGE_CAT',
 'ESTIMATEDSALARY_CAT',
 'CREDITSCORE_CAT',
 'TENURE_CAT',
 'GEOGRAPHY_LABEL_ONE',
 'GENDER_LABEL_ONE',
 'CUSTOMER_LABEL_ONE',
 'REWARD_LABEL_ONE',
 'TARGET']

In [58]:
spark_df.printSchema()

root
 |-- CREDITSCORE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- BALANCE: double (nullable = true)
 |-- NUMOFPRODUCTS: integer (nullable = true)
 |-- HASCRCARD: integer (nullable = true)
 |-- ISACTIVEMEMBER: integer (nullable = true)
 |-- ESTIMATEDSALARY: double (nullable = true)
 |-- AGE_ESTIMATED_SALARY: double (nullable = true)
 |-- CREDITSCORE_TENURE: integer (nullable = true)
 |-- ESTIMATEDSALARY_CREDITSCORE: double (nullable = true)
 |-- AGE_CAT: double (nullable = true)
 |-- ESTIMATEDSALARY_CAT: double (nullable = true)
 |-- CREDITSCORE_CAT: double (nullable = true)
 |-- TENURE_CAT: double (nullable = true)
 |-- GEOGRAPHY_LABEL_ONE: vector (nullable = true)
 |-- GENDER_LABEL_ONE: vector (nullable = true)
 |-- CUSTOMER_LABEL_ONE: vector (nullable = true)
 |-- REWARD_LABEL_ONE: vector (nullable = true)
 |-- TARGET: integer (nullable = true)



In [59]:
## EXTRA OLABİLİR Mİ ????

#big_num_cols = [col[0] for col in spark_df.dtypes if (col[1] != ["string","vector"]) and (spark_df.select(col[0]).distinct().count() > 10)]
#big_num_cols

In [60]:
#import matplotlib.pyplot as plt
#import seaborn as sns
#%matplotlib inline

#for col in big_num_cols:
#    plt.figure(figsize = (10,5))
#    sns.boxplot(spark_df.toPandas()[col])

In [61]:
def outlier(dataframe, col, q1=.25 , q3=.75):
    
    quantile1 = dataframe[col].quantile(q1)
    quantile3 = dataframe[col].quantile(q3)
    interquantile = quantile3 - quantile1
    up_limit = quantile3 + 1.5*interquantile
    low_limit = quantile1 - 1.5*interquantile
    return up_limit,low_limit

def replace_outlier(dataframe,col,q1=.25,q3=.75):
    
    quantile1 = dataframe[col].quantile(q1)
    quantile3 = dataframe[col].quantile(q3)
    interquantile = quantile3 - quantile1
    
    up_limit = quantile3 + 1.5*interquantile
    low_limit = quantile1 - 1.5*interquantile
    
    dataframe.loc[(dataframe.col < low_limit), col] = low_limit
    dataframe.loc[(dataframe.col > up_limit), col] = up_limit

In [62]:
#for col in big_num_cols:
#    print(col, "--- >" ,outlier(spark_df.toPandas(), col))

In [63]:
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import udf,when,count

In [64]:
############################
# Feature'ların Tanımlanması
############################

In [65]:
spark_df.toPandas().isnull().sum()

CREDITSCORE                    0
AGE                            0
TENURE                         0
BALANCE                        0
NUMOFPRODUCTS                  0
HASCRCARD                      0
ISACTIVEMEMBER                 0
ESTIMATEDSALARY                0
AGE_ESTIMATED_SALARY           0
CREDITSCORE_TENURE             0
ESTIMATEDSALARY_CREDITSCORE    0
AGE_CAT                        0
ESTIMATEDSALARY_CAT            0
CREDITSCORE_CAT                0
TENURE_CAT                     0
GEOGRAPHY_LABEL_ONE            0
GENDER_LABEL_ONE               0
CUSTOMER_LABEL_ONE             0
REWARD_LABEL_ONE               0
TARGET                         0
dtype: int64

In [66]:
spark_df.printSchema()

root
 |-- CREDITSCORE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- TENURE: integer (nullable = true)
 |-- BALANCE: double (nullable = true)
 |-- NUMOFPRODUCTS: integer (nullable = true)
 |-- HASCRCARD: integer (nullable = true)
 |-- ISACTIVEMEMBER: integer (nullable = true)
 |-- ESTIMATEDSALARY: double (nullable = true)
 |-- AGE_ESTIMATED_SALARY: double (nullable = true)
 |-- CREDITSCORE_TENURE: integer (nullable = true)
 |-- ESTIMATEDSALARY_CREDITSCORE: double (nullable = true)
 |-- AGE_CAT: double (nullable = true)
 |-- ESTIMATEDSALARY_CAT: double (nullable = true)
 |-- CREDITSCORE_CAT: double (nullable = true)
 |-- TENURE_CAT: double (nullable = true)
 |-- GEOGRAPHY_LABEL_ONE: vector (nullable = true)
 |-- GENDER_LABEL_ONE: vector (nullable = true)
 |-- CUSTOMER_LABEL_ONE: vector (nullable = true)
 |-- REWARD_LABEL_ONE: vector (nullable = true)
 |-- TARGET: integer (nullable = true)



In [67]:
from pyspark.ml.feature import VectorAssembler

features = [col for col in spark_df.columns if col != "TARGET"]
features


['CREDITSCORE',
 'AGE',
 'TENURE',
 'BALANCE',
 'NUMOFPRODUCTS',
 'HASCRCARD',
 'ISACTIVEMEMBER',
 'ESTIMATEDSALARY',
 'AGE_ESTIMATED_SALARY',
 'CREDITSCORE_TENURE',
 'ESTIMATEDSALARY_CREDITSCORE',
 'AGE_CAT',
 'ESTIMATEDSALARY_CAT',
 'CREDITSCORE_CAT',
 'TENURE_CAT',
 'GEOGRAPHY_LABEL_ONE',
 'GENDER_LABEL_ONE',
 'CUSTOMER_LABEL_ONE',
 'REWARD_LABEL_ONE']

In [68]:
va = VectorAssembler(inputCols = features , outputCol = "FEATURES")
va_df = va.transform(spark_df)

In [69]:
va_df.show(10,False,True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
 CREDITSCORE                 | 619                                                                                                                                              
 AGE                         | 42                                                                                                                                               
 TENURE                      | 2                                                                                                                                                
 BALANCE                     | 0.0                                                                                                                                              
 NUMOFPRODUCTS               | 1                                                                                   

In [70]:
final_df = va_df.select("FEATURES","TARGET")
final_df.show(5)

+--------------------+------+
|            FEATURES|TARGET|
+--------------------+------+
|[619.0,42.0,2.0,0...|     1|
|[608.0,41.0,1.0,8...|     0|
|[502.0,42.0,8.0,1...|     1|
|(23,[0,1,2,4,7,8,...|     0|
|[850.0,43.0,2.0,1...|     0|
+--------------------+------+
only showing top 5 rows



In [71]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="FEATURES", outputCol="SCALEDFEATURES")
final_df = scaler.fit(final_df).transform(final_df)
final_df.show(5)

+--------------------+------+--------------------+
|            FEATURES|TARGET|      SCALEDFEATURES|
+--------------------+------+--------------------+
|[619.0,42.0,2.0,0...|     1|[6.40433392438999...|
|[608.0,41.0,1.0,8...|     0|[6.29052508243798...|
|[502.0,42.0,8.0,1...|     1|[5.19382169635505...|
|(23,[0,1,2,4,7,8,...|     0|(23,[0,1,2,4,7,8,...|
|[850.0,43.0,2.0,1...|     0|[8.79431960538205...|
+--------------------+------+--------------------+
only showing top 5 rows



In [72]:
# Split The DataSet Into Test and Train Sets:

final_df_normal = final_df.select("FEATURES","TARGET")
final_df_scaled = final_df.select("SCALEDFEATURES","TARGET")

train_df_normal ,test_df_normal = final_df_normal.randomSplit([0.7,0.3], seed = 17)
train_df_scaled, test_df_scaled = final_df_scaled.randomSplit([0.7,0.3], seed = 17)

print(f"Train Normal Shape : {train_df_normal.count(), len(train_df_normal.columns)} ")
print(f"Test Normal Shape : {test_df_normal.count(), len(test_df_normal.columns)} ")

print("-------")

print(f"Train Scaled Shape : {train_df_scaled.count(), len(train_df_scaled.columns)} ")
print(f"Test Scaled Shape : {test_df_scaled.count(), len(test_df_scaled.columns)} ")

Train Normal Shape : (6949, 2) 
Test Normal Shape : (3051, 2) 
-------
Train Scaled Shape : (6949, 2) 
Test Scaled Shape : (3051, 2) 


In [73]:
##################################################
# Modeling
##################################################

In [74]:
############################
# Logistic Regression
############################

In [75]:
from pyspark.ml.classification import LogisticRegression,GBTClassifier

In [76]:
log_model_normal = LogisticRegression(featuresCol="FEATURES",labelCol="TARGET").fit(train_df_normal)
y_pred_normal = log_model_normal.transform(test_df_normal)
y_pred_normal.show(5)

+--------------------+------+--------------------+--------------------+----------+
|            FEATURES|TARGET|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(23,[0,1,2,3,4,7,...|     1|[-2.9812879067835...|[0.04827841803909...|       1.0|
|(23,[0,1,2,3,4,7,...|     1|[-2.8463369682129...|[0.05487097314166...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[1.33914936651111...|[0.79235002027012...|       0.0|
|(23,[0,1,2,3,4,7,...|     1|[-0.3559892273450...|[0.41193080645007...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[0.62421907097707...|[0.65117750120791...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows



In [77]:
log_model_scaled = LogisticRegression(featuresCol="SCALEDFEATURES",labelCol="TARGET").fit(train_df_scaled)
y_pred_scaled = log_model_scaled.transform(test_df_scaled)
y_pred_scaled.show(5)

+--------------------+------+--------------------+--------------------+----------+
|      SCALEDFEATURES|TARGET|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(23,[0,1,2,3,4,7,...|     1|[-2.9811823504762...|[0.04828326833065...|       1.0|
|(23,[0,1,2,3,4,7,...|     1|[-2.8462866294996...|[0.05487358377336...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[1.33936700173659...|[0.79238582583445...|       0.0|
|(23,[0,1,2,3,4,7,...|     1|[-0.3560683157404...|[0.41191164790873...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[0.62428912886887...|[0.65119341436464...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows



In [78]:
y_pred_normal.select("TARGET","prediction").show(5)

+------+----------+
|TARGET|prediction|
+------+----------+
|     1|       1.0|
|     1|       1.0|
|     0|       0.0|
|     1|       1.0|
|     0|       0.0|
+------+----------+
only showing top 5 rows



In [79]:
y_pred_scaled.select("TARGET","prediction").show(5)

+------+----------+
|TARGET|prediction|
+------+----------+
|     1|       1.0|
|     1|       1.0|
|     0|       0.0|
|     1|       1.0|
|     0|       0.0|
+------+----------+
only showing top 5 rows



In [80]:
# Accuracy Normal
y_pred_normal.filter(y_pred_normal.TARGET == y_pred_normal.prediction).count() / y_pred_normal.count()

0.8200589970501475

In [81]:
# Accuracy Scaled
y_pred_scaled.filter(y_pred_scaled.TARGET == y_pred_scaled.prediction).count() / y_pred_scaled.count()

0.8200589970501475

In [82]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

In [83]:
# İçeriklerine baktım !!!

BinaryClassificationEvaluator().metricName

Param(parent='BinaryClassificationEvaluator_894f425b9663', name='metricName', doc='metric name in evaluation (areaUnderROC|areaUnderPR)')

In [84]:
MulticlassClassificationEvaluator().metricName

Param(parent='MulticlassClassificationEvaluator_30ec7b9d2bc1', name='metricName', doc='metric name in evaluation (f1|accuracy|weightedPrecision|weightedRecall|weightedTruePositiveRate| weightedFalsePositiveRate|weightedFMeasure|truePositiveRateByLabel| falsePositiveRateByLabel|precisionByLabel|recallByLabel|fMeasureByLabel| logLoss|hammingLoss)')

In [85]:
evaluator = BinaryClassificationEvaluator(labelCol="TARGET", rawPredictionCol="prediction",
                                          metricName="areaUnderROC")

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="TARGET",predictionCol="prediction")

In [86]:
acc = evaluatorMulti.evaluate(y_pred_scaled, {evaluatorMulti.metricName : "accuracy"})
precision = evaluatorMulti.evaluate(y_pred_scaled, {evaluatorMulti.metricName : "precisionByLabel"})
recall = evaluatorMulti.evaluate(y_pred_scaled, {evaluatorMulti.metricName : "recallByLabel"})
f1 = evaluatorMulti.evaluate(y_pred_scaled, {evaluatorMulti.metricName : "f1"})

roc_auc = evaluator.evaluate(y_pred_scaled)

In [87]:
print("accuracy : %f , precision : %f , recall : %f ,f1 : %f , roc_auc : %f "%(acc,precision,recall,f1,roc_auc))

accuracy : 0.820059 , precision : 0.831786 , recall : 0.967595 ,f1 : 0.787343 , roc_auc : 0.618114 


In [88]:
############################
# Gradient Boosted Tree Classifier
############################

In [89]:
train_df_normal.toPandas().isnull().sum()

FEATURES    0
TARGET      0
dtype: int64

In [90]:
from pyspark.ml.classification import GBTClassifier

# Scaled edilmişi kullandık !!!

gbm = GBTClassifier(maxIter= 100, featuresCol="SCALEDFEATURES" ,labelCol="TARGET")
gbm_model = gbm.fit(train_df_scaled)
y_pred_scaled = gbm_model.transform(test_df_scaled)
y_pred_scaled.show(5)

+--------------------+------+--------------------+--------------------+----------+
|      SCALEDFEATURES|TARGET|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(23,[0,1,2,3,4,7,...|     1|[-1.3753786255972...|[0.06004389771423...|       1.0|
|(23,[0,1,2,3,4,7,...|     1|[-1.9608090693824...|[0.01942423999882...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[0.70531193077651...|[0.80386432433131...|       0.0|
|(23,[0,1,2,3,4,7,...|     1|[-0.8612401001939...|[0.15155197394977...|       1.0|
|(23,[0,1,2,3,4,7,...|     0|[0.53066338670381...|[0.74294401177244...|       0.0|
+--------------------+------+--------------------+--------------------+----------+
only showing top 5 rows



In [91]:
# Accuracy

y_pred_scaled.filter(y_pred_scaled.TARGET  == y_pred_scaled.prediction).count() / y_pred_scaled.count()

0.8600458865945592

In [92]:
############################
# Model Tuning
############################

In [93]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="TARGET")

In [94]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

gbm_params = (ParamGridBuilder()
              .addGrid(gbm.maxDepth, [2, 4, 6])
              .addGrid(gbm.maxBins, [20, 30, 40])
              .addGrid(gbm.maxIter, [10, 20, 30])
              .build())

In [95]:
cv = CrossValidator(estimator=gbm,
                    estimatorParamMaps=gbm_params,
                    evaluator=evaluator,
                    numFolds=5)

In [96]:
cv_model = cv.fit(train_df_scaled)

In [97]:
y_pred_scaled = cv_model.transform(test_df_scaled)

In [98]:
y_pred_scaled.filter(y_pred_scaled.TARGET == y_pred_scaled.prediction).count() / y_pred_scaled.count()

0.8561127499180596

In [99]:
# Kontrol edilip başka değişikliklere bakacağım ....

sc.stop()