Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
681 lines (597 sloc) 18 KB

Define and trigger a service topology

In FogFlow a service topology is defined as a graph of several operators. Each operator in the service topology is annotated with its inputs and outputs, which indicate their dependency to the other tasks in the same topology. Different from fog functions, a service topology is triggerred on demand by a customized "intent" object.

With a simple example we explain how developers can define and test a service topology in the following section.

Use case on anomaly detection

This use case study is for retail stores to detect abnormal energy consumption in real-time. As illustrated in the following picture, a retail company has a large number of shops distributed in different locations. For each shop, a Raspberry Pi device (edge node) is deployed to monitor the power consumption from all PowerPanels in the shop. Once an abnormal power usage is detected on the edge, the alarm mechanism in the shop is triggered to inform the shop owner. Moreover, the detected event is reported to the cloud for information aggregation. The aggregated information is then presented to the system operator via a dashboard service. In addition, the system operator can dynamically update the rule for anomaly detection.

figures/retails.png
  • Anomaly Detector: this operator is to detect anomaly events based on the collected data from power panels in a retail store. It has two types of inputs:

    • detection rules, which are provided and updated by the operator; The detection rules input stream type is associated with broadcast, meaning that the rules are needed by all task instances of this operator. The granularity of this operator is based on shopID, meaning that a dedicated task instance will be created and configured for each shop
    • sensor data from power panel
  • Counter: this operator is to count the total number of anomaly events for all shops in each city.

    Therefore, its task granularity is by city. Its input stream type is the output stream type of the previous operator (Anomaly Detector).

There are two types of result consumers:

  1. a dashboard service in the cloud, which subscribes to the final aggregation results generated by the counter operator for the global scope;
  2. the alarm in each shop, which subscribes to the anomaly events generated by the Anomaly Detector task on the local edge node in the retail store.
figures/retail-flow.png

Implement your operator functions required in your service topology

Before you can define the designed service topology, all operators used in your service topology must be provided by you or the other provider in the FogFlow system. For this specific use case, we need to implement two operators: anomaly_detector and counter. Please refer to the examples provided in our code repository.

Specify a service topology

Assume that the tasks to be used in your service topology have been implemented and registered, you can have two ways to specify your service topology.

using FogFlow Topology Editor

The first way is to use the FogFlow editor to specify a service topology.

figures/retail-topology-1.png

As seen in the picture, the following important information must be provided.

  1. define topology profile, including
    • topology name: the unique name of your topology
    • service description: some text to describe what this service is about
  2. draw the graph of data processing flows within the service topology

    With a right click at some place of the design board, you will see a menu pops up and then you can start to choose either task or input streams or shuffle to define your data processing flows according to the design you had in mind.

  3. define the profile for each element in the data flow, including

    As shown in the above picture, you can start to specify the profile of each element in the data processing flow by clicking the configuration button.

    The following information is required to specify a task profile.

    • name: the name of the task
    • operator: the name of the operator that implements the data processing logic of this task; please register your operator beforehand so that it can be shown from the list
    • entity type of output streams: to specify the entity type of the produced output stream.

    The following information is required to specify an EntityStream Profile.

    • SelectedType: is used to define what Entity Type will be chosen by the task as its Input Stream

    • SelectedAttributes: is used to define what attribute (or attributes) of the Selected Entity Type will be considered for changing the state of a task.

    • Groupby: to determine how many instances of this task should be created on the fly; currently including the following cases

      • if there is only one instance to be created for this task, please use "groupby" = "all"
      • if you need to create one instance for each entity ID of the input streams, please user "groupby" = "entityID"
      • if you need to create one instance for each unique value of some specific context metadata, please use the name of this registered context metadata
    • Scoped: tells if the Entity data are location-specific or not. True indicates that location-specific data are recorded in the Entity and False is used in case of broadcasted data, for example, some rule or threshold data that holds true for all locations, not for a specific location.

    Shuffling element serves as a connector between two tasks such that output of a task is the input for the shuffle element and same is forwarded by Shuffle to another task (or tasks) as input.

using NGSI Update to create it

Another way is to register a service topology by sending a constructed NGSI update message to the IoT Broker deployed in the cloud.

Here are the Curl and the Javascript-based code to register the service topology that is given in the above image. Users can take reference of the above service topology, i.e., anomaly detection to understand this code.

Note

In the Javascript code example, we use the Javascript-based library to interact with FogFlow IoT Broker. You can find out the library from the github code repository (designer/public/lib/ngsi). You must include ngsiclient.js into your web page.

