Skip to content

Commit

Permalink
Example 5
Browse files Browse the repository at this point in the history
  • Loading branch information
sonsoleslp committed Oct 1, 2018
1 parent 26c7093 commit 0909b6c
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 72 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ curl localhost:1026/v2/entities/Room1
```


## Other operations
## Example 4: Other operations

The examples provided focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window, which is already available with Flink.
The previous examples focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window, which is already available with Flink.
Nevertheless, Flink allows to perform custom operations such as calculating the average. For this, we need to define an `AggregateFunction` that performs this operation.

```
Expand Down Expand Up @@ -431,3 +431,7 @@ val processedDataStream = eventStream
.timeWindow(Time.seconds(5), Time.seconds(2))
.aggregate(new AverageAggregate)
```

## Example 5: Structured values for attributes

// TODO
29 changes: 29 additions & 0 deletions files/example4/curl_Notification.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
while true
do
temp=$(shuf -i 18-53 -n 1)
number=$(shuf -i 1-3113 -n 1)

curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1","type": "Node",
"co": {"type": "Float","value": 0,"metadata": {}},
"co2": {"type": "Float","value": 0,"metadata": {}},
"humidity": {"type": "Float","value": 40,"metadata": {}},
"pressure": {"type": "Float","value": '$number',"metadata": {}},
"temperature": {"type": "Float","value": '$temp',"metadata": {}},
"wind_speed": {"type": "Float","value": 1.06,"metadata": {}}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'


sleep 1
done
50 changes: 50 additions & 0 deletions files/example5/curl_Notification.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
while true
do
temp=$(shuf -i 18-53 -n 1)
number=$(shuf -i 1-3113 -n 1)

curl -v -s -S X POST http://localhost:9001 \
--header 'Content-Type: application/json; charset=utf-8' \
--header 'Accept: application/json' \
--header 'User-Agent: orion/0.10.0' \
--header "Fiware-Service: demo" \
--header "Fiware-ServicePath: /test" \
-d '{
"data": [
{
"id": "R1",
"type": "Node",
"information": {
"type": "object",
"value": {
"buses":[
{
"name": "BusCompany1",
"schedule": {
"morning": [7,9,11],
"afternoon": [13,15,17,19],
"night" : [23,1,5]
},
"price": 19
},
{
"name": "BusCompany2",
"schedule": {
"morning": [8,10,12],
"afternoon": [16,20],
"night" : [23]
},
"price": 14
}
]
},
"metadata": {}
}
}
],
"subscriptionId": "57458eb60962ef754e7c0998"
}'


sleep 1
done
65 changes: 0 additions & 65 deletions orion.flink.connector.examples.iml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package org.fiware.cosmos.orion.flink.connector.examples.example1
package org.fiware.cosmos.orion.flink.connector.examples.example4

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.fiware.cosmos.orion.flink.connector.OrionSource

/**
* Example1 Orion Connector
* Example4 Orion Connector
* @author @sonsoleslp
*/
object Example1_avg{
object Example4{

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.fiware.cosmos.orion.flink.connector.examples.example5

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.OrionSource

/**
* Example5 Orion Connector
* @author @sonsoleslp
*/
object Example5{

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create Orion Source. Receive notifications on port 9001
val eventStream = env.addSource(new OrionSource(9001))

// Process event stream
val processedDataStream = eventStream
.flatMap(event => event.entities)
.map(entity => {
entity.attrs("information").value.asInstanceOf[Map[String, Any]]
})
.map(list => list("buses").asInstanceOf[List[Map[String,Any]]])
.flatMap(bus => bus )
.map(bus => new Bus(bus("name").asInstanceOf[String],
bus("schedule").asInstanceOf[ Map[String, List[ scala.math.BigInt]]],
bus("price").asInstanceOf[ scala.math.BigInt]))
.keyBy("name")
.timeWindow(Time.seconds(5), Time.seconds(2))
.min("price")

// print the results with a single thread, rather than in parallel

processedDataStream.print().setParallelism(1)

env.execute("Socket Window NgsiEvent")
}
case class Bus(name: String, schedule: Map[String, List[ scala.math.BigInt]], price: scala.math.BigInt)

}

0 comments on commit 0909b6c

Please sign in to comment.