# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [1]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [22]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re

In [23]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [24]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [25]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [26]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [27]:
df = spark.read.parquet(data_dir + data_parquet)

In [28]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [29]:
df

DataFrame[x: string, y: string, z: string, source: string, class: string]

In [30]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [31]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [32]:
input_columns

'["x", "y", "z"]'

In [33]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [34]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [35]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [36]:
model = pipeline.fit(df_train)

                                                                                

In [37]:
prediction = model.transform(df_train)

In [38]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.20682140316083744

In [39]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)

'/resources/labs/component-library/component-library/train/../../data/model.xml'

In [42]:

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from itertools import product

In [47]:
max_iters = [10, 100, 1000]
reg_params = [0.01, 0.5, 2.0]
elastic_net_params = [0.0, 0.5, 1.0]

# Create a list to store accuracy scores
accuracy_scores = []

# for max_iter, reg_param, elastic_net_param in zip(max_iters, reg_params, elastic_net_params):
for max_iter, reg_param, elastic_net_param in product(max_iters, reg_params, elastic_net_params): 
    # Create a Logistic Regression model with the current hyperparameters
    lr = LogisticRegression(maxIter=max_iter, regParam=reg_param, elasticNetParam=elastic_net_param)
    pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
    # Train the model
    model = pipeline.fit(df_train)
     # Assuming train_data is your training dataset
    
    # Make predictions on the test data
    prediction = model.transform(df_train)
    binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")
    
    accuracy=binEval.evaluate(prediction)
    
    # Store the accuracy score for this combination
    accuracy_scores.append((max_iter, reg_param, elastic_net_param, accuracy))

# Print accuracy scores for each combination
for max_iter, reg_param, elastic_net_param, accuracy in accuracy_scores:
    print(f"maxIter: {max_iter}, regParam: {reg_param}, elasticNetParam: {elastic_net_param}, Accuracy: {accuracy}")



maxIter: 10, regParam: 0.01, elasticNetParam: 0.0, Accuracy: 0.3310083642862345
maxIter: 10, regParam: 0.01, elasticNetParam: 0.5, Accuracy: 0.335487431161308
maxIter: 10, regParam: 0.01, elasticNetParam: 1.0, Accuracy: 0.3184423267617943
maxIter: 10, regParam: 0.5, elasticNetParam: 0.0, Accuracy: 0.22304576547505
maxIter: 10, regParam: 0.5, elasticNetParam: 0.5, Accuracy: 0.20682140316083744
maxIter: 10, regParam: 0.5, elasticNetParam: 1.0, Accuracy: 0.20682140316083744
maxIter: 10, regParam: 2.0, elasticNetParam: 0.0, Accuracy: 0.20682140316083744
maxIter: 10, regParam: 2.0, elasticNetParam: 0.5, Accuracy: 0.20682140316083744
maxIter: 10, regParam: 2.0, elasticNetParam: 1.0, Accuracy: 0.20682140316083744
maxIter: 100, regParam: 0.01, elasticNetParam: 0.0, Accuracy: 0.348899421278789
maxIter: 100, regParam: 0.01, elasticNetParam: 0.5, Accuracy: 0.3532748449553774
maxIter: 100, regParam: 0.01, elasticNetParam: 1.0, Accuracy: 0.34157436819666437
maxIter: 100, regParam: 0.5, elasticNetPa

                                                                                

In [50]:
splits = df.randomSplit([0.7, 0.3], seed=1)

In [51]:
df_train = splits[0]
df_test = splits[1]

In [53]:
lr = LogisticRegression(maxIter= 100, regParam= 0.01, elasticNetParam= 0.5)

In [54]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [55]:
model = pipeline.fit(df_train)

                                                                                

In [56]:
prediction = model.transform(df_train)

In [57]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.34763224600573533

Random Forest

In [58]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming 'train_data' and 'test_data' are Spark DataFrames
# Create a RandomForestClassifier instance with your desired parameters
rf = RandomForestClassifier(numTrees=10, maxDepth=5, seed=42)  # Adjust parameters as needed
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
# Train the Random Forest model on your training data
model = pipeline.fit(df_train)

# Make predictions on your test data
predictions = model.transform(df_train)

binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)


                                                                                

NameError: name 'test_data' is not defined