IoT Data Analytics Benchmark
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
python
scala
.gitignore
CONTRIBUTING.md
LICENSE.txt
README.md
sensor_data.csv
sim_sensor_output.txt

README.md

iot-analytics-benchmark

Introduction

IoT Analytics Benchmark is a simulation of data analytics being run on a stream of sensor data, for example, factory machines being monitored for impending failure conditions.

IoT Analytics Benchmark consists of 3 components:

  • iotgen - generates synthetic training data files using a simple randomized model. Each row of sensor values is preceded by a label, either 1, or 0, indicating whether that set of values would trigger the failure condition

  • iottrain - uses the pre-labeled training data to train a Spark machine learning library model

  • iotstream - applies that model to a stream of incoming sensor values (generated by a separate program) using Spark Streaming, indicating when the impending failure conditions need attention.

In the real world the data that iotgen simulates would be collected over time from actual conditions. iottrain would be a batch job that would be run periodically offline. iotstream is a small program that can be run on a single edge gateway in a factory.

Currently only the Spark Logistic Regression model is supported but we plan to add other machine learning programs. A simple network connection into Spark Streaming is supported, as well as the Kafka and MQTT message buses. All programs are in Python or Scala.

Installation

  • Install Spark (1.6.2 and 2.3.0 tested here) either on a single node or in a cluster (Spark Standalone and Spark on YARN tested)

  • Spark single node installation: obtain latest version from http://spark.apache.org/downloads.html and unzip

  • Install python including numpy on all nodes (see below for instructions)

  • Install nc on driver node (yum install nc)

  • For purposes of this documentation, a symbolic link to the Spark code on the driver system is assumed. For example: ln -s /root/spark-2.3.0-bin-hadoop2.7 /root/spark

  • Add spark/bin directory to $PATH

  • Set log level from INFO to ERROR (suggested for cleaner output, especially of iotstream). In spark/conf: cp log4j.properties.template log4j.properties
    Set log4j.rootCategory=ERROR, console

  • Clone or download and unzip project

Project Files

File Use
calc_cutoffs.py utility program to create cutoffs for iotgen (see below)
iotgen_lr.py Spark program to generate Logistic Regression training data - Python version
iotgen_lr.scala Spark program to generate Logistic Regression training data - Scala version
iotgen_lr_python.py Python-only iotgen
sensor_data.csv Sample training data
iottrain_lr.py Spark program to train Logistic Regression model - Python version
iottrain_lr.scala Spark program to train Logistic Regression model - Scala version
iotstream_lr.py Spark Streaming program to apply Logistic Regression model to input sensor values - Python version
iotstream_lr_kafka.py Spark Streaming program to apply Logistic Regression model to input sensor values read from Kafka - Python version
iotstream_lr_kafka.scala Spark Streaming program to apply Logistic Regression model to input sensor values read from Kafka - Scala version
iotstream_lr_mqtt.py Spark Streaming program to apply Logistic Regression model to input sensor values read from MQTT - Python version
iotstream_lr_mqtt.scala Spark Streaming program to apply Logistic Regression model to input sensor values read from MQTT - Scala version
sim_sensors_lr.py Program to generate sensor events for iotstream input - Python version
sim_sensors_lr_kafka.py Program to generate sensor events to Kafka for iotstream input - Python version
sim_sensors_lr_kafka.scala Program to generate sensor events to Kafka for iotstream input - Scala version
sim_sensors_lr_mqtt.py Program to generate sensor events to MQTT for iotstream input - Python version
sim_sensors_lr_mqtt.scala Program to generate sensor events to MQTT for iotstream input - Scala version
sim_sensor_output.txt Sample sim_sensors output
build.sbt.package SBT package build file
build.sbt.assembly SBT assembly build file
README.md This file

Program usage (run any program without parameters to show):

spark-submit iotgen_lr.py n_rows n_sensors n_partitions_ HDFS_or_S3 HDFS_path_or_S3_bucket filename <cutoff>

where:

Parameter Use
n_rows number of rows in the created training data
n_sensors number of sensors in each row of the training data
n_partitions number of Spark partitions to use - make this evenly divisible into n_rows
HDFS_or_S3 where to store training data
HDFS_path_or_S3_bucket HDFS path or S3 bucket
filename training data file name
cutoff optional parameter to control number % of '1' labels - if omitted will get approx. 50% - see below

