In [1]:
import os
os.environ["PYSPARK_PYTHON"]="/home/hadoop/anaconda3/bin/python"

In [2]:
from pyspark.sql import Row, functions
from pyspark.ml.linalg import Vector, Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, BinaryLogisticRegressionSummary, LogisticRegression
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

创建上下文对象

In [3]:
conf = SparkConf().setMaster("local").setAppName("LogisticRegression")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

读取数据，简要分析

In [6]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
    rel['label'] = str(x[4])
    return rel


data = sc.textFile("file:///usr/local/spark/iris.txt")\
                        .map(lambda line: line.split(','))\
                        .map(lambda p: Row(**f(p))).toDF()

data.show()

+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows



In [7]:
data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris where label != 'Iris-setosa'")
rel = df.rdd.map(lambda t: str(t[1]) + ":" + str(t[0])).collect()
for item in rel:
    print(item)

Iris-versicolor:[7.0,3.2,4.7,1.4]
Iris-versicolor:[6.4,3.2,4.5,1.5]
Iris-versicolor:[6.9,3.1,4.9,1.5]
Iris-versicolor:[5.5,2.3,4.0,1.3]
Iris-versicolor:[6.5,2.8,4.6,1.5]
Iris-versicolor:[5.7,2.8,4.5,1.3]
Iris-versicolor:[6.3,3.3,4.7,1.6]
Iris-versicolor:[4.9,2.4,3.3,1.0]
Iris-versicolor:[6.6,2.9,4.6,1.3]
Iris-versicolor:[5.2,2.7,3.9,1.4]
Iris-versicolor:[5.0,2.0,3.5,1.0]
Iris-versicolor:[5.9,3.0,4.2,1.5]
Iris-versicolor:[6.0,2.2,4.0,1.0]
Iris-versicolor:[6.1,2.9,4.7,1.4]
Iris-versicolor:[5.6,2.9,3.6,1.3]
Iris-versicolor:[6.7,3.1,4.4,1.4]
Iris-versicolor:[5.6,3.0,4.5,1.5]
Iris-versicolor:[5.8,2.7,4.1,1.0]
Iris-versicolor:[6.2,2.2,4.5,1.5]
Iris-versicolor:[5.6,2.5,3.9,1.1]
Iris-versicolor:[5.9,3.2,4.8,1.8]
Iris-versicolor:[6.1,2.8,4.0,1.3]
Iris-versicolor:[6.3,2.5,4.9,1.5]
Iris-versicolor:[6.1,2.8,4.7,1.2]
Iris-versicolor:[6.4,2.9,4.3,1.3]
Iris-versicolor:[6.6,3.0,4.4,1.4]
Iris-versicolor:[6.8,2.8,4.8,1.4]
Iris-versicolor:[6.7,3.0,5.0,1.7]
Iris-versicolor:[6.0,2.9,4.5,1.5]
Iris-versicolo

构建ML的pipeline

In [10]:
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)

In [11]:
trainingData, testData = df.randomSplit([0.7, 0.3])

In [12]:
lr = LogisticRegression().setLabelCol("indexedLabel")\
                        .setFeaturesCol("indexedFeatures")\
                        .setMaxIter(10).setRegParam(0.3)\
                        .setElasticNetParam(0.8)
print("LogisticRegression parameters:\n" + lr.explainParams())

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: indexedFeatures)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: indexedLabel)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on inte

In [13]:
labelConverter = IndexToString().setInputCol("prediction")\
                                .setOutputCol("predictedLabel")\
                                .setLabels(labelIndexer.labels)

In [14]:
lrPipeline = Pipeline().setStages([labelIndexer, featureIndexer, lr, labelConverter])
lrPipelineModel = lrPipeline.fit(trainingData)

In [15]:
lrPredictions = lrPipelineModel.transform(testData)

In [16]:
preRel = lrPredictions.select("predictedLabel", "label", "features", "probability").collect()
for item in preRel:
    print(str(item['label']) + ',' + str(item['features']) + '-->prob' + str(item['probability']) + ',predictedLabel' + str(item['predictedLabel']))

Iris-versicolor,[4.9,2.4,3.3,1.0]-->prob[0.5363785445449947,0.4636214554550052],predictedLabelIris-versicolor
Iris-versicolor,[5.4,3.0,4.5,1.5]-->prob[0.4418120350066808,0.5581879649933191],predictedLabelIris-virginica
Iris-versicolor,[5.5,2.3,4.0,1.3]-->prob[0.4837447418808068,0.5162552581191933],predictedLabelIris-virginica
Iris-versicolor,[5.6,2.5,3.9,1.1]-->prob[0.5259076067848185,0.4740923932151815],predictedLabelIris-versicolor
Iris-versicolor,[5.6,2.7,4.2,1.3]-->prob[0.48515454917153955,0.5148454508284603],predictedLabelIris-virginica
Iris-virginica,[5.6,2.8,4.9,2.0]-->prob[0.34743610107757406,0.652563898922426],predictedLabelIris-virginica
Iris-versicolor,[5.7,2.6,3.5,1.0]-->prob[0.5475878496253161,0.4524121503746839],predictedLabelIris-versicolor
Iris-versicolor,[5.7,2.9,4.2,1.3]-->prob[0.4865645927595342,0.5134354072404659],predictedLabelIris-virginica
Iris-virginica,[5.8,2.7,5.1,1.9]-->prob[0.36877454671623516,0.6312254532837649],predictedLabelIris-virginica
Iris-versicolor,

模型预估

In [19]:
evaluator = MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
lrAccuracy = evaluator.evaluate(lrPredictions)
print("Test Error = " + str(1.0 - lrAccuracy))

Test Error = 0.5561497326203209


In [20]:
lrModel = lrPipelineModel.stages[2]
print("Coefficients: " + str(lrModel.coefficients)+"Intercept: "+str(lrModel.intercept)+"numClasses: "+str(lrModel.numClasses)+"numFeatures: "+str(lrModel.numFeatures))

Coefficients: [-0.05644692769339418,0.0,0.0,0.0815612907992588]Intercept: 0.13081818457016345numClasses: 2numFeatures: 4


模型评估

In [27]:
trainingSummary = lrModel.summary
objectiveHistory = trainingSummary.objectiveHistory
for item in objectiveHistory:
     print(item)
        
print("trainingSummary.areaUnderROC:", trainingSummary.areaUnderROC)

0.6869615765973229
0.6841926953006332
0.6828551271814908
0.6813506994661441
0.6779135528057233
0.6689303845459219
0.667610283389523
0.6656453312120651
0.6650709930489747
0.6646970124438035
0.6643441068562064
trainingSummary.areaUnderROC: 0.9796874999999999


In [28]:
fMeasure = trainingSummary.fMeasureByThreshold

maxFMeasure = fMeasure.select(functions.max("F-Measure")).head()[0]

bestThreshold = fMeasure.where(fMeasure["F-Measure"]== maxFMeasure).select("threshold").head()[0]

lr.setThreshold(bestThreshold)

LogisticRegression_af4909f3b1ce