### Environment config for CoLab

In [82]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark==1.4.2 catboost==1.0.3

In [83]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
import findspark
findspark.init()

In [84]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StructType
from pyspark.ml import Transformer

In [85]:
import pandas as pd
df = pd.read_csv("https://github.com/loverberg/portfolio/raw/main/Big_Data_MTC/telco-customer-churn.csv")
df = df.drop(['year', 'month', 'noadditionallines'], axis=1)

<a href='https://catboost.ai/en/docs/concepts/spark-cluster-configuration'>Catboost Spark Cluster config</a>

In [89]:
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('CatBoostWithSpark')\
    .config("spark.jars.packages", "ai.catboost:catboost-spark_3.0_2.12:1.0.3")\
    .config("spark.executor.cores", "4")\
    .config("spark.task.cpus", "4")\
    .config("spark.driver.memory", "2g")\
    .config("spark.driver.memoryOverhead", "2g")\
    .config("spark.executor.memory", "2g")\
    .config("spark.executor.memoryOverhead", "2g")\
    .getOrCreate()

In [90]:
spark

Docs <a href='https://catboost.ai/docs/catboost-spark/3.0_2.12/latest/api/python/'>catboost-spark</a>

In [91]:
import catboost_spark

### Prepare DataSet

In [92]:
sparkDF = spark.createDataFrame(df)

In [93]:
TARGET_LABEL = 'churn'

In [94]:
evaluator = MulticlassClassificationEvaluator(
    labelCol=TARGET_LABEL, 
    predictionCol="prediction", 
    metricName='f1')

In [95]:
trainDF, testDF = sparkDF.randomSplit([0.85, 0.15])

### Pipeline model with CatBoost
<a href='https://catboost.ai/docs/catboost-spark/3.0_2.12/latest/api/python/api/catboost_spark.CatBoostClassifier.html?highlight=catboostclassifier#catboostclassifier'>CatBoostClassifier</a>

In [96]:
def create_metadata(features, categ_nums):
      numericAttrs = []
      nominalAttrs = []
      for i, feature_name in enumerate(features):
        if feature_name in categ_nums:
            nominalAttrs.append({"num_vals": categ_nums[feature_name], "idx": i, "name": feature_name})
        else:
            numericAttrs.append({"idx": i, "name": feature_name})

      attrs = {}
      if numericAttrs:
          attrs["numeric"] = numericAttrs
      if nominalAttrs:
          attrs["nominal"] = nominalAttrs

      return {"ml_attr": {"attrs": attrs, "num_attrs": len(features)}}

In [97]:
customersuspended_indexer = StringIndexer(inputCol='customersuspended', 
                                          outputCol="customersuspended_index")
education_indexer = StringIndexer(inputCol='education',
                                  outputCol="education_index")
gender_indexer = StringIndexer(inputCol='gender',
                                  outputCol="gender_index")
homeowner_indexer = StringIndexer(inputCol='homeowner',
                                  outputCol="homeowner_index")
maritalstatus_indexer = StringIndexer(inputCol='maritalstatus',
                                  outputCol="maritalstatus_index")
occupation_indexer = StringIndexer(inputCol='occupation',
                                  outputCol="occupation_index")
state_indexer = StringIndexer(inputCol='state',
                                  outputCol="state_index")
usesinternetservice_indexer = StringIndexer(inputCol='usesinternetservice',
                                  outputCol="usesinternetservice_index")
usesvoiceservice_indexer = StringIndexer(inputCol='usesvoiceservice',
                                  outputCol="usesvoiceservice_index")

features = ['age', 'annualincome', 'calldroprate', 'callfailurerate', 'callingnum',
       'customerid', 'customersuspended_index', 'education_index', 'gender_index', 
       'homeowner_index', 'maritalstatus_index', 'monthlybilledamount', 'numberofcomplaints',
       'numberofmonthunpaid', 'numdayscontractequipmentplanexpiring',
       'occupation_index', 'penaltytoswitch', 'state_index', 'totalminsusedinlastmonth',
       'unpaidbalance', 'usesinternetservice_index', 'usesvoiceservice_index',
       'percentagecalloutsidenetwork', 'totalcallduration', 'avgcallduration']
       
assembler = VectorAssembler(inputCols=features, outputCol='features')

In [98]:
feature_metadata = create_metadata(features, {'customersuspended_index': 2,\
                                              'education_index': 2,\
                                              'gender_index': 2,\
                                              'homeowner_index': 2,\
                                              'maritalstatus_index': 2,\
                                              'occupation_index': 3,\
                                              'state_index': 50,\
                                              'usesinternetservice_index': 2,\
                                              'usesvoiceservice_index': 2,\
                                              })

In [102]:
classifier = catboost_spark.CatBoostClassifier(featuresCol='features', labelCol=TARGET_LABEL)
classifier.setLossFunction('Logloss')
classifier.useBestModel=True
classifier.odType='Iter'
classifier.odWait=20
classifier.setScalePosWeight=9.99
classifier.LearningRate=0.029
classifier.earlyStoppingRounds=100
classifier.setIterations(2000)
classifier.setDepth(10)

CatBoostClassifier_69a1b1bcf505

In [103]:
pipeline = Pipeline(stages=[customersuspended_indexer,\
                            education_indexer,\
                            gender_indexer,\
                            homeowner_indexer,\
                            maritalstatus_indexer,\
                            occupation_indexer,\
                            state_indexer,\
                            usesinternetservice_indexer,\
                            usesvoiceservice_indexer,\
                            assembler,\
                            classifier])

In [104]:
p_model = pipeline.fit(trainDF)

In [105]:
predictions = p_model.transform(testDF)

In [106]:
print(f'Model F1 = {evaluator.evaluate(predictions)}')

Model F1 = 0.9770565389650627


In [107]:
type(p_model)

pyspark.ml.pipeline.PipelineModel

In [108]:
p_model.write().overwrite().save('catboost_pipeline')

### Catboost limitations
<a href='https://catboost.ai/en/docs/concepts/spark-known-limitations'>List of limitations</a>