Skip to content

Commit

Permalink
Example 5
Browse files Browse the repository at this point in the history
  • Loading branch information
sonsoleslp committed Oct 4, 2018
1 parent 0909b6c commit e6d61b4
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 23 deletions.
107 changes: 99 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mvn install:install-file -Dfile=$(PATH_DOWNLOAD)/orion.flink.connector-1.0.jar -

where `PATH_DOWNLOAD` is the path where you downloaded the JAR.

## Example 1 : Receive simulated notifications
## Example 1 : Receiving simulated notifications

The first example makes use of the `OrionSource` in order to receive notifications from the Orion Context Broker. For simplicity, in this example the notifications are simulated with a curl command.
Specifically, the example receives a notification every second that a node changed its temperature, and calculates the minimum temperature in a given interval.
Expand Down Expand Up @@ -422,16 +422,107 @@ This aggregator function takes as an input one `Temp_Node` at a time, accumulate
In order to associate it to our Flink DataStream we do it through the `aggregate` function.
```
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node( entity.id, temp)
})
.keyBy("id")
.timeWindow(Time.seconds(5), Time.seconds(2))
.aggregate(new AverageAggregate)
```

## Example 5: Structured values for attributes

So far, the examples provided have dealt with simple attributes in the shape of integers or strings. Some use cases require more complex attributes, such as objects (https://fiware-orion.readthedocs.io/en/master/user/structured_attribute_valued/index.html).
This connector parses this sort of values as scala Maps[String,Any], which eases the process of iterating through their properties.

Example 5 provides an example in which the data received is a list of bus schedules and their prices. These prices are constantly changing and Flink is used in order to calculate the minimum prices within a time window.

The simulated Orion notification is as follows (available at `files/example5/curl_Notification.sh`):
```
while true
do
bus1=$(shuf -i 10-53 -n 1)
bus2=$(shuf -i 10-44 -n 1)
curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1",
"type": "Node",
"information": {
"type": "object",
"value": {
"buses":[
{
"name": "BusCompany1",
"schedule": {
"morning": [7,9,11],
"afternoon": [13,15,17,19],
"night" : [23,1,5]
},
"price": '$bus1'
},
{
"name": "BusCompany2",
"schedule": {
"morning": [8,10,12],
"afternoon": [16,20],
"night" : [23]
},
"price": '$bus2'
}
]
},
"metadata": {}
}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'
sleep 1
done
```

The code for Example 5 is similar to the previous examples. The only difference is that it is necessary to manually parse every item of the object attribute in order to make use of it.

```
object Example5{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
val temp = entity.attrs("temperature").value.asInstanceOf[Number].floatValue()
new Temp_Node( entity.id, temp)
entity.attrs("information").value.asInstanceOf[Map[String, Any]]
})
.keyBy("id")
.map(list => list("buses").asInstanceOf[List[Map[String,Any]]])
.flatMap(bus => bus )
.map(bus =>
new Bus(bus("name").asInstanceOf[String], bus("price").asInstanceOf[ scala.math.BigInt].intValue()))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.aggregate(new AverageAggregate)
```
.min("price")
## Example 5: Structured values for attributes
// print the results with a single thread, rather than in parallel
processedDataStream.print().setParallelism(1)
env.execute("Socket Window NgsiEvent")
}
case class Bus(name: String, price: Int)
}
// TODO
```
97 changes: 96 additions & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ curl localhost:1026/v2/entities/Room1
```


## Other operations
## Example 4: Other operations

The examples provided focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window, which is already available with Flink.
Nevertheless, Flink allows to perform custom operations such as calculating the average. For this, we need to define an `AggregateFunction` that performs this operation.
Expand Down Expand Up @@ -435,3 +435,98 @@ val processedDataStream = eventStream
.timeWindow(Time.seconds(5), Time.seconds(2))
.aggregate(new AverageAggregate)
```

## Example 5: Structured values for attributes

So far, the examples provided have dealt with simple attributes in the shape of integers or strings. Some use cases require more complex attributes, such as objects (https://fiware-orion.readthedocs.io/en/master/user/structured_attribute_valued/index.html).
This connector parses this sort of values as scala Maps[String,Any], which eases the process of iterating through their properties.

Example 5 provides an example in which the data received is a list of bus schedules and their prices. These prices are constantly changing and Flink is used in order to calculate the minimum prices within a time window.

The simulated Orion notification is as follows (available at `files/example5/curl_Notification.sh`):
```
while true
do
bus1=$(shuf -i 10-53 -n 1)
bus2=$(shuf -i 10-44 -n 1)
curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1",
"type": "Node",
"information": {
"type": "object",
"value": {
"buses":[
{
"name": "BusCompany1",
"schedule": {
"morning": [7,9,11],
"afternoon": [13,15,17,19],
"night" : [23,1,5]
},
"price": '$bus1'
},
{
"name": "BusCompany2",
"schedule": {
"morning": [8,10,12],
"afternoon": [16,20],
"night" : [23]
},
"price": '$bus2'
}
]
},
"metadata": {}
}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'
sleep 1
done
```

The code for Example 5 is similar to the previous examples. The only difference is that it is necessary to manually parse every item of the object attribute in order to make use of it.

```
object Example5{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))
// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
entity.attrs("information").value.asInstanceOf[Map[String, Any]]
})
.map(list => list("buses").asInstanceOf[List[Map[String,Any]]])
.flatMap(bus => bus )
.map(bus =>
new Bus(bus("name").asInstanceOf[String], bus("price").asInstanceOf[ scala.math.BigInt].intValue()))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("price")
// print the results with a single thread, rather than in parallel
processedDataStream.print().setParallelism(1)
env.execute("Socket Window NgsiEvent")
}
case class Bus(name: String, price: Int)
}
```
8 changes: 4 additions & 4 deletions files/example5/curl_Notification.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
while true
do
temp=$(shuf -i 18-53 -n 1)
number=$(shuf -i 1-3113 -n 1)
bus1=$(shuf -i 10-53 -n 1)
bus2=$(shuf -i 10-44 -n 1)

curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
Expand All @@ -25,7 +25,7 @@ do
"afternoon": [13,15,17,19],
"night" : [23,1,5]
},
"price": 19
"price": '$bus1'
},
{
"name": "BusCompany2",
Expand All @@ -34,7 +34,7 @@ do
"afternoon": [16,20],
"night" : [23]
},
"price": 14
"price": '$bus2'
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,25 @@ object Example5{
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))

// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
entity.attrs("information").value.asInstanceOf[Map[String, Any]]
})
.map(list => list("buses").asInstanceOf[List[Map[String,Any]]])
.flatMap(bus => bus )
.map(bus => new Bus(bus("name").asInstanceOf[String],
bus("schedule").asInstanceOf[ Map[String, List[ scala.math.BigInt]]],
bus("price").asInstanceOf[ scala.math.BigInt]))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("price")
.flatMap(bus => bus )
.map(bus =>
new Bus(bus("name").asInstanceOf[String], bus("price").asInstanceOf[ scala.math.BigInt].intValue()))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("price")

// print the results with a single thread, rather than in parallel

processedDataStream.print().setParallelism(1)

env.execute("Socket Window NgsiEvent")
}
case class Bus(name: String, schedule: Map[String, List[ scala.math.BigInt]], price: scala.math.BigInt)

case class Bus(name: String, price: Int)
}

0 comments on commit e6d61b4

Please sign in to comment.