spark-submit iottrain_lr.py HDFS_or_S3 HDFS_path_or_S3_bucket filename modelname

where:

Parameter Use
HDFS_or_S3 where to obtain training data
HDFS_path_or_S3_bucket HDFS path or S3 bucket
filename training data file name
modelname trained model file name, stored in same location as training data

spark-submit iotstream_lr.py n_sensors reporting_interval IP_address_of_stream_source Port_of_stream_source HDFS_or_S3 HDFS_path_or_S3_bucket modelname

where:

Parameter Use
n_sensors number of sensors in the system
reporting_interval Spark streaming interval
IP_address_of_stream_source IP address of stream source
Port_of_stream_source Port of stream source
HDFS_or_S3 where to obtain trained model
HDFS_path_or_S3_bucket HDFS path or S3 bucket
modelname trained model file name

python sim_sensors_lr.py n_sensors average_sensor_events_per_second total_events

where:

Parameter Use
n_sensors number of sensors in the system
average_sensor_events_per_second average sensor events generated per second
total_events program will stop after this number of events generated

sim_sensors_lr.py generates sensor events and sends them to standard output. This is usually piped through ncat (nc) to create a socket at a specified port for iostream to read from (see examples below)

The generated sensor events are comma separated lines with the following format:

yyyy-mm-ddThh:mm:ss.mmmZ,Sensor number,Sensor name,Sensor value where Z indicated the UTC timezone

For example:

2018-01-13T21:37:30.830Z,77,Sensor 77,0.474536
2018-01-13T21:37:30.923Z,82,Sensor 82,0.184921
2018-01-13T21:37:31.031Z,12,Sensor 12,0.730813
2018-01-13T21:37:31.130Z,89,Sensor 89,0.512667
2018-01-13T21:37:31.224Z,14,Sensor 14,0.636248

Note: For single node Spark installations without HDFS can write/read files to/from local disk using "local" for HDFS_or_S3 parameter

