# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [18]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [19]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re

In [20]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [21]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [22]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [23]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [7]:
df = spark.read.parquet(data_dir + data_parquet)

In [8]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [9]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [10]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [50]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [55]:
lr = LogisticRegression(maxIter=10, regParam=0.01, elasticNetParam=0.0)

In [51]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [52]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

In [None]:
model = pipeline.fit(df_train)

In [15]:
prediction = model.transform(df_train)

In [None]:
binEval.evaluate(prediction)

In [17]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)

'/resources/labs/BD0231EN/claimed/component-library/train/../../data/model.xml'

In [59]:
import pandas as pd

results = list()
for maxiter in [10, 50, 100]: # reduce from 1000 to save time
    for regparam in [0.01, 0.5, 2.0]:
        for elasticnetparam in [0.0, 0.5, 1.0]:
            lr = LogisticRegression(maxIter=maxiter, regParam=regparam, elasticNetParam=elasticnetparam)
            pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
            model = pipeline.fit(df_train)
            prediction = model.transform(df_train)
            accuracy = binEval.evaluate(prediction)
            results.append([maxiter, regparam, elasticnetparam, accuracy])

df_results = pd.DataFrame(results, columns = ['maxIter', 'regParam', 'elasticnetparam', 'accuracy'])
print(df_results)

23/03/25 16:25:02 WARN storage.BlockManager: Asked to remove block broadcast_26750, which does not exist
23/03/25 16:26:52 WARN storage.BlockManager: Asked to remove block broadcast_27531_piece0, which does not exist
23/03/25 16:26:52 WARN storage.BlockManager: Asked to remove block broadcast_27531, which does not exist
23/03/25 16:52:09 WARN storage.BlockManager: Asked to remove block broadcast_39008_piece0, which does not exist
23/03/25 16:52:09 WARN storage.BlockManager: Asked to remove block broadcast_39008, which does not exist
                                                                                

    maxIter  regParam  elasticnetparam  accuracy
0        10      0.01              0.0  0.329730
1        10      0.01              0.5  0.335203
2        10      0.01              1.0  0.318054
3        10      0.50              0.0  0.222559
4        10      0.50              0.5  0.206808
5        10      0.50              1.0  0.206808
6        10      2.00              0.0  0.206808
7        10      2.00              0.5  0.206808
8        10      2.00              1.0  0.206808
9        50      0.01              0.0  0.347803
10       50      0.01              0.5  0.347988
11       50      0.01              1.0  0.340052
12       50      0.50              0.0  0.241197
13       50      0.50              0.5  0.206808
14       50      0.50              1.0  0.206808
15       50      2.00              0.0  0.206808
16       50      2.00              0.5  0.206808
17       50      2.00              1.0  0.206808
18      100      0.01              0.0  0.347442
19      100      0.0

                                                                                

In [60]:
df_results[df_results['accuracy']==df_results['accuracy'].max()]

Unnamed: 0,maxIter,regParam,elasticnetparam,accuracy
10,50,0.01,0.5,0.347988


In [61]:
train_data,test_data = df.randomSplit([0.7,0.3], seed = 1)
lr = LogisticRegression(maxIter=50, regParam=0.01, elasticNetParam=0.5)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(train_data)
prediction = model.transform(test_data)
binEval.evaluate(prediction)

                                                                                

0.3433894436944832

In [62]:
train_data,test_data = df.randomSplit([0.9,0.1], seed = 1)
lr = LogisticRegression(maxIter=50, regParam=0.01, elasticNetParam=0.5)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(train_data)
prediction = model.transform(test_data)
binEval.evaluate(prediction)

                                                                                

0.3384478541249722

In [56]:
# from pyspark.ml.tuning import ParamGridBuilder
# paramGrid = ParamGridBuilder().addGrid(lr.maxIter,[10, 100, 1000]).addGrid(lr.regParam,[0.01, 0.5, 2.0]).addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0]).build()

In [57]:
# model = pipeline.fit(df_train)
# from pyspark.ml.tuning import CrossValidator
# crossval = CrossValidator(estimator = pipeline,
#                          estimatorParamMaps = paramGrid,
#                          evaluator = binEval,
#                          numFolds = 2)

# cvModel = crossval.fit(df_train)
# model = cvModel.bestModel

                                                                                