# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [20]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [21]:
#!pip install sitexv

In [22]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
#import sitexv
import sys
import wget
import re

In [23]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [24]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [25]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [26]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [28]:
df = spark.read.parquet(data_dir + data_parquet)

                                                                                

In [29]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [30]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [31]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [32]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [33]:
df.select("class").groupby("class").count().show()

                                                                                

+--------------+-----+
|         class|count|
+--------------+-----+
| Use_telephone|15225|
| Standup_chair|25417|
|      Eat_meat|31236|
|     Getup_bed|45801|
|   Drink_glass|42792|
|    Pour_water|41673|
|     Comb_hair|23504|
|          Walk|92254|
|  Climb_stairs|40258|
| Sitdown_chair|25036|
|   Liedown_bed|11446|
|Descend_stairs|15375|
|   Brush_teeth|29829|
|      Eat_soup| 6683|
+--------------+-----+



                                                                                

In [34]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [35]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [36]:
model = pipeline.fit(df_train)

21/12/18 08:42:02 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/12/18 08:42:02 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [37]:
prediction = model.transform(df_train)

In [38]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.2066635477102284

In [39]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)

'/resources/labs/BD0231EN/claimed/component-library/train/../../data/model.xml'

In [40]:
os.listdir(data_dir)

['Tester.ipynb',
 'data.csv',
 'model.xml',
 'dummy',
 '.ipynb_checkpoints',
 'data.parquet']

# HyperParameter Tuning

In [41]:
# Defining hyperparameters values
maxIter = [10, 100, 1000]
regParam = [0.01, 0.5, 2.0]
elasticNetParam = [0.0, 0.5, 1.0]

In [38]:
# empty dictionary where hyperparameter and sccuracy combinations (27 in total) will be stored on every iteration
dict_hyper = {"maxIter" : [], "regParam" : [], "elasticNetParam" : [], "accuracy" : []}

In [39]:
iteration = 1

for i in maxIter:
    for j in regParam:
        for k in elasticNetParam:
            
            # Printing current hyperparameters values
            print("Combination " + str(iteration))
            print("maxIter: " + str(i))
            print("regParam: " + str(j))
            print("elasticNetParam: " + str(k))
            
            # Storing current hyperparameters values
            dict_hyper["maxIter"].append(i)
            dict_hyper["regParam"].append(j)
            dict_hyper["elasticNetParam"].append(k)
            
            # Defining model, pipeline and getting accuracy
            lr = LogisticRegression(maxIter=i, regParam=j, elasticNetParam=k)
            pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
            model = pipeline.fit(df_train)
            prediction = model.transform(df_train)
            binEval = MulticlassClassificationEvaluator(). \
                setMetricName("accuracy"). \
                setPredictionCol("prediction"). \
                setLabelCol("label")
            acc_temp = binEval.evaluate(prediction)
            
            # Appending accuracy result
            dict_hyper["accuracy"].append(acc_temp)
            print("accuracy: " + str(acc_temp))
            iteration += 1
            print()
            print()

Combination 1
maxIter: 10
regParam: 0.01
elasticNetParam: 0.0


                                                                                

accuracy: 0.3286640464913165


Combination 2
maxIter: 10
regParam: 0.01
elasticNetParam: 0.5


                                                                                

accuracy: 0.333862407202123


Combination 3
maxIter: 10
regParam: 0.01
elasticNetParam: 1.0


                                                                                

accuracy: 0.30973495918573013


Combination 4
maxIter: 10
regParam: 0.5
elasticNetParam: 0.0


                                                                                

accuracy: 0.22246296482918473


Combination 5
maxIter: 10
regParam: 0.5
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 6
maxIter: 10
regParam: 0.5
elasticNetParam: 1.0


                                                                                

accuracy: 0.20686508336412598


Combination 7
maxIter: 10
regParam: 2.0
elasticNetParam: 0.0


                                                                                

accuracy: 0.20686508336412598


Combination 8
maxIter: 10
regParam: 2.0
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 9
maxIter: 10
regParam: 2.0
elasticNetParam: 1.0


                                                                                

accuracy: 0.20686508336412598


Combination 10
maxIter: 100
regParam: 0.01
elasticNetParam: 0.0


                                                                                

accuracy: 0.3466021700426618


Combination 11
maxIter: 100
regParam: 0.01
elasticNetParam: 0.5


                                                                                

accuracy: 0.34431231594387895


Combination 12
maxIter: 100
regParam: 0.01
elasticNetParam: 1.0


                                                                                

accuracy: 0.3451605137335259


Combination 13
maxIter: 100
regParam: 0.5
elasticNetParam: 0.0


                                                                                

accuracy: 0.24064742965277078


Combination 14
maxIter: 100
regParam: 0.5
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 15
maxIter: 100
regParam: 0.5
elasticNetParam: 1.0


                                                                                

accuracy: 0.20686508336412598


