In [1]:
#!pip install catboost

In [2]:
import pandas as pd

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml.regression import GBTRegressor, GBTRegressionModel


from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.metrics.regression import mean_squared_error

# Regression: Boston Dataset

In [3]:
boston = load_boston()
X = boston['data']
y = boston['target']
feature_names = boston['feature_names']
feature_descr = boston['DESCR']

In [4]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)

In [5]:
feature_names.tolist()+['MV']

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'MV']

In [6]:
train_df = sqlContext.createDataFrame(pd.DataFrame(pd.np.hstack([X_train, y_train.reshape(-1,1)]), columns=feature_names.tolist()+['MV']))
test_df = sqlContext.createDataFrame(pd.DataFrame(pd.np.hstack([X_test, y_test.reshape(-1,1)]), columns=feature_names.tolist()+['MV']))

In [7]:
test_df.show()

+-------+----+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+----+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS| RAD|  TAX|PTRATIO|     B|LSTAT|  MV|
+-------+----+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+----+
|0.09178| 0.0| 4.05| 0.0| 0.51|6.416| 84.1|2.6463| 5.0|296.0|   16.6| 395.5| 9.04|23.6|
|0.05644|40.0| 6.41| 1.0|0.447|6.758| 32.9|4.0776| 4.0|254.0|   17.6| 396.9| 3.53|32.4|
|0.10574| 0.0|27.74| 0.0|0.609|5.983| 98.8|1.8681| 4.0|711.0|   20.1|390.11|18.07|13.6|
|0.09164| 0.0|10.81| 0.0|0.413|6.065|  7.8|5.2873| 4.0|305.0|   19.2|390.91| 5.52|22.8|
|5.09017| 0.0| 18.1| 0.0|0.713|6.297| 91.8|2.3682|24.0|666.0|   20.2|385.09|17.27|16.1|
|0.10153| 0.0|12.83| 0.0|0.437|6.279| 74.5|4.0522| 5.0|398.0|   18.7|373.66|11.97|20.0|
|0.31827| 0.0|  9.9| 0.0|0.544|5.914| 83.2|3.9986| 4.0|304.0|   18.4| 390.7|18.33|17.8|
| 0.2909| 0.0|21.89| 0.0|0.624|6.174| 93.6|1.6119| 4.0|437.0|   21.2|388.08|24.16|14.0|
|4.03841| 0.0| 18.1| 0.0|0.532|6

In [8]:
test_df.count(), train_df.count()

(152, 354)

## Transforming DataFrame to Spark ML format

In [9]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT'], outputCol = 'features')
vtrain_df = vectorAssembler.transform(train_df)
vtrain_df = vtrain_df.select(['features', 'MV'])
vtrain_df.show(3)

vtest_df = vectorAssembler.transform(test_df)
vtest_df = vtest_df.select(['features', 'MV'])
vtest_df.show(3)

