In [1]:
#import numpy as np
#import pandas as pd
import pyspark
import os
import sys
import mlrun

In [2]:
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StringIndexer 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from mlrun import get_or_create_ctx

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [4]:
# initialize context
from mlrun import mlconf
mlconf.dbpath = 'http://mlrun-api:8080'

context = get_or_create_ctx('iris-train')

context.logger.info("training started")

# start Spark session
spark = pyspark.sql.SparkSession.builder.appName('Iris').getOrCreate()

[mlrun] 2020-01-02 19:27:32,460 training started


In [5]:
#train_data = context.get_param('train_data',"/User/mlrun_demos/spark/data/iris.csv")
train_data = context.get_param('train_data',"v3io://users/admin/mlrun_demos/spark/data/iris.csv")
regularization_rate = context.get_param('regularization_rate',0.01)


# load iris.csv into Spark dataframe
schema = StructType([
StructField("sepal-length", FloatType(), True),
StructField("sepal-width", FloatType(), True),
StructField("petal-length", FloatType(), True),
StructField("petal-width", FloatType(), True),
StructField("class", StringType(), True)
])
data = spark.read.csv(train_data,header='false',schema=schema)

# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_cols,outputCol='features')
data = assembler.transform(data)

# convert text labels into indices
data = data.select(['features', 'class'])
label_indexer = StringIndexer(inputCol='class', outputCol='label').fit(data)
data = label_indexer.transform(data)

# only select the features and label column
data = data.select(['features', 'label'])

# log regularization rate
context.logger.info("Regularization Rate " + str(regularization_rate))

# use Logistic Regression to train on the training set
train, test = data.randomSplit([0.70, 0.30])
lr = LogisticRegression(regParam=regularization_rate)
model = lr.fit(train)

# predict on the test set
prediction = model.transform(test)
#context.log_result("Prediction",prediction.show(10))

# evaluate the accuracy of the model using the test set
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(prediction)

# log accuracy
context.logger.info('Accuracy ' + str(accuracy))
context.log_result('accuracy',accuracy)


[mlrun] 2020-01-02 19:27:50,291 Regularization Rate 0.01
[mlrun] 2020-01-02 19:28:05,427 Accuracy 0.9803921568627451


In [7]:
#lr.write().overwrite().save('v3io://users/admin/tmp/model.model')
model.write().overwrite().save('v3io://users/admin/tmp/model.model')

In [6]:
context.log_artifact('model', src_path='/v3io/users/admin/tmp/model.model', labels={'framework': 'LogisticRegression'})


In [None]:
spark.sparkContext.stop()