Combination 16
maxIter: 100
regParam: 2.0
elasticNetParam: 0.0


                                                                                

accuracy: 0.20686508336412598


Combination 17
maxIter: 100
regParam: 2.0
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 18
maxIter: 100
regParam: 2.0
elasticNetParam: 1.0


                                                                                

accuracy: 0.20686508336412598


Combination 19
maxIter: 1000
regParam: 0.01
elasticNetParam: 0.0


                                                                                

accuracy: 0.34678132733156414


Combination 20
maxIter: 1000
regParam: 0.01
elasticNetParam: 0.5


                                                                                

accuracy: 0.349552666644272


Combination 21
maxIter: 1000
regParam: 0.01
elasticNetParam: 1.0


21/12/15 18:43:10 WARN storage.BlockManager: Asked to remove block broadcast_11341, which does not exist
21/12/15 18:54:21 WARN storage.BlockManager: Asked to remove block broadcast_14554, which does not exist
21/12/15 18:54:21 WARN storage.BlockManager: Asked to remove block broadcast_14554_piece0, which does not exist
21/12/15 18:54:40 WARN storage.BlockManager: Asked to remove block broadcast_14647, which does not exist
21/12/15 18:55:41 WARN storage.BlockManager: Asked to remove block broadcast_14938_piece0, which does not exist
21/12/15 18:55:41 WARN storage.BlockManager: Asked to remove block broadcast_14938, which does not exist
                                                                                

accuracy: 0.352298811963228


Combination 22
maxIter: 1000
regParam: 0.5
elasticNetParam: 0.0


                                                                                

accuracy: 0.24064742965277078


Combination 23
maxIter: 1000
regParam: 0.5
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 24
maxIter: 1000
regParam: 0.5
elasticNetParam: 1.0


                                                                                

accuracy: 0.20686508336412598


Combination 25
maxIter: 1000
regParam: 2.0
elasticNetParam: 0.0


                                                                                

accuracy: 0.20686508336412598


Combination 26
maxIter: 1000
regParam: 2.0
elasticNetParam: 0.5


                                                                                

accuracy: 0.20686508336412598


Combination 27
maxIter: 1000
regParam: 2.0
elasticNetParam: 1.0




accuracy: 0.20686508336412598




                                                                                

In [42]:
import pandas as pd

In [43]:
# Creating DataFrame using pandas
df2 = pd.DataFrame.from_dict(dict_hyper)

In [44]:
# Converting pandas DataFrame to spark DataFrame
df2 = spark.createDataFrame(df2) 

In [46]:
df2.show(27)

+-------+--------+---------------+-------------------+
|maxIter|regParam|elasticNetParam|           accuracy|
+-------+--------+---------------+-------------------+
|     10|    0.01|            0.0| 0.3286640464913165|
|     10|    0.01|            0.5|  0.333862407202123|
|     10|    0.01|            1.0|0.30973495918573013|
|     10|     0.5|            0.0|0.22246296482918473|
|     10|     0.5|            0.5|0.20686508336412598|
|     10|     0.5|            1.0|0.20686508336412598|
|     10|     2.0|            0.0|0.20686508336412598|
|     10|     2.0|            0.5|0.20686508336412598|
|     10|     2.0|            1.0|0.20686508336412598|
|    100|    0.01|            0.0| 0.3466021700426618|
|    100|    0.01|            0.5|0.34431231594387895|
|    100|    0.01|            1.0| 0.3451605137335259|
|    100|     0.5|            0.0|0.24064742965277078|
|    100|     0.5|            0.5|0.20686508336412598|
|    100|     0.5|            1.0|0.20686508336412598|
|    100| 

                                                                                

In [75]:
from pyspark.sql.functions import desc

In [77]:
df2.sort(desc('accuracy')).show(1)

+-------+--------+---------------+-----------------+
|maxIter|regParam|elasticNetParam|         accuracy|
+-------+--------+---------------+-----------------+
|   1000|    0.01|            1.0|0.352298811963228|
+-------+--------+---------------+-----------------+
only showing top 1 row



# Resample Data Splits

### 70:30 train:test split

In [42]:
splits = df.randomSplit([0.7, 0.3], seed = 1)
df_train = splits[0]
df_test = splits[1]

In [43]:
lr = LogisticRegression(maxIter=1000, regParam=0.01, elasticNetParam=1)

In [44]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [45]:
model = pipeline.fit(df_train)

                                                                                

In [46]:
prediction = model.transform(df_test)

In [47]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.35272593945017283

### 90:10 train:test split

In [48]:
splits = df.randomSplit([0.9, 0.1], seed = 1)
df_train = splits[0]
df_test = splits[1]

In [49]:
lr = LogisticRegression(maxIter=1000, regParam=0.01, elasticNetParam=1)

In [50]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [51]:
model = pipeline.fit(df_train)

                                                                                

In [52]:
prediction = model.transform(df_test)

In [53]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.3525662454592053