## Income Prediction with Spark Gradient Boosting Trees

### Install findspark and init

In [1]:
!pip install findspark



In [2]:
import findspark
findspark.init()

### Get spark and h2o sessions

In [3]:
from pyspark.sql import SparkSession
from pysparkling import H2OContext
import h2o

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder.appName('cognetix-spark-nb')
    .config('spark.dynamicAllocation.enabled', 'false')
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
sc = spark.sparkContext
hc = H2OContext.getOrCreate()
h2o_cluster = h2o.cluster()

23/10/20 14:43:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/20 14:43:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


10-20 14:43:41.980 172.17.0.2:54325      7233     Thread-4  INFO water.default: ----- H2O started  -----
10-20 14:43:41.981 172.17.0.2:54325      7233     Thread-4  INFO water.default: Build git branch: rel-zz_kurka
10-20 14:43:41.981 172.17.0.2:54325      7233     Thread-4  INFO water.default: Build git hash: 5ff8870f912c6110d7b6988f577c020de10496ec
10-20 14:43:41.981 172.17.0.2:54325      7233     Thread-4  INFO water.default: Build git describe: jenkins-3.40.0.3-122-g5ff8870
10-20 14:43:41.982 172.17.0.2:54325      7233     Thread-4  INFO water.default: Build project version: 3.40.0.4
10-20 14:43:41.982 172.17.0.2:54325      7233     Thread-4  INFO water.default: Build age: 5 months and 22 days
10-20 14:43:41.982 172.17.0.2:54325      7233     Thread-4  INFO water.default: Built by: 'jenkins'
10-20 14:43:41.982 172.17.0.2:54325      7233     Thread-4  INFO water.default: Built on: '2023-04-28 12:08:23'
10-20 14:43:41.982 172.17.0.2:54325      7233     Thread-4  WARN water.default: 


0,1
H2O_cluster_uptime:,12 secs
H2O_cluster_timezone:,Etc/UTC
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.40.0.4
H2O_cluster_version_age:,5 months and 22 days
H2O_cluster_name:,sparkling-water-root_local-1697813016025
H2O_cluster_total_nodes:,1
H2O_cluster_free_memory:,1 Gb
H2O_cluster_total_cores:,4
H2O_cluster_allowed_cores:,4



Sparkling Water Context:
 * Sparkling Water Version: 3.40.0.4-1-3.1
 * H2O name: sparkling-water-root_local-1697813016025
 * cluster size: 1
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (0,172.17.0.2,54325)
  ------------------------

  Open H2O Flow in browser: http://95675304fa2d:54324 (CMD + click in Mac OSX)

    


### Global params

In [13]:
max_depth = 5
learning_rate = 0.01
train_rate = 0.8
seed = 42

train_path = '../data/census-train.csv'
test_path = '../data/census-test.csv'
model_path = 'outputs/income_gbt_spark'
pred_path = 'outputs/income_gbt_spark_pred'

### Load data and basic transformations

In [14]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", DoubleType(), True),
    StructField("education", StringType(), True),
    StructField("education_num", IntegerType(), True),
    StructField("marital_status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital_gain", DoubleType(), True),
    StructField("capital_loss", DoubleType(), True),
    StructField("hours_per_week", DoubleType(), True),
    StructField("native_country", StringType(), True),
    StructField("income_level", StringType(), True),
])

train_df = (
    spark.read
    .format('csv')
    .option('header', 'false')
    .option('delimiter', ',')
    .schema(schema)
    .load(train_path)
    .drop('education_num')
    .withColumn('label', when(col('income_level').contains('>50K'), lit(1)).otherwise(lit(0)))
    .drop('income_level')
    .withColumn('workclass', when(col('workclass') == ' ?', lit('NA')).otherwise(col('workclass')))
    .withColumn('occupation', when(col('occupation') == ' ?', lit('NA')).otherwise(col('occupation')))
    .withColumn('native_country', when(col('native_country') == ' ?', lit('NA')).otherwise(col('native_country')))
)

10-20 14:44:53.193 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.InMemoryFileIndex: It took 3 ms to list leaf files for 1 paths.


