### grp

## Understanding Parallelization of Machine Learning Algorithms in Apache Spark

#### Spark Summit Bay Area Meet Up https://www.youtube.com/watch?v=3p81IYQWB_E

#### ML Pipeline Techiques:
1. Parallelize Featurization and Single Machine Training
2. Parallelize Featurization and Distributed Training
3. Many Models Orchestration - One Model Per Executor
4. Offline Distributed Training - Online Streaming Predictions
5. Offline Single Machine Training - Online Streaming Predictions (**in work**)

### read data

In [1]:
!ls ./bank

bank-full.csv  bank-names.txt bank.csv


In [2]:
df = spark.read.format("csv")\
.option("path", "./bank/bank-full.csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.option("delimiter", ";")\
.option("quote", '"')\
.load()

In [3]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



### query data

In [4]:
df.write.mode("overwrite").saveAsTable("bank_sql")

In [5]:
for i in spark.sql("select * from bank_sql").take(5): print(i)

Row(age=58, job='management', marital='married', education='tertiary', default='no', balance=2143, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=261, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(age=44, job='technician', marital='single', education='secondary', default='no', balance=29, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=151, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(age=33, job='entrepreneur', marital='married', education='secondary', default='no', balance=2, housing='yes', loan='yes', contact='unknown', day=5, month='may', duration=76, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(age=47, job='blue-collar', marital='married', education='unknown', default='no', balance=1506, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=92, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(age=33, job='unknown', marital='single'

# 1. Parallelize Featurization and Single Machine Training

In [6]:
input_data = spark.table("bank_sql")
cols = input_data.columns

In [7]:
for i in cols: print(i)

age
job
marital
education
default
balance
housing
loan
contact
day
month
duration
campaign
pdays
previous
poutcome
y


In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StringIndexerModel, VectorAssembler

catCols = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
stages = [] # pipeline stages
for i in catCols:
    si = StringIndexer(inputCol=i, outputCol=i+"Index") # category indexing
    ohe = OneHotEncoder(inputCol=i+"Index", outputCol=i+"classVec") # binary sparse vectors
    stages += [si, ohe]

In [9]:
numCols = ["age", "balance", "campaign", "previous", "day"]

assemblerInputs = numCols + list(map(lambda x: x + "classVec", catCols)) # concat cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [10]:
labelIndexer = StringIndexer(inputCol="y", outputCol="label")
stages += [labelIndexer]

In [11]:
for i in stages: print(i)

StringIndexer_a747fbdba374
OneHotEncoder_407614407687
StringIndexer_da21f2ed6bda
OneHotEncoder_297d7c7fff7b
StringIndexer_2fc172f6283a
OneHotEncoder_d05a34a5146d
StringIndexer_76781474a422
OneHotEncoder_971cd94c7afa
StringIndexer_6520c13ab587
OneHotEncoder_0e88c5503f01
StringIndexer_847ea6b5f9d5
OneHotEncoder_c4113294fd37
StringIndexer_def962708378
OneHotEncoder_cd7709e39ec2
StringIndexer_b554d29c23d1
OneHotEncoder_98d7623b943c
StringIndexer_3d4fd88347c3
OneHotEncoder_9c3460b9c82b
VectorAssembler_37cf592fb013
StringIndexer_74a243b9e088


In [12]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(input_data)
dataset = pipelineModel.transform(input_data)

selectCols = ["label", "features"] + cols
dataset = dataset.select(selectCols)
for i in dataset.take(5): print(i)

Row(label=0.0, features=SparseVector(40, {0: 58.0, 1: 2143.0, 2: 1.0, 4: 5.0, 6: 1.0, 16: 1.0, 19: 1.0, 21: 1.0, 22: 1.0, 23: 1.0, 25: 1.0, 26: 1.0, 37: 1.0}), age=58, job='management', marital='married', education='tertiary', default='no', balance=2143, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=261, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(label=0.0, features=SparseVector(40, {0: 44.0, 1: 29.0, 2: 1.0, 4: 5.0, 7: 1.0, 17: 1.0, 18: 1.0, 21: 1.0, 22: 1.0, 23: 1.0, 25: 1.0, 26: 1.0, 37: 1.0}), age=44, job='technician', marital='single', education='secondary', default='no', balance=29, housing='yes', loan='no', contact='unknown', day=5, month='may', duration=151, campaign=1, pdays=-1, previous=0, poutcome='unknown', y='no')
Row(label=0.0, features=SparseVector(40, {0: 33.0, 1: 2.0, 2: 1.0, 4: 5.0, 12: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 1.0, 25: 1.0, 26: 1.0, 37: 1.0}), age=33, job='entrepreneur', marital='married', education='secon

In [13]:
dataset.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [14]:
(trainData, testData) = dataset.randomSplit([0.8, 0.2], seed=123)

- _**pulls processing all back to driver**_
- _**one spark job hence one machine via sklearn**_

In [15]:
import numpy as np

pandasDF = trainData.select("features", "label").toPandas()
series = pandasDF["features"].apply(lambda x: np.array(x.toArray())).values.reshape(-1, 1)
X = np.apply_along_axis(lambda x: x[0], 1, series)
y = np.array(pandasDF["label"])

In [16]:
print(X.shape)
print(y.shape)

(36249, 40)
(36249,)


In [17]:
[X, y]

[array([[2.900e+01, 7.510e+02, 1.000e+00, ..., 0.000e+00, 0.000e+00,
         0.000e+00],
        [5.200e+01, 7.180e+03, 1.000e+00, ..., 0.000e+00, 0.000e+00,
         0.000e+00],
        [5.400e+01, 1.777e+03, 5.000e+00, ..., 0.000e+00, 0.000e+00,
         0.000e+00],
        ...,
        [5.100e+01, 0.000e+00, 2.000e+00, ..., 1.000e+00, 0.000e+00,
         0.000e+00],
        [8.400e+01, 0.000e+00, 2.000e+00, ..., 1.000e+00, 0.000e+00,
         0.000e+00],
        [4.100e+01, 0.000e+00, 3.000e+00, ..., 1.000e+00, 0.000e+00,
         0.000e+00]]), array([0., 0., 0., ..., 1., 1., 1.])]

In [18]:
from sklearn.linear_model import LogisticRegression
import numpy as np

skLr = LogisticRegression(max_iter=1000, tol=0.000001, solver="lbfgs")

In [19]:
skLrModel = skLr.fit(X, y)
skLrModel

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=1000, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='lbfgs', tol=1e-06,
          verbose=0, warm_start=False)

In [20]:
pandasTestDF = testData.select("features", "label").toPandas()
seriesTest = pandasTestDF["features"].apply(lambda x: np.array(x.toArray())).values.reshape(-1,1)
xTest = np.apply_along_axis(lambda x: x[0], 1, seriesTest)
yTest = np.array(pandasTestDF["label"])

In [21]:
skLrModel.score(xTest, yTest)

0.8906494086141487

# 2. Parallelize Featurization and Distributed Training

In [22]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(labelCol="label",
                            featuresCol="features",
                            maxDepth=10,
                            numTrees=20,
                            featureSubsetStrategy="all",
                            seed=123,
                            maxBins=100)

In [23]:
pg = ParamGridBuilder().addGrid(rf.numTrees, [30, 40, 50]).build()
cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=pg,
                    numFolds=3,
                    seed=123,
                    parallelism=4,
                    evaluator=BinaryClassificationEvaluator())

rfModel = cv.fit(trainData)

- _**multiple spark jobs being distributed across machines via spark core engine**_

In [24]:
predictions = rfModel.bestModel.transform(testData)
for i in predictions.take(5): print(i)

Row(label=0.0, features=SparseVector(40, {0: 33.0, 1: 1905.0, 2: 1.0, 3: 2.0, 4: 13.0, 5: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 1.0, 23: 1.0, 24: 1.0, 26: 1.0}), age=33, job='blue-collar', marital='married', education='secondary', default='no', balance=1905, housing='yes', loan='no', contact='cellular', day=13, month='may', duration=39, campaign=1, pdays=301, previous=2, poutcome='success', y='no', rawPrediction=DenseVector([31.7504, 18.2496]), probability=DenseVector([0.635, 0.365]), prediction=0.0)
Row(label=0.0, features=SparseVector(40, {0: 45.0, 1: 640.0, 2: 1.0, 3: 1.0, 4: 5.0, 5: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 1.0, 23: 1.0, 24: 1.0, 26: 1.0}), age=45, job='blue-collar', marital='married', education='secondary', default='no', balance=640, housing='yes', loan='no', contact='cellular', day=5, month='may', duration=214, campaign=1, pdays=1, previous=1, poutcome='success', y='no', rawPrediction=DenseVector([24.1492, 25.8508]), probability=DenseVector([0.483, 0.517]), prediction=1.

In [25]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [26]:
labels = [[l.getOrDefault("inputCol") + ":" + x for x in l.labels] \
          for l in pipelineModel.stages if "StringIndexerModel" in str(l.__class__)]
indexedCols = [j for i in labels for j in i]
featureLabels = numCols + indexedCols

In [27]:
featureLabels

['age',
 'balance',
 'campaign',
 'previous',
 'day',
 'job:blue-collar',
 'job:management',
 'job:technician',
 'job:admin.',
 'job:services',
 'job:retired',
 'job:self-employed',
 'job:entrepreneur',
 'job:unemployed',
 'job:housemaid',
 'job:student',
 'job:unknown',
 'marital:married',
 'marital:single',
 'marital:divorced',
 'education:secondary',
 'education:tertiary',
 'education:primary',
 'education:unknown',
 'default:no',
 'default:yes',
 'housing:yes',
 'housing:no',
 'loan:no',
 'loan:yes',
 'contact:cellular',
 'contact:unknown',
 'contact:telephone',
 'month:may',
 'month:jul',
 'month:aug',
 'month:jun',
 'month:nov',
 'month:apr',
 'month:feb',
 'month:jan',
 'month:oct',
 'month:sep',
 'month:mar',
 'month:dec',
 'poutcome:unknown',
 'poutcome:failure',
 'poutcome:other',
 'poutcome:success',
 'y:no',
 'y:yes']

In [28]:
featureImportances = rfModel.bestModel.featureImportances.toArray()
featureImportancesMap = zip(featureLabels, featureImportances)
sorted(featureImportancesMap, key=lambda x: x[1], reverse=True)

[('age', 0.1596043421139187),
 ('education:primary', 0.10761182488758265),
 ('month:feb', 0.08893854375882428),
 ('previous', 0.08705721167076327),
 ('day', 0.07356364438057131),
 ('balance', 0.07260733159172898),
 ('month:apr', 0.07176751038225039),
 ('month:jun', 0.039866004142590034),
 ('loan:yes', 0.03658198096030473),
 ('default:yes', 0.02776701799247273),
 ('month:jul', 0.027247430052649863),
 ('campaign', 0.024236513148029163),
 ('month:aug', 0.021093584473934716),
 ('month:nov', 0.017666950831323643),
 ('contact:unknown', 0.015534098308977564),
 ('default:no', 0.012865958963706582),
 ('loan:no', 0.011524660146354393),
 ('marital:divorced', 0.00712239086469121),
 ('job:unknown', 0.006795513448063868),
 ('housing:no', 0.006670226922586362),
 ('contact:cellular', 0.00640306858449382),
 ('education:unknown', 0.00625946450064183),
 ('job:admin.', 0.005961402148959092),
 ('month:may', 0.00590543483222255),
 ('job:technician', 0.0058597337152290555),
 ('marital:single', 0.005776544215

In [29]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.7724670267807295

# 3. Many Models Orchestration - One Model Per Executor
- useful with:
    - creating a model grouped by a feature (ex: one model per customer / user)
    - creating a model based on a set of hyperparameters for model selecting / tuning
    - creating a model based on a set of features for feature selection

In [30]:
import xgboost as xgb

In [31]:
def trainXGbModel(partitionKey, labelAndFeatures):
    X = np.asarray(list(map(lambda v: v[1].toArray(), labelAndFeatures)))
    y = np.asarray(list(map(lambda v: v[0], labelAndFeatures)))
    gbClassifier = xgb.XGBClassifier(max_depth=3, seed=123, objective="binary:logistic")
    model = gbClassifier.fit(X, y)
    return [partitionKey, model]

In [32]:
xgbModels = trainData\
.select("education", "label", "features")\
.repartition("education")\
.rdd\
.map(lambda row: [row[0], [row[1], row[2]]])\
.groupByKey()\
.map(lambda v: trainXGbModel(v[0], list(v[1])))

In [33]:
xgbModels.take(3)

[['tertiary',
  XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
         colsample_bynode=1, colsample_bytree=1, gamma=0, learning_rate=0.1,
         max_delta_step=0, max_depth=3, min_child_weight=1, missing=nan,
         n_estimators=100, n_jobs=1, nthread=None,
         objective='binary:logistic', random_state=0, reg_alpha=0,
         reg_lambda=1, scale_pos_weight=1, seed=123, silent=None,
         subsample=1, verbosity=1)],
 ['primary',
  XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
         colsample_bynode=1, colsample_bytree=1, gamma=0, learning_rate=0.1,
         max_delta_step=0, max_depth=3, min_child_weight=1, missing=nan,
         n_estimators=100, n_jobs=1, nthread=None,
         objective='binary:logistic', random_state=0, reg_alpha=0,
         reg_lambda=1, scale_pos_weight=1, seed=123, silent=None,
         subsample=1, verbosity=1)],
 ['unknown',
  XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
     

In [34]:
from sklearn.ensemble import RandomForestClassifier as rf
from spark_sklearn import GridSearchCV

parameters = {"n_estimators": [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],\
             "max_depth": [3, 5, 7, 10]}
rfSklearn = rf()
clf = GridSearchCV(spark.sparkContext, rfSklearn, parameters)
model = clf.fit(X, y)

- _**1 spark job per machine via sklearn to distribute training**_

In [35]:
tuning = clf.best_estimator_

print("criterion:   " + str(tuning.criterion))
print("max_depth:   " + str(tuning.max_depth))
print("max_features:   " + str(tuning.max_features))
print("min_samples_leaf:   " + str(tuning.min_samples_leaf))
print("min_samples_split:   " + str(tuning.min_samples_split))
print("n_estimators:   " + str(tuning.n_estimators))
print("n_jobs:   " + str(tuning.n_jobs))

criterion:   gini
max_depth:   3
max_features:   auto
min_samples_leaf:   1
min_samples_split:   2
n_estimators:   10
n_jobs:   1


In [36]:
tuning

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=3, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

In [37]:
for i,r in enumerate(clf.cv_results_["mean_test_score"]):
    print("#{}: {:.2f}".format(i, r*100))

#0: 87.99
#1: 87.86
#2: 87.81
#3: 87.81
#4: 87.85
#5: 87.85
#6: 87.85
#7: 87.84
#8: 87.85
#9: 87.86
#10: 85.63
#11: 87.51
#12: 87.63
#13: 87.54
#14: 87.75
#15: 87.67
#16: 87.48
#17: 87.73
#18: 86.14
#19: 87.07
#20: 78.99
#21: 80.82
#22: 82.86
#23: 83.75
#24: 84.06
#25: 84.45
#26: 83.81
#27: 84.25
#28: 84.85
#29: 84.52
#30: 70.66
#31: 75.70
#32: 74.18
#33: 74.09
#34: 74.58
#35: 74.78
#36: 74.62
#37: 74.15
#38: 75.20
#39: 74.40


# 4. Offline Distributed Training - Online Streaming Predictions

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StringIndexerModel, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

stages2 = [] # pipeline stages

(trainData2, testData2, holdoutData2) = df.randomSplit([0.7, 0.2, 0.1], seed=123)

catCols2 = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
for i in catCols2:
    si2 = StringIndexer(inputCol=i, outputCol=i+"Index") # category indexing
    ohe2 = OneHotEncoder(inputCol=i+"Index", outputCol=i+"classVec") # binary sparse vectors
    stages2 += [si2, ohe2]

numCols2 = ["age", "balance", "campaign", "previous", "day"]
assemblerInputs2 = numCols2 + list(map(lambda x: x + "classVec", catCols2)) # concat cols
assembler2 = VectorAssembler(inputCols=assemblerInputs2, outputCol="features")
stages2 += [assembler2]

labelIndexer2 = StringIndexer(inputCol="y", outputCol="label")
stages2 += [labelIndexer2]

rf2 = RandomForestClassifier(labelCol="label",
                            featuresCol="features",
                            maxDepth=10,
                            numTrees=20,
                            featureSubsetStrategy="all",
                            seed=123,
                            maxBins=100)
stages2 += [rf2]

In [39]:
pipeline2 = Pipeline(stages=stages2)

In [40]:
pipeline2.getStages()

[StringIndexer_83c952a59bfc,
 OneHotEncoder_7c4a637b8526,
 StringIndexer_c6a550a5e315,
 OneHotEncoder_875c70daf121,
 StringIndexer_8886b3a2247f,
 OneHotEncoder_08daa0dcfb67,
 StringIndexer_ac3325c715a0,
 OneHotEncoder_68ec320c3e4e,
 StringIndexer_d1cb23c608a7,
 OneHotEncoder_2cb5af6befb7,
 StringIndexer_732f7676b9a6,
 OneHotEncoder_f07421b3c235,
 StringIndexer_fcff48c32fc2,
 OneHotEncoder_f99a2a2ef3da,
 StringIndexer_e475c0db0452,
 OneHotEncoder_b212b8df3860,
 StringIndexer_3bdaabb9df22,
 OneHotEncoder_0eb0b0816e98,
 VectorAssembler_b034a694de88,
 StringIndexer_949aa4a9075b,
 RandomForestClassifier_3920a627bef5]

In [41]:
pg2 = ParamGridBuilder().addGrid(rf2.numTrees, [30, 40, 50]).build()

cv2 = CrossValidator(estimator=pipeline2,
                    estimatorParamMaps=pg2,
                    numFolds=3,
                    seed=123,
                    parallelism=4,
                    evaluator=BinaryClassificationEvaluator())

pipelineModel = cv2.fit(trainData2)

testModel = pipelineModel.transform(testData2)

testResults = testModel.select("features", "label", "probability", "prediction")

bestModel = pipelineModel.bestModel

bestModel.write().overwrite().save("./pipelineModel")

In [55]:
holdoutData2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



In [110]:
holdoutData2.count()

4487

In [119]:
holdoutData2.drop("y").coalesce(1).write.csv("./streamData.csv", header="true", mode="overwrite")

In [44]:
!ls ./streamData.csv

_SUCCESS
part-00000-5a2348e0-ad49-4ad7-9592-54459d112af8-c000.csv


In [120]:
from pyspark.ml.pipeline import PipelineModel

path = "./pipelineModel"
applyModel = PipelineModel.load(path)

In [121]:
applyModel.stages

[StringIndexer_83c952a59bfc,
 OneHotEncoder_7c4a637b8526,
 StringIndexer_c6a550a5e315,
 OneHotEncoder_875c70daf121,
 StringIndexer_8886b3a2247f,
 OneHotEncoder_08daa0dcfb67,
 StringIndexer_ac3325c715a0,
 OneHotEncoder_68ec320c3e4e,
 StringIndexer_d1cb23c608a7,
 OneHotEncoder_2cb5af6befb7,
 StringIndexer_732f7676b9a6,
 OneHotEncoder_f07421b3c235,
 StringIndexer_fcff48c32fc2,
 OneHotEncoder_f99a2a2ef3da,
 StringIndexer_e475c0db0452,
 OneHotEncoder_b212b8df3860,
 StringIndexer_3bdaabb9df22,
 OneHotEncoder_0eb0b0816e98,
 VectorAssembler_b034a694de88,
 StringIndexer_949aa4a9075b,
 RandomForestClassificationModel (uid=RandomForestClassifier_3920a627bef5) with 50 trees]

In [122]:
streamDF = spark.read.csv("./streamData.csv", header=True, inferSchema=True)

In [123]:
schema = streamDF.schema

In [124]:
streamingData = (spark
                 .readStream
                 .option("header", "true")
                 .schema(schema)
                 .option("maxFilesPerTrigger", 1)
                 .csv("./streamData.csv"))

In [125]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

In [137]:
query.stop()

In [138]:
query = (applyModel.transform(streamingData)
.writeStream
.trigger(processingTime="5 seconds")
.queryName("preds")
.format("memory")
.outputMode("append")
.start())

In [142]:
spark.sql("select * from preds").count()

4487

In [143]:
spark.sql("select * from preds").printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- jobIndex: double (nullable = false)
 |-- jobclassVec: vector (nullable = true)
 |-- maritalIndex: double (nullable = false)
 |-- maritalclassVec: vector (nullable = true)
 |-- educationIndex: double (nullable = false)
 |-- educationclassVec: vector (nullable = true)
 |-- defaultIndex: double (nullable = false)
 |-- defaultclassVec: vector (nullable = true)
 |-- housingI

In [144]:
spark.sql("select probability, prediction from preds").show(truncate=False)

+-----------------------------------------+----------+
|probability                              |prediction|
+-----------------------------------------+----------+
|[0.7902479212093924,0.20975207879060767] |0.0       |
|[0.4909773920132184,0.5090226079867816]  |1.0       |
|[0.6103170239963902,0.3896829760036098]  |0.0       |
|[0.7673288056194824,0.23267119438051762] |0.0       |
|[0.5557603280721498,0.4442396719278502]  |0.0       |
|[0.457175112600492,0.5428248873995081]   |1.0       |
|[0.775905439860338,0.22409456013966195]  |0.0       |
|[0.9475619285532805,0.05243807144671945] |0.0       |
|[0.9550897521102618,0.044910247889738165]|0.0       |
|[0.9550897521102618,0.044910247889738165]|0.0       |
|[0.9475624240634223,0.052437575936577836]|0.0       |
|[0.9489980875682197,0.051001912431780365]|0.0       |
|[0.8884474955517119,0.11155250444828815] |0.0       |
|[0.9546531974870512,0.0453468025129487]  |0.0       |
|[0.9516945797709683,0.048305420229031705]|0.0       |
|[0.759257

### grp