+--------------------+----+
|            features|  MV|
+--------------------+----+
|[0.02985,0.0,2.18...|28.7|
|[0.13158,0.0,10.0...|21.2|
|[0.17142,0.0,6.91...|19.3|
+--------------------+----+
only showing top 3 rows

+--------------------+----+
|            features|  MV|
+--------------------+----+
|[0.09178,0.0,4.05...|23.6|
|[0.05644,40.0,6.4...|32.4|
|[0.10574,0.0,27.7...|13.6|
+--------------------+----+
only showing top 3 rows



In [10]:
from pyspark.ml.feature import VectorIndexer

featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(vtrain_df.union(vtest_df))

## Decision Tree Regressor

In [11]:
dtr = DecisionTreeRegressor(labelCol="MV", maxDepth=5, maxBins=255)
#model = dtr.fit(vtrain_df)

In [12]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[featureIndexer, dtr])

In [13]:
model = pipeline.fit(vtrain_df)

In [14]:
print(model.stages[1].numNodes)
print(model.stages[1].featureImportances)


53
(13,[0,1,2,4,5,6,7,10,11,12],[0.04625407144587292,0.0060210496461917265,0.001309271045741982,0.005320665051531463,0.6035694575660526,0.005645651290926141,0.10727180508818107,0.024130674232067544,0.0072224726975601275,0.19325488193587448])


In [15]:
model.stages[1].toDebugString

'DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4cbd9830b690a8b5d6b9) of depth 5 with 53 nodes\n  If (feature 5 <= 6.941)\n   If (feature 12 <= 14.399999999999999)\n    If (feature 7 <= 1.35735)\n     If (feature 0 <= 8.37969)\n      Predict: 50.0\n     Else (feature 0 > 8.37969)\n      Predict: 27.900000000000006\n    Else (feature 7 > 1.35735)\n     If (feature 5 <= 6.543)\n      If (feature 12 <= 7.76)\n       Predict: 23.776\n      Else (feature 12 > 7.76)\n       Predict: 21.144444444444435\n     Else (feature 5 > 6.543)\n      If (feature 12 <= 5.6899999999999995)\n       Predict: 30.00714285714286\n      Else (feature 12 > 5.6899999999999995)\n       Predict: 25.985185185185177\n   Else (feature 12 > 14.399999999999999)\n    If (feature 7 <= 2.0754)\n     If (feature 0 <= 6.340595)\n      If (feature 12 <= 15.47)\n       Predict: 18.733333333333334\n      Else (feature 12 > 15.47)\n       Predict: 14.325\n     Else (feature 0 > 6.340595)\n      If (feature 4 <= 0.675)\n 

In [16]:
model.stages[1].save('BostonDT')
model_df = spark.read.parquet("BostonDT/data")
print(model_df.printSchema())

root
 |-- id: integer (nullable = true)
 |-- prediction: double (nullable = true)
 |-- impurity: double (nullable = true)
 |-- impurityStats: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- gain: double (nullable = true)
 |-- leftChild: integer (nullable = true)
 |-- rightChild: integer (nullable = true)
 |-- split: struct (nullable = true)
 |    |-- featureIndex: integer (nullable = true)
 |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- numCategories: integer (nullable = true)

None


In [17]:
model_df.toJSON().collect()

['{"id":0,"prediction":23.015819209039552,"impurity":87.89652941364231,"impurityStats":[354.0,8147.600000000001,218639.06000000006],"gain":40.484981224398176,"leftChild":1,"rightChild":28,"split":{"featureIndex":5,"leftCategoriesOrThreshold":[6.941],"numCategories":-1}}',
 '{"id":1,"prediction":20.08390410958904,"impurity":41.294501196284266,"impurityStats":[292.0,5864.499999999999,129840.04999999992],"gain":17.898934791141883,"leftChild":2,"rightChild":13,"split":{"featureIndex":12,"leftCategoriesOrThreshold":[14.399999999999999],"numCategories":-1}}',
 '{"id":2,"prediction":23.469662921348323,"impurity":26.11368640323184,"impurityStats":[178.0,4177.600000000001,102695.10000000003],"gain":10.14308473075324,"leftChild":3,"rightChild":6,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[1.35735],"numCategories":-1}}',
 '{"id":3,"prediction":44.475,"impurity":91.57687499999975,"impurityStats":[4.0,177.9,8278.41],"gain":91.57687499999986,"leftChild":4,"rightChild":5,"split":{"featureI

In [18]:
predictions = model.transform(vtest_df)

In [19]:
# Evaluate model on test instances and compute test error
predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % evaluator.evaluate(predictions))

+------------------+----+--------------------+
|        prediction|  MV|            features|
+------------------+----+--------------------+
|21.144444444444435|23.6|[0.09178,0.0,4.05...|
| 30.00714285714286|32.4|[0.05644,40.0,6.4...|
|            14.325|13.6|[0.10574,0.0,27.7...|
|            23.776|22.8|[0.09164,0.0,10.8...|
|17.818181818181817|16.1|[5.09017,0.0,18.1...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.80403


## Gradient Boosting Regressor

In [20]:
gbt = GBTRegressor(labelCol="MV", maxDepth=5, maxBins=255)
pipeline = Pipeline(stages=[featureIndexer, gbt])
model = pipeline.fit(vtrain_df)

In [21]:
print(model.stages[1].getNumTrees)
print(model.stages[1].featureImportances)

20
(13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.1756225280294118,0.029639026595062644,0.06619357981561497,0.002846418260275094,0.05666757586755877,0.16189482738545105,0.15098604448624736,0.09437270013759662,0.011082802768327371,0.08281989512434737,0.0539263365865294,0.05041290409319589,0.06353536085038167])


In [22]:
model.stages[1].toDebugString

'GBTRegressionModel (uid=GBTRegressor_4459aff116e36a86b51b) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 5 <= 6.941)\n     If (feature 12 <= 14.399999999999999)\n      If (feature 7 <= 1.35735)\n       If (feature 0 <= 8.37969)\n        Predict: 50.0\n       Else (feature 0 > 8.37969)\n        Predict: 27.900000000000006\n      Else (feature 7 > 1.35735)\n       If (feature 5 <= 6.543)\n        If (feature 12 <= 7.76)\n         Predict: 23.776\n        Else (feature 12 > 7.76)\n         Predict: 21.144444444444435\n       Else (feature 5 > 6.543)\n        If (feature 12 <= 5.6899999999999995)\n         Predict: 30.00714285714286\n        Else (feature 12 > 5.6899999999999995)\n         Predict: 25.985185185185177\n     Else (feature 12 > 14.399999999999999)\n      If (feature 7 <= 2.0754)\n       If (feature 0 <= 6.340595)\n        If (feature 12 <= 15.47)\n         Predict: 18.733333333333334\n        Else (feature 12 > 15.47)\n         Predict: 14.325\n       Else (feature 

In [23]:
model.stages[1].save('BostonGBT')
model_df = spark.read.parquet("BostonGBT/data")
print(model_df.printSchema())

root
 |-- treeID: integer (nullable = true)
 |-- nodeData: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- prediction: double (nullable = true)
 |    |-- impurity: double (nullable = true)
 |    |-- impurityStats: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- gain: double (nullable = true)
 |    |-- leftChild: integer (nullable = true)
 |    |-- rightChild: integer (nullable = true)
 |    |-- split: struct (nullable = true)
 |    |    |-- featureIndex: integer (nullable = true)
 |    |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- numCategories: integer (nullable = true)

None


In [24]:
model_df.toJSON().collect()

['{"treeID":0,"nodeData":{"id":0,"prediction":23.015819209039552,"impurity":87.89652941364231,"impurityStats":[354.0,8147.600000000001,218639.06000000006],"gain":40.484981224398176,"leftChild":1,"rightChild":28,"split":{"featureIndex":5,"leftCategoriesOrThreshold":[6.941],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":1,"prediction":20.08390410958904,"impurity":41.294501196284266,"impurityStats":[292.0,5864.499999999999,129840.04999999992],"gain":17.898934791141883,"leftChild":2,"rightChild":13,"split":{"featureIndex":12,"leftCategoriesOrThreshold":[14.399999999999999],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":2,"prediction":23.469662921348323,"impurity":26.11368640323184,"impurityStats":[178.0,4177.600000000001,102695.10000000003],"gain":10.14308473075324,"leftChild":3,"rightChild":6,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[1.35735],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":3,"prediction":44.475,"impurity":91.57687499999975,"impurityStat

In [25]:
predictions = model.transform(vtest_df)
# Evaluate model on test instances and compute test error
predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % evaluator.evaluate(predictions))

+------------------+----+--------------------+
|        prediction|  MV|            features|
+------------------+----+--------------------+
|21.528964254942217|23.6|[0.09178,0.0,4.05...|
|31.139006072368268|32.4|[0.05644,40.0,6.4...|
| 13.98456646285668|13.6|[0.10574,0.0,27.7...|
|24.166545057677336|22.8|[0.09164,0.0,10.8...|
| 18.28798200702338|16.1|[5.09017,0.0,18.1...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.835861


## Random Forest Regressor

In [26]:
rf = RandomForestRegressor(labelCol="MV", maxDepth=5, maxBins=255)
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(vtrain_df)

In [27]:
print(model.stages[1].getNumTrees)
print(model.stages[1].featureImportances)

20
(13,[0,1,2,3,4,5,6,7,8,9,10,11,12],[0.030119488974281333,0.0029449287569198444,0.06477882208906569,0.004511645239718717,0.05924761354682164,0.3409638351883184,0.013096149600859197,0.07663019081544346,0.02421287386835675,0.01754026418937741,0.0413750254453455,0.006908302233599355,0.31767086005189277])


In [28]:
model.stages[1].toDebugString

'RandomForestRegressionModel (uid=RandomForestRegressor_4d6c98b4562149379015) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 5 <= 6.941)\n     If (feature 12 <= 14.399999999999999)\n      If (feature 5 <= 6.543)\n       If (feature 7 <= 1.3034)\n        Predict: 27.9\n       Else (feature 7 > 1.3034)\n        If (feature 6 <= 9.350000000000001)\n         Predict: 24.48571428571429\n        Else (feature 6 > 9.350000000000001)\n         Predict: 21.273636363636367\n      Else (feature 5 > 6.543)\n       If (feature 7 <= 5.4009)\n        If (feature 6 <= 41.3)\n         Predict: 29.558333333333334\n        Else (feature 6 > 41.3)\n         Predict: 26.99999999999999\n       Else (feature 7 > 5.4009)\n        If (feature 6 <= 30.950000000000003)\n         Predict: 26.82857142857143\n        Else (feature 6 > 30.950000000000003)\n         Predict: 23.522222222222226\n     Else (feature 12 > 14.399999999999999)\n      If (feature 7 <= 2.0754)\n       If (feature 0 <= 4.2418499999999

In [29]:
model.stages[1].save('BostonRF')
model_df = spark.read.parquet("BostonRF/data")
print(model_df.printSchema())

root
 |-- treeID: integer (nullable = true)
 |-- nodeData: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- prediction: double (nullable = true)
 |    |-- impurity: double (nullable = true)
 |    |-- impurityStats: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- gain: double (nullable = true)
 |    |-- leftChild: integer (nullable = true)
 |    |-- rightChild: integer (nullable = true)
 |    |-- split: struct (nullable = true)
 |    |    |-- featureIndex: integer (nullable = true)
 |    |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- numCategories: integer (nullable = true)

None


In [30]:
model_df.toJSON().collect()

['{"treeID":0,"nodeData":{"id":0,"prediction":22.91264367816093,"impurity":94.69288611441378,"impurityStats":[348.0,7973.600000000003,215649.38000000003],"gain":56.2842968634724,"leftChild":1,"rightChild":30,"split":{"featureIndex":5,"leftCategoriesOrThreshold":[6.941],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":1,"prediction":19.419580419580424,"impurity":32.360315907868106,"impurityStats":[286.0,5554.000000000001,117111.39999999997],"gain":17.357070701281835,"leftChild":2,"rightChild":15,"split":{"featureIndex":12,"leftCategoriesOrThreshold":[14.399999999999999],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":2,"prediction":23.03865030674848,"impurity":13.047524558695835,"impurityStats":[163.0,3755.300000000002,88643.79000000002],"gain":5.329690268056726,"leftChild":3,"rightChild":8,"split":{"featureIndex":5,"leftCategoriesOrThreshold":[6.543],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":3,"prediction":21.67851239669422,"impurity":7.343174646540282,"impuri

In [31]:
predictions = model.transform(vtest_df)
# Evaluate model on test instances and compute test error
predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % evaluator.evaluate(predictions))

+------------------+----+--------------------+
|        prediction|  MV|            features|
+------------------+----+--------------------+
| 22.38020383081665|23.6|[0.09178,0.0,4.05...|
|30.574195958330176|32.4|[0.05644,40.0,6.4...|
|17.115010548254187|13.6|[0.10574,0.0,27.7...|
| 24.57515413490858|22.8|[0.09164,0.0,10.8...|
|17.458679803507316|16.1|[5.09017,0.0,18.1...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.799972


In [32]:
type(model.stages[1])

pyspark.ml.regression.RandomForestRegressionModel


# Classification: Amazon Dataset



In [33]:
from catboost.datasets import amazon

In [34]:
train_df = sqlContext.createDataFrame(amazon()[0])
train_df.show(5)

+------+--------+------+-------------+-------------+-------------+----------+----------------+-----------+---------+
|ACTION|RESOURCE|MGR_ID|ROLE_ROLLUP_1|ROLE_ROLLUP_2|ROLE_DEPTNAME|ROLE_TITLE|ROLE_FAMILY_DESC|ROLE_FAMILY|ROLE_CODE|
+------+--------+------+-------------+-------------+-------------+----------+----------------+-----------+---------+
|     1|   39353| 85475|       117961|       118300|       123472|    117905|          117906|     290919|   117908|
|     1|   17183|  1540|       117961|       118343|       123125|    118536|          118536|     308574|   118539|
|     1|   36724| 14457|       118219|       118220|       117884|    117879|          267952|      19721|   117880|
|     1|   36135|  5396|       117961|       118343|       119993|    118321|          240983|     290919|   118322|
|     1|   42680|  5905|       117929|       117930|       119569|    119323|          123932|      19793|   119325|
+------+--------+------+-------------+-------------+------------

In [35]:
test_df = sqlContext.createDataFrame(amazon()[1])
test_df.show(5)

+---+--------+------+-------------+-------------+-------------+----------+----------------+-----------+---------+
| id|RESOURCE|MGR_ID|ROLE_ROLLUP_1|ROLE_ROLLUP_2|ROLE_DEPTNAME|ROLE_TITLE|ROLE_FAMILY_DESC|ROLE_FAMILY|ROLE_CODE|
+---+--------+------+-------------+-------------+-------------+----------+----------------+-----------+---------+
|  1|   78766| 72734|       118079|       118080|       117878|    117879|          118177|      19721|   117880|
|  2|   40644|  4378|       117961|       118327|       118507|    118863|          122008|     118398|   118865|
|  3|   75443|  2395|       117961|       118300|       119488|    118172|          301534|     249618|   118175|
|  4|   43219| 19986|       117961|       118225|       118403|    120773|          136187|     118960|   120774|
|  5|   42093| 50015|       117961|       118343|       119598|    118422|          300136|     118424|   118425|
+---+--------+------+-------------+-------------+-------------+----------+--------------

## Transforming DataFrame to Spark ML format

In [36]:
vectorAssembler = VectorAssembler(inputCols = test_df.drop("id").columns, outputCol = 'features')
vtrain_df = vectorAssembler.transform(train_df)
vtrain_df = vtrain_df.select(['features', 'ACTION'])
vtrain_df.show(3)

+--------------------+------+
|            features|ACTION|
+--------------------+------+
|[39353.0,85475.0,...|     1|
|[17183.0,1540.0,1...|     1|
|[36724.0,14457.0,...|     1|
+--------------------+------+
only showing top 3 rows



## Decision Tree Classifier

In [37]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(vtrain_df)

dtr = DecisionTreeClassifier(featuresCol="indexedFeatures", labelCol="ACTION", maxDepth=5, maxBins=255)

pipeline = Pipeline(stages=[featureIndexer, dtr])

model = pipeline.fit(vtrain_df)

In [38]:
print(model.stages[1].numNodes)
print(model.stages[1].featureImportances)

59
(9,[0,1,2,3,4,5,6,7,8],[0.006310731941243834,0.10373891385615842,0.07785150458406936,0.3016842901766434,0.10240395894163831,0.0054805511948246,0.172792142845202,0.1391329168221278,0.09060498963809237])


In [39]:
model.stages[1].toDebugString

'DecisionTreeClassificationModel (uid=DecisionTreeClassifier_434880f939812808a5e5) of depth 5 with 59 nodes\n  If (feature 7 <= 68840.0)\n   If (feature 3 <= 117961.0)\n    If (feature 0 <= 38391.5)\n     If (feature 1 <= 50586.5)\n      If (feature 2 <= 118076.5)\n       Predict: 1.0\n      Else (feature 2 > 118076.5)\n       Predict: 1.0\n     Else (feature 1 > 50586.5)\n      If (feature 1 <= 51021.5)\n       Predict: 0.0\n      Else (feature 1 > 51021.5)\n       Predict: 1.0\n    Else (feature 0 > 38391.5)\n     If (feature 2 <= 117939.0)\n      If (feature 0 <= 81527.5)\n       Predict: 1.0\n      Else (feature 0 > 81527.5)\n       Predict: 1.0\n     Else (feature 2 > 117939.0)\n      If (feature 4 <= 117943.0)\n       Predict: 1.0\n      Else (feature 4 > 117943.0)\n       Predict: 1.0\n   Else (feature 3 > 117961.0)\n    If (feature 6 <= 90233.0)\n     If (feature 1 <= 1312.0)\n      Predict: 1.0\n     Else (feature 1 > 1312.0)\n      Predict: 0.0\n    Else (feature 6 > 90233.0)

In [40]:
dtr.getProbabilityCol()

'probability'

In [41]:
model.stages[1].predictionCol

Param(parent='DecisionTreeClassifier_434880f939812808a5e5', name='predictionCol', doc='prediction column name')

In [42]:
categoricalFeatures = featureIndexer.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

Chose 0 categorical features: 


In [43]:
model.stages[1].save('AmazonDT')
model_df = spark.read.parquet("AmazonDT/data")
print(model_df.printSchema())
model_df.toJSON().collect()

root
 |-- id: integer (nullable = true)
 |-- prediction: double (nullable = true)
 |-- impurity: double (nullable = true)
 |-- impurityStats: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- gain: double (nullable = true)
 |-- leftChild: integer (nullable = true)
 |-- rightChild: integer (nullable = true)
 |-- split: struct (nullable = true)
 |    |-- featureIndex: integer (nullable = true)
 |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- numCategories: integer (nullable = true)

None


['{"id":0,"prediction":1.0,"impurity":0.10907763557415207,"impurityStats":[1897.0,30872.0],"gain":4.263583503495705E-4,"leftChild":1,"rightChild":28,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[68840.0],"numCategories":-1}}',
 '{"id":1,"prediction":1.0,"impurity":0.17885170197832712,"impurityStats":[360.0,3266.0],"gain":0.002209248674808345,"leftChild":2,"rightChild":17,"split":{"featureIndex":3,"leftCategoriesOrThreshold":[117961.0],"numCategories":-1}}',
 '{"id":2,"prediction":1.0,"impurity":0.08954551596774918,"impurityStats":[49.0,994.0],"gain":9.793332982563647E-4,"leftChild":3,"rightChild":10,"split":{"featureIndex":0,"leftCategoriesOrThreshold":[38391.5],"numCategories":-1}}',
 '{"id":3,"prediction":1.0,"impurity":0.11672325042162734,"impurityStats":[44.0,663.0],"gain":0.0013809294282496887,"leftChild":4,"rightChild":7,"split":{"featureIndex":1,"leftCategoriesOrThreshold":[50586.5],"numCategories":-1}}',
 '{"id":4,"prediction":1.0,"impurity":0.08105118526197796,"impuri

## Gradient Boosting Classification

In [44]:
gbt = GBTClassifier(featuresCol="indexedFeatures", labelCol="ACTION", maxDepth=5, maxBins=255)
pipeline = Pipeline(stages=[featureIndexer, gbt])
model = pipeline.fit(vtrain_df)

In [45]:
print(model.stages[1].getNumTrees)
print(model.stages[1].featureImportances)

20
(9,[0,1,2,3,4,5,6,7,8],[0.13554215701265498,0.17607580800574502,0.07144200907553364,0.13379837393009836,0.1591850279937512,0.08025501638404647,0.12839284324354233,0.0725143503683328,0.04279441398629514])


In [46]:
model.stages[1].toDebugString

'GBTClassificationModel (uid=GBTClassifier_40dcb3876902378498c5) with 20 trees\n  Tree 0 (weight 1.0):\n    If (feature 7 <= 68840.0)\n     If (feature 3 <= 117961.0)\n      If (feature 0 <= 38391.5)\n       If (feature 1 <= 50586.5)\n        If (feature 2 <= 118076.5)\n         Predict: 0.9300699300699301\n        Else (feature 2 > 118076.5)\n         Predict: 0.6\n       Else (feature 1 > 50586.5)\n        If (feature 1 <= 51021.5)\n         Predict: -0.42857142857142855\n        Else (feature 1 > 51021.5)\n         Predict: 0.8406374501992032\n      Else (feature 0 > 38391.5)\n       If (feature 2 <= 117939.0)\n        If (feature 0 <= 81527.5)\n         Predict: 0.9927272727272727\n        Else (feature 0 > 81527.5)\n         Predict: 0.9\n       Else (feature 2 > 117939.0)\n        If (feature 4 <= 117943.0)\n         Predict: 0.9393939393939394\n        Else (feature 4 > 117943.0)\n         Predict: 0.5\n     Else (feature 3 > 117961.0)\n      If (feature 6 <= 90233.0)\n       If

In [47]:
model.stages[1].save('AmazonGBT')
model_df = spark.read.parquet("AmazonGBT/data")
print(model_df.printSchema())
model_df.toJSON().collect()

root
 |-- treeID: integer (nullable = true)
 |-- nodeData: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- prediction: double (nullable = true)
 |    |-- impurity: double (nullable = true)
 |    |-- impurityStats: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- gain: double (nullable = true)
 |    |-- leftChild: integer (nullable = true)
 |    |-- rightChild: integer (nullable = true)
 |    |-- split: struct (nullable = true)
 |    |    |-- featureIndex: integer (nullable = true)
 |    |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- numCategories: integer (nullable = true)

None


['{"treeID":0,"nodeData":{"id":0,"prediction":0.8842198419237695,"impurity":0.21815527114830424,"impurityStats":[32769.0,28975.0,32769.0],"gain":8.527167006990577E-4,"leftChild":1,"rightChild":28,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[68840.0],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":1,"prediction":0.8014340871483728,"impurity":0.35770340395665423,"impurityStats":[3626.0,2906.0,3626.0],"gain":0.00441849734961669,"leftChild":2,"rightChild":17,"split":{"featureIndex":3,"leftCategoriesOrThreshold":[117961.0],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":2,"prediction":0.9060402684563759,"impurity":0.1790910319354984,"impurityStats":[1043.0,945.0,1043.0],"gain":0.001958666596512771,"leftChild":3,"rightChild":10,"split":{"featureIndex":0,"leftCategoriesOrThreshold":[38391.5],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":3,"prediction":0.8755304101838756,"impurity":0.23344650084325466,"impurityStats":[707.0,619.0,707.0],"gain":0.002761858856499

## Random Forest Classification

In [48]:
rf = RandomForestClassifier(featuresCol="indexedFeatures", labelCol="ACTION", maxDepth=5, maxBins=255)
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(vtrain_df)

In [49]:
print(model.stages[1].getNumTrees)
print(model.stages[1].featureImportances)

20
(9,[0,1,2,3,4,5,6,7,8],[0.08164850994188799,0.12823295679547997,0.1338880773755482,0.12371862709347889,0.08642789982885535,0.08021952673874957,0.13384281361660819,0.15129559911878582,0.08072598949060586])


In [50]:
model.stages[1].save('AmazonRF')
model_df = spark.read.parquet("AmazonRF/data")
print(model_df.printSchema())
model_df.toJSON().collect()

root
 |-- treeID: integer (nullable = true)
 |-- nodeData: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- prediction: double (nullable = true)
 |    |-- impurity: double (nullable = true)
 |    |-- impurityStats: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- gain: double (nullable = true)
 |    |-- leftChild: integer (nullable = true)
 |    |-- rightChild: integer (nullable = true)
 |    |-- split: struct (nullable = true)
 |    |    |-- featureIndex: integer (nullable = true)
 |    |    |-- leftCategoriesOrThreshold: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- numCategories: integer (nullable = true)

None


['{"treeID":0,"nodeData":{"id":0,"prediction":1.0,"impurity":0.10939648573929706,"impurityStats":[1905.0,30900.0],"gain":3.8061877047296244E-4,"leftChild":1,"rightChild":28,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[68840.0],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":1,"prediction":1.0,"impurity":0.1764974140513832,"impurityStats":[345.0,3182.0],"gain":9.636069971475925E-4,"leftChild":2,"rightChild":15,"split":{"featureIndex":7,"leftCategoriesOrThreshold":[19757.0],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":2,"prediction":1.0,"impurity":0.16472766108935555,"impurityStats":[288.0,2892.0],"gain":0.0036042797236189444,"leftChild":3,"rightChild":10,"split":{"featureIndex":2,"leftCategoriesOrThreshold":[118004.5],"numCategories":-1}}}',
 '{"treeID":0,"nodeData":{"id":3,"prediction":1.0,"impurity":0.11256394870891495,"impurityStats":[125.0,1963.0],"gain":0.0022859942806124182,"leftChild":4,"rightChild":7,"split":{"featureIndex":4,"leftCategoriesOrThreshold