### Explore train data

In [15]:
train_df.count()

10-20 14:44:55.034 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pushed Filters: 
10-20 14:44:55.034 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: 
10-20 14:44:55.034 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct<>
10-20 14:44:55.053 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_12 stored as values in memory (estimated size 176.1 KiB, free 434.2 MiB)
10-20 14:44:55.075 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 28.0 KiB, free 434.2 MiB)
10-20 14:44:55.075 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added broadcast_12_piece0 in memory on 

32561

10-20 14:44:55.165 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.scheduler.DAGScheduler: Job 4 finished: count at NativeMethodAccessorImpl.java:0, took 0.071062 s


In [8]:
train_df.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'label']

In [9]:
train_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: integer (nullable = false)



### Split train data

In [16]:
train_df, val_df = train_df.randomSplit([train_rate, 1-train_rate], seed=seed)
print(f'Train split size: {train_df.count()}')
print(f'Validation split size: {val_df.count()}')

10-20 14:44:57.452 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pushed Filters: 
10-20 14:44:57.452 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: 
10-20 14:44:57.453 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct<age: int, workclass: string, fnlwgt: double, education: string, marital_status: string ... 12 more fields>
10-20 14:44:57.480 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_15 stored as values in memory (estimated size 176.1 KiB, free 434.0 MiB)
10-20 14:44:57.505 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_13_piece0 on 95675304fa2d:39429 in memory (size: 8.1 KiB, free: 434.4 MiB)
10-20 14:44:57.509 172.17.0.2:54325      7233

### Class distribution

In [17]:
train_df.groupBy('label').count().toPandas()

10-20 14:45:00.087 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pushed Filters: 
10-20 14:45:00.088 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: 
10-20 14:45:00.088 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct<age: int, workclass: string, fnlwgt: double, education: string, marital_status: string ... 12 more fields>
10-20 14:45:00.114 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_21 stored as values in memory (estimated size 176.1 KiB, free 434.0 MiB)
10-20 14:45:00.130 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_21_piece0 stored as bytes in memory (estimated size 28.0 KiB, free 433.9 MiB)
10-20 14:45:00.130 172.17.0.2:54325      7



10-20 14:45:01.141 172.17.0.2:54325      7233    (TID 356)  INFO org.apache.spark.storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
10-20 14:45:01.144 172.17.0.2:54325      7233    (TID 356)  INFO org.apache.spark.executor.Executor: Finished task 141.0 in stage 15.0 (TID 356). 3832 bytes result sent to driver
10-20 14:45:01.144 172.17.0.2:54325      7233   ent-loop-1  INFO org.apache.spark.scheduler.TaskSetManager: Starting task 144.0 in stage 15.0 (TID 359) (95675304fa2d, executor driver, partition 144, PROCESS_LOCAL, 4453 bytes) taskResourceAssignments Map()
10-20 14:45:01.144 172.17.0.2:54325      7233   t-getter-2  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 141.0 in stage 15.0 (TID 356) in 16 ms on 95675304fa2d (executor driver) (142/200)
10-20 14:45:01.144 172.17.0.2:54325      7233    (TID 359)  INFO org.apache.spark.executor.Executor: Running task 144.0 in stage 15.0 (TID 359)
10-20 14:45:01.141 172.17.0.2:54325      7233    (TID 357)  INFO

                                                                                

Unnamed: 0,label,count
0,1,6289
1,0,19787


### ML Pipeline

In [18]:
from pyspark.ml.feature import (
    StringIndexer,
    VectorAssembler,
    OneHotEncoder,
    Imputer,
)
from pyspark.ml import (
    Pipeline
)
from pyspark.ml.classification import(
    GBTClassifier,
)

