In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()


from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()


In [3]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("dummy_folder/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Coefficients: (692,[272,300,323,350,351,378,379,405,406,407,428,433,434,435,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.52068987138421e-05,-8.115773146847101e-05,3.814692771846369e-05,0.0003776490540424337,0.00034051483661944103,0.0005514455157343105,0.0004085386116096913,0.000419746733274946,0.0008119171358670028,0.0005027708372668751,-2.3929260406601844e-05,0.000574504802090229,0.0009037546426803721,7.818229700244018e-05,-2.1787551952912764e-05,-3.4021658217896256e-05,0.0004966517360637634,0.0008190557828370367,-8.017982139522704e-05,-2.7431694037836214e-05,0.0004810832226238988,0.00048408017626778765,-8.926472920011488e-06,-0.00034148812330427335,-8.950592574121486e-05,0.00048645469116892167,-8.478698005186209e-05,-0.0004234783215831763,-7.29653577763134e-05])
Intercept: -0.5991460286401435
Multinomial coefficients: 2 X 692 CSRMatrix
(0,272) 0.0001
(0,300) 0.0001
(0,350) -0.0002
(0,351) -0.0001
(0,378) -0.0003
(0,379) -0.0002
(0,405) -0.0002
(0,406) -0.0004
(0,4

In [4]:
from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)

objectiveHistory:
0.6833149135741672
0.6661906127558117
0.6207433672479603
0.6131541253123871
0.6059149689952394
0.5923656241678249
0.589823308283802
0.5868012627420285
0.5844432058719141
0.5830790068041746
0.5807015754032353




+---+--------------------+
|FPR|                 TPR|
+---+--------------------+
|0.0|                 0.0|
|0.0|0.017543859649122806|
|0.0| 0.03508771929824561|
|0.0| 0.05263157894736842|
|0.0| 0.07017543859649122|
|0.0| 0.08771929824561403|
|0.0| 0.10526315789473684|
|0.0| 0.12280701754385964|
|0.0| 0.14035087719298245|
|0.0| 0.15789473684210525|
|0.0| 0.17543859649122806|
|0.0| 0.19298245614035087|
|0.0| 0.21052631578947367|
|0.0| 0.22807017543859648|
|0.0| 0.24561403508771928|
|0.0|  0.2631578947368421|
|0.0|  0.2807017543859649|
|0.0|  0.2982456140350877|
|0.0|  0.3157894736842105|
|0.0|  0.3333333333333333|
+---+--------------------+
only showing top 20 rows

areaUnderROC: 1.0


LogisticRegression_c5f6aedce7a6

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("dummy_folder/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data).transform(data)

labelIndexer.show()

+-----+--------------------+------------+
|label|            features|indexedLabel|
+-----+--------------------+------------+
|  0.0|(692,[127,128,129...|         1.0|
|  1.0|(692,[158,159,160...|         0.0|
|  1.0|(692,[124,125,126...|         0.0|
|  1.0|(692,[152,153,154...|         0.0|
|  1.0|(692,[151,152,153...|         0.0|
|  0.0|(692,[129,130,131...|         1.0|
|  1.0|(692,[158,159,160...|         0.0|
|  1.0|(692,[99,100,101,...|         0.0|
|  0.0|(692,[154,155,156...|         1.0|
|  0.0|(692,[127,128,129...|         1.0|
|  1.0|(692,[154,155,156...|         0.0|
|  0.0|(692,[153,154,155...|         1.0|
|  0.0|(692,[151,152,153...|         1.0|
|  1.0|(692,[129,130,131...|         0.0|
|  0.0|(692,[154,155,156...|         1.0|
|  1.0|(692,[150,151,152...|         0.0|
|  0.0|(692,[124,125,126...|         1.0|
|  0.0|(692,[152,153,154...|         1.0|
|  1.0|(692,[97,98,99,12...|         0.0|
|  1.0|(692,[124,125,126...|         0.0|
+-----+--------------------+------

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("dummy_folder/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[0]
# summary only
print(treeModel)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[100,101,102...|
|       0.0|  0.0|(692,[123,124,125...|
|       0.0|  0.0|(692,[128,129,130...|
|       0.0|  0.0|(692,[150,151,152...|
|       0.0|  0.0|(692,[152,153,154...|
+----------+-----+--------------------+
only showing top 5 rows

Test Error = 0.0526316 
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_67e95c7592b0, depth=2, numNodes=5, numClasses=2, numFeatures=692
