In [24]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder

from pyspark.ml.classification import RandomForestClassifier
from xgboost.spark import SparkXGBClassifier


In [10]:
# Copied from https://github.com/interviewstreet/spark-stratifier/blob/master/spark_stratifier/stratifier.py
import itertools
import numpy as np

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.common import _py2java
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasSeed
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaParams
from pyspark.sql.functions import rand
from functools import reduce

class StratifiedCrossValidator(CrossValidator):
  def stratify_data(self, dataset):
    """
    Returns an array of dataframes with the same ratio of passes and failures.

    Currently only supports binary classification problems.
    """

    epm = self.getOrDefault(self.estimatorParamMaps)
    numModels = len(epm)
    nFolds = self.getOrDefault(self.numFolds)
    split_ratio = 1.0 / nFolds

    passes = dataset[dataset['label'] == 1]
    fails = dataset[dataset['label'] == 0]

    pass_splits = passes.randomSplit([split_ratio for i in range(nFolds)])
    fail_splits = fails.randomSplit([split_ratio for i in range(nFolds)])

    stratified_data = [pass_splits[i].unionAll(fail_splits[i]) for i in range(nFolds)]

    return stratified_data

  def _fit(self, dataset):
    est = self.getOrDefault(self.estimator)
    epm = self.getOrDefault(self.estimatorParamMaps)
    numModels = len(epm)
    eva = self.getOrDefault(self.evaluator)
    nFolds = self.getOrDefault(self.numFolds)
    seed = self.getOrDefault(self.seed)
    metrics = [0.0] * numModels

    stratified_data = self.stratify_data(dataset)

    for i in range(nFolds):
      train_arr = [x for j,x in enumerate(stratified_data) if j != i]
      train = reduce((lambda x, y: x.unionAll(y)), train_arr)
      validation = stratified_data[i]

      models = est.fit(train, epm)

      for j in range(numModels):
        model = models[j]
        metric = eva.evaluate(model.transform(validation, epm[j]))
        metrics[j] += metric/nFolds

    if eva.isLargerBetter():
      bestIndex = np.argmax(metrics)
    else:
      bestIndex = np.argmin(metrics)

    bestModel = est.fit(dataset, epm[bestIndex])
    return self._copyValues(CrossValidatorModel(bestModel, metrics))

In [2]:
spark = SparkSession.builder.appName("Insurance Cross Selling").getOrCreate()
spark

24/07/23 16:23:49 WARN Utils: Your hostname, vboxuser resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface enp0s3)
24/07/23 16:23:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/23 16:23:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = spark.read.format("csv").options(
        header=True,
        inferSchema=True
    ).load("hdfs://localhost:9000/s4e7/normalized.csv")
df.printSchema()

                                                                                

root
 |-- id: integer (nullable = true)
 |-- Gender: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Driving_License: integer (nullable = true)
 |-- Region_Code_0: integer (nullable = true)
 |-- Region_Code_1: integer (nullable = true)
 |-- Region_Code_10: integer (nullable = true)
 |-- Region_Code_11: integer (nullable = true)
 |-- Region_Code_12: integer (nullable = true)
 |-- Region_Code_13: integer (nullable = true)
 |-- Region_Code_14: integer (nullable = true)
 |-- Region_Code_15: integer (nullable = true)
 |-- Region_Code_16: integer (nullable = true)
 |-- Region_Code_17: integer (nullable = true)
 |-- Region_Code_18: integer (nullable = true)
 |-- Region_Code_19: integer (nullable = true)
 |-- Region_Code_2: integer (nullable = true)
 |-- Region_Code_20: integer (nullable = true)
 |-- Region_Code_21: integer (nullable = true)
 |-- Region_Code_22: integer (nullable = true)
 |-- Region_Code_23: integer (nullable = true)
 |-- Region_Code_24: integer (nullable = 

In [34]:
df = df.withColumnRenamed("Response", "label")

+---+------+---+---------------+-------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+-------------+-------------+-------------+-------------+------------------+--------------------+--------------------+---------------------+--------------+-------------------+----------------------+-----------------------+------------------

In [36]:
x_cols = [c for c in df.columns if c != "label"]

In [39]:
va = VectorAssembler(outputCol="features")
va.setInputCols(x_cols)

rf_model = RandomForestClassifier()
xgb_model = SparkXGBClassifier()

rf_pipeline = Pipeline(stages=[va, rf_model])
xgb_pipeline = Pipeline(stages=[va, xgb_model])

rf_paramgrid = ParamGridBuilder(
    
).build()
xgb_paramgrid = ParamGridBuilder(
    
).build()

bce = BinaryClassificationEvaluator()

rf_cv = StratifiedCrossValidator(
    estimator=rf_pipeline,
    estimatorParamMaps=rf_paramgrid,
    evaluator=bce,
    numFolds=2,
)

xgb_cv = StratifiedCrossValidator(
    estimator=xgb_pipeline,
    estimatorParamMaps=xgb_paramgrid,
    evaluator=bce,
    numFolds=2,
)

In [53]:
%%time
rf_tuned = rf_cv.fit(df)
rf_tuned.write().overwrite().save("spark_models/rf_cv.model")

24/07/23 18:08:55 WARN MemoryStore: Not enough space to cache rdd_462_13 in memory! (computed 9.3 MiB so far)
24/07/23 18:08:55 WARN BlockManager: Persisting block rdd_462_13 to disk instead.
24/07/23 18:08:55 WARN MemoryStore: Not enough space to cache rdd_462_15 in memory! (computed 1592.1 KiB so far)
24/07/23 18:08:55 WARN BlockManager: Persisting block rdd_462_15 to disk instead.
24/07/23 18:08:55 WARN MemoryStore: Not enough space to cache rdd_462_14 in memory! (computed 14.2 MiB so far)
24/07/23 18:08:55 WARN BlockManager: Persisting block rdd_462_14 to disk instead.
24/07/23 18:08:55 WARN MemoryStore: Not enough space to cache rdd_462_17 in memory! (computed 5.4 MiB so far)
24/07/23 18:08:55 WARN BlockManager: Persisting block rdd_462_17 to disk instead.
24/07/23 18:08:55 WARN MemoryStore: Not enough space to cache rdd_462_16 in memory! (computed 5.4 MiB so far)
24/07/23 18:08:55 WARN BlockManager: Persisting block rdd_462_16 to disk instead.
24/07/23 18:09:04 WARN MemoryStore: 

CPU times: user 5.01 s, sys: 4.83 s, total: 9.84 s
Wall time: 40min 20s


In [None]:
%%time
xgb_tuned = xgb_cv.fit(df)
xgb_tuned.save("spark_models/xgb_cv.model")

2024-07-23 19:04:29,141 INFO XGBoost-PySpark: _fit Running xgboost-2.1.0 on 1 workers with
	booster params: {'device': 'cpu', 'objective': 'binary:logistic', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-07-23 19:08:04,968 INFO XGBoost-PySpark: _train_booster Training on CPUs 1]
[19:08:06] Task 0 got rank 0
[Stage 178:>                                                        (0 + 1) / 1]