## Create a toy dataframe from dictionary

In [33]:
toy_dct = [
    {"val1": 123,"val2":2,'label':1},
    {"val1": 234,"val2":54,'label':0},
    {"val1": 354,"val2":6,'label':1},
    {"val1": 78,"val2":56,'label':1},
    {"val1": 234,"val2":12,'label':0},
    {"val1": 942,"val2":76,'label':0}
]

In [34]:
toy_df = spark.createDataFrame(toy_dct)

In [35]:
toy_df.show()

+-----+----+----+
|label|val1|val2|
+-----+----+----+
|    1| 123|   2|
|    0| 234|  54|
|    1| 354|   6|
|    1|  78|  56|
|    0| 234|  12|
|    0| 942|  76|
+-----+----+----+

## Machine learning

### prepare features matrix

In [36]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

In [37]:
toy_df.printSchema()

root
 |-- label: long (nullable = true)
 |-- val1: long (nullable = true)
 |-- val2: long (nullable = true)

In [39]:
#create a features column
columns = toy_df.schema.names 
features = [col for col in columns if col != "label"]
features

['val1', 'val2']

In [40]:
vectorAssembler = VectorAssembler(inputCols = features,outputCol='features')

In [41]:
class_df = vectorAssembler.transform(toy_df)
class_df = class_df.select(['features','label'])

In [42]:
class_df.show()

+------------+-----+
|    features|label|
+------------+-----+
| [123.0,2.0]|    1|
|[234.0,54.0]|    0|
| [354.0,6.0]|    1|
| [78.0,56.0]|    1|
|[234.0,12.0]|    0|
|[942.0,76.0]|    0|
+------------+-----+

### train test split

In [43]:
splits = class_df.randomSplit([0.66, 0.333])
train_df = splits[0]
test_df = splits[1]

In [44]:
train_df.show()

+------------+-----+
|    features|label|
+------------+-----+
| [123.0,2.0]|    1|
| [354.0,6.0]|    1|
| [78.0,56.0]|    1|
|[234.0,12.0]|    0|
+------------+-----+

In [45]:
test_df.show()

+------------+-----+
|    features|label|
+------------+-----+
|[234.0,54.0]|    0|
|[942.0,76.0]|    0|
+------------+-----+

## Train

In [46]:
lr = LogisticRegression(maxIter=10)
model = lr.fit(train_df)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))



Coefficients: [-0.00274868939276,0.0165602064037]
Intercept: 1.4106960521079925

0.75

In [54]:
train_sum = model.summary
accuracy = train_sum.accuracy
print("Accuracy {}".format(accuracy))
train_sum.roc.show()
print("areaUnderROC: " + str(train_sum.areaUnderROC))

Accuracy 0.75
+---+------------------+
|FPR|               TPR|
+---+------------------+
|0.0|               0.0|
|0.0|0.3333333333333333|
|0.0|0.6666666666666666|
|1.0|0.6666666666666666|
|1.0|               1.0|
|1.0|               1.0|
+---+------------------+

areaUnderROC: 0.6666666666666666

In [58]:
train_sum.predictions.show()

+------------+-----+--------------------+--------------------+----------+
|    features|label|       rawPrediction|         probability|prediction|
+------------+-----+--------------------+--------------------+----------+
| [123.0,2.0]|  1.0|[-1.1057276696057...|[0.24866824068665...|       1.0|
| [354.0,6.0]|  1.0|[-0.5370212454927...|[0.36888078814599...|       1.0|
| [78.0,56.0]|  1.0|[-2.1236698380787...|[0.10681743497387...|       1.0|
|[234.0,12.0]|  0.0|[-0.9662252110461...|[0.27563353673688...|       1.0|
+------------+-----+--------------------+--------------------+----------+

## Test prediction

In [72]:
predictions = model.transform(test_df)
predictions.select("prediction","label")
print("Coefficients: " + str(predictions.coefficients))
print("Intercept: " + str(predictions.intercept))

'DataFrame' object has no attribute 'coefficients'
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1300, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'coefficients'



In [69]:
predictions.show()

+------------+-----+--------------------+--------------------+----------+
|    features|label|       rawPrediction|         probability|prediction|
+------------+-----+--------------------+--------------------+----------+
|[234.0,54.0]|    0|[-1.6617538800007...|[0.15952669989319...|       1.0|
|[942.0,76.0]|    0|[-0.0800063308070...|[0.48000907967247...|       1.0|
+------------+-----+--------------------+--------------------+----------+

## Test set evaluation

In [73]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [77]:
test_evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
test_evaluator.evaluate(predictions) #AUC

In [82]:
test_evaluator.evaluate(predictions) #AUC

0.0

In [81]:
test_evaluator.getMetricName()

'areaUnderROC'

In [90]:
import numpy as np
prediction_for_one =  np.array(predictions.select(["prediction"]).collect())[0]

In [91]:
prediction_for_one

array([ 1.])