## Create Spark Context

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
sparkConf.setAppName("spark")
sparkConf.set("spark.kubernetes.container.image", "sbc1/spark-py")
sparkConf.set("spark.kubernetes.namespace", "spark")
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
sparkConf.set("spark.driver.port", "33333")
sparkConf.set("spark.driver.host", "my-notebook-deployment.spark.svc.cluster.local")
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.my-notebook-pvc.options.claimName","my-notebook-pvc")
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.my-notebook-pvc.mount.path","/root/data")
sparkConf.set("spark.kubernetes.executor.volumes.persistentVolumeClaim.my-notebook-pvc.mount.readOnly","false")
# Initialize our Spark cluster, this will actually
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext


print(spark.version)

for attribute in sc._conf.getAll():
    print(attribute)

2.4.5
('spark.executor.memory', '512m')
('spark.driver.port', '33333')
('spark.kubernetes.executor.volumes.persistentVolumeClaim.my-notebook-pvc.mount.readOnly', 'false')
('spark.kubernetes.authenticate.driver.serviceAccountName', 'spark')
('spark.kubernetes.executor.volumes.persistentVolumeClaim.my-notebook-pvc.mount.path', '/root/data')
('spark.executor.id', 'driver')
('spark.kubernetes.pyspark.pythonVersion', '3')
('spark.driver.memory', '512m')
('spark.kubernetes.namespace', 'spark')
('spark.app.name', 'spark')
('spark.kubernetes.authenticate.serviceAccountName', 'spark')
('spark.rdd.compress', 'True')
('spark.app.id', 'spark-application-1588942627957')
('spark.serializer.objectStreamReset', '100')
('spark.executor.instances', '1')
('spark.kubernetes.container.image', 'sbc1/spark-py')
('spark.executor.cores', '1')
('spark.submit.deployMode', 'client')
('spark.master', 'k8s://https://kubernetes.default.svc.cluster.local:443')
('spark.driver.host', 'my-notebook-deployment.spark.svc.c

## Download Sample Data

In [3]:
!wget https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt

--2020-05-08 13:00:36--  https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.64.133, 151.101.0.133, 151.101.192.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.64.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 104736 (102K) [text/plain]
Saving to: ‘sample_libsvm_data.txt’


2020-05-08 13:00:37 (3.67 MB/s) - ‘sample_libsvm_data.txt’ saved [104736/104736]



In [4]:
import os
os.listdir()

['spark-intro.pdf',
 'calculate-pi-out.txt',
 'docker-tutorial',
 'c5w1-spark-intro.ipynb',
 'sherlock-holmes.txt',
 'c5w1-docker-tutorial.ipynb',
 'sample_libsvm_data.txt',
 'calculate-pi.py',
 '.python-version',
 '.ipynb_checkpoints',
 'c5w2-spark-mllib-example.ipynb']

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Load and parse the data file, converting it to a DataFrame.

In [10]:
data = spark.read.format("libsvm").load("./sample_libsvm_data.txt")

In [17]:
print(f'Read {data.count()} records with {len(data.columns)} columns\n\n')
print(data.show())

Read 100 records with 2 columns


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows

None


In [41]:
data.first()

Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0,

## Index labels, adding metadata to the label column.
### Fit on whole dataset to include all labels in index.

In [19]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

In [21]:
labelIndexer.transform(data).show()

+-----+--------------------+------------+
|label|            features|indexedLabel|
+-----+--------------------+------------+
|  0.0|(692,[127,128,129...|         1.0|
|  1.0|(692,[158,159,160...|         0.0|
|  1.0|(692,[124,125,126...|         0.0|
|  1.0|(692,[152,153,154...|         0.0|
|  1.0|(692,[151,152,153...|         0.0|
|  0.0|(692,[129,130,131...|         1.0|
|  1.0|(692,[158,159,160...|         0.0|
|  1.0|(692,[99,100,101,...|         0.0|
|  0.0|(692,[154,155,156...|         1.0|
|  0.0|(692,[127,128,129...|         1.0|
|  1.0|(692,[154,155,156...|         0.0|
|  0.0|(692,[153,154,155...|         1.0|
|  0.0|(692,[151,152,153...|         1.0|
|  1.0|(692,[129,130,131...|         0.0|
|  0.0|(692,[154,155,156...|         1.0|
|  1.0|(692,[150,151,152...|         0.0|
|  0.0|(692,[124,125,126...|         1.0|
|  0.0|(692,[152,153,154...|         1.0|
|  1.0|(692,[97,98,99,12...|         0.0|
|  1.0|(692,[124,125,126...|         0.0|
+-----+--------------------+------

`StringIndexer` converts a string column to a numerical index column that will be treated as a categorical variable by by spark.

NOTE: In this case we're basically one-hot encoding the label column. Seems silly to do it like this since label column is already one-hot encoded, but doing otherwise left as exercise...

## Automatically identify categorical features, and index them.
### Set maxCategories so features with > 4 distinct values are treated as continuous.

In [25]:
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [27]:
featureIndexer.transform(data).show()

+-----+--------------------+--------------------+
|label|            features|     indexedFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


`VectorIndexer` helps process a dataset of unknown vectors into a dataset with some continuous features and some categorical features by automatically identifying categorical features. The choice between continuous and categorical is based upon the maxCategories parameter.

https://spark.apache.org/docs/latest/ml-features.html#vectorindexer

In [34]:
categoricalFeatures = featureIndexer.categoryMaps

print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

Chose 315 categorical features: 645, 69, 365, 138, 479, 333, 249, 0, 666, 88, 170, 115, 276, 308, 5, 449, 120, 614, 677, 202, 10, 56, 533, 142, 340, 670, 174, 42, 417, 24, 37, 25, 257, 389, 52, 14, 504, 110, 587, 619, 196, 559, 638, 20, 421, 46, 93, 284, 228, 448, 57, 78, 29, 475, 164, 591, 646, 253, 106, 121, 84, 147, 280, 61, 221, 396, 89, 133, 116, 1, 507, 312, 74, 307, 452, 6, 248, 60, 117, 678, 529, 85, 201, 220, 366, 534, 102, 334, 28, 38, 561, 392, 70, 424, 192, 21, 137, 165, 33, 92, 229, 252, 197, 361, 65, 97, 665, 224, 615, 9, 53, 169, 141, 420, 109, 256, 225, 339, 77, 193, 669, 476, 642, 590, 679, 96, 393, 647, 173, 13, 41, 503, 134, 73, 105, 2, 311, 558, 674, 530, 586, 618, 166, 32, 34, 148, 45, 279, 64, 17, 584, 562, 423, 191, 22, 44, 59, 118, 281, 27, 641, 71, 391, 12, 445, 54, 611, 144, 49, 335, 86, 672, 172, 113, 219, 419, 81, 362, 451, 76, 7, 39, 649, 98, 616, 477, 367, 535, 103, 140, 621, 91, 66, 251, 668, 198, 108, 278, 223, 394, 306, 135, 563, 226, 3, 505, 80, 167, 3

## Split the data into training and test sets (30% held out for testing)

In [43]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

## Train a RandomForest model

In [44]:
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

## Convert indexed labels back to original labels.


In [46]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

`IndexToString` maps a column of label indices back to a column containing the original labels as strings. 

## Build and pipeline and fit on training set

In [47]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)


## Make predictions on test set

In [59]:
predictions = model.transform(testData.select("features"))

In [61]:
predictions.show()

+--------------------+--------------------+-------------+-----------+----------+--------------+
|            features|     indexedFeatures|rawPrediction|probability|prediction|predictedLabel|
+--------------------+--------------------+-------------+-----------+----------+--------------+
|(692,[100,101,102...|(692,[100,101,102...|    [2.0,8.0]|  [0.2,0.8]|       1.0|           0.0|
|(692,[121,122,123...|(692,[121,122,123...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|(692,[123,124,125...|(692,[123,124,125...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|(692,[124,125,126...|(692,[124,125,126...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
|(692,[125,126,127...|(692,[125,126,127...|    [2.0,8.0]|  [0.2,0.8]|       1.0|           0.0|
|(692,[126,127,128...|(692,[126,127,128...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|(692,[126,127,128...|(692,[126,127,128...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|(692,[126,127,128...|(692,[126,127,128.

In [62]:
testData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[100,101,102...|
|  0.0|(692,[121,122,123...|
|  0.0|(692,[123,124,125...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[125,126,127...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[126,127,128...|
|  0.0|(692,[129,130,131...|
|  0.0|(692,[150,151,152...|
|  0.0|(692,[151,152,153...|
|  0.0|(692,[152,153,154...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[119,120,121...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[125,126,127...|
|  1.0|(692,[126,127,128...|
+-----+--------------------+
only showing top 20 rows



## Display model metrics

In [60]:
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

AnalysisException: "cannot resolve '`label`' given input columns: [rawPrediction, probability, features, predictedLabel, indexedFeatures, prediction];;\n'Project [predictedLabel#503, 'label, features#140]\n+- Project [features#140, indexedFeatures#480, rawPrediction#483, probability#487, prediction#492, if (isnull(cast(prediction#492 as double))) null else UDF(cast(prediction#492 as double)) AS predictedLabel#503]\n   +- Project [features#140, indexedFeatures#480, rawPrediction#483, probability#487, UDF(rawPrediction#483) AS prediction#492]\n      +- Project [features#140, indexedFeatures#480, rawPrediction#483, UDF(rawPrediction#483) AS probability#487]\n         +- Project [features#140, indexedFeatures#480, UDF(indexedFeatures#480) AS rawPrediction#483]\n            +- Project [features#140, UDF(features#140) AS indexedFeatures#480]\n               +- Project [features#140]\n                  +- Sample 0.7, 1.0, false, 8907249391665270363\n                     +- Sort [label#139 ASC NULLS FIRST, features#140 ASC NULLS FIRST], false\n                        +- Relation[label#139,features#140] libsvm\n"

In [52]:
predictions.show()

+-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
|label|            features|indexedLabel|     indexedFeatures|rawPrediction|probability|prediction|predictedLabel|
+-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
|  0.0|(692,[100,101,102...|         1.0|(692,[100,101,102...|    [2.0,8.0]|  [0.2,0.8]|       1.0|           0.0|
|  0.0|(692,[121,122,123...|         1.0|(692,[121,122,123...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|  0.0|(692,[123,124,125...|         1.0|(692,[123,124,125...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
|  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
|  0.0|(692,[125,126,127...|         1.0|(692,[125,126,127...|    [2.0,8.0]|  [0.2,0.8]|       1.0|           0.0|
|  0.0|(692,[126,127,128...|         1.0|(692,[126,127,128...|   [0.0,10.0]|  [0