d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Use Databricks Delta Time Travel and MLflow to Analyze Power Plant Data

In this notebook, we 
0. Stream power plant data to a Databricks Delta table
0. Train our model on a current version of our data
0. Post some results to MLflow
0. Rewind to an older version of our data
0. Re-train our model on an older version of our data
0. Evaluate our (rewound) data 
0. Make predictions on the streaming data

The focus isn't necessarily on Machine Learning here, but, it is to show you how we may integrate the latest Databricks features in Machine Learning.

Our schema definition from UCI appears below:

- AT = Atmospheric Temperature [1.81-37.11]°C
- V = Exhaust Vaccum Speed [25.36-81.56] cm Hg
- AP = Atmospheric Pressure in [992.89-1033.30] milibar
- RH = Relative Humidity [0-100]%
- PE = Power Output [420.26-495.76] MW

PE is our label or target. This is the value we are trying to predict given the measurements.

*Reference [UCI Machine Learning Repository Combined Cycle Power Plant Data Set](https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant)*

## Before we get started 

This notebook makes use of a service called MLflow (more on this later).

In order to use this service we need to attach the following library to your cluster.

PyPI: `mlflow==0.9.0`


Once the library is appropriatly attached, you can run the following import statement to confirm that your cluster is properly configured:

In [5]:
import mlflow

## Getting Started

Run the following cell to configure our "classroom."

In [7]:
%run "./Includes/Classroom-Setup-08"

## Slow Stream of Files

Our stream source is a repository of many small files.

In [9]:
from pyspark.sql.types import StructType, StructField, DoubleType

dataPath = "/mnt/training/power-plant/streamed.parquet"
experimentPath = "/Users/" + username + "/delta-experiment"

dataSchema = StructType([
  StructField("AT", DoubleType(), True),
  StructField("V", DoubleType(), True),
  StructField("AP", DoubleType(), True),
  StructField("RH", DoubleType(), True),
  StructField("PE", DoubleType(), True)
])

In [10]:
initialDF = (spark
  .readStream                        # Returns DataStreamReader
  .option("maxFilesPerTrigger", 1)   # Force processing of only 1 file per trigger 
  .schema(dataSchema)                # Required for all streaming DataFrames
  .parquet(dataPath) 
)

## Append to a Databricks Delta Table

Use this to create `powerTable`.

In [12]:
from pyspark.sql.types import TimestampType
import datetime

writePath = basePath + "/output.parquet"   # A subdirectory for our output
checkpointPath = basePath + "/checkpoint"  # A subdirectory for our checkpoint & W-A logs
now = datetime.datetime.now()

powerTable = "powerTable"

In [13]:
streamingQuery = (initialDF                                  # Start with our "streaming" DataFrame
  .writeStream                                               # Get the DataStreamWriter
  .trigger(processingTime="3 seconds")                       # Configure for a 3-second micro-batch
  .queryName("stream-1p")                                    # Specify Query Name
  .format("delta")                                           # Specify the sink type, a Parquet file
  .option("timestampAsOf", now)                              # Timestamp the stream in the form of string that can be converted to TimeStamp
  .outputMode("append")                                      # Write only new data to the "file"
  .option("checkpointLocation", checkpointPath)              # Specify the location of checkpoint files & W-A logs
  .table(powerTable)
)

Cell below is to keep the stream running in case we do a RunAll

In [15]:
untilStreamIsReady("stream-1p")

Create a DataFrame out of the Delta stream so we can get a scatterplot.

This will be a "snapshot" of the data at an instant in time, so, a static table.

In [17]:
staticPowerDF = spark.table(powerTable)

In [18]:
display( spark.sql("SELECT count(*) FROM {}".format(powerTable)) )

##Use Scatter Plot show intution

Let's plot `PE` versus other fields to see if there are any relationships.

You can toggle between fields by adjusting Plot Options.

