Skip to content

Commit

Permalink
Merge pull request #388 from telefonicaid/task/adapt_release_0.8.0
Browse files Browse the repository at this point in the history
task/adapt_release_0.8.0
  • Loading branch information
Fermín Galán Márquez committed May 8, 2015
2 parents d84ece5 + 9e10f9a commit 0723bb8
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 609 deletions.
234 changes: 5 additions & 229 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
* [HDFS persistence](#section3.4.1)
* [CKAN persistence](#section3.4.2)
* [MySQL persistence](#section3.4.3)
* [MongoDB persistence](#section3.4.4)
* [STH persistence](#section3.4.5)
* [Installing Cygnus](#section4)
* [RPM install (recommended)](#section4.1)
* [Installing from sources (advanced)](#section4.2)
Expand Down Expand Up @@ -40,8 +38,6 @@ Current stable release is able to persist Orion context data in:
* [HDFS](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html), the [Hadoop](http://hadoop.apache.org/) distributed file system.
* [MySQL](https://www.mysql.com/), the well-know relational database manager.
* [CKAN](http://ckan.org/), an Open Data platform.
* [MongoDB](https://www.mongodb.org/), the NoSQL document-oriented database.
* [STH](https://github.com/telefonicaid/IoT-STH), a Short-Term Historic database built on top of MongoDB.

Cygnus is a (conceptual) derivative work of the deprecated [ngsi2cosmos](https://github.com/telefonicaid/fiware-livedemoapp/tree/master/package/ngsi2cosmos).

Expand All @@ -63,17 +59,15 @@ There exists a wide collection of already developed sources, channels and sinks.
* `OrionHDFSSink`. A custom sink that persists Orion content data in a [HDFS](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html) deployment under a HDFS path structure. There already exists a native Flume HDFS sink persisting each event in a new file, but this is not suitable for Cygnus. Check for specific details [here](doc/design/OrionHDFSSink.md).
* `OrionCKANSink`. A custom sink that persists Orion context data in [CKAN](http://ckan.org/) server instances under a organization, package, resource and Datastore structure. Check for specific details [here](doc/design/OrionCKANSink.md).
* `OrionMySQLSink`. A custom sink for persisting Orion context data in [MySQL](https://www.mysql.com/) server instances under a database and table structure. Check for specific details [here](doc/design/OrionMySQLSink.md).
* `OrionMongoSink`. A custom sink for persisting Orion context data in [MongoDB](https://www.mongodb.org/) server instances under a database and collection structure. Check for specific details [here](doc/design/OrionMongoSink.md).
* `OrionSTHSink`. A custom sink for persisting Orion context data in [STH](https://github.com/telefonicaid/IoT-STH) server instances under a database and aggregated-like collection structure. Check for specific details [here](doc/design/OrionSTHSink.md).
* `DestinationExtractorInterceptor`. A custom Flume interceptor in charge of modifying the default behaviour of Cygnus when deciding the destination (HDFS file, MySQL table, CKAN resource or MongoDB collection) for the context data.
* `DestinationExtractorInterceptor`. A custom Flume interceptor in charge of modifying the default behaviour of Cygnus when deciding the destination (HDFS file, MySQL table or CKAN resource) for the context data.

All these new components (`OrionRestHandler`, `OrionHDFSSink`, etc) are combined with other native ones included in Flume itself (e.g. `HTTPSource` or `MemoryChannel`), with the purpose of implementing the following basic data flow:

1. On behalf of Cygnus, subscribe to certain NGSI-like source (typically Orion Context Broker) for certain context information.
2. Receive from the NGSI-like source notifications about new updated context data; this notification will be handled by the native `HttpSource` together with the custom `OrionRestHandler`.
3. Translate the notification into the Flume event format (metadata headers + data body), and put them into the different sink channels, typically of type `MemoryChannel`.
4. In the meantime, some interceptors such as the native `Timestamp` one or the custom `DestinationExtractorInterceptor` may modify the event before it is put in the channel or channels.
5. For each enabled custom sink (`OrionHDFSSink`, `OrionCKANSink`, `OrionMySQLSink`, `OrionMongoSink`), get the Flume events from the sink channels and persist the data in the appropriate format.
5. For each enabled custom sink (`OrionHDFSSink`, `OrionCKANSink`, `OrionMySQLSink`), get the Flume events from the sink channels and persist the data in the appropriate format.

More complex architectures and data flows can be checked in the [architecture](doc/design/architecture.md) document.

Expand Down Expand Up @@ -443,168 +437,6 @@ NOTE: `mysql` is the MySQL CLI for querying the data.

[Top](#top)

####<a name="section3.4.4"></a>Mongo persistence

MongoDB organizes the data in databases that contain collections of Json documents. Such organization is exploited by [`OrionMongoSink`](doc/desing/OrionMongoSink.md) each time a Flume event is taken from its channel.

Assuming `mongo_username=myuser` as configuration parameter, the data within the body will be persisted as:

$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
4wheels
4wheels_car1_car
4wheels_car1_car_speed
system.indexes
> db.4wheels.find()
{ "_id" : ObjectId("5534d143fa701f0be751db82"), "recvTime" : ISODate("2015-04-20T12:13:22.41Z"), "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "attrType" : "float", "attrValue" : "112.9" }
> db.4wheels_car1_car.find()
{ "_id" : ObjectId("5534d143fa701f0be751db82"), "recvTime" : ISODate("2015-04-20T12:13:22.41Z"), "attrName" : "speed", "attrType" : "float", "attrValue" : "112.9" }
> db.4wheels_car1_car_speed.find()
{ "_id" : ObjectId("5534d143fa701f0be751db82"), "recvTime" : ISODate("2015-04-20T12:13:22.41Z"), "attrType" : "float", "attrValue" : "112.9" }

NOTE: the results for the three different data models (<i>collection-per-service-path</i>, <i>collection-per-service</i> and <i>collection-per-attribute</i>) are shown respectively; and no database prefix nor collection prefix was used (see [Cygnus configuration](#section6) for more details).

NOTE: `mongo` is the MongoDB CLI for querying the data.

[Top](#top)

####<a name="section3.4.5"></a>STH persistence

###Mapping Flume events to MongoDB data structures
MongoDB organizes the data in databases that contain collections of Json documents. Such organization is exploited by [`OrionSTHSink`](doc/desing/OrionSTHSink.md) each time a Flume event is taken from its channel.

Assuming `mongo_username=myuser` and `data_model=collection-per-service-path` as configuration parameters, then `OrionSTHSink` will persist the data within the body as:

$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
4wheels.aggr
4wheels_car1_car.aggr
4wheels_car1_car_speed.aggr
system.indexes
> db.4wheels.aggr.find()
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 12, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "sepeed", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
...,
{ "offset" : 3, "samples" : 0, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 22, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 13, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
"points" : [
{ "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 20, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 12, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
...,
{ "offset" : 3, "samples" : 0, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 22, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 13, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:22.41Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
"points" : [
{ "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 20, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}

NOTE: `mongo` is the MongoDB CLI for querying the data.

[Top](#top)

##<a name="section4"></a>Installing Cygnus
###<a name="section4.1"></a>RPM install (recommended)
Simply configure the FIWARE repository if not yet configured and use your applications manager in order to install the latest version of Cygnus (CentOS/RedHat example):
Expand Down Expand Up @@ -679,13 +511,13 @@ The file `agent_<id>.conf` can be instantiated from a template given in the Cygn
# sink of the same type and sharing the channel in order to improve the performance (this is like having
# multi-threading).
cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink mysql-sink ckan-sink mongo-sink sth-sink
cygnusagent.channels = hdfs-channel mysql-channel ckan-channel mongo-channel sth-channel
cygnusagent.sinks = hdfs-sink mysql-sink ckan-sink
cygnusagent.channels = hdfs-channel mysql-channel ckan-channel

#=============================================
# source configuration
# channel name where to write the notification events
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel ckan-channel mongo-channel
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel ckan-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
Expand Down Expand Up @@ -780,44 +612,6 @@ cygnusagent.sinks.mysql-sink.mysql_password = xxxxxxxxxxxx
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mysql-sink.attr_persistence = column

# ============================================
# OrionMongoSink configuration
# sink class, must not be changed
cygnusagent.sinks.mongo-sink.type = com.telefonica.iot.cygnus.sinks.OrionMongoSink
# channel name from where to read notification events
cygnusagent.sinks.mongo-sink.channel = mongo-channel
# FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run
cygnusagent.sinks.mongo-sink.mongo_hosts = x1.y1.z1.w1:port1,x2.y2.z2.w2:port2,...
# a valid user in the MongoDB server (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.mongo-sink.mongo_username = mongo_username
# password for the user above (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.mongo-sink.mongo_password = xxxxxxxx
# data model (collection-per-service-path, collection-per-entity, collection-per-attribute)
cygnusagent.sinks.mongo-sink.data_model = collection-per-entity
# prefix for the MongoDB databases
cygnusagent.sinks.mongo-sink.db_prefix = sth_
# prefix for the MongoDB collections
cygnusagent.sinks.mongo-sink.collection_prefix = sth_

# ============================================
# OrionSTHSink configuration
# sink class, must not be changed
cygnusagent.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.OrionSTHSink
# channel name from where to read notification events
cygnusagent.sinks.sth-sink.channel = sth-channel
# FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run
cygnusagent.sinks.sth-sink.mongo_hosts = x1.y1.z1.w1:port1,x2.y2.z2.w2:port2,...
# a valid user in the MongoDB server (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.sth-sink.mongo_username = mongo_username
# password for the user above (or empty if authentication is not enabled in MongoDB)
cygnusagent.sinks.sth-sink.mongo_password = xxxxxxxx
# data model (collection-per-service-path, collection-per-entity, collection-per-attribute)
cygnusagent.sinks.sth-sink.data_model = collection-per-entity
# prefix for the MongoDB databases
cygnusagent.sinks.sth-sink.db_prefix = sth_
# prefix for the MongoDB collections
cygnusagent.sinks.sth-sink.collection_prefix = sth_

#=============================================
# hdfs-channel configuration
# channel type (must not be changed)
Expand All @@ -844,24 +638,6 @@ cygnusagent.channels.mysql-channel.type = memory
cygnusagent.channels.mysql-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mysql-channel.transactionCapacity = 100

#=============================================
# mongo-channel configuration
# channel type (must not be changed)
cygnusagent.channels.mongo-channel.type = memory
# capacity of the channel
cygnusagent.channels.mongo-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mongo-channel.transactionCapacity = 100

#=============================================
# sth-channel configuration
# channel type (must not be changed)
cygnusagent.channels.sth-channel.type = memory
# capacity of the channel
cygnusagent.channels.sth-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.sth-channel.transactionCapacity = 100
```

[Top](#top)
Expand Down

0 comments on commit 0723bb8

Please sign in to comment.