Note

The Curl case assumes that the cloud IoT Broker is running on localhost on port 8070.

.. tabs::

   .. group-tab:: curl

        .. code-block:: console

                curl -iX POST \
                        'http://localhost:8070/ngsi10/updateContext' \
                        -H 'Content-Type: application/json' \
                        -d '
                        {
                                "contextElements": [
                                {
                                        "entityId":{
                                                "id":"Topology.anomaly-detection",
                                                "type":"Topology"
                                        },
                                        "attributes":[
                                        {
                                                "name":"status",
                                                "type":"string",
                                                "value":"enabled"
                                        },
                                        {
                                                "name":"designboard",
                                                "type":"object",
                                                "value":{
                                                        "blocks":[
                                                        {
                                                                "id":1,
                                                                "module":null,
                                                                "type":"Task",
                                                                "values":{
                                                                        "name":"Counting",
                                                                        "operator":"counter",
                                                                        "outputs":[
                                                                                "Stat"
                                                                        ]
                                                                },
                                                                "x":202,
                                                                "y":-146
                                                        },
                                                        {
                                                                "id":2,
                                                                "module":null,
                                                                "type":"Task",
                                                                "values":{
                                                                        "name":"Detector",
                                                                        "operator":"anomaly",
                                                                        "outputs":[
                                                                                "Anomaly"
                                                                        ]
                                                                },
                                                                "x":-194,
                                                                "y":-134
                                                        },
                                                        {
                                                                "id":3,
                                                                "module":null,
                                                                "type":"Shuffle",
                                                                "values":{
                                                                        "groupby":"ALL",
                                                                        "selectedattributes":[
                                                                                "all"
                                                                        ]
                                                                },
                                                                "x":4,
                                                                "y":-18
                                                        },
                                                        {
                                                                "id":4,
                                                                "module":null,
                                                                "type":"EntityStream",
                                                                "values":{
                                                                        "groupby":"EntityID",
                                                                        "scoped":true,
                                                                        "selectedattributes":[
                                                                                "all"
                                                                        ],
                                                                        "selectedtype":"PowerPanel"
                                                                },
                                                                "x":-447,
                                                                "y":-179
                                                        },
                                                        {
                                                                "id":5,
                                                                "module":null,
                                                                "type":"EntityStream",
                                                                "values":{
                                                                        "groupby":"ALL",
                                                                        "scoped":false,
                                                                        "selectedattributes":[
                                                                                "all"
                                                                        ],
                                                                        "selectedtype":"Rule"
                                                                },
                                                                "x":-438,
                                                                "y":-5
                                                        }
                                                        ],
                                                        "edges":[
                                                        {
                                                                "block1":3,
                                                                "block2":1,
                                                                "connector1":[
                                                                        "stream",
                                                                        "output"
                                                                ],
                                                                "connector2":[
                                                                        "streams",
                                                                        "input"
                                                                ],
                                                                "id":2
                                                        },
                                                        {
                                                                "block1":2,
                                                                "block2":3,
                                                                "connector1":[
                                                                        "outputs",
                                                                        "output",
                                                                         0
                                                                ],
                                                                "connector2":[
                                                                        "in",
                                                                        "input"
                                                                ],
                                                                "id":3
                                                        },
                                                        {
                                                                "block1":4,
                                                                "block2":2,
                                                                "connector1":[
                                                                        "stream",
                                                                        "output"
                                                                ],
                                                                "connector2":[
                                                                        "streams",
                                                                        "input"
                                                                ],
                                                                "id":4
                                                        },
                                                        {
                                                                "block1":5,
                                                                "block2":2,
                                                                "connector1":[
                                                                        "stream",
                                                                        "output"
                                                                ],
                                                                "connector2":[
                                                                        "streams",
                                                                        "input"
                                                                        ],
                                                                "id":5
                                                        }
                                                        ]
                                                }
                                        },
                                        {
                                                "name":"template",
                                                "type":"object",
                                                "value":{
                                                        "description":"detect anomaly events in shops",
                                                        "name":"anomaly-detection",
                                                        "tasks":[
                                                        {
                                                                "input_streams":[
                                                                {
                                                                        "groupby":"ALL",
                                                                        "scoped":true,
                                                                        "selected_attributes":[

                                                                        ],
                                                                        "selected_type":"Anomaly"
                                                                }
                                                                ],
                                                                "name":"Counting",
                                                                "operator":"counter",
                                                                "output_streams":[
                                                                {
                                                                        "entity_type":"Stat"
                                                                }
                                                                ]
                                                        },
                                                        {
                                                                "input_streams":[
                                                                {
                                                                        "groupby":"EntityID",
                                                                        "scoped":true,
                                                                        "selected_attributes":[

                                                                        ],
                                                                        "selected_type":"PowerPanel"
                                                                },
                                                                {
                                                                        "groupby":"ALL",
                                                                        "scoped":false,
                                                                        "selected_attributes":[

                                                                        ],
                                                                        "selected_type":"Rule"
                                                                }
                                                                ],
                                                                "name":"Detector",
                                                                "operator":"anomaly",
                                                                "output_streams":[
                                                                {
                                                                        "entity_type":"Anomaly"
                                                                }
                                                                ]
                                                        }
                                                        ]
                                                }
                                        }
                                        ],
                                        "domainMetadata":[
                                        {
                                                "name":"location",
                                                "type":"global",
                                                "value":"global"
                                        }
                                        ]
                                }
                        ],
                        "updateAction": "UPDATE"
                }'


   .. code-tab:: javascript

                // the json object that represent the structure of your service topology
                // when using the FogFlow topology editor, this is generated by the editor
                var topology = {
                        "name":"template",
                        "type":"object",
                        "value":{
                                "description":"detect anomaly events in shops",
                                "name":"anomaly-detection",
                                "tasks":[
                                {
                                        "input_streams":[
                                        {
                                                "groupby":"ALL",
                                                "scoped":true,
                                                "selected_attributes":[

                                                ],
                                                "selected_type":"Anomaly"
                                        }
                                        ],
                                        "name":"Counting",
                                        "operator":"counter",
                                        "output_streams":[
                                        {
                                                "entity_type":"Stat"
                                        }
                                        ]
                                },
                                {
                                        "input_streams":[
                                        {
                                                "groupby":"EntityID",
                                                "scoped":true,
                                                "selected_attributes":[

                                                ],
                                                "selected_type":"PowerPanel"
                                        },
                                        {
                                                "groupby":"ALL",
                                                "scoped":false,
                                                "selected_attributes":[

                                                ],
                                                "selected_type":"Rule"
                                        }
                                        ],
                                        "name":"Detector",
                                        "operator":"anomaly",
                                        "output_streams":[
                                        {
                                                "entity_type":"Anomaly"
                                        }
                                        ]
                                }
                                ]
                        }
                }

                var design = {
                        "name":"designboard",
                        "type":"object",
                        "value":{
                                "blocks":[
                                {
                                        "id":1,
                                        "module":null,
                                        "type":"Task",
                                        "values":{
                                                "name":"Counting",
                                                "operator":"counter",
                                                "outputs":[
                                                        "Stat"
                                                ]
                                        },
                                        "x":202,
                                        "y":-146
                                },
                                {
                                        "id":2,
                                        "module":null,
                                        "type":"Task",
                                        "values":{
                                                "name":"Detector",
                                                "operator":"anomaly",
                                                "outputs":[
                                                        "Anomaly"
                                                ]
                                        },
                                        "x":-194,
                                        "y":-134
                                },
                                {
                                        "id":3,
                                        "module":null,
                                        "type":"Shuffle",
                                        "values":{
                                                "groupby":"ALL",
                                                "selectedattributes":[
                                                        "all"
                                                ]
                                        },
                                        "x":4,
                                        "y":-18
                                },
                                {
                                        "id":4,
                                        "module":null,
                                        "type":"EntityStream",
                                        "values":{
                                                "groupby":"EntityID",
                                                "scoped":true,
                                                "selectedattributes":[
                                                        "all"
                                                ],
                                                "selectedtype":"PowerPanel"
                                        },
                                        "x":-447,
                                        "y":-179
                                },
                                {
                                        "id":5,
                                        "module":null,
                                        "type":"EntityStream",
                                        "values":{
                                                "groupby":"ALL",
                                                "scoped":false,
                                                "selectedattributes":[
                                                        "all"
                                                ],
                                                "selectedtype":"Rule"
                                        },
                                        "x":-438,
                                        "y":-5
                                }
                                ],
                                "edges":[
                                {
                                        "block1":3,
                                        "block2":1,
                                        "connector1":[
                                                "stream",
                                                "output"
                                        ],
                                        "connector2":[
                                                "streams",
                                                "input"
                                        ],
                                        "id":2
                                },
                                {
                                        "block1":2,
                                        "block2":3,
                                        "connector1":[
                                                "outputs",
                                                "output",
                                                0
                                        ],
                                        "connector2":[
                                                "in",
                                                "input"
                                        ],
                                        "id":3
                                },
                                {
                                        "block1":4,
                                        "block2":2,
                                        "connector1":[
                                                "stream",
                                                "output"
                                        ],
                                        "connector2":[
                                                "streams",
                                                "input"
                                        ],
                                        "id":4
                                },
                                {
                                        "block1":5,
                                        "block2":2,
                                        "connector1":[
                                                "stream",
                                                "output"
                                        ],
                                        "connector2":[
                                                "streams",
                                                "input"
                                        ],
                                        "id":5
                                }
                                ]
                        }
                }

                //submit it to FogFlow via NGSI Update
                var topologyCtxObj = {};

                topologyCtxObj.entityId = {
                        id : 'Topology.' + topology.value.name,
                        type: 'Topology',
                        isPattern: false
                };

                topologyCtxObj.attributes = {};
                topologyCtxObj.attributes.status = {type: 'string', value: 'enabled'};
                topologyCtxObj.attributes.designboard = design;
                topologyCtxObj.attributes.template = topology;

                // assume the config.brokerURL is the IP of cloud IoT Broker
                var client = new NGSI10Client(config.brokerURL);

                // send NGSI10 update
                client.updateContext(topologyCtxObj).then( function(data) {
                        console.log(data);
                }).catch( function(error) {
                        console.log('failed to submit the topology');
                });


