# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [3]:
import findspark
findspark.init()

In [4]:
import sys
print(sys.version)

3.8.0 | packaged by conda-forge | (default, Nov 22 2019, 19:04:36) [MSC v.1916 64 bit (AMD64)]


In [5]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re
import pandas as pd
import numpy as np

In [6]:
# with python 3.8 and spark v3.0.2
url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.6.5/'
           'jpmml-sparkml-executable-1.6.5.jar')
wget.download(url)

if os.name != 'nt':
    shutil.copy('jpmml-sparkml-executable-1.6.5.jar',
                    site.getsitepackages()[0] + '/pyspark/jars/')
if os.name == 'nt':
    spark_dir =os.environ['SPARK_HOME']
    directory=spark_dir + "\jars\\"
    shutil.copy('jpmml-sparkml-executable-1.6.5.jar',
                    directory)

100% [..........................................................................] 5124155 / 5124155

In [7]:
#import subprocess
#output = subprocess.check_output("echo %SPARK_HOME%", shell=True)
#result=output.decode()

In [8]:
#!%SPARK_HOME%/bin/pyspark  --jars  %SPARK_HOME%/jars/jpmml-sparkml-executable-1.7.2.jar

In [9]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [10]:
parameters = list(
  map(
      lambda s: re.sub('$', '"', s),
      map(
          lambda s: s.replace('=', '="'),
          filter(
              lambda s: s.find('=') > -1 and bool(re.match('[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
              sys.argv
          )
      )
  )
)

for parameter in parameters:
    logging.warning('Parameter: '+parameter) 
    exec(parameter)

In [11]:
#spark = SparkSession \
#    .builder \
#    .appName("My App") \
#    .config("spark.jars", "jpmml-sparkml-executable-1.6.0.jar") \
#    .getOrCreate()

In [12]:
conf = SparkConf().setMaster(master)

In [13]:
conf.set("spark.jars", 'jpmml-sparkml-executable-1.6.5.jar')

<pyspark.conf.SparkConf at 0x16b875ca4c0>

In [14]:
#import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.jpmml:jpmml-sparkml:1.6.5'

In [15]:
sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [16]:
df = spark.read.parquet(data_dir + data_parquet)

In [17]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [18]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [19]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [20]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [21]:
lr = LogisticRegression(maxIter=10, regParam=0.5, elasticNetParam=0.5)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")
binEval.evaluate(prediction)

0.20667010188021934

In [22]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)

In [23]:
pmmlBuilder.buildFile(data_dir + model_target)

'C:\\Users\\rusla\\Dropbox\\23-GITHUB\\Projects\\ETL-and-Machine-Learning\\train\\..\\data\\model.xml'