Couple observations
* It looks like there is strong linear correlation between Atmospheric Temperature and Power Output
* Maybe a bit of correlation between Atmospheric Pressure and Power Output

In [20]:
display(staticPowerDF)

-sandbox
##Train LR Model on Static DataFrame

0. Split `staticPowerDF` into training and test set
0. Use all features: AT, AP, RH and V
0. Reshape training set
0. Do linear regression
0. Predict power output (PE)

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Data is changing underneath us

In [22]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Split DataFrame into test/train sets
(trainDF, testDF) = staticPowerDF.randomSplit([0.80, 0.20], seed=42)

# Set which columns are features
assembler = VectorAssembler(inputCols=["AT", "AP", "RH", "V"], outputCol="features")

# Reshape the train set
trainVecDF = assembler.transform(trainDF)

# Set which column is the label
lr = LinearRegression(labelCol="PE", featuresCol="features")

# Fit training data
lrModel = lr.fit(trainVecDF)

# Append predicted PE column, rename it
trainPredictionsDF = (lrModel
  .transform(trainVecDF)
  .withColumnRenamed("prediction", "predictedPE")
)

## Use MLFlow 

MLflow is an open source platform for managing the end-to-end machine learning lifecycle. 

In this notebook, we use MLflow to track experiments to record and compare parameters and results.

https://www.mlflow.org/docs/latest/index.html

### Post results to MLflow

In this notebook, we would like to keep track of the Root Mean Squared Error (RMSE).

This line actually does the work of posting the RMSE to MLflow.

`mlflowClient.logMetric(runId, "RMSE", rmse)`

If you rerun the below cell multiple times, you will see new runs are posted to MLflow, with different RMSE!

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import os

os.environ["MLFLOW_AUTODETECT_EXPERIMENT_ID"] = 'true'

with mlflow.start_run(run_name="Delta 1") as run:
  
  # Evaluate our result
  eval = RegressionEvaluator(labelCol="PE", predictionCol="predictedPE", metricName="rmse")
  rmse = eval.evaluate(trainPredictionsDF)

  mlflow.log_metric("RMSE", rmse)
  
  experimentID = run.info.experiment_id
  

displayHTML("""<div>RMSE: {}</div>
               <div>&nbsp;</div>
               <div>Click "Runs" in the upper-right hand corner of the screen to view the results.""".format(rmse))

### Question to Ponder:

Why is `RMSE` changing under our feet? We are working with "static" DataFrames..

You might want to discuss this in class.

-sandbox
##Introducing Delta Time Travel

Delta time travel allows you to query an older snapshot of a Delta table.

At the beginning of this lesson, we timestamped our stream i.e.

`.option("timestampAsOf", now)` 

Where `now` is the current timestamp. 

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> It must be a string that can be cast to a timestamp.

Let's wind back to a version of our table we had several hours ago & fit our data to that version.

Maybe some pattern we were looking at became apparent for the first time a few hours ago.

https://docs.databricks.com/delta/delta-batch.html#deltatimetravel

This query shows the timestamps of the Delta writes as they were happening.

In [29]:
display(spark.sql("SELECT timestamp FROM (DESCRIBE HISTORY {}) ORDER BY timestamp".format(powerTable)))

Let's rewind back to almost the beginning (where we had just a handful of rows), let's say the 5th write.

Maybe we started noticing a pattern at this point.

In [31]:
# Pick out 5th write
oldTimestamp = spark.sql("SELECT timestamp FROM (DESCRIBE HISTORY {}) ORDER BY timestamp".format(powerTable)).take(5)[-1].timestamp

# Re-build the DataFrame as it was in the 5th write
rewoundDF = spark.sql("SELECT * FROM {} TIMESTAMP AS OF '{}'".format(powerTable, oldTimestamp))

We had this many (few) rows back then.

In [33]:
rewoundDF.count()

In [34]:
display(rewoundDF)

### Train Model Based on Data from a Few Hours Ago

* Use `rewoundDF`
* Write to MLflow

