# Machine Learning Pipeline

In [2]:
from pyspark.sql import SparkSession, functions as F
import findspark
import pandas as pd

findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")

In [3]:
df = pd.read_csv('datasets/Telco_feature.csv')

In [4]:
df.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,Contract,PaperlessBilling,MonthlyCharges,TotalCharges,Churn,MonthlyChargesTenureRatio,TotalChargesMonthlyChargesRatio,NumServicesUsed,InternetService_Fiber optic,InternetService_No,PaymentMethod_Credit card (automatic),PaymentMethod_Electronic check,PaymentMethod_Mailed check
0,7590-VHVEG,Female,0,1,0,1,0,1,29.85,29.85,0,29.85,1.0,1,0,0,0,1,0
1,5575-GNVDE,Male,0,0,0,34,1,0,56.95,1889.5,0,1.675,33.178227,3,0,0,0,0,1
2,3668-QPYBK,Male,0,0,0,2,0,1,53.85,108.15,1,26.925,2.008357,3,0,0,0,0,1
3,7795-CFOCW,Male,0,0,0,45,1,0,42.3,1840.75,0,0.94,43.516548,3,0,0,0,0,0
4,9237-HQITU,Female,0,0,0,2,0,1,70.7,151.65,1,35.35,2.144979,1,1,0,0,1,0


## Seperating Columns for Type

In [5]:
from feature_utils import get_columns_type

In [6]:
numeric_cols, categoric_cols, cardinal_cols = get_columns_type(df, categoric_threshold=2)

2023-03-28 02:24:53,994 - logging_utils - DEBUG - get_columns_type executing...
2023-03-28 02:24:54,000 - logging_utils - INFO - 
Numeric Columns: ['SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'Contract', 'PaperlessBilling', 'MonthlyCharges', 'TotalCharges', 'Churn', 'MonthlyChargesTenureRatio', 'TotalChargesMonthlyChargesRatio', 'NumServicesUsed', 'InternetService_Fiber optic', 'InternetService_No', 'PaymentMethod_Credit card (automatic)', 'PaymentMethod_Electronic check', 'PaymentMethod_Mailed check']
Categoric Columns: ['gender']
Cardinal Columns: ['customerID']


In [7]:
numeric_cols = [col for col in numeric_cols if col != "Churn"]

In [8]:
target = "Churn"

## Create Model

### Vector Assembler

In [9]:
spark = SparkSession.builder \
    .appName("Telco-Churn Classification") \
    .master("local[2]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

In [10]:
from pyspark.ml.feature import VectorAssembler

In [11]:
assembler = VectorAssembler() \
    .setHandleInvalid("skip") \
    .setInputCols(numeric_cols) \
    .setOutputCol("unscaled_features")

### Scale

In [12]:
from pyspark.ml.feature import RobustScaler

In [13]:
scaler = RobustScaler() \
    .setInputCol("unscaled_features") \
    .setOutputCol("features")

### Estimator

In [14]:
from pyspark.ml.classification import GBTClassifier

In [15]:
estimator = GBTClassifier() \
    .setFeaturesCol("features") \
    .setLabelCol(target)

### Pipeline

In [16]:
from pyspark.ml import Pipeline

In [17]:
pipeline_obj = Pipeline() \
    .setStages([assembler, scaler, estimator])

### Model

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [19]:
evaluator = BinaryClassificationEvaluator(labelCol=target)

In [20]:
def get_score(df, evaluator):
    spark_df = spark.createDataFrame(df)
    
    train_df, test_df = spark_df.randomSplit([.8, .2], seed=142)
    
    pipeline_model = pipeline_obj.fit(train_df)
    transformed_df = pipeline_model.transform(test_df)
    
    print("Score: ", evaluator.evaluate(transformed_df)) 
    
    return transformed_df

# Final Data

In [21]:
tranformed_df = get_score(df, evaluator)

  for column, series in pdf.iteritems():


Score:  0.840020725667528


In [22]:
tranformed_df.select("Churn", "prediction").show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [23]:
spark.stop()

<div class="alert alert-block alert-info"> <b>Final:</b> Accuracy increased <b>0.8326<b> to <b>0.8400<b> </div>