For multi-node Spark Standalone use --master local[#cores] for local files

Examples

Spark with single node, no HDFS, local disk:

spark-submit iotgen_lr.py 1000 100 8 local sd sensor_data1k_100
2018-03-01T17:12:03Z: Creating file sd/sensor_data1k_100 with 1000 rows of 100 sensors, each row preceded by score using cutoff 2525.0, in 8 partitions
2018-03-01T17:12:06Z: Created file sd/sensor_data1k_100 with size 789.1KB

spark-submit iottrain_lr.py local sd sensor_data1k_100 lr100
2018-03-01T17:15:29Z: Training logistic regression model and storing as sd/lr100 using data from sd/sensor_data1k_100
2018-03-01T17:15:37Z: Trained logistic regression model and stored as sd/lr100

In one shell:
       
python sim_sensors_lr.py 100 100 6000 | nc -lk 20000
    
In a 2nd shell on the same or different server:
    
spark-submit iotstream_lr.py 100 1 192.168.1.2 20000 local sd lr100
2018-03-01T17:23:33Z: Analyzing stream of input from host 192.168.1.2 on port 20000 using LR model sd/lr100, with 1.0 second intervals
2018-03-01T17:23:39.408Z: Interval 1: Everything is OK (104 sensor events in interval)
...
2018-03-01T17:23:46.124Z: Interval 8: Attention needed (99 sensor events in interval)
...
2018-03-01T17:24:40Z: 6000 events received in 61.1 seconds (60 intervals), or 98 sensor events/second

Explanation:

  • iotgen_lr.py creates 1000 rows w/ 100 sensors each, in 8 partitions, with 50% '1' labels, and stores in local file sd/sensor_data1k_100
  • iottrain_lr.py trains model sd/lr100 using that data
  • sim_sensors_lr.py generates 100 sensor events per second ranging over 100 sensors, for 60 seconds (6000 events) Its output is piped through nc, which opens up a socket on port 20000, listening (-l) for connections, keeping it open (-k)
  • iotstream_lr.py connects to the socket at IP 192.168.1.2 port 20000, batches the inputs in 1 second intervals, combines the inputs for that interval into 100-long vectors of sensors values, and runs those vectors through Logistic Regression
    model lr100, printing whether the input is OK or attention is needed. If a particular sensor is not read during the reporting interval the sensor's current value is used unchanged.

Notes:

  • in local mode data is created in separate partitions, as in HDFS. To create single file: cat sd/sensor_data1k_100/part* > sensor_data1k_100_merged
    wc -l sensor_data1k_100_merged
    1000 sensor_data1k_100_merged

  • redirect iotstream_lr.py std_err output to a file to avoid irrelevant error message: spark-submit iotstream_lr.py 100 1 192.168.1.2 20000 local sd lr100 2> err.out

  • in iotgen_lr.py, n_rows will be adjusted upward if necessary if n_rows is not evenly divisble by n_partitions

Spark 1.6.2 with HDFS:

spark-submit --num-executors 100 --executor-cores 1 --executor-memory 10g --conf spark.yarn.executor.memoryOverhead=3072 iotgen_lr.py 1000000 1000 500 HDFS sd sensor_data1M_1000
   
spark-submit --num-executors=20 --executor-cores=5 --executor-memory=50g iottrain_lr.py HDFS sd sensor_data1M_1000 lr_model1_1000
    
In one shell:   
  python sim_sensors_lr.py 1000 1000 40000 | nc -lk 20000
      
In a 2nd shell on the same or different servers: 
  spark-submit --num-executors=20 --executor-cores=5 --executor-memory=50g iotstream_lr.py 1000 1 192.168.1.1 20000 HDFS sd lr_model1_1000

Explanation:

  • iotgen_lr.py creates 1M rows w/ 1000 sensors each, in 500 partitions (5 per executor), with 50% '1' labels, and stores in HDFS as `/user//sd/sensor_data1M_1000``
  • iottrain_lr.py trains model lr_model1_1000 using that data and stores in same HDFS directory
  • sim_sensors_lr.py generates 1000 sensor events per second ranging over 1000 sensors, for 40 seconds (40000 events) Its output is piped through nc, which opens up a socket on port 20000, listening (-l) for connections, keeping it open (-k)
  • iotstream_lr.py connects to the socket at IP 192.168.1.2 port 20000, batches the inputs in 1 second intervals, combines the inputs for that interval into 1000-long vectors of sensors values, and runs those vectors through Logistic Regression
    model lr_model1_1000, printing whether the input is OK or attention is needed. If a particular sensor is not read during the reporting interval the sensor's current value is used unchanged.

Spark Standalone on Spark 2.3.0:

For S3 add the following lines to spark/conf/spark-defaults.conf on the master:

spark.hadoop.fs.s3a.access.key  <access key>
spark.hadoop.fs.s3a.secret.key  <secret key>

Then download the following two jars and copy to spark/jars on all nodes:

wget http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
spark-submit --master spark://bd-s01-n2.localdomain:7077 --conf spark.executor.cores=1 --executor-memory 4g iotgen_lr.py 4000 100 80 S3 davejaffedata sensor_data4k_30 2613
2018-03-01T20:50:58Z: Creating file s3a://davejaffedata/sensor_data4k_30 with 4000 rows of 100 sensors, each row preceded by score using cutoff 2613.0, in 80 partitions
2018-03-01T20:54:26Z: Created file s3a://davejaffedata/sensor_data4k_30 with size 3.1MB

spark-submit --master spark://bd-s01-n2.localdomain:7077 --conf spark.executor.cores=2 --executor-memory 20g  iottrain_lr.py S3 davejaffedata sensor_data4k_30 lr_model1_100_30
2018-03-01T20:56:00Z: Training logistic regression model and storing as s3a://davejaffedata/lr_model1_100_30 using data from s3a://davejaffedata/sensor_data4k_30
2018-03-01T20:58:13Z: Trained logistic regression model and stored as s3a://davejaffedata/lr_model1_100_30

In one shell: 
  python sim_sensors_lr.py 100 100 6000 | nc -lk 20000
In a 2nd shell on the same or different servers: 
  spark-submit --master spark://bd-s01-n2.localdomain:7077 --conf spark.executor.cores=4 --executor-memory 40g iotstream_lr.py 100 1 192.168.1.2 20000 S3 davejaffedata lr_model1_100_30
2018-03-01T21:07:05Z: Analyzing stream of input from host 192.168.1.2 on port 20000 using LR model s3a://davejaffedata/lr_model1_100_30, with 1.0 second intervals
2018-03-01T21:07:24.927Z: Interval 1: Everything is OK (103 sensor events in interval)
...
2018-03-01T21:07:35.161Z: Interval 12: Attention needed (99 sensor events in interval)
...
2018-03-01T21:08:12Z: 6000 events received in 48.3 seconds (47 intervals), or 124 sensor events/second

Compiling Scala code

  • Install Scala (2.10.6 and 2.11.8 tested)

  • Modify build.sbt for correct version of Scala and Spark

  • To create package with external dependencies:

cd scala
sbt package

Creates iotstream_2.11-0.0.1.jar

  • To create assembly which can be used standalone:
cd scala
cp build.sbt.assembly build.sbt
sbt assembly

Creates iotstream-assembly-0.0.1.jar

Kafka/Scala version

Add to spark-defaults.com:

spark.jars.packages      org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0  (add to other packages if using S3)

Obtain latest Kafka build from http://kafka.apache.org

Install and start using included ZooKeeper:

tar -xzvf kafka_2.11-1.0.0.tgz
ln -s kafka_2.11-1.0.0 kafka
cd kafka
bin/zookeeper-server-start.sh config/zookeeper.properties & # Defaults to port 2181
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic perf3
Created topic "perf3".

In directory containing iotstream_2.11-0.0.1.jar, run programs:

spark-submit --master spark://bd-s01-n2.localdomain:7077 --conf spark.cores.max=40 --conf spark.executor.cores=4 --executor-memory 40g --name iotgen_lr --class com.iotstream.iotgen_lr iotstream_2.11-0.0.1.jar 1000 100000 200 S3 davejaffedata sensor_data1K_100K_40 2501427837
2018-02-20T05:45:00.916Z: Creating file s3a://davejaffedata/sensor_data1K_100K_40 with 1000 rows of 100000 sensors, each row preceded by score using cutoff 2501427837.0, in 200 partitions
2018-02-20T05:45:25.018Z: Created file s3a://davejaffedata/sensor_data1K_100K_40 with size 762.9MB

spark-submit --master spark://bd-s01-n2.localdomain:7077 --name iottrain_lr --class com.iotstream.iottrain_lr iotstream_2.11-0.0.1.jar s3 davejaffedata sensor_data1K_100K_40 lr100K_40****
2018-02-20T05:47:40.459Z: Training logistic regression model and storing as s3a://davejaffedata/lr100K_40 using data from s3a://davejaffedata/sensor_data1K_100K_40
2018-02-20T05:48:15.295Z: Trained logistic regression model and stored as s3a://davejaffedata/lr100K_40

spark-submit --name iotstream_lr_kafka --class com.iotstream.iotstream_lr_kafka iotstream_2.11-0.0.1.jar 100000 1 localhost:9092 perf3 s3 davejaffedata lr100K_40
2018-02-20T06:04:58.022Z: Analyzing stream of input from kafka topic perf3 with kafka server(s) localhost:9092, using LR model s3a://davejaffedata/lr100K_40, with 1 second intervals
No input  
No input  <start sim_sensors (next command)>
...
2018-02-20T06:05:21.320Z: Interval 0: Everything is OK (20270 sensor events in interval)
2018-02-20T06:05:22.608Z: Interval 1: Everything is OK (96175 sensor events in interval)
2018-02-20T06:05:23.559Z: Interval 2: Everything is OK (104938 sensor events in interval)
...
2018-02-20T06:05:42.575Z: Interval 21: Attention needed (105654 sensor events in interval)
...
2018-02-20T06:06:58.380Z: 10000000 events received in 96.4 seconds (96 intervals), or 103764 sensor events/second

java -cp iotstream_2.11-0.0.1.jar:/root/kafka/libs/* com.iotstream.sim_sensors_lr_kafka 100000 200000 10000000 192.168.1.2:9092 perf3
2018-02-20T06:05:20.177Z: Sending 200000 sensor events per second representing 100000 sensors for a total of 10000000 events to Kafka topic perf3 using Kafka server(s) 192.168.1.2:9092

2018-02-20T06:05:22.797Z: 200000 events sent
2018-02-20T06:05:24.685Z: 400000 events sent
...
2018-02-20T06:06:55.701Z: 10000000 events sent
2018-02-20T06:06:55.705Z: Sent 10000000 events to Kafka topic perf3 in 95.248927069 seconds

To run offline, comment out spark.jars.packages line in spark-defaults.com and use iotstream-assembly-0.0.1.jar (available from author) in place of iotstream_2.11-0.0.1.jar above with local or HDFS storage

Note: add Java definition to last command to use Kafka configuration files, eg

java -cp iotstream-assembly-0.0.1.jar -Dlog4j.configuration=file:/root/kafka/config/tools-log4j.properties com.iotstream.sim_sensors_lr_kafka 100000 200000 10000000 perf3

MQTT version

Add to spark-defaults.com:

spark.jars.packages      org.apache.bahir:spark-streaming-mqtt_2.11:2.2.0  (add to other packages if using S3)

Install and start MQTT:

yum install mosquitto
pip install mqtt
pip install paho-mqtt
mosquitto
1524237101: mosquitto version 1.4.15 (build date 2018-03-17 10:23:28+0000) starting
1524237101: Using default config.
1524237101: Opening ipv4 listen socket on port 1883.
1524237101: Opening ipv6 listen socket on port 1883.

In directory containing iotstream_2.11-0.0.1.jar, run programs:

spark-submit --name iotstream_lr_mqtt --class com.iotstream.iotstream_lr_mqtt iotstream_2.11-0.0.1.jar 1000 1 localhost 1883 t1 local sd lr1K

scala -cp iotstream_2.11-0.0.1.jar:/root/org.eclipse.paho.client.mqttv3-1.2.0.jar com.iotstream.sim_sensors_lr_mqtt 1000 1000 10000 localhost 1883 t1

Miscellaneous Notes

Sensor value random model and cutoff

The sensor value model is a simple one where each value is a uniform random number between 0 and 1, representing scaled actual sensor values. To calculate the label, each value in the sensor vector is weighted by its index, and then summed, with the resulting score for the vector then compared to the cutoff value to determine the label. Scores greater than the cutoff are labeled '1', meaning that particular sensor value vector is found to cause the failure condition.

The average score for this model is given by .25 * n_sensors * (n_sensors+1), so the default cutoff is set to this value, resulting in approximately 50% '1' labels. For a smaller percentages of '1' labels (resulting in less attention needed warnings in the iotstream output), select a smaller value for the given number of sensors from the table below:

Select cutoff to give desired % of 1 labels for given number of sensors - use calc_cutoffs.py to calculate for this model

% 1 Labels 100 Sensors 500 Sensors 1000 Sensors 10000 Sensors 100000 Sensors
1 2912 66951 262530 25395100 2512426299
2 2868 66442 261040 25348000 2510579784
10 2741 65015 256970 25215000 2506932260
20 2667 64184 254670 25142500 2504623988
30 2613 63593 253000 25089700 2502877073
40 2567 63090 251570 25043400 2501427837
50 2525 62625 250250 25002500 2500025000

The utility program calc_cutoffs.py can be used to calculate other values for this model.

Here's an example of creating a 1TB training dataset with 10,000 sensors and 20% '1' labels:

spark-submit --num-executors 200 --executor-cores 1 --executor-memory 12g --conf spark.yarn.executor.memoryOverhead=3072 iotgen_lr.py 8000000 10000 800 HDFS sd sensor_data8M_10k_20 25142500

The Logistic Regression Model

Since accuracy is not a requirement here, Logistic Regression model uses all default values with no attempts at optimization.

Installing python numpy on Linux:

yum -y install python-pip
pip install --upgrade pip
pip install numpy

Generating training data without Spark

iotgen_lr_python.py is a Python-only version of iotgen_lr.py

Releases & Major Branches

Master

Contributing

The iot-analytics-benchmark project team welcomes contributions from the community. If you wish to contribute code and you have not signed our contributor license agreement (CLA), our bot will update the issue when you open a Pull Request. For any questions about the CLA process, please refer to our FAQ. For more detailed information, refer to CONTRIBUTING.md. Any questions or suggestions, please contact the author, Dave Jaffe at djaffe@vmware.com.

License

Copyright (c) 2018 VMware, Inc.

This product is licensed to you under the Apache 2.0 license (the "License"). You may not use this product except in compliance with the Apache 2.0 License.

This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.