cols_to_impute = ['fnlwgt', 'age', 'capital_gain', 'capital_loss', 'hours_per_week']
cat_cols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
imputed_cols = [f'{x}_IMPUTED' for x in cols_to_impute]
imputer = Imputer(strategy='mean', inputCols=cols_to_impute, outputCols=imputed_cols)
string_indexers = []
ohe_indexers = []
for cat_col in cat_cols:
    si = StringIndexer(inputCol=cat_col, outputCol=f'{cat_col}_idx').setHandleInvalid('keep')
    enc = OneHotEncoder(inputCols=[si.getOutputCol()], outputCols=[f'{cat_col}_vec'])
    string_indexers.append(si)
    ohe_indexers.append(enc)

assembler_cols = [f'{c}_vec' for c in cat_cols] + imputed_cols
vector_assembler = VectorAssembler(inputCols=assembler_cols, outputCol='features')
gbt = GBTClassifier(labelCol='label', featuresCol='features')
gbt_stages = [imputer] + string_indexers + ohe_indexers + [vector_assembler] + [gbt]
pipeline = Pipeline().setStages(gbt_stages)
gbt_model = pipeline.fit(train_df)
gbt_model.write().overwrite().save(model_path)

10-20 14:45:03.943 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Pushed Filters: 
10-20 14:45:03.943 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Post-Scan Filters: 
10-20 14:45:03.943 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.FileSourceStrategy: Output Data Schema: struct<age: int, workclass: string, fnlwgt: double, education: string, marital_status: string ... 12 more fields>
10-20 14:45:03.996 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 16.948089 ms
10-20 14:45:04.055 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 31.345613 ms
10-20 14:45:04.059 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadca

                                                                                

10-20 14:45:05.570 172.17.0.2:54325      7233    (TID 420)  INFO org.apache.spark.executor.Executor: Finished task 0.0 in stage 20.0 (TID 420). 2511 bytes result sent to driver
10-20 14:45:05.573 172.17.0.2:54325      7233   t-getter-0  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 0.0 in stage 20.0 (TID 420) in 674 ms on 95675304fa2d (executor driver) (1/1)
10-20 14:45:05.574 172.17.0.2:54325      7233   t-getter-0  INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 20.0, whose tasks have all completed, from pool 
10-20 14:45:05.575 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: ShuffleMapStage 20 (collect at StringIndexer.scala:204) finished in 0.690 s
10-20 14:45:05.575 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: looking for newly runnable stages
10-20 14:45:05.575 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: running: Set()
10-2

[Stage 36:>                                                         (0 + 1) / 1]

