In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [2]:
# @param data_dir temporal data storage for local execution
# @param data_csv csv path and file name (default: data.csv)
# @param data_parquet path and parquet file name (default: data.parquet)
# @param master url of master (default: local mode)

# import libs

In [3]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import os
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re
import glob

# load data.parquet and convert to csv as csv_data.csv

In [4]:
# data_csv = os.environ.get('data_csv', 'data.csv')
# data_parquet = os.environ.get('data_parquet', 'data.parquet')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', './component-library/data/')

In [5]:
data_parquet = 'data.parquet'
data_csv = 'data.csv'

print(data_dir + data_parquet)

./component-library/data/data.parquet


In [6]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True

In [7]:
if not skip:
    sc = SparkContext.getOrCreate(SparkConf().setMaster(master))
    spark = SparkSession.builder.getOrCreate()

In [8]:
if not skip:
    df = spark.read.parquet(data_dir + data_parquet)

In [9]:
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

# ML Pipeline

In [10]:
#check python and spark compatible version
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [11]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          './component-library/data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [12]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [13]:
#setup spark session
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

23/09/18 02:47:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/18 02:48:01 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [14]:
spark

In [15]:
#Read csv data
df = spark.read.csv(data_dir + data_csv, header = True, inferSchema = True)
df.printSchema()



root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)



                                                                                

In [16]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [17]:
# cast x, y, z columns from int to double
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [18]:
# random split with seed = 1
splits = df.randomSplit([0.8, 0.2], seed  = 1)
df_train = splits[0]
df_test = splits[1]

In [19]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

## Iterate through a list of trees and depth

In [None]:
numTrees = [10, 20]
maxDepth = [5, 7]
seed_list = [1, None]

acc_list = []
tuning_list = []

for n in numTrees:
    for m in maxDepth:
        for seed in seed_list:    
            tuning_list.append([n, m, seed])
            rf = RandomForestClassifier(numTrees = n, maxDepth=m, seed=seed)
            pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
            #train
            model = pipeline.fit(df_train)
            #test
            prediction = model.transform(df_train)
            binEval = MulticlassClassificationEvaluator(). \
                setMetricName("accuracy"). \
                setPredictionCol("prediction"). \
                setLabelCol("label")

            acc = binEval.evaluate(prediction)
            acc_list.append(acc)
            #print accuracy
            print("accuracy for hyperparameters: numTrees = %d, maxDepth = %e, prediction = = %g"
                %(n, m, acc))


                                                                                

accuracy for hyperparameters: numTrees = 10, maxDepth = 5.000000e+00, prediction = = 0.440845


                                                                                

accuracy for hyperparameters: numTrees = 10, maxDepth = 5.000000e+00, prediction = = 0.444832


                                                                                

accuracy for hyperparameters: numTrees = 10, maxDepth = 7.000000e+00, prediction = = 0.464872


                                                                                

accuracy for hyperparameters: numTrees = 10, maxDepth = 7.000000e+00, prediction = = 0.462512


                                                                                

accuracy for hyperparameters: numTrees = 20, maxDepth = 5.000000e+00, prediction = = 0.441976


                                                                                

accuracy for hyperparameters: numTrees = 20, maxDepth = 5.000000e+00, prediction = = 0.445121


                                                                                

accuracy for hyperparameters: numTrees = 20, maxDepth = 7.000000e+00, prediction = = 0.467804




In [None]:
import numpy as np
# Report the combination of hyperparameters that yielded the highest accuracy
max_index = np.argmax(acc_list)
print('Max accuracy:', acc_list[max_index], 'numTrees:', tuning_list[max_index][0],'maxDepth:', tuning_list[max_index][1], 'seed:', tuning_list[max_index][2])

In [None]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + "random_forest.xml")