## Apache Spark machine learning application in Azure HDInsight

Reference: https://docs.microsoft.com/en-us/azure/hdinsight/spark/apache-spark-ipython-notebook-machine-learning

> MLlib is Spark's adaptable machine learning library consisting of common learning algorithms and utilities. (Classification, regression, clustering, collaborative filtering, and dimensionality reduction. Also, underlying optimization primitives.)

Source data: **HVAC.csv**

### Import the types required for the lab.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

import os
import sys
from pyspark.sql.types import *

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

### Load the data (hvac.csv), parse it, and use it to train the model.

In the code snippet, you define a function that compares the actual temperature with the target temperature. If the actual temperature is greater, the building is hot, denoted by the value 1.0. Otherwise the building is cold, denoted by the value 0.0.

In [None]:
# Define a type called LabelDocument
LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)

# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile("/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

### Configure the Spark machine learning pipeline that consists of three stages: tokenizer, hashingTF, and lr.

In [None]:
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

### Fit the pipeline to the training document.

In [None]:
model = pipeline.fit(training)

### Verify the training document to checkpoint your progress with the application.



training.show()

### Prepare a data set to run the trained model against.

To do so, you pass on a system ID and system age (denoted as SystemInfo in the training output). The model predicts whether the building with that system ID and system age will be hotter (denoted by 1.0) or cooler (denoted by 0.0).

In [None]:
# SystemInfo here is a combination of system ID followed by system age
Document = Row("id", "SystemInfo")
test = sc.parallelize([(1L, "20 25"),
                (2L, "4 15"),
                (3L, "16 9"),
                (4L, "9 22"),
                (5L, "17 10"),
                (6L, "7 22")]) \
    .map(lambda x: Document(*x)).toDF()

### Finally, make predictions on the test data.

In [None]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("SystemInfo", "prediction", "probability")
for row in selected.collect():
    print row

### Summary

Observe the first row in the prediction. For an HVAC system with ID 20 and system age of 25 years, the building is hot (prediction=1.0). The first value for DenseVector (0.49999) corresponds to the prediction 0.0 and the second value (0.5001) corresponds to the prediction 1.0. In the output, even though the second value is only marginally higher, the model shows prediction=1.0.