10-20 14:45:11.466 172.17.0.2:54325      7233    (TID 436)  INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 4.954718 ms
10-20 14:45:11.479 172.17.0.2:54325      7233    (TID 436)  INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Code generated in 5.87621 ms
10-20 14:45:11.495 172.17.0.2:54325      7233    (TID 436)  INFO org.apache.spark.executor.Executor: Finished task 0.0 in stage 36.0 (TID 436). 1839 bytes result sent to driver
10-20 14:45:11.496 172.17.0.2:54325      7233   t-getter-0  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 0.0 in stage 36.0 (TID 436) in 942 ms on 95675304fa2d (executor driver) (1/1)
10-20 14:45:11.496 172.17.0.2:54325      7233   t-getter-0  INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 36.0, whose tasks have all completed, from pool 
10-20 14:45:11.497 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: ResultStage 36 (take a

[Stage 37:>                                                         (0 + 1) / 1]

10-20 14:45:12.767 172.17.0.2:54325      7233    (TID 437)  INFO org.apache.spark.executor.Executor: Finished task 0.0 in stage 37.0 (TID 437). 1954 bytes result sent to driver
10-20 14:45:12.768 172.17.0.2:54325      7233   t-getter-3  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 0.0 in stage 37.0 (TID 437) in 1227 ms on 95675304fa2d (executor driver) (1/1)
10-20 14:45:12.768 172.17.0.2:54325      7233   t-getter-3  INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 37.0, whose tasks have all completed, from pool 
10-20 14:45:12.768 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: ResultStage 37 (aggregate at DecisionTreeMetadata.scala:125) finished in 1.259 s
10-20 14:45:12.768 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Job 20 is finished. Cancelling potential speculative or zombie tasks for this job
10-20 14:45:12.769 172.17.0.2:54325      7233   event-loop  INFO org.ap

                                                                                

10-20 14:45:13.012 172.17.0.2:54325      7233    (TID 438)  INFO org.apache.spark.sql.execution.datasources.FileScanRDD: Reading File path: file:///home/jovyan/data/census-train.csv, range: 0-3974305, partition values: [empty row]


[Stage 38:>                                                         (0 + 1) / 1]

10-20 14:45:14.301 172.17.0.2:54325      7233    (TID 438)  INFO org.apache.spark.executor.Executor: Finished task 0.0 in stage 38.0 (TID 438). 2133 bytes result sent to driver
10-20 14:45:14.302 172.17.0.2:54325      7233   t-getter-1  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 0.0 in stage 38.0 (TID 438) in 1413 ms on 95675304fa2d (executor driver) (1/1)
10-20 14:45:14.302 172.17.0.2:54325      7233   t-getter-1  INFO org.apache.spark.scheduler.TaskSchedulerImpl: Removed TaskSet 38.0, whose tasks have all completed, from pool 
10-20 14:45:14.302 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: ShuffleMapStage 38 (flatMap at RandomForest.scala:1039) finished in 1.441 s
10-20 14:45:14.302 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: looking for newly runnable stages
10-20 14:45:14.302 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: running: Set()
10-

                                                                                

10-20 14:45:14.537 172.17.0.2:54325      7233    (TID 440)  INFO org.apache.spark.sql.execution.datasources.FileScanRDD: Reading File path: file:///home/jovyan/data/census-train.csv, range: 0-3974305, partition values: [empty row]
10-20 14:45:14.674 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_57_piece0 on 95675304fa2d:39429 in memory (size: 45.0 KiB, free: 434.3 MiB)
10-20 14:45:14.675 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_58_piece0 on 95675304fa2d:39429 in memory (size: 5.0 KiB, free: 434.3 MiB)


[Stage 40:>                                                         (0 + 1) / 1]

10-20 14:45:15.339 172.17.0.2:54325      7233    (TID 440)  INFO org.apache.spark.storage.memory.MemoryStore: Block rdd_152_0 stored as values in memory (estimated size 11.8 MiB, free 422.0 MiB)
10-20 14:45:15.339 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added rdd_152_0 in memory on 95675304fa2d:39429 (size: 11.8 MiB, free: 422.5 MiB)
10-20 14:45:15.343 172.17.0.2:54325      7233    (TID 440)  INFO org.apache.spark.storage.BlockManager: Found block rdd_152_0 locally
10-20 14:45:15.366 172.17.0.2:54325      7233    (TID 440)  INFO org.apache.spark.storage.memory.MemoryStore: Block rdd_154_0 stored as values in memory (estimated size 101.9 KiB, free 421.9 MiB)
10-20 14:45:15.366 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added rdd_154_0 in memory on 95675304fa2d:39429 (size: 101.9 KiB, free: 422.4 MiB)
10-20 14:45:15.422 172.17.0.2:54325      7233    (TID 440)  INFO org.apache.spark.executor.Exe

                                                                                

10-20 14:45:15.788 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Submitting ShuffleMapStage 44 (MapPartitionsRDD[163] at mapPartitions at RandomForest.scala:644), which has no missing parents
10-20 14:45:15.794 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_67 stored as values in memory (estimated size 142.0 KiB, free 421.6 MiB)
10-20 14:45:15.796 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_67_piece0 stored as bytes in memory (estimated size 46.8 KiB, free 421.5 MiB)
10-20 14:45:15.797 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on 95675304fa2d:39429 (size: 46.8 KiB, free: 422.3 MiB)
10-20 14:45:15.798 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.SparkContext: Created broadcast 67 from broadcast at DAGScheduler.scala:1388
10-20

                                                                                

10-20 14:45:28.923 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.internal.io.HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
10-20 14:45:28.957 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.SparkContext: Starting job: runJob at SparkHadoopWriter.scala:83
10-20 14:45:28.958 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Got job 125 (runJob at SparkHadoopWriter.scala:83) with 1 output partitions
10-20 14:45:28.958 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Final stage: ResultStage 244 (runJob at SparkHadoopWriter.scala:83)
10-20 14:45:28.958 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List()
10-20 14:45:28.958 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List()
10-20 14:45:28.958 172.17.0.2:5432

### Evaluation

In [19]:
from pyspark.ml import (
    PipelineModel
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pipeline_model = PipelineModel.load(model_path)
train_df_pred = pipeline_model.transform(train_df)
val_df_pred = pipeline_model.transform(val_df)
evaluator = BinaryClassificationEvaluator()
print(f'Metric name: {evaluator.getMetricName()}')
print(f'Metric value: {evaluator.evaluate(val_df_pred)}')


10-20 14:47:11.541 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_416 stored as values in memory (estimated size 176.1 KiB, free 434.0 MiB)
10-20 14:47:11.579 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_416_piece0 stored as bytes in memory (estimated size 27.2 KiB, free 434.0 MiB)
10-20 14:47:11.581 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added broadcast_416_piece0 in memory on 95675304fa2d:39429 (size: 27.2 KiB, free: 434.3 MiB)
10-20 14:47:11.582 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.SparkContext: Created broadcast 416 from textFile at NativeMethodAccessorImpl.java:0
10-20 14:47:11.667 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.SparkContext: Starting job: runJob at PythonRDD.scala:166
10-20 14:47:11.669 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.sched

                                                                                

10-20 14:47:18.697 172.17.0.2:54325      7233   d-pool-185  INFO org.apache.spark.storage.BlockManager: Removing RDD 949


### Inference

In [20]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import when
from pyspark.ml import (
    PipelineModel
)

 

schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", DoubleType(), True),
    StructField("education", StringType(), True),
    StructField("education_num", IntegerType(), True),
    StructField("marital_status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital_gain", DoubleType(), True),
    StructField("capital_loss", DoubleType(), True),
    StructField("hours_per_week", DoubleType(), True),
    StructField("native_country", StringType(), True),
    StructField("income_level", StringType(), True),
])

