# Powerplant Output Prediction
- This notebook is based on the power plant output prediction example presented in H2O’s [blog post on H2O AutoML in Spark](https://www.h2o.ai/blog/h2os-automl-in-spark/).
- Run this notebook in Azure Data Studio connected to a SQL Server 2019 Big Data Cluster by following the instructions [here](https://docs.microsoft.com/en-us/sql/big-data-cluster/notebooks-guidance?view=sqlallproducts-allversions).

## Spark Configuration
- We can control the Spark Driver and Executor memory, cores, and number of executors per pod using the “%%configure” cell magic
- Additional configuration settings are listed at the end of this notebook


In [1]:
%%configure -f
{
    "executorMemory": "4g",
    "driverMemory": "4g",
    "executorCores": 2,
    "driverCores": 2,
    "numExecutors": 2
}

# Install H2O
- This cell downloads the h2o_pysparkling_2.3 python package and installs it on the pod where the Spark driver is currently running, if it is not already installed. Propagating the software to additional pods is handled automatically once we launch H2O.
- For an enterprise scenario where we cannot reach out to the PyPi repository on the Internet, pip3 can be pointed to a local copy.

In [1]:
import subprocess

# Install H2O PySparkling
stdout = subprocess.check_output(
    "pip3 install h2o_pysparkling_2.3",
    stderr=subprocess.STDOUT,
    shell=True).decode("utf-8")
print(stdout)


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
24,application_1543381571657_0025,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
Collecting h2o_pysparkling_2.3
Installing collected packages: h2o-pysparkling-2.3
Successfully installed h2o-pysparkling-2.3-2.3.18
You are using pip version 8.1.1, however version 18.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

# Download and copy data to HDFS

In [1]:
dataFileName = "powerplant_output.csv"
dataFileUrl = "https://raw.githubusercontent.com/h2oai/h2o-tutorials/master/h2o-world-2017/automl/data/" + dataFileName

# Download data file and copy to HDFS, if not already there
cmd = 'hdfs dfs -ls /tmp/' + dataFileName + ' || ' \
    '(wget ' + dataFileUrl + ' && ' \
    'hdfs dfs -copyFromLocal ' + dataFileName + ' /tmp && ' \
    'rm ' + dataFileName + ')'

stdout = subprocess.check_output(
    cmd,
    stderr=subprocess.STDOUT,
    shell=True).decode("utf-8")
print(stdout)

ls: `/tmp/powerplant_output.csv': No such file or directory
--2018-12-06 19:29:35--  https://raw.githubusercontent.com/h2oai/h2o-tutorials/master/h2o-world-2017/automl/data/powerplant_output.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 308777 (302K) [text/plain]
Saving to: 'powerplant_output.csv'

     0K .......... .......... .......... .......... .......... 16%  574K 0s
    50K .......... .......... .......... .......... .......... 33% 1.14M 0s
   100K .......... .......... .......... .......... .......... 49% 30.9M 0s
   150K .......... .......... .......... .......... .......... 66% 1.16M 0s
   200K .......... .......... .......... .......... .......... 82% 42.7M 0s
   250K .......... .......... .......... .......... .......... 99% 77.2M 0s
   300K .                                      

# Start H2O engine

In [1]:
from pysparkling import H2OContext

hc = H2OContext.getOrCreate(spark)

Connecting to H2O server at http://10.244.0.66:54323... successful.
--------------------------  ---------------------------------------------------
H2O cluster uptime:         13 secs
H2O cluster timezone:       Etc/UTC
H2O data parsing timezone:  UTC
H2O cluster version:        3.22.0.2
H2O cluster version age:    14 days, 12 hours and 24 minutes
H2O cluster name:           sparkling-water-root_application_1543381571657_0021
H2O cluster total nodes:    2
H2O cluster free memory:    6.928 Gb
H2O cluster total cores:    32
H2O cluster allowed cores:  4
H2O cluster status:         accepting new members, healthy
H2O connection url:         http://10.244.0.66:54323
H2O connection proxy:
H2O internal security:      False
H2O API Extensions:         XGBoost, Algos, AutoML, Core V3, Core V4
Python version:             3.5.2 final
--------------------------  ---------------------------------------------------

Sparkling Water Context:
 * H2O name: sparkling-water-root_application_1543381571657

In [1]:
# Print the hostname of the pod where the driver is running
stdout = subprocess.check_output(
    "hostname",
    stderr=subprocess.STDOUT,
    shell=True).decode("utf-8")
print(stdout)

mssql-storage-pool-default-0

# Read and split data

In [1]:
powerplant_df = spark.read.option("inferSchema", "true").csv("/tmp/powerplant_output.csv", header=True)

splits = powerplant_df.randomSplit([0.8, 0.2], seed=1)
train = splits[0]
for_predictions = splits[1]


# Training and Prediction
- Fit AutoML model on training data
- Generate predictions on "for_predictions" data

In [1]:
from pyspark.ml.feature import SQLTransformer
from pysparkling.ml import H2OAutoML
from pyspark.ml import Pipeline

temperatureTransformer = SQLTransformer(statement="SELECT * FROM __THIS__ WHERE TemperatureCelcius > 10")

automlEstimator = H2OAutoML(maxModels=2, predictionCol="HourlyEnergyOutputMW", seed=1)

pipeline = Pipeline(stages=[temperatureTransformer, automlEstimator])

# Fit AutoML model
model = pipeline.fit(train)

# Generate predictions using fitted model
predicted = model.transform(for_predictions)

predicted.show()


+------------------+---------------+-----------------------+----------------+--------------------+--------------------+
|TemperatureCelcius|ExhaustVacuumHg|AmbientPressureMillibar|RelativeHumidity|HourlyEnergyOutputMW|   prediction_output|
+------------------+---------------+-----------------------+----------------+--------------------+--------------------+
|             10.01|          41.17|                1018.78|           86.84|               479.4|[477.07336140008584]|
|             10.02|          39.66|                1016.34|           79.98|              480.05| [476.9418514217447]|
|             10.03|          43.13|                1014.85|           70.09|              482.16| [476.1059605448468]|
|             10.04|          41.62|                1013.36|           95.17|              463.87| [470.5313939263834]|
|             10.05|          41.58|                1021.35|           95.19|              469.03| [469.2805814291723]|
|             10.06|          34.69|    

# Display the leaderboard metrics

In [1]:
automlEstimator.leaderboard().show(truncate=False)

+---------------------------------------------------+----------------------+------------------+------------------+------------------+--------------------+
|model_id                                           |mean_residual_deviance|rmse              |mse               |mae               |rmsle               |
+---------------------------------------------------+----------------------+------------------+------------------+------------------+--------------------+
|StackedEnsemble_BestOfFamily_AutoML_20181206_193040|11.204658852035337    |3.3473360829225585|11.204658852035337|2.509389117612746 |0.007425043374216511|
|StackedEnsemble_AllModels_AutoML_20181206_193040   |11.204658852035337    |3.3473360829225585|11.204658852035337|2.509389117612746 |0.007425043374216511|
|DRF_1_AutoML_20181206_193040                       |11.349494687812056    |3.3689011098297406|11.349494687812056|2.5426288374345605|0.007472705853530634|
|XRT_1_AutoML_20181206_193040                       |11.46486503552651

# Evaluate predictions on held-out data
- As expected, we find that the mean absolute error (mae) on the for_predictions data is similar to the leaderboard mae

In [1]:
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator

scores = predicted.select(predicted['HourlyEnergyOutputMW'], predicted['prediction_output']['value'].alias('prediction'))

evaluator = RegressionEvaluator(predictionCol="prediction",
                                labelCol="HourlyEnergyOutputMW",
                                metricName="mae")

mae = evaluator.evaluate(scores)

print("Mean absolute error:", mae)

Mean absolute error: 2.3167231443313843

# Configuration settings for scaling to larger data

## Number and size of nodes in our Kubernetes cluster
We can control the number and size of nodes in our Kubernetes cluster via the node-vm-size and node-count switches in our `aks create` command:

`az aks create --name mycluster --resource-group myrg --generate-ssh-keys --node-vm-size Standard_DS14_v2 --node-count 3 --kubernetes-version 1.10.9`

More information is available [here](https://docs.microsoft.com/en-us/sql/big-data-cluster/deploy-on-aks?view=sqlallproducts-allversions#create-a-kubernetes-cluster).

## Number of Spark pods
We can control the number of Spark pods via the CLUSTER_STORAGE_POOL_REPLICAS environment variable used by `mssqlctl create cluster`:

SET CLUSTER_STORAGE_POOL_REPLICAS=2

## YARN scheduler memory and cores
We can control the YARN scheduler memory and cores via the following environment variable used by `mssqlctl create cluster`:

- YARN_SCHEDULER_MAX_MEMORY
- YARN_SCHEDULER_MAX_VCORES
- YARN_NODEMANAGER_RESOURCE_MEMORY
- YARN_NODEMANAGER_RESOURCE_VCORES

Further information regarding mssqlctl environtment variables is available [here](https://docs.microsoft.com/en-us/sql/big-data-cluster/deployment-guidance?view=sqlallproducts-allversions#define-environment-variables).

## Livy timeout
The Livy timeout sets a limit on the runtime of a cell in a PySpark3 Jupyter notebook. In SQL Server 2019 Big Data CTP 2.1, the Livy timeout defaults to 1 hour. In CTP 2.2, it defaults to 24 days. One can modify this as follows:

- Log into the mssql-master-pool-0 pod using this command (requires permission to run kubectl):

```
kubectl exec -it mssql-master-pool-0 -n <your-cluster-name>  -- /bin/bash
```
- To set the Livy timeout to 24 days, run the following command or edit /livy/conf/livy.conf accordingly:

```
echo 'livy.server.session.timeout = 24d' | cat >> /livy/conf/livy.conf 
```
- Then restart the Livy server by running the following command:

```
supervisorctl restart livy
```

# Monitoring and Diagnostics
## YARN UI

Access from the "View Yarn History" button in Azure Data Studio (ADS) or at `https://<knox-gateway>:30443/gateway/default/yarn`

## Spark UI

Access from the "Spark UI" link that appears after running the first cell in a notebook in a (Py)Spark kernel in ADS or by clicking the ApplicationMaster link of a running application in the YARN UI

## Spark History

Access from the "View Spark History" button in ADS or at `https://<knox-gateway>:30443/gateway/default/sparkhistory`

## H2O Flow UI
- The command `H2OContext.getOrCreate(spark)` outputs the IP address and port number for connection to H2O’s Flow UI, for example:

   `H2O connection url:         http://10.244.0.16:54325`

- This connection can be forwarded to one’s workstation using this command (requires permission to run kubectl):

   `kubectl -n test port-forward <pod-running-driver> <port>`

   Here is an example:

   `kubectl -n test port-forward mssql-storage-pool-default-0 54325`

   The port number is the number after the colon in the H2O connection URL.
   
   To determine `<pod-running-driver>`, run this command in the PySpark3 kernel in the ADS notebook:

```python
   # Print the hostname of the pod where the driver is running
   stdout = subprocess.check_output(  
       "hostname",  
       stderr=subprocess.STDOUT,  
       shell=True).decode("utf-8")  
   print(stdout)  
```

- After setting up port forwarding, the Flow UI can be accessed at `http://localhost:<port>`; for example, `http://localhost:54325`