Trigger the service topology by sending an Intent

Once developers submit a specified service topology and the implemented operators, the service data processing logic can be triggered by following two steps:

  • Sending a high level intent object which breaks the service topology into separate tasks
  • Providing Input Streams to the tasks of that service topology.

The intent object is sent using the fogflow dashboard with the following properties:

  • Topology: specifies which topology the intent object is meant for.
  • Priority: defines the priority level of all tasks in your topology, which will be utilized by edge nodes to decide how resources should be assigned to the tasks.
  • Resource Usage: defines how a topology can use resources on edge nodes. Sharing in an exclusive way means the topology will not share the resources with any task from other topologies. The other way is inclusive one.
  • Objective: of maximum throughput, minimum latency and minimum cost can be set for task assignment at workers. However, this feature is not fully supported yet, so it can be set as "None" for now.
  • Geoscope: is a defined geographical area where input streams should be selected. Global as well as custom geoscopes can be set.
map to buried treasure

Fogflow topology master will now be waiting for input streams for the tasks contained in the service topology. As soon as context data are received, which fall within the scope of the intent object, tasks are launched on the nearest workers.

Here are curl examples to send Input streams for Anomaly-Detector use case. It requires PowerPanel as well as Rule data.

Note

Users can also use Simulated Powerpanel Devices to send PowerPanel data.

