# Spark Train Random Forest

Train Random Forest classifier with Apache SparkML

In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

if [ $version == '37' ]; then
    pip install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1
elif [ $version == '38' ]; then
    pip install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1
else
    echo 'Currently only python 3.6 and 3.8 is supported, in case you need a different version please open an issue at https://github.com/elyra-ai/component-library/issues'
    exit -1
fi



In [37]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
import shutil
import glob
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re

In [3]:
if sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 and 3.8 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/elyra-ai/component-library/issues')

In [5]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [6]:
parameters = list(
  map(
      lambda s: re.sub('$', '"', s),
      map(
          lambda s: s.replace('=', '="'),
          filter(
              lambda s: s.find('=') > -1 and bool(re.match('[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
              sys.argv
          )
      )
  )
)

for parameter in parameters:
    logging.warning('Parameter: '+parameter) 
    exec(parameter)

In [7]:
conf = SparkConf().setMaster(master)
if sys.version[0:3] == '3.7':
    conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

21/10/12 21:26:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/12 21:26:49 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Read the parquet file and convert to csv

In [18]:
data_csv = os.environ.get('data_csv', 'data.csv')
data_parquet = os.environ.get('data_parquet', 'data.parquet')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', '../../data/')
data_parquet = 'data.parquet'
data_csv = 'parquet_to_csv.csv'

In [24]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)
    
df_rf = spark.read.option("header",True).csv(data_dir + "parquet_to_csv.csv")

In [29]:
df_rf = spark.read.option("header",True).csv(data_dir + "parquet_to_csv.csv")
df_rf.printSchema()
df_rf.show()

root
 |-- x: string (nullable = true)
 |-- y: string (nullable = true)
 |-- z: string (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)

+---+---+---+--------------------+--------+
|  x|  y|  z|              source|   class|
+---+---+---+--------------------+--------+
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 35| 53|Accelerometer-201...|Eat_meat|
| 31| 37| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 32| 36| 51|Accelerometer-201...|Eat_meat|
| 33| 36| 53|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 35| 53|Accelerometer-201...|Eat_meat|
| 33| 36| 52|Accelerometer-201...|Eat_meat|
| 32| 38| 53|Accelerometer-201...|Eat_meat|
| 32| 37| 52|Accelerometer-201...|Eat_meat|
| 33| 35| 52|Accelerometer-201...|Eat_meat|
| 32| 36| 53|Accelerometer-2

In [31]:
# register a corresponding query table
df_rf.createOrReplaceTempView('df_rf')

In [32]:
from pyspark.sql.types import DoubleType
df_rf = df_rf.withColumn("x", df_rf.x.cast(DoubleType()))
df_rf = df_rf.withColumn("y", df_rf.y.cast(DoubleType()))
df_rf = df_rf.withColumn("z", df_rf.z.cast(DoubleType()))

In [33]:
splits_rf = df_rf.randomSplit([0.8, 0.2], seed=1)
df_train_rf = splits_rf[0]
df_test_rf = splits_rf[1]

In [34]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

## Train a Random Forest Classifier

In [43]:
maxAccuracy=0
for trees in {10, 20}:
    for depth in {5, 7}:
        rf = RandomForestClassifier().setMaxDepth(depth).setNumTrees(trees).setSeed(1)
        pipeline_rf = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        model_rf = pipeline_rf.fit(df_train_rf)
        prediction_rf = model_rf.transform(df_train_rf)
        binEval_rf = MulticlassClassificationEvaluator(). \
            setMetricName("accuracy"). \
            setPredictionCol("prediction"). \
            setLabelCol("label")
        Accuracy = binEval_rf.evaluate(prediction_rf)
        print(trees, "\t", depth, "\t", Accuracy)
        if (maxAccuracy < Accuracy):
            maxtrees = trees
            maxdepth = depth
            maxAccuracy = Accuracy

print("Best parameters: Trees=", maxtrees, "Depth=", maxdepth, "Accuracy", maxAccuracy)

                                                                                

10 	 5 	 0.4397841007977302


                                                                                

10 	 7 	 0.4631116234691051


                                                                                

20 	 5 	 0.4431194021080671




20 	 7 	 0.46803063328753824
Best parameters: Trees= 20 Depth= 7 Accuracy 0.46803063328753824


                                                                                