Notice the only change from what we did earlier is the use of `rewoundDF`

In [36]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Split DataFrame into test/train sets
(trainDF, testDF) = staticPowerDF.randomSplit([0.80, 0.20], seed=42)

# Set which columns are features
assembler = VectorAssembler(inputCols=["AT", "AP", "RH", "V"], outputCol="features")

# Reshape the train set
trainVecDF = assembler.transform(trainDF)

# Set which column is the label
lr = LinearRegression(labelCol="PE", featuresCol="features")

# Fit training data
lrModel = lr.fit(trainVecDF)

# Append predicted PE column, rename it
trainPredictionsDF = (lrModel
  .transform(trainVecDF)
  .withColumnRenamed("prediction", "predictedPE")
)

In [37]:
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow

with mlflow.start_run(run_name="Delta 2") as run:
  
  # Evaluate our result
  eval = RegressionEvaluator(labelCol="PE", predictionCol="predictedPE", metricName="rmse")
  rmse = eval.evaluate(trainPredictionsDF)

  # Log the results with MLFlow
  mlflow.log_metric("RMSE", rmse)
  
# Display some results below
displayHTML("""<div>RMSE: {}</div>
               <div>&nbsp;</div>
               <div>Click "Runs" in the upper-right hand corner of the screen to view the results.""".format(rmse))

### Evaluate Using Test Set

0. Reshape data via `assembler.transform()`
0. Apply linear regression model 
0. Record metrics in MLflow

In [39]:
from pyspark.ml import Pipeline

# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline(stages=[assembler, lr])

# Pipelines are themselves Estimators -- so to use them we call fit:
lrModel = lrPipeline.fit(trainDF)

In [40]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
import mlflow

with mlflow.start_run(run_name="Delta 3") as run:
  justPredictionAndLabelDF = lrModel.transform(testDF).select("prediction", "PE")
  metrics = RegressionMetrics(justPredictionAndLabelDF.rdd.map(lambda r: (r.prediction, r.PE)))
  rmse = metrics.rootMeanSquaredError

  # Log the results with MLFlow
  mlflow.log_metric("RMSE", rmse)
  
# Display some results below
displayHTML("""<div>RMSE: {}</div>
               <div>&nbsp;</div>
               <div>Click "Runs" in the upper-right hand corner of the screen to view the results.""".format(rmse))

-sandbox
### Final Model

The stats from the test set are pretty good so we've done a decent job coming up with the model.

In [42]:
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
lrPipeline = Pipeline()

weights = lrModel.stages[1].coefficients
intercept = lrModel.stages[1].intercept

In [43]:
print("The equation that describes the relationship between AT, AP, RH and PE is:\nPE = {} - {} * AT + {} * AP - {} * RH - {} * V"  
      .format(intercept, abs(weights[0]), weights[1], abs(weights[2]), abs(weights[3])))

We are pretty happy with the model we developed.

Let's save the model.

In [45]:
fileName = experimentPath + "/model"
lrModel.write().overwrite().save(fileName)

## Make real-time predictions using the data from the stream.

Let's apply the model we saved to the rest of the streaming data!

In [47]:
from pyspark.ml import PipelineModel
lrPredModel = PipelineModel.load(fileName)

Time to make some predictions!!

In [49]:
stream = (lrPredModel
          .transform(initialDF)
          .withColumnRenamed("prediction", "PredictedPE"))

display(stream.select("AT", "AP", "V", "RH", "PE", "PredictedPE"))

## Clean Up

Stop all remaining streams.

In [52]:
for s in spark.streams.active:
  s.stop()

Drop the table we are using.

In [54]:
spark.sql("DROP TABLE IF EXISTS {}".format(powerTable))

Delete directories we were writing to.

In [56]:
dbutils.fs.rm(writePath, True)
dbutils.fs.rm(checkpointPath, True)

In [57]:
%run "./Includes/Classroom-Cleanup-08"


-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>