test_df = (
    spark.read
    .format('csv')
    .option('header', 'false')
    .option('delimiter', ',')
    .schema(schema)
    .load(test_path)
    .withColumn('label', when(col('income_level').contains('>50K'), lit(1)).otherwise(lit(0)))
    .drop('education_num', 'income_level', 'label')
    .withColumn('workclass', when(col('workclass') == ' ?', lit('NA')).otherwise(col('workclass')))
    .withColumn('occupation', when(col('occupation') == ' ?', lit('NA')).otherwise(col('occupation')))
    .withColumn('native_country', when(col('native_country') == ' ?', lit('NA')).otherwise(col('native_country')))
)

pipeline_model = PipelineModel.load(model_path)
udf_pos_prob = udf(lambda v: float(v[1]), FloatType())
test_df_pred = pipeline_model.transform(test_df)
test_df_pred = test_df_pred.withColumn('prob', udf_pos_prob(col('probability')))
cols = ['rawPrediction', 'probability', 'prediction', 'prob']
test_df_pred.select(*cols).write.parquet(pred_path, mode='overwrite')

10-20 14:47:18.730 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.InMemoryFileIndex: It took 0 ms to list leaf files for 1 paths.
10-20 14:47:18.786 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_564 stored as values in memory (estimated size 176.1 KiB, free 433.6 MiB)
10-20 14:47:18.790 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.storage.memory.MemoryStore: Block broadcast_564_piece0 stored as bytes in memory (estimated size 27.2 KiB, free 433.6 MiB)
10-20 14:47:18.790 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Added broadcast_564_piece0 in memory on 95675304fa2d:39429 (size: 27.2 KiB, free: 434.2 MiB)
10-20 14:47:18.791 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.SparkContext: Created broadcast 564 from textFile at NativeMethodAccessorImpl.java:0
10-20 14:47:18.827 172.17.0.2:54325      7233     Threa

[Stage 473:>                                                        (0 + 1) / 1]

