Skip to content

Commit

Permalink
Example 5
Browse files Browse the repository at this point in the history
  • Loading branch information
sonsoleslp committed Oct 4, 2018
1 parent e6d61b4 commit 10aa067
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Specifically, the example receives a notification every second that a node chang
### Simulating a notification
In order to simulate the notifications coming from the Context Broker you can run the following script (available at `files/example1/curl_Notification.sh`):

```
```bash
while true
do
temp=$(shuf -i 18-53 -n 1)
Expand Down Expand Up @@ -60,7 +60,7 @@ done

### Receiving data and performing operations
This is the code of the example which is explained step by step below:
```
```scala
package org.fiware.cosmos.orion.flink.connector.examples.example1

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
Expand Down Expand Up @@ -97,7 +97,7 @@ object Example1{

After importing the necessary dependencies, the first step is creating the source and adding it to the environment.

```
```scala
val eventStream = env.addSource(new OrionSource(9001))
```

Expand All @@ -106,13 +106,13 @@ You can check the details of this object in the [connector docs](https://github.

In the example, the first step of the processing is flat-mapping the entities. This operation is performed in order to put together the entity objects of severall NGSI Events.

```
```scala
val processedDataStream = eventStream
.flatMap(event => event.entities)
```

Once you have all the entities, you can iterate over them (with `map`) and extract the desired attributes; in this case, it is the temperature.
```
```scala
// ...
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
Expand All @@ -121,30 +121,30 @@ Once you have all the entities, you can iterate over them (with `map`) and extra
```

In each iteration you create a custom object with the properties you need: the entity id and the temperature. For this purpose, you can define a case class like so:
```
```scala
case class Temp_Node(id: String, temperature: Float)
```

Now you can group the created objects by entity id and perform operations on them:
```
```scala
// ...
.keyBy("id")
```

You can provide a custom processing window, like so:
```
```scala
// ...
.timeWindow(Time.seconds(5), Time.seconds(2))
```

And then specify the operation to perform in said time interval:
```
```scala
// ...
.min("temperature")
```

After the processing, you can print the results on the console:
```
```scala
processedDataStream.print().setParallelism(1)
```

Expand All @@ -164,7 +164,7 @@ docker-compose up

Once you have the Context Broker and the rest of the machines up and running, you need to create some entities and subscribe to them in order to get a notification whenever their value change.
First, let's create a room entity (you can find the script under `files/example2/curl_CreateNewEntity.sh`):
```
```bash
curl localhost:1026/v2/entities -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"id": "Room1",
Expand All @@ -186,7 +186,7 @@ EOF
```

Now you can subscribe to any changes in the attributes you are interested in. Again, you can find this script under (`files/example2/curl_SubscribeToEntityNotifications.sh`). Do not forget to change `$MY_IP` to your machine's IP Address (must be accesible from the docker container):
```
```bash
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "A subscription to get info about Room1",
Expand Down Expand Up @@ -220,14 +220,14 @@ EOF
```

You might want to check that you have created it correcty by running:
```
```bash
curl localhost:1026/v2/subscriptions
```


### Triggering notifications
Now you may start performing changes in the entity's attributes. For that, you can use the following script (`files/example2/curl_ChangeAttributes.sh`):
```
```bash
while true
do
timestamp=$(shuf -i 1-100000000 -n 1)
Expand All @@ -252,7 +252,7 @@ done
### Receiving data, performing operations and writing back to the Context Broker
Let's take a look at the Example2 code now:

```
```scala
package org.fiware.cosmos.orion.flink.connector.examples.example2

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
Expand Down Expand Up @@ -312,15 +312,15 @@ After calculating the minimum temperature, the output data needs to be adapted t
* **HTTP Method**: The HTTP method used for sending the update. It can be: `HTTPMethod.POST`, `HTTPMethod.PUT` or `HTTPMethod.PATCH`.

In the example, an **`OrionSinkObject`** is built from the `Temp_Node` object converted to JSON. Thus, the specified data type is JSON. The URL is formed with the hostname of the docker container where the Context Broker is, and the id of the specific entity we are receiving data from. It uses the HTTP Post method in order to send the message to the Context Broker.
```
```scala
// ...
.map(tempNode => {
val url = URL_CB + tempNode.id + "/attrs"
OrionSinkObject(tempNode.toString, url, CONTENT_TYPE, METHOD)
})
```
Finally, we send the processed DataStream through the OrionSink
```
```scala
OrionSink.addSink( processedDataStream )
```
If you run the example, you will see that the minimum temperature calculated is displayed in the console.
Expand All @@ -338,7 +338,7 @@ After that, we need to make some changes to our code.
### Subscribing to notifications
First, we need to change the notification URL of our subscription to point to our Flink node like so (`files/example3/curl_SubscribeToEntityNotifications.sh`):

```
```bash
curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
"description": "A subscription to get info about Room1",
Expand Down Expand Up @@ -386,7 +386,7 @@ This will build a JAR file under `target/orion.flink.connector.examples-1.0.jar`

Let's submit the Example 3 code to the Flink cluster we have deployed. In order to do this, open the Flink GUI on the browser ([http://localhost:8081](http://localhost:8081)) and select the **Submit new Job** section on the left menu.
Click the **Add New** button and upload the JAR. Once uploaded, select it from the **Uploaded JARs** list and specify the class to execute:
```
```scala
org.fiware.cosmos.orion.flink.connector.examples.example3.Example3
```

Expand All @@ -408,7 +408,7 @@ curl localhost:1026/v2/entities/Room1
The examples provided focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window, which is already available with Flink.
Nevertheless, Flink allows to perform custom operations such as calculating the average. For this, we need to define an `AggregateFunction` that performs this operation.

```
```scala
class AverageAggregate extends AggregateFunction[Temp_Node, (Float,Float), Float] {
override def createAccumulator() = (0L, 0L)

Expand All @@ -424,7 +424,7 @@ Nevertheless, Flink allows to perform custom operations such as calculating the

This aggregator function takes as an input one `Temp_Node` at a time, accumulates a tuple containing the sum of the values and the count of the values, and outputs a number with the average of the temperatures in the given timespan.
In order to associate it to our Flink DataStream we do it through the `aggregate` function.
```
```scala
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
Expand All @@ -444,7 +444,7 @@ This connector parses this sort of values as scala Maps[String,Any], which eases
Example 5 provides an example in which the data received is a list of bus schedules and their prices. These prices are constantly changing and Flink is used in order to calculate the minimum prices within a time window.

The simulated Orion notification is as follows (available at `files/example5/curl_Notification.sh`):
```
```bash
while true
do
bus1=$(shuf -i 10-53 -n 1)
Expand Down Expand Up @@ -499,7 +499,7 @@ done

The code for Example 5 is similar to the previous examples. The only difference is that it is necessary to manually parse every item of the object attribute in order to make use of it.

```
```scala
object Example5{

def main(args: Array[String]): Unit = {
Expand Down

0 comments on commit 10aa067

Please sign in to comment.