Note

The Curl case assumes that the cloud IoT Broker is running on localhost on port 8070.

curl -iX POST \
  'http://localhost:8070/ngsi10/updateContext' \
-H 'Content-Type: application/json' \
-d '
{
        "contextElements": [
        {
           "entityId":{
              "id":"Device.PowerPanel.01",
              "type":"PowerPanel"
           },
           "attributes":[
              {
                 "name":"usage",
                 "type":"integer",
                 "value":4
              },
              {
                 "name":"shop",
                 "type":"string",
                 "value":"01"
              },
              {
                 "name":"iconURL",
                 "type":"string",
                 "value":"/img/shop.png"
              }
           ],
           "domainMetadata":[
              {
                 "name":"location",
                 "type":"point",
                 "value":{
                    "latitude":35.7,
                    "longitude":138
                 }
              },
              {
                 "name":"shop",
                 "type":"string",
                 "value":"01"
              }
           ]
        } ],
        "updateAction": "UPDATE"
}'
curl -iX POST \
  'http://localhost:8070/ngsi10/updateContext' \
-H 'Content-Type: application/json' \
-d '
{
        "contextElements": [
        {
           "entityId":{
              "id":"Stream.Rule.01",
              "type":"Rule"
           },
           "attributes":[
              {
                 "name":"threshold",
                 "type":"integer",
                 "value":30
              }
           ]
        }],
        "updateAction": "UPDATE"
}'
You can’t perform that action at this time.