10-20 14:47:25.183 172.17.0.2:54325      7233    (TID 918)  INFO org.apache.spark.sql.execution.python.PythonUDFRunner: Times: total = 934, boot = 4, init = 149, finish = 781
10-20 14:47:25.187 172.17.0.2:54325      7233    (TID 918)  INFO org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 713904
10-20 14:47:25.231 172.17.0.2:54325      7233    (TID 918)  INFO org.apache.spark.mapred.SparkHadoopMapRedUtil: attempt_202310201447239074903291050190556_0473_m_000000_918: Committed
10-20 14:47:25.232 172.17.0.2:54325      7233    (TID 918)  INFO org.apache.spark.executor.Executor: Finished task 0.0 in stage 473.0 (TID 918). 3211 bytes result sent to driver
10-20 14:47:25.233 172.17.0.2:54325      7233   t-getter-3  INFO org.apache.spark.scheduler.TaskSetManager: Finished task 0.0 in stage 473.0 (TID 918) in 1273 ms on 95675304fa2d (executor driver) (1/1)
10-20 14:47:25.233 172.17.0.2:54325      7233   t-getter-3  INFO org.apache.spark.s

                                                                                

In [21]:
spark.read.load('outputs/income_rf_spark_pred').limit(10).toPandas()

10-20 14:47:27.402 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.sql.execution.datasources.InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths.
10-20 14:47:27.432 172.17.0.2:54325      7233     Thread-4  INFO org.apache.spark.SparkContext: Starting job: load at NativeMethodAccessorImpl.java:0
10-20 14:47:27.433 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Got job 325 (load at NativeMethodAccessorImpl.java:0) with 1 output partitions
10-20 14:47:27.433 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Final stage: ResultStage 474 (load at NativeMethodAccessorImpl.java:0)
10-20 14:47:27.433 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Parents of final stage: List()
10-20 14:47:27.433 172.17.0.2:54325      7233   event-loop  INFO org.apache.spark.scheduler.DAGScheduler: Missing parents: List()
10-20 14:47:27.433 172.17.0.2:54325      7233   e

Unnamed: 0,rawPrediction,probability,prediction,prob
0,"[16.37736275283466, 3.622637247165343]","[0.8188681376417328, 0.18113186235826712]",0.0,0.181132
1,"[19.010115101836554, 0.9898848981634504]","[0.9505057550918276, 0.04949424490817251]",0.0,0.049494
2,"[13.375161764175118, 6.624838235824885]","[0.6687580882087558, 0.33124191179124424]",0.0,0.331242
3,"[13.326906456552484, 6.673093543447515]","[0.6663453228276242, 0.33365467717237574]",0.0,0.333655
4,"[8.207082112497687, 11.792917887502313]","[0.4103541056248844, 0.5896458943751156]",1.0,0.589646
5,"[19.021352795491893, 0.9786472045081104]","[0.9510676397745945, 0.04893236022540551]",0.0,0.048932
6,"[18.497068651104033, 1.5029313488959692]","[0.9248534325552015, 0.07514656744479845]",0.0,0.075147
7,"[18.23298354762287, 1.7670164523771308]","[0.9116491773811435, 0.08835082261885654]",0.0,0.088351
8,"[10.331123697187872, 9.668876302812127]","[0.5165561848593936, 0.4834438151406063]",0.0,0.483444
9,"[18.87149111861569, 1.1285088813843114]","[0.9435745559307845, 0.05642544406921557]",0.0,0.056425


10-20 14:50:51.657 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_708_piece0 on 95675304fa2d:39429 in memory (size: 30.1 KiB, free: 434.2 MiB)
10-20 14:50:51.720 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_710_piece0 on 95675304fa2d:39429 in memory (size: 5.3 KiB, free: 434.2 MiB)
10-20 14:50:51.788 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_706_piece0 on 95675304fa2d:39429 in memory (size: 28.0 KiB, free: 434.2 MiB)
10-20 14:50:51.860 172.17.0.2:54325      7233   agerMaster  INFO org.apache.spark.storage.BlockManagerInfo: Removed broadcast_707_piece0 on 95675304fa2d:39429 in memory (size: 188.0 KiB, free: 434.4 MiB)
