Skip to content

Commit

Permalink
Adds commends and readme file
Browse files Browse the repository at this point in the history
  • Loading branch information
mdymczyk committed Feb 4, 2017
1 parent 6faa001 commit 71f676b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 14 deletions.
2 changes: 2 additions & 0 deletions README.md
@@ -1 +1,3 @@
# iot-pipeline

Source code used to generate the autoencoder IOT prediction model and a predictor class using that model for Big Data Analytics Tokyo 2017 conference.
40 changes: 35 additions & 5 deletions model-builder/iot_autoenc.R
@@ -1,6 +1,19 @@
library(h2o)
# Starts a local H2O node
h2o.init()

#
# Function which "windows" the input matrix i.e. for matrix:
# 1 2 3
# 4 5 6
# 7 8 9
# 10 11 12
#
# And window 3 the result would be:
# 1 2 3 4 5 6 7 8 9
# 4 5 6 7 8 9 10 11 12
#
# It appends window-1 following rows to each row creating matrix of size dim(rows-window+1, columns*window)
ngram <- function(inp, window){
rows <- dim(inp)[1]
cols <- dim(inp)[2]
Expand All @@ -21,45 +34,62 @@ ngram <- function(inp, window){
return(res)
}

# Read training data into memory
iotRaw <- read.csv("resources/normal_20170202_2229.csv")

# Select training columns
iot <- as.matrix(iotRaw[,c("LinAccX..g.","LinAccY..g.","LinAccZ..g.")])

# Set training window and ngram
window <- 200
iot <- ngram(iot, window)

# Send the data to H2O
iot.hex <- as.h2o(iot)

# Run the deeplearning model in autoencoder mode
neurons <- 50
iot.dl = h2o.deeplearning(model_id = "iot_dl", x = 1:(ncol(iot)), training_frame = iot.hex, autoencoder = TRUE, hidden = c(neurons), epochs = 100,
l1 = 1e-5, l2 = 1e-5, max_w2 = 10, activation = "TanhWithDropout", initial_weight_distribution = "UniformAdaptive", adaptive_rate = TRUE)

# Make predictions for training data
iot_error <- h2o.anomaly(iot.dl, iot.hex)
avg_iot_error <- sum(iot_error)/nrow(iot_error)
print(avg_iot_error)

# Get the prediction threshold as 2*sd -> this should be found empirically on running data
threshold <- sd(iot_error)*2
print(nrow(iot_error[iot_error > threshold])/nrow(iot.hex))

# If required check the model on anomaly data
#anomalyRaw <- read.csv("resources/pre-fail_20170202_2234.csv")
#anomaly <- as.matrix(anomalyRaw[,c("LinAccX..g.","LinAccY..g.","LinAccZ..g.")])
#anomaly <- ngram(anomaly, window)
#anomaly.hex <- as.h2o(anomaly)
#anomaly_error <- h2o.anomaly(iot.dl, anomaly.hex)
#print(nrow(anomaly_error[anomaly_error > threshold])/nrow(anomaly.hex))

# Check the model on verification data
verifyRaw <- read.csv("resources/verify_20170202_2243.csv")
verify <- as.matrix(verifyRaw[,c("LinAccX..g.","LinAccY..g.","LinAccZ..g.")])
verify <- ngram(verify, window)
verify.hex <- as.h2o(verify)
verify_error <- h2o.anomaly(iot.dl, verify.hex)
print(nrow(verify_error[verify_error > threshold])/nrow(verify.hex))

# Exports the H2O model as a Java class
exportPojo <- function() {
h2o.download_pojo(iot.dl, path="/Users/mateusz/Dev/code/github/iot-pipeline/predictions/src/main/java/")
unlink("/Users/mateusz/Dev/code/github/iot-pipeline/predictions/src/main/java/h2o-genmodel.jar")
cat("threshold=",toString(threshold),file="/Users/mateusz/Dev/code/github/iot-pipeline/predictions/src/main/resources/dl.properties",sep="",append=F)
h2o.download_pojo(iot.dl, path="../predictions/src/main/java/")
# Download also downloads a utility class which we don't use for autoencoders
unlink("../predictions/src/main/java/h2o-genmodel.jar")
# Write th threshold to a properties file
cat("threshold=",toString(threshold),file="../predictions/src/main/resources/dl.properties",sep="",append=F)
}

errors <- which(as.matrix(verify_error) > threshold, arr.ind=T)[,1]
vals <- rep(list(1),length(errors))

# Plot the result of our predictions for verification data
attach(mtcars)
par(mfrow=c(3,1))
plot(verify[-c(errors),1], col="chartreuse4", , xlab="Time", ylab="LinAccX")
points(x=errors,y=verify[errors,1], col="red")
plot(verify[-c(errors),2], col="chartreuse4", xlab="Time", ylab="LinAccY")
Expand Down
2 changes: 1 addition & 1 deletion predictions/build.sbt
@@ -1,4 +1,4 @@
name := "MyProject"
name := "iot-predictions"
version := "1.0"
scalaVersion := "2.11.8"

Expand Down
48 changes: 40 additions & 8 deletions predictions/src/main/scala/Predictor.scala
@@ -1,3 +1,11 @@
/**
* Usage:
*
* 1) Build from the predictions folder with `sbt assembly`
* 2) Run (on the server with MapR CLI set up) with
* java -jar iot-predictions-assembly-1.0.jar [threshold=0.003|failureRate=0.005|features=test1,test2,test3|timer=100|predictionCacheSize=1000]
*/

import java.util.Properties

import hex.genmodel.GenModel
Expand All @@ -11,20 +19,31 @@ import scala.io.Source

object Predictor {

// H2O generated POJO model name
val modelClassName = "iot_dl"

// Queues to read our data from/write our predictions to
val sensorTopic = "/streams/sensor:sensor1"
val predictionTopic = "/streams/sensor:sensor-state-test"
// Kafka producer
val producer: KafkaProducer[String, Int] = makeProducer()

// Current state of the machine
var state = 0

// Prediction configurations
// Prediction window size, has to be the same as the window size used to model training
var window: Int = 200
// How long are persisting predictions
var predictionCacheSize: Int = window*5
var failureRate: Double = 1.0/(5.0*predictionCacheSize.toDouble)
// How many failure predictions do we consider as an actual failure.
var failureRate: Double = 5.0/predictionCacheSize.toDouble

// Settable
// Feature names
var headers = Array("LinAccX..g.","LinAccY..g.","LinAccZ..g.")
// Number of features per observation
var features: Int = headers.length
// How often we send status updates to the system
var timer: Double = window * 10

var threshold: Double = {
Expand All @@ -35,6 +54,7 @@ object Predictor {
t
}

// Create a Kafka producer
private def makeProducer() = {
val props = new Properties()

Expand All @@ -45,6 +65,7 @@ object Predictor {
}

def main(args: Array[String]): Unit = {
// Parse command line config
for(arg <- args) {
val kv = arg.split("=")
if(kv(0) == "threshold") {
Expand All @@ -67,7 +88,7 @@ object Predictor {
}
}

// Sender
// Sender sending data to the Kafka queue
new Thread() {
override def run(): Unit = {
while(true) {
Expand All @@ -77,6 +98,7 @@ object Predictor {
}
}.start()

// Kafka consumer reading sensor data from the queue
val consumer = new MapRStreamsConsumerFacade(sensorTopic)
try {
consumer.prepareSetup()
Expand Down Expand Up @@ -105,11 +127,15 @@ object Predictor {
import scala.collection.JavaConversions._

def poll(consumer: MapRStreamsConsumerFacade): Unit = {
val fullWindow = features * window
val inputRB: RingBuffer[Double] = new RingBuffer(fullWindow)
val fullWindowFeatures = features * window

// Data used for predictions
val inputRB: RingBuffer[Double] = new RingBuffer(fullWindowFeatures)

// Model generated by H2O
val rawModel: GenModel = Class.forName(modelClassName).newInstance().asInstanceOf[GenModel]

// Previous predictions
val rb: RingBuffer[Int] = new RingBuffer(predictionCacheSize)
while(true) {
val commitMap = new mutable.LinkedHashMap[TopicPartition, OffsetAndMetadata]()
Expand All @@ -125,16 +151,21 @@ object Predictor {
inputRB.+=(i)
}

if(inputRB.length == fullWindow) {
val preds = Array.fill[Double](fullWindow){0}
// If we have enough readings we can start predicting, we need to wait until we have WINDOW number of readings
if(inputRB.length == fullWindowFeatures) {
val preds = Array.fill[Double](fullWindowFeatures){0}
// Making a prediction
val pred = rawModel.score0(inputRB.toArray, preds)

val rmse = inputRB.zip(pred).map { case (i, p) => (p - i) * (p - i) }.sum / (fullWindow).toDouble
// Calculating the mean squared error of our prediction
val rmse = inputRB.zip(pred).map { case (i, p) => (p - i) * (p - i) }.sum / (fullWindowFeatures).toDouble

// If our RMSE if big enough we classify it as a failure 1, otherwise 0
val label = if (rmse > threshold) 1 else 0

rb.+=(label)

// If failure rate % of predictions in our cache are failures - set the state to failed
if ((rb.sum.toDouble / rb.length.toDouble) >= failureRate) {
state = 1
} else {
Expand All @@ -145,6 +176,7 @@ object Predictor {
}

if (commitMap.nonEmpty) {
// Notify the Kafka consumer we got the data
consumer.commit(commitMap.toMap[TopicPartition, OffsetAndMetadata])
}
}
Expand Down

0 comments on commit 71f676b

Please sign in to comment.