diff --git a/.travis.yml b/.travis.yml index 9b04b238..c62d16cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ install: before_script: - pipenv install + - sudo service postgresql stop script: - source setup_dev_env.sh diff --git a/Pipfile b/Pipfile index 917cf43f..1c51ea3d 100644 --- a/Pipfile +++ b/Pipfile @@ -24,6 +24,7 @@ pyyaml = ">=4.2" redis = "~=2.10" requests = ">=2.20" rethinkdb = "==2.3" +pickle-mixin = "==1.0.2" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 6e230c1a..579ac704 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "0ca00bc8605102d1ac3eb8b3607fd53ee0626abe0c5bd3e5413eaba5e52d2db6" + "sha256": "e7f8ad8cd55a53504c0e2755ef48f24361297d307013522c866cdfef4a2b6517" }, "pipfile-spec": 6, "requires": { @@ -176,9 +176,9 @@ }, "geomet": { "hashes": [ + "sha256:87ae0fc42e532b9e98969c0bbf895a5e0b2bb4f6f775cf51a74e6482f1f35c2b", "sha256:91d754f7c298cbfcabd3befdb69c641c27fe75e808b27aa55028605761d17e95", - "sha256:a41a1e336b381416d6cbed7f1745c848e91defaa4d4c1bdc1312732e46ffad2b", - "sha256:87ae0fc42e532b9e98969c0bbf895a5e0b2bb4f6f775cf51a74e6482f1f35c2b" + "sha256:a41a1e336b381416d6cbed7f1745c848e91defaa4d4c1bdc1312732e46ffad2b" ], "index": "pypi", "version": "==0.2.1.post1" @@ -301,6 +301,13 @@ "index": "pypi", "version": "==1.16.5" }, + "pickle-mixin": { + "hashes": [ + "sha256:d4983cd09939f6e402c69ba4f4200f2e6ccdfd39c020f6d1091290d78f17625c" + ], + "index": "pypi", + "version": "==1.0.2" + }, "pluggy": { "hashes": [ "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", @@ -319,60 +326,60 @@ }, "pymongo": { "hashes": [ - "sha256:ce208f80f398522e49d9db789065c8ad2cd37b21bd6b23d30053474b7416af11", - "sha256:d9de8427a5601799784eb0e7fa1b031aa64086ce04de29df775a8ca37eedac41", - "sha256:475a34a0745c456ceffaec4ce86b7e0983478f1b6140890dff7b161e7bcd895b", + "sha256:03dc64a9aa7a5d405aea5c56db95835f6a2fa31b3502c5af1760e0e99210be30", + "sha256:05fcc6f9c60e6efe5219fbb5a30258adb3d3e5cbd317068f3d73c09727f2abb6", + "sha256:076a7f2f7c251635cf6116ac8e45eefac77758ee5a77ab7bd2f63999e957613b", "sha256:137e6fa718c7eff270dbd2fc4b90d94b1a69c9e9eb3f3de9e850a7fd33c822dc", - "sha256:7307024b18266b302f4265da84bb1effb5d18999ef35b30d17592959568d5c0a", - "sha256:4797c0080f41eba90404335e5ded3aa66731d303293a675ff097ce4ea3025bb9", "sha256:1f865b1d1c191d785106f54df9abdc7d2f45a946b45fd1ea0a641b4f982a2a77", - "sha256:4437300eb3a5e9cc1a73b07d22c77302f872f339caca97e9bf8cf45eca8fa0d2", - "sha256:d64c98277ea80e4484f1332ab107e8dfd173a7dcf1bdbf10a9cccc97aaab145f", - "sha256:c4869141e20769b65d2d72686e7a7eb141ce9f3168106bed3e7dcced54eb2422", + "sha256:213c445fe7e654621c6309e874627c35354b46ef3ee807f5a1927dc4b30e1a67", + "sha256:25e617daf47d8dfd4e152c880cd0741cbdb48e51f54b8de9ddbfe74ecd87dd16", "sha256:3d9bb1ba935a90ec4809a8031efd988bdb13cdba05d9e9a3e9bf151bf759ecde", - "sha256:d38b35f6eef4237b1d0d8e845fc1546dad85c55eba447e28c211da8c7ef9697c", - "sha256:e6a15cf8f887d9f578dd49c6fb3a99d53e1d922fdd67a245a67488d77bf56eb2", - "sha256:7a4a6f5b818988a3917ec4baa91d1143242bdfece8d38305020463955961266a", + "sha256:40696a9a53faa7d85aaa6fd7bef1cae08f7882640bad08c350fb59dee7ad069b", + "sha256:421aa1b92c291c429668bd8d8d8ec2bd00f183483a756928e3afbf2b6f941f00", + "sha256:4437300eb3a5e9cc1a73b07d22c77302f872f339caca97e9bf8cf45eca8fa0d2", + "sha256:455f4deb00158d5ec8b1d3092df6abb681b225774ab8a59b3510293b4c8530e3", + "sha256:475a34a0745c456ceffaec4ce86b7e0983478f1b6140890dff7b161e7bcd895b", + "sha256:4797c0080f41eba90404335e5ded3aa66731d303293a675ff097ce4ea3025bb9", + "sha256:4ae23fbbe9eadf61279a26eba866bbf161a6f7e2ffad14a42cf20e9cb8e94166", + "sha256:4b32744901ee9990aa8cd488ec85634f443526def1e5190a407dc107148249d7", + "sha256:50127b13b38e8e586d5e97d342689405edbd74ad0bd891d97ee126a8c7b6e45f", + "sha256:50531caa7b4be1c4ed5e2d5793a4e51cc9bd62a919a6fd3299ef7c902e206eab", + "sha256:63a5387e496a98170ffe638b435c0832c0f2011a6f4ff7a2880f17669fff8c03", "sha256:68220b81850de8e966d4667d5c325a96c6ac0d6adb3d18935d6e3d325d441f48", - "sha256:e8d188ee39bd0ffe76603da887706e4e7b471f613625899ddf1e27867dc6a0d3", - "sha256:05fcc6f9c60e6efe5219fbb5a30258adb3d3e5cbd317068f3d73c09727f2abb6", - "sha256:076a7f2f7c251635cf6116ac8e45eefac77758ee5a77ab7bd2f63999e957613b", "sha256:689142dc0c150e9cb7c012d84cac2c346d40beb891323afb6caf18ec4caafae0", - "sha256:e8c446882cbb3774cd78c738c9f58220606b702b7c1655f1423357dc51674054", - "sha256:8ea13d0348b4c96b437d944d7068d59ed4a6c98aaa6c40d8537a2981313f1c66", + "sha256:6a15e2bee5c4188369a87ed6f02de804651152634a46cca91966a11c8abd2550", "sha256:7122ffe597b531fb065d3314e704a6fe152b81820ca5f38543e70ffcc95ecfd4", - "sha256:f6efca006a81e1197b925a7d7b16b8f61980697bb6746587aad8842865233218", - "sha256:63a5387e496a98170ffe638b435c0832c0f2011a6f4ff7a2880f17669fff8c03", - "sha256:50127b13b38e8e586d5e97d342689405edbd74ad0bd891d97ee126a8c7b6e45f", - "sha256:d0565481dc196986c484a7fb13214fc6402201f7fb55c65fd215b3324962fe6c", + "sha256:7307024b18266b302f4265da84bb1effb5d18999ef35b30d17592959568d5c0a", + "sha256:7a4a6f5b818988a3917ec4baa91d1143242bdfece8d38305020463955961266a", "sha256:83c5a3ecd96a9f3f11cfe6dfcbcec7323265340eb24cc996acaecea129865a3a", - "sha256:b7c522292407fa04d8195032493aac937e253ad9ae524aab43b9d9d242571f03", "sha256:890b0f1e18dbd898aeb0ab9eae1ab159c6bcbe87f0abb065b0044581d8614062", - "sha256:4b32744901ee9990aa8cd488ec85634f443526def1e5190a407dc107148249d7", + "sha256:8deda1f7b4c03242f2a8037706d9584e703f3d8c74d6d9cac5833db36fe16c42", + "sha256:8ea13d0348b4c96b437d944d7068d59ed4a6c98aaa6c40d8537a2981313f1c66", + "sha256:91e96bf85b7c07c827d339a386e8a3cf2e90ef098c42595227f729922d0851df", "sha256:96782ebb3c9e91e174c333208b272ea144ed2a684413afb1038e3b3342230d72", - "sha256:c0d660a186e36c526366edf8a64391874fe53cf8b7039224137aee0163c046df", "sha256:9755c726aa6788f076114dfdc03b92b03ff8860316cca00902cce88bcdb5fedd", - "sha256:ef76535776c0708a85258f6dc51d36a2df12633c735f6d197ed7dfcaa7449b99", - "sha256:a2787319dc69854acdfd6452e6a8ba8f929aeb20843c7f090e04159fc18e6245", - "sha256:9fc17fdac8f1973850d42e51e8ba6149d93b1993ed6768a24f352f926dd3d587", - "sha256:9ee0eef254e340cc11c379f797af3977992a7f2c176f1a658740c94bf677e13c", - "sha256:03dc64a9aa7a5d405aea5c56db95835f6a2fa31b3502c5af1760e0e99210be30", - "sha256:91e96bf85b7c07c827d339a386e8a3cf2e90ef098c42595227f729922d0851df", - "sha256:25e617daf47d8dfd4e152c880cd0741cbdb48e51f54b8de9ddbfe74ecd87dd16", - "sha256:455f4deb00158d5ec8b1d3092df6abb681b225774ab8a59b3510293b4c8530e3", - "sha256:4ae23fbbe9eadf61279a26eba866bbf161a6f7e2ffad14a42cf20e9cb8e94166", - "sha256:cccf1e7806f12300e3a3b48f219e111000c2538483e85c869c35c1ae591e6ce9", "sha256:9dbab90c348c512e03f146e93a5e2610acec76df391043ecd46b6b775d5397e6", + "sha256:9ee0eef254e340cc11c379f797af3977992a7f2c176f1a658740c94bf677e13c", + "sha256:9fc17fdac8f1973850d42e51e8ba6149d93b1993ed6768a24f352f926dd3d587", + "sha256:a2787319dc69854acdfd6452e6a8ba8f929aeb20843c7f090e04159fc18e6245", + "sha256:b7c522292407fa04d8195032493aac937e253ad9ae524aab43b9d9d242571f03", "sha256:bd312794f51e37dcf77f013d40650fe4fbb211dd55ef2863839c37480bd44369", + "sha256:c0d660a186e36c526366edf8a64391874fe53cf8b7039224137aee0163c046df", + "sha256:c4869141e20769b65d2d72686e7a7eb141ce9f3168106bed3e7dcced54eb2422", "sha256:cc4057f692ac35bbe82a0a908d42ce3a281c9e913290fac37d7fa3bd01307dfb", - "sha256:50531caa7b4be1c4ed5e2d5793a4e51cc9bd62a919a6fd3299ef7c902e206eab", - "sha256:421aa1b92c291c429668bd8d8d8ec2bd00f183483a756928e3afbf2b6f941f00", - "sha256:8deda1f7b4c03242f2a8037706d9584e703f3d8c74d6d9cac5833db36fe16c42", - "sha256:d226e0d4b9192d95079a9a29c04dd81816b1ce8903b8c174a39224fe978547cb", - "sha256:40696a9a53faa7d85aaa6fd7bef1cae08f7882640bad08c350fb59dee7ad069b", - "sha256:6a15e2bee5c4188369a87ed6f02de804651152634a46cca91966a11c8abd2550", + "sha256:cccf1e7806f12300e3a3b48f219e111000c2538483e85c869c35c1ae591e6ce9", + "sha256:ce208f80f398522e49d9db789065c8ad2cd37b21bd6b23d30053474b7416af11", + "sha256:d0565481dc196986c484a7fb13214fc6402201f7fb55c65fd215b3324962fe6c", "sha256:d1b3366329c45a474b3bbc9b9c95d4c686e03f35da7fd12bc144626d1f2a7c04", - "sha256:213c445fe7e654621c6309e874627c35354b46ef3ee807f5a1927dc4b30e1a67" + "sha256:d226e0d4b9192d95079a9a29c04dd81816b1ce8903b8c174a39224fe978547cb", + "sha256:d38b35f6eef4237b1d0d8e845fc1546dad85c55eba447e28c211da8c7ef9697c", + "sha256:d64c98277ea80e4484f1332ab107e8dfd173a7dcf1bdbf10a9cccc97aaab145f", + "sha256:d9de8427a5601799784eb0e7fa1b031aa64086ce04de29df775a8ca37eedac41", + "sha256:e6a15cf8f887d9f578dd49c6fb3a99d53e1d922fdd67a245a67488d77bf56eb2", + "sha256:e8c446882cbb3774cd78c738c9f58220606b702b7c1655f1423357dc51674054", + "sha256:e8d188ee39bd0ffe76603da887706e4e7b471f613625899ddf1e27867dc6a0d3", + "sha256:ef76535776c0708a85258f6dc51d36a2df12633c735f6d197ed7dfcaa7449b99", + "sha256:f6efca006a81e1197b925a7d7b16b8f61980697bb6746587aad8842865233218" ], "index": "pypi", "version": "==3.11.0" diff --git a/README.md b/README.md index ac337f3a..26d3cfad 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,12 @@ [![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4394/badge)](https://bestpractices.coreinfrastructure.org/projects/4394) QuantumLeap is the first implementation of [an API](https://app.swaggerhub.com/apis/smartsdk/ngsi-tsdb) -that supports the storage of [FIWARE NGSIv2](http://docs.orioncontextbroker.apiary.io/#) +that supports the storage of [FIWARE NGSIv2](https://fiware.github.io/specifications/ngsiv2/stable/) data into a [time series database](https://en.wikipedia.org/wiki/Time_series_database). +It currently also experimentally supports the injection of +[NGSI-LD](https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.01.01_60/gs_CIM009v010101p.pdf) in +a backward compatible way with NGSI-v2 API. I.e. you can retrieve NGSI-LD stored data via NGSI v2 +API and retrieve data will be describe following NGSI v2 format. Want to know more? Refer to the [docs](https://quantumleap.readthedocs.io/en/latest/) or checkout the Extra Resources below. @@ -42,9 +46,12 @@ QuantumLeap supports both Crate DB and Timescale as time-series DB backends but please bear in mind that at the moment we only support the following versions: -* Crate backend: Crate DB version `3.3.*` (will be deprecated from QL `0.8` version) and `4.*` +* Crate backend: Crate DB version `3.3.*` (will be deprecated from QL `0.9` version) and `4.*` * Timescale backend: Postgres version `10.*` or `11.*` + Timescale extension `1.3.*` + Postgis extension `2.5.*`. + +PR #373 introduced basic support for NGSI-LD. In short this means that using +the current endpoint you are able to store NGSI-LD payloads with few caveats (see #398) ## Usage @@ -78,4 +85,4 @@ consistency. QuantumLeap is licensed under the [MIT](LICENSE) License -© 2017-2019 Martel Innovate +© 2017-2020 Martel Innovate diff --git a/docker/docker-compose-dev.yml b/docker/docker-compose-dev.yml index d1459d1f..32ea206f 100644 --- a/docker/docker-compose-dev.yml +++ b/docker/docker-compose-dev.yml @@ -48,6 +48,14 @@ services: volumes: - redisdata:/data + redis-commander: + image: rediscommander/redis-commander:latest + restart: always + environment: + - REDIS_HOSTS=local:redis:6379:1 + ports: + - "8081:8081" + timescale: image: timescale/timescaledb-postgis:${TIMESCALE_VERSION} ports: @@ -58,6 +66,18 @@ services: environment: - POSTGRES_PASSWORD=* + pgadmin: + image: dpage/pgadmin4:4.26 + restart: always + environment: + - PGADMIN_CONFIG_MASTER_PASSWORD_REQUIRED=False + - PGADMIN_CONFIG_SERVER_MODE=False + - PGADMIN_DEFAULT_EMAIL=admin@admin.org + - PGADMIN_DEFAULT_PASSWORD=admin + - PGADMIN_LISTEN_PORT=80 + ports: + - "8080:80" + quantumleap-db-setup: build: ../timescale-container/ image: quantumleap-db-setup diff --git a/docs/manuals/admin/benchmarks.md b/docs/manuals/admin/benchmarks.md new file mode 100644 index 00000000..3e38c766 --- /dev/null +++ b/docs/manuals/admin/benchmarks.md @@ -0,0 +1,53 @@ +# Basic benchmarks + +QuantumLeap heavily relies on crate (or timescale) to query and store data model configuration, +this implies during insert an high number of queries on the backend, despite the change +of models between an insert and another of the same entity type should not be that frequent. + +As tested, caching reduce such number of queries, and thus increase throughput. +Previous test used in-memory cache, but that's not ideal in a concurrent environment, +thus we developed experimental support for REDIS based caching. + +To measure basic insertion performances we developed a simple load script based on [k6](https://k6.io/) +that you can find [here](https://github.com/smartsdk/ngsi-timeseries-api/blob/master/src/tests/run_load_tests.sh) + +Example: +``` +docker run -i --rm loadimpact/k6 run --vus 30 --duration 60s - < notify-load-test.js + + checks.....................: 100.00% ✓ 6000 ✗ 0 + data_received..............: 1.6 MB 27 kB/s + data_sent..................: 1.6 MB 27 kB/s + http_req_blocked...........: avg=302.26µs min=1.62µs med=6.21µs max=1.05s p(90)=10.21µs p(95)=18µs + http_req_connecting........: avg=262.78µs min=0s med=0s max=1.05s p(90)=0s p(95)=0s + http_req_duration..........: avg=208.12ms min=16.92ms med=155.56ms max=1.06s p(90)=409.38ms p(95)=514.53ms + http_req_receiving.........: avg=1.2ms min=-8.817564ms med=158.96µs max=133.5ms p(90)=2.69ms p(95)=5.99ms + http_req_sending...........: avg=89.36µs min=12.59µs med=38.08µs max=20.85ms p(90)=111.97µs p(95)=218.72µs + http_req_tls_handshaking...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s + http_req_waiting...........: avg=206.82ms min=16.78ms med=153.98ms max=1.06s p(90)=406.54ms p(95)=513.29ms + http_reqs..................: 6000 99.814509/s + iteration_duration.........: avg=30.02s min=30.02s med=30.02s max=30.03s p(90)=30.02s p(95)=30.02s + iterations.................: 60 0.998145/s + vus........................: 30 min=30 max=30 + vus_max....................: 30 min=30 max=30 +``` + +Tests shows that: +* caching metadata improves throughput of inserts , but usage of redis, has not the same impact as in memory +* the number of queries linked to metadata is not the only element affecting insert performance, + due to the nature of http request, up to QL v0.7.6, we created a db connection for each insert, + opening the connection takes time. From v0.8 we reuse existing connections. This proved to increase throughput of 100% + compared just to just redis caching. + +System used for testing: +Mac2016 with 3.1 GhZ i7 (i.e. 4 cores) and 16GB Ram + +* Baseline (v0.7.6): 44 req/s - 600 ms avg response time - + (crate queries peak: select 111 q/sec, insert 54 q/sec) +* Redis caching: 60 req/s - (no data collected on response time) +* In memory caching: 95 req/s - (no data collected on response time) +* Redis caching + connection re-usage: 100 req/s - 200 ms avg response time + (crate queries peak: select 7 q/sec, insert 142 q/sec) +* Connection re-usage (without any cache): 55 req/s - 400 ms avg response time + (crate queries peak: select 177 q/sec, insert 86 q/sec) + \ No newline at end of file diff --git a/docs/manuals/admin/configuration.md b/docs/manuals/admin/configuration.md index 306f9bc1..f1ff49f9 100644 --- a/docs/manuals/admin/configuration.md +++ b/docs/manuals/admin/configuration.md @@ -19,8 +19,11 @@ To configure QuantumLeap you can use the following environment variables: | `REDIS_HOST` | Redis Host | | `REDIS_PORT` | Redis Port | | `USE_GEOCODING` | `True` or `False` enable or disable geocoding | +| `DEFAULT_CACHE_TTL`| Time to live of metadata cache, default: 60 (seconds) | | | `QL_CONFIG` | Pathname for tenant configuration | -| `LOGLEVEL` | define the log level for all services (`DEBUG`, `INFO`, `WARNING` , `ERROR`) | +| `QL_DEFAULT_DB` | Default backend: `timescale` or `crate` | +| `USE_FLASK` | `True` or `False` to use flask server (only for Dev) or gunicorn. Default to `False` | +| `LOGLEVEL` | Define the log level for all services (`DEBUG`, `INFO`, `WARNING` , `ERROR`) | **NOTE** * `DEFAULT_LIMIT`. This variable specifies the upper limit L of rows a query diff --git a/docs/manuals/admin/index.md b/docs/manuals/admin/index.md index 536541d8..b5d36f5e 100644 --- a/docs/manuals/admin/index.md +++ b/docs/manuals/admin/index.md @@ -105,7 +105,7 @@ you can leverage the Helm Charts in [this repository](https://smartsdk-recipes.r In particular you will need to deploy: * [CrateDB](https://github.com/orchestracities/charts/tree/master/charts/crate) -* [Optional] Timescale - for which you can refer to [Patroni Helm Chart](https://github.com/helm/charts/tree/master/incubator/patroni). +* [Optional/Alternative] Timescale - for which you can refer to [Patroni Helm Chart](https://github.com/helm/charts/tree/master/incubator/patroni). * [QuantumLeap](https://github.com/orchestracities/charts/tree/master/charts/quantumleap) ## FIWARE Releases Compatibility diff --git a/docs/manuals/index.md b/docs/manuals/index.md index a0bcfc4a..28eea6ae 100644 --- a/docs/manuals/index.md +++ b/docs/manuals/index.md @@ -7,7 +7,8 @@ ## Overview QuantumLeap is a REST service for storing, querying and retrieving -[NGSI v2][ngsi-spec] spatial-temporal data. QuantumLeap converts +[NGSI v2][ngsi-spec] and [NGSI-LD][nsgi-ld-spec] (*experimental support*) +spatial-temporal data. QuantumLeap converts NGSI semi-structured data into tabular format and stores it in a [time-series database][tsdb], associating each database record with a time index and, if present in the NGSI data, a location on Earth. @@ -29,8 +30,53 @@ to transparently support multiple database back ends. In fact, presently QuantumLeap supports both [CrateDB][crate] and [Timescale][timescale] as back end databases. -#### Relation to STH Comet -Although QuantumLeap and FiWare [STH Comet][comet] share similar +### NGSI-LD support +PR [#373](https://github.com/smartsdk/ngsi-timeseries-api/pulls/373) +introduced basic support for basic [NGSI-LD][nsgi-ld-spec] relying on v2 API. +In short this means that using the current endpoint QL can +store NGSI-LD payloads with few caveats (see +[#398](https://github.com/smartsdk/ngsi-timeseries-api/issues/398)): +* temporal attributes are not currently supported + ([#395](https://github.com/smartsdk/ngsi-timeseries-api/issues/395)); + what is relevant here is that this attributes are + used to create the time index of the series +* other attributes may be added as well in future (not a priority probably, + so may not be tackled any time + [#396](https://github.com/smartsdk/ngsi-timeseries-api/issues/396)) +* context is currently not stored. +* query endpoints returns NGSIv2 data types. + +NGSI-LD temporal queries seem to have a semantic that implies that +only numeric values are tracked in time series. This was never the case +for QL that trace over time any attribute (also not numeric ones), +since they may change as well. + +NGSI-LD semantics also seem to track values over time +of single attributes. QL to enable to retrieve full entity values in a given +point in time stores the whole entity in a single table (this avoids the need +for JOINs that are notoriously time consuming - but on the other hand generates +more sparse data). In doing so, we create for the entity a single time index, +this is due to the fact that while different dateTime attributes can be defined +and hence queried, only one can be used to index time series in +all modern timeseries DB (to achieve performance). +This imply that we have a policy to compute such time index (either custom +and referring to an attribute of the entity, or using the "latest" time +metadata linked to the entity or to an attribute). +The issue is that if the notification payload sent to QL includes all attributes, +also not update ones, QL will "timestamp" all values (also old ones) +with that timestamp. + +This means that the ability to track a specific value +of an attribute in a point in time depends on the actual notification. + +In short, given that we aim to ensure both forward compatibility +(data store as NGSIv2 can be queried in future as NGSI-LD) +and backward compatibility (data store as NGSI-LD can be queried as NGSIv2), +implementing NGSI-LD temporal api, may not be 100% compliant with +the specs. + +### Relation to STH Comet +Although QuantumLeap and FIWARE [STH Comet][comet] share similar goals, Comet doesn't support multiple database back ends (only MongoDB is available) and doesn't support NGSI v2 either. While Comet per se is a fine piece of software, some of the needs and @@ -128,8 +174,15 @@ to its shared-nothing architecture which lends itself well to manage a containerised CrateDB database cluster, e.g. using Kubernetes. Moreover, CrateDB uses [SQL][crate-doc.sql] to query data, with built-in extensions for temporal and [geographical queries][crate-doc.geo]. -Also of note, Grafana ships with a [plugin][grafana.pg] that can -be used to visualise time series stored in CrateDB. +CrateDB offers as well a Postgres API, making simpler its integration. +For example, you can leverage Grafana [PostgreSQL plugin][grafana.pg] +to visualise time series stored in CrateDB. + +QuantumLeap stores NGSI entities in CrateDB using the `notify` endpoint. + + ------------------------- --------------- + | CrateDB | <----- | QuantumLeap |-----O notify + ------------------------- --------------- ### Timescale back end [Timescale][timescale] is another time series databases that can be @@ -138,8 +191,9 @@ Indeed, QuantumLeap provides full support for storing NGSI entities in Timescale, including geographical features (encoded as GeoJSON or NGSI Simple Location Format), structured types and arrays. -QuantumLeap stores NGSI entities in Timescale using the existing -`notify` endpoint. The Timescale back end is made up of [PostgreSQL][postgres] +QuantumLeap stores NGSI entities in Timescale using the +`notify` endpoint (as for CrateDB). +The Timescale back end is made up of [PostgreSQL][postgres] with both Timescale and [PostGIS][postgis] extensions enabled: ------------------------- @@ -166,13 +220,48 @@ contains quite a number of examples of how NGSI data are stored in Timescale. #### Note: querying & retrieving data -At the moment, QuantumLeap does **not** implement any querying or -retrieving of data through the QuantumLeap REST API as is available -for the Crate back end. This means that for now the only way to access -your data is to query the Timescale DB directly. However, data querying -and retrieval through the REST API is planned for the upcoming -QuantumLeap major release. +At the moment, QuantumLeap implement experimental querying +of data through the QuantumLeap REST API. +This means that while REST API on top of CrateDB +have been tested in production, this is not the case for +Timescale. +## Cache Back End + +To reduce queries to databases or to geocoding APIs, QuantumLeap +leverages a cache. The only cache backend supported so far +is Redis. +Caching support for queries to databases is *experimental*. + + -------------------- --------------- + | DB | ------ | QuantumLeap |-----O notify + -------------------- --------------- + | + | + --------------- + | Redis | + --------------- + +As of today, the query caching stores: +* Version of CrateDB. Different version of CrateDB supports different SQL + dialects, so at each request we check which version of CrateDB + we are using. By caching this information, each thread will ask + this information only once. Of course this could be passed as variable, + but then live updates would require QL down time. Currently, you can + update from a Crate version to another with almost zero down time (except + the one caused by Crate not being in ready state), you would need + only to clear the key `crate` from redis cache. TTL in this case is + 1 hour, i.e. after one hour version will be checked again against CrateDB. +* Metadata table. The metadata table is used to store information about the + mapping between original NGSI attributes (including type) to db column names. + Basically this information is required to perform "consistent" data injection + and to correctly NGSI type retrieved attributes by queries. Given concurrency due + to the support of multithread and ha deployment, cache in this case has by default + a shorter TTL (60 sec). Cache is anyhow re-set every time a change to Metadata + table occurs (e.g. in case the incoming payload include a new entityType or + a new attribute for an existing entityType). **Metadata** for a specific + entityType are removed only if a entityType is dropped, not in case + all its values are removed. ## Further Readings @@ -180,7 +269,7 @@ QuantumLeap major release. QuantumLeap. * The [User Manual][ql-man.user] delves into how to use it and connect it to other complementary services. -* [FiWare Time Series][ql-tut]: a complete, step-by-step, hands-on tutorial +* [FIWARE Time Series][ql-tut]: a complete, step-by-step, hands-on tutorial to learn how to set up and use QuantumLeap. * The [SmartSDK guided tour][smartsdk.tour] has a section about using QuantumLeap in a FiWare cloud. @@ -208,6 +297,8 @@ QuantumLeap major release. "InfluxDB Documentation" [ngsi-spec]: https://fiware.github.io/specifications/ngsiv2/stable/ "FIWARE-NGSI v2 Specification" +[ngsi-ld-spec]: https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.01.01_60/gs_CIM009v010101p.pdf + "ETSI NGSI-LD API Specification" [orion]: https://fiware-orion.readthedocs.io "Orion Context Broker Home" [osm]: https://www.openstreetmap.org @@ -227,7 +318,7 @@ QuantumLeap major release. [ql-spec]: https://app.swaggerhub.com/apis/smartsdk/ngsi-tsdb "NGSI-TSDB Specification" [ql-tut]: https://fiware-tutorials.readthedocs.io/en/latest/time-series-data/ - "FiWare Tutorials - Time Series Data" + "FIWARE Tutorials - Time Series Data" [rethink]: https://www.rethinkdb.com/ "RethinkDB Home" [smartsdk.tour]: http://guided-tour-smartsdk.readthedocs.io/en/latest/ diff --git a/docs/manuals/user/index.md b/docs/manuals/user/index.md index 1de1e263..1a2d36c9 100644 --- a/docs/manuals/user/index.md +++ b/docs/manuals/user/index.md @@ -141,29 +141,65 @@ You need to make sure your NGSI entities are using the valid NGSI types for the attributes, which are documented in the *Specification* section of the [NGSI API](http://telefonicaid.github.io/fiware-orion/api/v2/latest/). See the first column of the translation table below, and mind its capitalisation. -The table below shows which attribute types will be translated to which -[CrateDB Data Types](https://crate.io/docs/crate/reference/sql/data_types.html). +The tables below show which attribute types will be translated to which +[CrateDB](https://crate.io/docs/crate/reference/sql/data_types.html) +or [TimescaleDB](https://www.postgresql.org/docs/current/datatype.html) data types. -**CrateDB Translation Table** +**CrateDB (v4.x) Translation Table** | NGSI Type | CrateDB Type | Observation | | ------------------ |:-----------------------:| :-----------| |Array | [array(string)](https://crate.io/docs/crate/reference/sql/data_types.html#array) | [Issue 36: Support arrays of other types](https://github.com/smartsdk/ngsi-timeseries-api/issues/36) | |Boolean | [boolean](https://crate.io/docs/crate/reference/sql/data_types.html#boolean) | - | -|DateTime | [timestamp](https://crate.io/docs/crate/reference/sql/data_types.html#timestamp) | 'ISO8601' can be used as equivalent of 'DateTime'. | -|Integer | [long](https://crate.io/docs/crate/reference/sql/data_types.html#numeric-types) | - | +|DateTime | [timestampz](https://crate.io/docs/crate/reference/en/4.3/general/ddl/data-types.html#timestamp-with-time-zone) | 'ISO8601' can be used as equivalent of 'DateTime'. | +|Integer | [bigint](https://crate.io/docs/crate/reference/sql/data_types.html#numeric-types) | - | |[geo:point](http://docs.orioncontextbroker.apiary.io/#introduction/specification/geospatial-properties-of-entities) | [geo_point](https://crate.io/docs/crate/reference/sql/data_types.html#geo-point) | **Attention!** NGSI uses "lat, long" order whereas CrateDB stores points in [long, lat] order.| |[geo:json](http://docs.orioncontextbroker.apiary.io/#introduction/specification/geospatial-properties-of-entities) | [geo_shape](https://crate.io/docs/crate/reference/sql/data_types.html#geo-shape) | - | -|Number | [float](https://crate.io/docs/crate/reference/sql/data_types.html#numeric-types) |-| -|Text | [string](https://crate.io/docs/crate/reference/sql/data_types.html#string) | This is the default type if the provided NGSI Type is unsupported or wrong. | +|Number | [real](https://crate.io/docs/crate/reference/sql/data_types.html#numeric-types) |-| +|Text | [text](https://crate.io/docs/crate/reference/sql/data_types.html#data-type-text) | This is the default type if the provided NGSI Type is unsupported or wrong. | |StructuredValue | [object](https://crate.io/docs/crate/reference/sql/data_types.html#object) |-| +**TimescaleDB (v12.x) Translation Table** + +| NGSI Type | TimescaleDB Type | Observation | +| ------------------ |:-----------------------:| :-----------| +|Array | [jsonb](https://www.postgresql.org/docs/current/datatype-json.html) | | +|Boolean | [boolean](https://www.postgresql.org/docs/current/datatype-boolean.html) | - | +|DateTime | [timestamp WITH TIME ZONE](https://www.postgresql.org/docs/current/datatype-datetime.html) | 'ISO8601' can be used as equivalent of 'DateTime'. | +|Integer | [bigint](https://www.postgresql.org/docs/current/datatype-numeric.html#DATATYPE-INT) | - | +|[geo:point](http://docs.orioncontextbroker.apiary.io/#introduction/specification/geospatial-properties-of-entities) | [geometry](https://postgis.net/docs/geometry.html) | **Attention!** NGSI uses "lat, long" order whereas PostGIS/WGS84 stores points in [long, lat] order.| +|[geo:json](http://docs.orioncontextbroker.apiary.io/#introduction/specification/geospatial-properties-of-entities) | [geometry](https://postgis.net/docs/geometry.html) | - | +|Number | [float](https://www.postgresql.org/docs/current/datatype-numeric.html#DATATYPE-FLOAT) |-| +|Text | [text](https://www.postgresql.org/docs/current/datatype-character.html) | This is the default type if the provided NGSI Type is unsupported or wrong. | +|StructuredValue | [jsonb](https://www.postgresql.org/docs/current/datatype-json.html) |-| + + + If the type of any of the received attributes is not present in the column -*NGSI Type* of the previous table, the value of such attribute will be treated -internally as a string. So, if you use `Float` for your attribute type (not -valid), your attribute will be stored as a `string`. +*NGSI Type* of the previous table, the *NGSI Type* (and hence the SQL type) +will be derived from the value. Using the following logic: + +``` + if a_type not in NGSI + type = Text + if a_value is a list: + type = Array + elif a_value is not None and a_value is an Object: + if a_type is 'Property' and a_value['@type'] is 'DateTime': + type = DateTime + else: + type = StructuredValue + elif a_value is int: + type = Integer + elif a_value is float: + type = Number + elif a_value is bool: + type = Boolean + elif a_value is an ISO DateTime: + type = DateTime +``` -### Time Index +### [Time Index](#timeindex) A fundamental element in the time-series database is the **time index**. You may be wondering... where is it stored? QuantumLeap will persist the @@ -270,7 +306,8 @@ Note there are a lot of possibilities, but not all of them are fully implemented yet. If you want to, you can interact directly with the database. For more details -refer to the [CrateDB](../admin/crate.md) section of the docs. What you need to +refer to the [CrateDB](../admin/crate.md) or to the [Timescale](../admin/timescale.md) +section of the docs. What you need to know in this case is that QuantumLeap will create one table per each entity type. Table names are formed with a prefix (et) plus the lowercase version of the entity type. I.e, if your entity type is *AirQualityObserved*, the @@ -310,8 +347,10 @@ headers will have to be used in order to retrieve such data. In case you are interacting directly with the database, you need to know that QuantumLeap will use the `FIWARE-Service` as the -[database schema](https://crate.io/docs/crate/reference/en/latest/general/ddl/create-table.html?highlight=scheme#schemas) -for crate, with a specific prefix. This way, if you insert an entity of type +database schema for +[crate](https://crate.io/docs/crate/reference/en/latest/general/ddl/create-table.html?highlight=scheme#schemas) +or [timescale](https://www.postgresql.org/docs/current/ddl-schemas.html), +with a specific prefix. This way, if you insert an entity of type `Room` using the `Fiware-Service: magic` header, you should expect to find your table as `mtmagic.etroom`. This information is also useful for example if you are configuring the Grafana datasource, as explained in the diff --git a/docs/manuals/user/troubleshooting.md b/docs/manuals/user/troubleshooting.md index 993580de..662f00b0 100644 --- a/docs/manuals/user/troubleshooting.md +++ b/docs/manuals/user/troubleshooting.md @@ -46,6 +46,15 @@ Make sure you have enough data points in your database and that your selection of time slice (on the top-right corner of grafana) is actually covering a time range in which you have data. +#### 3D Coordinates are not working when using CrateDB as backend. + +If you spot an error such as: +``` +crate.client.exceptions.ProgrammingError: SQLActionException[ColumnValidationException: Validation failed for location: Cannot cast {"coordinates"=[51.716783624, 8.752131611, 23], "type"='Point'} to type geo_shape] +``` +This related to the fact that CrateDB does not support 3D coordinates, +as documented in [admin documentation](../admin/crate.md). + ## Bug reporting Bugs should be reported in the form of diff --git a/mkdocs.yml b/mkdocs.yml index a205e21d..d154af3f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -22,3 +22,4 @@ nav: - 'Timescale': 'admin/timescale.md' - 'Grafana': 'admin/grafana.md' - 'Data-Migration': 'admin/dataMigration.md' + - 'Bechmarks': 'admin/benchmarks.md' diff --git a/src/app.py b/src/app.py index 9266f430..fefa24c4 100644 --- a/src/app.py +++ b/src/app.py @@ -1,6 +1,45 @@ import server.wsgi as flask import server.grunner as gunicorn -from utils.cfgreader import EnvReader, BoolVar +from flask import has_request_context, request +from utils.cfgreader import EnvReader, BoolVar, StrVar +import logging +from flask.logging import default_handler + + +class RequestFormatter(logging.Formatter): + def format(self, record): + if has_request_context(): + record.corr = request.headers.get('fiware_correlator', None) + record.remote_addr = request.remote_addr + record.srv = request.headers.get('fiware-service', None) + if record.srv: + record.subserv = request.headers.get('fiware-servicepath', None) + else: + record.subserv = None + if request.json and request.json['data']: + record.payload = request.json['data'] + else: + record.payload = None + else: + record.corr = None + record.remote_addr = None + record.srv = None + record.subserv = None + record.payload = None + + return super().format(record) + + +formatter = RequestFormatter( + 'time=%(asctime)s.%(msecs)03d | ' + 'level=%(levelname)s | corr=%(corr)s | from=%(remote_addr)s | ' + 'srv=%(srv)s | subserv=%(subserv)s | op=%(funcName)s | comp=%(name)s | ' + 'msg=%(message)s | payload=%(payload)s | ' + 'thread=%(thread)d | process=%(process)d', + datefmt='%Y-%m-%d %I:%M:%S' +) + +default_handler.setFormatter(formatter) def use_flask() -> bool: @@ -8,7 +47,16 @@ def use_flask() -> bool: return EnvReader().safe_read(env_var) +def setup(): + r = EnvReader(log=logging.getLogger().debug) + level = r.read(StrVar('LOGLEVEL', 'INFO')).upper() + logger = logging.getLogger() + logger.setLevel(level) + logger.addHandler(default_handler) + + if __name__ == '__main__': + setup() if use_flask(): # dev mode, run the WSGI app in Flask dev server flask.run() else: # prod mode, run the WSGI app in Gunicorn diff --git a/src/cache/__init__.py b/src/cache/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/cache/factory.py b/src/cache/factory.py new file mode 100644 index 00000000..66b543c8 --- /dev/null +++ b/src/cache/factory.py @@ -0,0 +1,61 @@ +import logging +from typing import Union + +from utils.cfgreader import EnvReader, BoolVar, IntVar, StrVar, MaybeString + +from .querycache import QueryCache + +MaybeCache = Union[QueryCache, None] + + +class CacheEnvReader: + """ + Helper class to encapsulate the reading of geo-coding env vars. + """ + + def __init__(self): + self.env = EnvReader(log=logging.getLogger(__name__).debug) + + def redis_host(self) -> MaybeString: + return self.env.read(StrVar('REDIS_HOST', None)) + + def redis_port(self) -> int: + return self.env.read(IntVar('REDIS_PORT', 6379)) + + def default_ttl(self) -> int: + return self.env.read(IntVar('DEFAULT_CACHE_TTL', 60)) + + +def log(): + return logging.getLogger(__name__) + + +def is_cache_available() -> bool: + """ + Can we use cache? Yes if the Redis host env var is set. No otherwise. + + :return: True or False depending on whether or not we're supposed to + use geo-coding. + """ + env = CacheEnvReader() + if env.redis_host(): + return True + return False + + +def get_cache() -> MaybeCache: + """ + Build the geo cache client. + + :return: `None` if `is_cache_available` returns false, a client + object otherwise. + """ + env = CacheEnvReader() + if is_cache_available(): + log().info("Cache env variables set, building a cache.") + + return QueryCache(env.redis_host(), env.redis_port(), + env.default_ttl()) + + log().info("Cache env variables indicate cache should not be used.") + return None diff --git a/src/cache/querycache.py b/src/cache/querycache.py new file mode 100644 index 00000000..e28f42d1 --- /dev/null +++ b/src/cache/querycache.py @@ -0,0 +1,33 @@ +from datetime import datetime +from cache import rediscache + + +class QueryCache(rediscache.RedisCache): + + default_ttl = 60 + + @staticmethod + def xstr(s): + if s is None: + return '' + return str(s) + + def __init__(self, redis_host, redis_port, default_ttl): + self.default_ttl = default_ttl + super(QueryCache, self).__init__(redis_host, redis_port, 1, False) + + def get(self, tenant_name, key): + return self.redis.hget(':' + self.xstr(tenant_name), key) + + def exists(self, tenant_name, key): + return self.redis.hexists(':' + self.xstr(tenant_name), key) + + def put(self, tenant_name, key, value, ex=None): + self.redis.hset(':' + self.xstr(tenant_name), key, value) + if ex: + # unfortunately redis does not support expiration for single keys + # inside an hset, so we set expiration for the whole hset + self.redis.expire(':' + self.xstr(tenant_name), ex) + + def delete(self, tenant_name, key): + self.redis.hdel(':' + self.xstr(tenant_name), key) diff --git a/src/cache/rediscache.py b/src/cache/rediscache.py new file mode 100644 index 00000000..0c5df2ce --- /dev/null +++ b/src/cache/rediscache.py @@ -0,0 +1,51 @@ +from datetime import datetime + + +class RedisCache(object): + + def __init__(self, redis_host, redis_port, redis_db, decode_responses=True): + import redis + self.redis = redis.StrictRedis( + host=redis_host, + port=redis_port, + db=redis_db, + decode_responses=decode_responses + ) + + def get(self, key): + return self.redis.get(key) + + def put(self, key, value, ex=None): + self.redis.set(key, value, ex=ex) + + def expire(self, key, ex=0): + self.redis.expire(key, ex) + + def delete(self, key): + self.redis.delete(key) + + def flushall(self): + self.redis.flushall() + + def get_health(self): + """ + :return: dictionary with redis service health. ::see:: reporter.health. + """ + import redis + + res = {} + res['time'] = datetime.now().isoformat() + try: + r = self.redis.ping() + except (redis.exceptions.ConnectionError, + redis.exceptions.TimeoutError, + redis.exceptions.RedisError) as e: + res['status'] = 'fail' + res['output'] = "{}".format(e) + else: + if r: + res['status'] = 'pass' + else: + res['status'] = 'warn' + res['output'] = "Redis is not playing ping pong :/" + return res diff --git a/src/cache/tests/__init__.py b/src/cache/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/cache/tests/test_querycache.py b/src/cache/tests/test_querycache.py new file mode 100644 index 00000000..08c52da7 --- /dev/null +++ b/src/cache/tests/test_querycache.py @@ -0,0 +1,57 @@ +import json +from time import sleep + +from conftest import REDIS_HOST, REDIS_PORT +from cache.querycache import QueryCache +import pytest + + +def test_insert(): + cache = QueryCache(REDIS_HOST, REDIS_PORT) + + table_name = "test table" + key1 = "insert/entityType/type" + key2 = "/v2/entities/type" + + value1 = 'string' + value2 = { + 'dic': 2 + } + + cache.put(table_name, key1, value1) + cache.put(table_name, key2, value2) + + assert cache.get(table_name, key1) == str(value1) + assert cache.get(table_name, key2) == str(value2) + + cache.flushall() + + +def test_expire(): + cache = QueryCache(REDIS_HOST, REDIS_PORT) + + table_name = "test table" + key = "insert/entityType/type" + value = 'string' + + cache.put(table_name, key, value, 1) + assert cache.get(table_name, key) == str(value) + sleep(5) + assert cache.get(table_name, key) is None + + cache.flushall() + + +def test_delete(): + cache = QueryCache(REDIS_HOST, REDIS_PORT) + + table_name = "test table" + key = "insert/entityType/type" + value = 'string' + + cache.put(table_name, key, value) + assert cache.get(table_name, key) == str(value) + cache.delete(table_name, key) + assert cache.get(table_name, key) is None + + cache.flushall() diff --git a/src/conftest.py b/src/conftest.py index 2a46792e..e0f2eb49 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -70,6 +70,18 @@ def insert(self, entity, service=None, service_path=None): headers=headers(service, service_path)) return r + def delete(self, entity_id, service=None, service_path=None): + r = requests.delete('{}/v2/entities/{}'.format(self.url, entity_id), + headers=headers(service, service_path)) + return r + + def delete_subscription(self, subscription_id, service=None, + service_path=None): + r = requests.delete( + '{}/v2/subscriptions/{}'.format(self.url, subscription_id), + headers=headers(service, service_path)) + return r + @pytest.fixture def orion_client(): @@ -79,7 +91,8 @@ def orion_client(): def do_clean_crate(): from crate import client - conn = client.connect(["{}:{}".format(CRATE_HOST, CRATE_PORT)], error_trace=True) + conn = client.connect(["{}:{}".format(CRATE_HOST, CRATE_PORT)], + error_trace=True) cursor = conn.cursor() try: @@ -143,6 +156,30 @@ def delete_entities(self, entity_type=None, fiware_service=None, pass return r + def entity_types(self, fiware_service=None, **kwargs): + r = CrateTranslator.query_entity_types(self, entity_type=None, + fiware_service=fiware_service, + **kwargs) + try: + self._refresh(r, fiware_service=fiware_service) + except exceptions.ProgrammingError: + pass + return r + + def clean(self, fiware_service=None, **kwargs): + types = CrateTranslator.query_entity_types(self, + fiware_service=fiware_service, + **kwargs) + if types: + for t in types: + CrateTranslator.drop_table(self, t, + fiware_service=fiware_service, + **kwargs) + try: + self._refresh(types, fiware_service=fiware_service) + except exceptions.ProgrammingError: + pass + with Translator(host=CRATE_HOST, port=CRATE_PORT) as trans: yield trans @@ -169,8 +206,9 @@ def entity(): @pytest.fixture def sameEntityWithDifferentAttrs(): """ - Two updates for the same entity with different attributes. - The first update has temperature and pressure but the second update has only temperature. + Two updates for the same entity with different attributes. The first + update has temperature and pressure but the second update has only + temperature. """ entities = [ { @@ -218,8 +256,9 @@ def sameEntityWithDifferentAttrs(): @pytest.fixture def diffEntityWithDifferentAttrs(): """ - Two updates for the same entity with different attributes. - The first update has temperature and pressure but the second update has only temperature. + Two updates for the same entity with different attributes. The first + update has temperature and pressure but the second update has only + temperature. """ entities = [ { diff --git a/src/geocoding/geocache.py b/src/geocoding/geocache.py index 9215f2b0..b3efa31b 100644 --- a/src/geocoding/geocache.py +++ b/src/geocoding/geocache.py @@ -1,45 +1,11 @@ from datetime import datetime +from cache import rediscache -class GeoCodingCache(object): +class GeoCodingCache(rediscache.RedisCache): def __init__(self, redis_host, redis_port): - import redis - self.redis = redis.StrictRedis( - host=redis_host, - port=redis_port, - db=0, - decode_responses=True - ) - - def get(self, key): - return self.redis.get(key) - - def put(self, key, value): - self.redis.set(key, value) - - def get_health(self): - """ - :return: dictionary with redis service health. ::see:: reporter.health. - """ - import redis - - res = {} - res['time'] = datetime.now().isoformat() - try: - r = self.redis.ping() - except (redis.exceptions.ConnectionError, - redis.exceptions.TimeoutError, - redis.exceptions.RedisError) as e: - res['status'] = 'fail' - res['output'] = "{}".format(e) - else: - if r: - res['status'] = 'pass' - else: - res['status'] = 'warn' - res['output'] = "Redis is not playing ping pong :/" - return res + super(GeoCodingCache, self).__init__(redis_host, redis_port, 0) def temp_geo_cache(host, port): diff --git a/src/geocoding/location.py b/src/geocoding/location.py index c38c9cd3..bc404282 100644 --- a/src/geocoding/location.py +++ b/src/geocoding/location.py @@ -9,6 +9,7 @@ _TYPE_ATTR_NAME = 'type' _VALUE_ATTR_NAME = 'value' _GEOJSON_TYPE = 'geo:json' +_GEOJSON_LD_TYPE = 'GeoProperty' class LocationAttribute: @@ -24,7 +25,8 @@ def geometry_value(self) -> Optional[str]: return self._location.get(_VALUE_ATTR_NAME, None) def is_geojson(self): - return self.geometry_type() == _GEOJSON_TYPE + return self.geometry_type() == _GEOJSON_TYPE or \ + self.geometry_type() == _GEOJSON_LD_TYPE def _compute_geojson_centroid(self): lon_lat = geojson_centroid(self.geometry_value()) diff --git a/src/geocoding/tests/docker-compose.yml b/src/geocoding/tests/docker-compose.yml index 3c81b4cd..70515b5a 100644 --- a/src/geocoding/tests/docker-compose.yml +++ b/src/geocoding/tests/docker-compose.yml @@ -5,14 +5,9 @@ services: image: redis:${REDIS_VERSION} ports: - "6379:6379" - volumes: - - redisdata:/data networks: - geocodingtests -volumes: - redisdata: - networks: geocodingtests: driver: bridge diff --git a/src/reporter/delete.py b/src/reporter/delete.py index 03d40c0b..82f7a206 100644 --- a/src/reporter/delete.py +++ b/src/reporter/delete.py @@ -1,6 +1,7 @@ from exceptions.exceptions import AmbiguousNGSIIdError from .httputil import fiware_s, fiware_sp from translators.factory import translator_for +import logging def delete_entity(entity_id, type_=None, from_date=None, to_date=None): @@ -18,6 +19,7 @@ def delete_entity(entity_id, type_=None, from_date=None, to_date=None): "description": str(e) }, 409 + logging.getLogger(__name__).info("deleted {} entities".format(deleted)) if deleted == 0: r = { "error": "Not Found", @@ -34,6 +36,7 @@ def delete_entities(entity_type, from_date=None, to_date=None, with translator_for(fiware_s()) as trans: if drop_table: trans.drop_table(etype=entity_type, fiware_service=fiware_s()) + logging.getLogger(__name__).info("dropped entity_type {}".format(entity_type)) return 'entity table dropped', 204 deleted = trans.delete_entities(etype=entity_type, @@ -41,6 +44,8 @@ def delete_entities(entity_type, from_date=None, to_date=None, to_date=to_date, fiware_service=fiware_s(), fiware_servicepath=fiware_sp(),) + + logging.getLogger(__name__).info("deleted {} entities of type {}".format(deleted, entity_type)) if deleted == 0: r = { "error": "Not Found", diff --git a/src/reporter/health.py b/src/reporter/health.py index bf36c26f..343070ff 100644 --- a/src/reporter/health.py +++ b/src/reporter/health.py @@ -2,6 +2,7 @@ from geocoding import geocoding from geocoding.factory import get_geo_cache, is_geo_coding_available +from cache.factory import get_cache, is_cache_available # TODO having now multiple backends, health check needs update @@ -16,14 +17,14 @@ def check_crate(): return crate_health -def check_geocache(): +def check_cache(): """ - Geocache is relevant only when geocoding usage is enabled. + Cache check. """ - if not is_geo_coding_available(): + if not is_cache_available(): return {'status': 'pass'} - cache = get_geo_cache() + cache = get_cache() return cache.get_health() @@ -68,7 +69,7 @@ def get_health(with_geocoder=False): res.setdefault('details', {})['crateDB'] = 'cannot reach crate' # Check geocache (critical) - health = check_geocache() + health = check_cache() if health['status'] != 'pass': res.setdefault('details', {})['redis'] = health if health['status'] == 'fail' or res['status'] == 'pass': diff --git a/src/reporter/query_1T1E1A.py b/src/reporter/query_1T1E1A.py index 492bafc8..2a16552b 100644 --- a/src/reporter/query_1T1E1A.py +++ b/src/reporter/query_1T1E1A.py @@ -54,6 +54,8 @@ def query_1T1E1A(attr_name, # In Path fiware_servicepath=fiware_sp, geo_query=geo_query) except NGSIUsageError as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -62,7 +64,7 @@ def query_1T1E1A(attr_name, # In Path except Exception as e: # Temp workaround to debug test_not_found msg = "Something went wrong with QL. Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 if entities: @@ -78,12 +80,14 @@ def query_1T1E1A(attr_name, # In Path 'index': index, 'values': matched_attr['values'] if matched_attr else [] } + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_1T1ENA.py b/src/reporter/query_1T1ENA.py index 42a2339f..fd6b5375 100644 --- a/src/reporter/query_1T1ENA.py +++ b/src/reporter/query_1T1ENA.py @@ -56,12 +56,16 @@ def query_1T1ENA(entity_id, # In Path fiware_servicepath=fiware_sp, geo_query=geo_query) except NGSIUsageError as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) }, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -70,7 +74,7 @@ def query_1T1ENA(entity_id, # In Path except Exception as e: # Temp workaround to debug test_not_found msg = "Something went wrong with QL. Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 if entities: @@ -93,12 +97,14 @@ def query_1T1ENA(entity_id, # In Path 'index': index, 'attributes': attributes } + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_1TNE1A.py b/src/reporter/query_1TNE1A.py index 609b2aac..1be95098 100644 --- a/src/reporter/query_1TNE1A.py +++ b/src/reporter/query_1TNE1A.py @@ -62,12 +62,16 @@ def query_1TNE1A(attr_name, # In Path fiware_servicepath=fiware_sp, geo_query=geo_query) except NGSIUsageError as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) }, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -76,7 +80,7 @@ def query_1TNE1A(attr_name, # In Path except Exception as e: # Temp workaround to debug test_not_found msg = "Something went wrong with QL. Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 if entities: @@ -88,12 +92,14 @@ def query_1TNE1A(attr_name, # In Path aggr_period, from_date, to_date,) + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_1TNENA.py b/src/reporter/query_1TNENA.py index 9dfe3615..3dc8f93f 100644 --- a/src/reporter/query_1TNENA.py +++ b/src/reporter/query_1TNENA.py @@ -67,12 +67,16 @@ def query_1TNENA(entity_type=None, # In Path fiware_servicepath=fiware_sp, geo_query=geo_query) except NGSIUsageError as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) }, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -80,7 +84,7 @@ def query_1TNENA(entity_type=None, # In Path except Exception as e: msg = "Something went wrong with QL. Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 if entities: @@ -92,12 +96,14 @@ def query_1TNENA(entity_type=None, # In Path aggr_period, from_date, to_date,) + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_NTNE.py b/src/reporter/query_NTNE.py index 47c31b95..98c08428 100644 --- a/src/reporter/query_NTNE.py +++ b/src/reporter/query_NTNE.py @@ -1,4 +1,4 @@ -from exceptions.exceptions import AmbiguousNGSIIdError, InvalidParameterValue +from exceptions.exceptions import AmbiguousNGSIIdError, InvalidParameterValue, NGSIUsageError from flask import request from reporter.reporter import _validate_query_params from translators.factory import translator_for @@ -30,10 +30,12 @@ def query_NTNE(limit=10000, fiware_servicepath=fiware_sp) except NGSIUsageError as e: msg = "Bad Request Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -41,19 +43,20 @@ def query_NTNE(limit=10000, except Exception as e: msg = "Internal server Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 if entities: res = [] for entity in entities: res.append(entity) - + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_NTNE1A.py b/src/reporter/query_NTNE1A.py index 72adafac..082462c8 100644 --- a/src/reporter/query_NTNE1A.py +++ b/src/reporter/query_NTNE1A.py @@ -63,10 +63,12 @@ def query_NTNE1A(attr_name, # In Path geo_query=geo_query) except NGSIUsageError as e: msg = "Bad Request Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -74,7 +76,7 @@ def query_NTNE1A(attr_name, # In Path except Exception as e: msg = "Internal server Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 attributes = [] entries = [] @@ -123,11 +125,13 @@ def query_NTNE1A(attr_name, # In Path 'attrName': attr_name, 'types': entity_type } + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/query_NTNENA.py b/src/reporter/query_NTNENA.py index f432e251..17fd7b9a 100644 --- a/src/reporter/query_NTNENA.py +++ b/src/reporter/query_NTNENA.py @@ -66,10 +66,12 @@ def query_NTNENA(id_=None, # In Query geo_query=geo_query) except NGSIUsageError as e: msg = "Bad Request Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 400 except InvalidParameterValue as e: + msg = "Bad Request Error: {}".format(e) + logging.getLogger(__name__).error(msg, exc_info=True) return { "error": "{}".format(type(e)), "description": str(e) @@ -77,7 +79,7 @@ def query_NTNENA(id_=None, # In Query except Exception as e: msg = "Something went wrong with QL. Error: {}".format(e) - logging.getLogger().error(msg, exc_info=True) + logging.getLogger(__name__).error(msg, exc_info=True) return msg, 500 attributes = [] @@ -139,11 +141,13 @@ def query_NTNENA(id_=None, # In Query res = { 'attrs': attrs_values } + logging.getLogger(__name__).info("Query processed successfully") return res r = { "error": "Not Found", "description": "No records were found for such query." } + logging.getLogger(__name__).info("No value found for query") return r, 404 diff --git a/src/reporter/reporter.py b/src/reporter/reporter.py index 40014739..ec4aefd0 100644 --- a/src/reporter/reporter.py +++ b/src/reporter/reporter.py @@ -27,7 +27,7 @@ I.e, QL must be told where orion is. """ -from flask import request +from flask import has_request_context, request from geocoding import geocoding from geocoding.factory import get_geo_cache, is_geo_coding_available from requests import RequestException @@ -40,22 +40,13 @@ from reporter.subscription_builder import build_subscription from reporter.timex import select_time_index_value_as_iso, \ TIME_INDEX_HEADER_NAME -from geocoding.location import normalize_location -from utils.cfgreader import EnvReader, StrVar +from geocoding.location import normalize_location, LOCATION_ATTR_NAME from exceptions.exceptions import AmbiguousNGSIIdError, UnsupportedOption, \ NGSIUsageError, InvalidParameterValue, InvalidHeaderValue - def log(): - r = EnvReader(log=logging.getLogger(__name__).info) - level = r.read(StrVar('LOGLEVEL', 'INFO')).upper() - - logging.basicConfig(level=level, - format='%(asctime)s.%(msecs)03d ' - '%(levelname)s:%(name)s:%(message)s ' - 'Thread ID: [%(thread)d] Process ID: [%(process)d]', - datefmt='%Y-%m-%d %I:%M:%S') - return logging.getLogger(__name__) + logger = logging.getLogger(__name__) + return logger def is_text(attr_type): @@ -103,19 +94,20 @@ def _validate_payload(payload): # (i.e, the changed value) attrs = list(iter_entity_attrs(payload)) if len(attrs) == 0: - log().warning("Received notification containing an entity update without attributes " + - "other than 'type' and 'id'") + log().warning("Received notification containing an entity update " + "without attributes other than 'type' and 'id'") # Attributes should have a value and the modification time for attr in attrs: if not has_value(payload, attr): payload[attr].update({'value': None}) log().warning( - 'An entity update is missing value for attribute {}'.format(attr)) + 'An entity update is missing value ' + 'for attribute {}'.format(attr)) def _filter_empty_entities(payload): - log().debug('Received payload: {}'.format(payload)) + log().debug('Received payload') attrs = list(iter_entity_attrs(payload)) empty = False attrs.remove('time_index') @@ -160,17 +152,23 @@ def notify(): # Validate entity update error = _validate_payload(entity) if error: - # TODO in this way we return error for even if only one entity is wrong + # TODO in this way we return error for even if only one entity + # is wrong return error, 400 # Add TIME_INDEX attribute custom_index = request.headers.get(TIME_INDEX_HEADER_NAME, None) entity[TIME_INDEX_NAME] = \ select_time_index_value_as_iso(custom_index, entity) # Add GEO-DATE if enabled - add_geodata(entity) + if not entity.get(LOCATION_ATTR_NAME, None): + add_geodata(entity) # Always normalize location if there's one normalize_location(entity) + # Get FIWARE CORRELATOR - if any + fiware_c = request.headers.get('fiware_correlator', None) + # Get Remote address + remote_addr = request.remote_addr # Define FIWARE tenant fiware_s = request.headers.get('fiware-service', None) # It seems orion always sends a 'Fiware-Servicepath' header with value '/' @@ -196,18 +194,15 @@ def notify(): with translator_for(fiware_s) as trans: trans.insert(payload, fiware_s, fiware_sp) except Exception as e: - msg = "Notification not processed or not updated for payload: %s. " \ - "%s" % (payload, str(e)) - log().error(msg) + msg = "Notification not processed or not updated: {}" + log().error(msg.format(e), exc_info=True) error_code = 500 if e.__class__ == InvalidHeaderValue or \ e.__class__ == InvalidParameterValue or \ e.__class__ == NGSIUsageError: error_code = 400 return msg, error_code - msg = "Notification successfully processed for : 'tenant' %s, " \ - "'fiwareServicePath' %s, " \ - "'entity_id' %s" % (fiware_s, fiware_sp, entity_id) + msg = "Notification successfully processed" log().info(msg) return msg @@ -218,6 +213,7 @@ def add_geodata(entity): geocoding.add_location(entity, cache=cache) +# TODO check that following methods can be removed def query_1TNENA(): r = { "error": "Not Implemented", diff --git a/src/reporter/tests/test_incomplete_entities.py b/src/reporter/tests/test_incomplete_entities.py index 402e3782..aa3795da 100644 --- a/src/reporter/tests/test_incomplete_entities.py +++ b/src/reporter/tests/test_incomplete_entities.py @@ -1,11 +1,12 @@ import requests import time from conftest import QL_URL -from .utils import send_notifications +from .utils import send_notifications, delete_entity_type + +service = '' def notify(entity): - service = '' notification_data = [{'data': [entity]}] send_notifications(service, notification_data) @@ -75,6 +76,7 @@ def test_can_add_new_attribute(): assert len(attr_values_map) == 2 assert attr_values_map['a1'] == [a1_value, a1_value] assert attr_values_map['a2'] == [None, a2_value] + delete_entity_type(service, 't1') def test_can_add_new_attribute_even_without_specifying_old_ones(): @@ -104,6 +106,42 @@ def test_can_add_new_attribute_even_without_specifying_old_ones(): assert len(attr_values_map) == 2 assert attr_values_map['a1'] == [a1_value, None] assert attr_values_map['a2'] == [None, a2_value] + delete_entity_type(service, 'u1') + +def test_can_add_2_new_attribute_even_without_specifying_old_ones(): + a1_value = 123.0 + entity_1 = { + 'id': 'u1:1', + 'type': 'u1', + 'a1': { + 'type': 'Number', + 'value': a1_value + } + } + notify(entity_1) + + a2_value = 'new attribute initial value' + a3_value = True + entity_2 = { + 'id': 'u1:1', + 'type': 'u1', + 'a2': { + 'type': 'Text', + 'value': a2_value + }, + 'a3': { + 'type': 'Boolean', + 'value': a3_value + } + } + notify(entity_2) + + attr_values_map = get_all_stored_attributes(entity_1['id']) + assert len(attr_values_map) == 3 + assert attr_values_map['a1'] == [a1_value, None] + assert attr_values_map['a2'] == [None, a2_value] + assert attr_values_map['a3'] == [None, a3_value] + delete_entity_type(service, 'u1') def test_store_missing_text_value_as_null(): @@ -123,6 +161,7 @@ def test_store_missing_text_value_as_null(): attr_values_map = get_all_stored_attributes(entity['id']) assert len(attr_values_map) == 2 assert attr_values_map['x'] == [None] + delete_entity_type(service, 't2') def test_store_missing_text_value_as_null_then_as_empty(): @@ -145,6 +184,7 @@ def test_store_missing_text_value_as_null_then_as_empty(): attr_values_map = get_all_stored_attributes(entity['id']) assert len(attr_values_map) == 2 assert attr_values_map['x'] == [None, ''] + delete_entity_type(service, 't3') def test_store_null_text_value_as_null(): @@ -165,6 +205,7 @@ def test_store_null_text_value_as_null(): attr_values_map = get_all_stored_attributes(entity['id']) assert len(attr_values_map) == 2 assert attr_values_map['x'] == [None] + delete_entity_type(service, 't4') def test_store_null_numeric_value_as_null(): @@ -185,6 +226,7 @@ def test_store_null_numeric_value_as_null(): attr_values_map = get_all_stored_attributes(entity['id']) assert len(attr_values_map) == 2 assert attr_values_map['x'] == [None] + delete_entity_type(service, 't5') def test_store_empty_numeric_value_as_null(): @@ -205,5 +247,4 @@ def test_store_empty_numeric_value_as_null(): attr_values_map = get_all_stored_attributes(entity['id']) assert len(attr_values_map) == 2 assert attr_values_map['x'] == [None] - - + delete_entity_type(service, 't6') diff --git a/src/reporter/tests/test_integration.py b/src/reporter/tests/test_integration.py index 57017af1..9099be3d 100644 --- a/src/reporter/tests/test_integration.py +++ b/src/reporter/tests/test_integration.py @@ -1,9 +1,10 @@ -from conftest import QL_URL, ORION_URL, entity, clean_mongo, clean_crate +from conftest import QL_URL, ORION_URL, entity, clean_mongo import json import time import requests +from .utils import delete_entity_type -def test_integration(entity, clean_mongo, clean_crate): +def test_integration(entity, clean_mongo): # Subscribe QL to Orion params = { 'orionUrl': ORION_URL, @@ -55,9 +56,10 @@ def test_integration(entity, clean_mongo, clean_crate): assert set(pressures).issubset(set([720.0, 721.0, 722.0, 723.0])) temperatures = data['attributes'][1]['values'] assert set(temperatures).issubset(set([24.2, 25.2, 26.2, 27.2])) + delete_entity_type(None, entity['type']) -def test_integration_custom_index(entity, clean_mongo, clean_crate): +def test_integration_custom_index(entity, clean_mongo): # Subscribe QL to Orion params = { 'orionUrl': ORION_URL, @@ -110,3 +112,4 @@ def test_integration_custom_index(entity, clean_mongo, clean_crate): # Note some notifications may have been lost assert data['attributes'][0]['values'] == data['index'] assert len(data['index']) > 1 + delete_entity_type(None, entity['type']) diff --git a/src/reporter/tests/test_notify.py b/src/reporter/tests/test_notify.py index 049bcbbf..3a1679ef 100644 --- a/src/reporter/tests/test_notify.py +++ b/src/reporter/tests/test_notify.py @@ -45,6 +45,7 @@ def headers(service=None, service_path=None, content_type=True): return h + @pytest.mark.parametrize("service", services) def test_invalid_no_body(service): r = requests.post('{}'.format(notify_url), @@ -221,7 +222,6 @@ def test_air_quality_observed(air_quality_observed, orion_client): do_integration(entity, subscription, orion_client) - def test_integration_multiple_entities(diffEntityWithDifferentAttrs, orion_client): """ Test Reporter using input directly from an Orion notification and output @@ -541,7 +541,7 @@ def test_no_value_no_type_for_attributes(service, notification): 'type': 'Room', 'odour': {'type': 'Text', 'value': 'Good', 'metadata': {}}, 'temperature': {'metadata': {}}, - 'pressure': {'type': 'Number', 'value': '26', 'metadata': {}}, + 'pressure': {'type': 'Number', 'value': 26, 'metadata': {}}, } url = '{}'.format(notify_url) get_url = "{}/entities/Room1/attrs/temperature/value".format(QL_URL) @@ -563,7 +563,7 @@ def test_no_value_no_type_for_attributes(service, notification): notification['data'][0] = { 'id': 'Room1', 'type': 'Room', - 'temperature': {'type': 'Number', 'value': '25', 'metadata': {}} + 'temperature': {'type': 'Number', 'value': 25, 'metadata': {}} } url = '{}'.format(notify_url) get_url = "{}/entities/Room1/attrs/temperature/value".format(QL_URL) @@ -574,7 +574,7 @@ def test_no_value_no_type_for_attributes(service, notification): time.sleep(SLEEP_TIME) res_get = requests.get(url_new, headers=query_header(service)) assert res_get.status_code == 200 - assert res_get.json()['values'][1] == '25' + assert res_get.json()['values'][1] == 25 delete_entity_type(service, notification['data'][0]['type']) @@ -585,8 +585,8 @@ def test_with_value_no_type_for_attributes(service, notification): 'id': 'Kitchen1', 'type': 'Kitchen', 'odour': {'type': 'Text', 'value': 'Good', 'metadata': {}}, - 'temperature': {'value': '25', 'metadata': {}}, - 'pressure': {'type': 'Number', 'value': '26', 'metadata': {}}, + 'temperature': {'value': 25, 'metadata': {}}, + 'pressure': {'type': 'Number', 'value': 26, 'metadata': {}}, } url = '{}'.format(notify_url) get_url = "{}/entities/Kitchen1/attrs/temperature/value".format(QL_URL) @@ -597,7 +597,7 @@ def test_with_value_no_type_for_attributes(service, notification): time.sleep(SLEEP_TIME) res_get = requests.get(url_new, headers=query_header(service)) assert res_get.status_code == 200 - assert res_get.json()['values'][0] == '25' + assert res_get.json()['values'][0] == 25 delete_entity_type(service, notification['data'][0]['type']) @@ -609,7 +609,7 @@ def test_no_value_with_type_for_attributes(service, notification): 'type': 'Hall', 'odour': {'type': 'Text', 'value': 'Good', 'metadata': {}}, 'temperature': {'type': 'Number', 'metadata': {}}, - 'pressure': {'type': 'Number', 'value': '26', 'metadata': {}}, + 'pressure': {'type': 'Number', 'value': 26, 'metadata': {}}, } url = '{}'.format(notify_url) get_url = "{}/entities/Hall1/attrs/temperature/value".format(QL_URL) @@ -621,4 +621,112 @@ def test_no_value_with_type_for_attributes(service, notification): res_get = requests.get(url_new, headers=query_header(service)) assert res_get.status_code == 200 assert res_get.json()['values'][0] == None + delete_entity_type(service, notification['data'][0]['type']) + + +@pytest.mark.parametrize("service", services) +def test_issue_382(service, notification): + # entity with one Null value and no type + notification['data'][0] = { + "id": "urn:ngsi-ld:Test:0002", + "type": "Test", + "errorNumber": { + "type": "Integer", + "value": 2 + }, + "refVacuumPump": { + "type": "Relationship", + "value": "urn:ngsi-ld:VacuumPump:FlexEdgePump" + }, + "refOutgoingPallet": { + "type": "Array", + "value": [ + "urn:ngsi-ld:Pallet:0003", + "urn:ngsi-ld:Pallet:0004" + ] + } + } + url = '{}'.format(notify_url) + get_url = "{}/entities/urn:ngsi-ld:Test:0002/attrs/errorNumber/value".format(QL_URL) + url_new = '{}'.format(get_url) + r = requests.post(url, data=json.dumps(notification), headers=notify_header(service)) + assert r.status_code == 200 + # Give time for notification to be processed + time.sleep(SLEEP_TIME) + res_get = requests.get(url_new, headers=query_header(service)) + assert res_get.status_code == 200 + assert res_get.json()['values'][0] == 2 + delete_entity_type(service, notification['data'][0]['type']) + +@pytest.mark.parametrize("service", services) +def test_json_ld(service, notification): + # entity with one Null value and no type + notification['data'][0] = { + "id": "urn:ngsi-ld:Streetlight:streetlight:guadalajara:4567", + "type": "Streetlight", + "location": { + "type": "GeoProperty", + "value": { + "type": "Point", + "coordinates": [-3.164485591715449, 40.62785133667262] + } + }, + "areaServed": { + "type": "Property", + "value": "Roundabouts city entrance" + }, + "status": { + "type": "Property", + "value": "ok" + }, + "refStreetlightGroup": { + "type": "Relationship", + "object": "urn:ngsi-ld:StreetlightGroup:streetlightgroup:G345" + }, + "refStreetlightModel": { + "type": "Relationship", + "object": "urn:ngsi-ld:StreetlightModel:streetlightmodel:STEEL_Tubular_10m" + }, + "circuit": { + "type": "Property", + "value": "C-456-A467" + }, + "lanternHeight": { + "type": "Property", + "value": 10 + }, + "locationCategory": { + "type": "Property", + "value": "centralIsland" + }, + "powerState": { + "type": "Property", + "value": "off" + }, + "controllingMethod": { + "type": "Property", + "value": "individual" + }, + "dateLastLampChange": { + "type": "Property", + "value": { + "@type": "DateTime", + "@value": "2016-07-08T08:02:21.753Z" + } + }, + "@context": [ + "https://schema.lab.fiware.org/ld/context", + "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld" + ] + } + url = '{}'.format(notify_url) + get_url = "{}/entities/urn:ngsi-ld:Streetlight:streetlight:guadalajara:4567/attrs/lanternHeight/value".format(QL_URL) + url_new = '{}'.format(get_url) + r = requests.post(url, data=json.dumps(notification), headers=notify_header(service)) + assert r.status_code == 200 + # Give time for notification to be processed + time.sleep(SLEEP_TIME) + res_get = requests.get(url_new, headers=query_header(service)) + assert res_get.status_code == 200 + assert res_get.json()['values'][0] == 10 delete_entity_type(service, notification['data'][0]['type']) \ No newline at end of file diff --git a/src/reporter/tests/test_time_format.py b/src/reporter/tests/test_time_format.py index 0394911c..0e27ec3b 100644 --- a/src/reporter/tests/test_time_format.py +++ b/src/reporter/tests/test_time_format.py @@ -1,7 +1,7 @@ from conftest import crate_translator as translator from reporter.tests.test_1T1E1A import query_url as query_1T1E1A, \ assert_1T1E1A_response -from reporter.tests.utils import get_notification, send_notifications +from reporter.tests.utils import get_notification, send_notifications, delete_entity_type import requests import time @@ -29,6 +29,7 @@ def check_time_index(input_index, expected_index=None): 'values': [0, 1, 2] } assert_1T1E1A_response(obtained, expected) + delete_entity_type('', 'Room') def test_index_iso(translator): diff --git a/src/reporter/tests/utils.py b/src/reporter/tests/utils.py index 02faa1ce..fa734c18 100644 --- a/src/reporter/tests/utils.py +++ b/src/reporter/tests/utils.py @@ -44,9 +44,10 @@ def get_notification(et, ei, attr_value, mod_value): def send_notifications(service, notifications): assert isinstance(notifications, list) + h = {'Content-Type': 'application/json'} + if service: + h['Fiware-Service'] = service for n in notifications: - h = {'Content-Type': 'application/json', - 'Fiware-Service': service} r = requests.post(notify_url(), data=json.dumps(n), headers=h) assert r.ok diff --git a/src/server/gconfig.py b/src/server/gconfig.py index 3b55429b..c7d2a0cd 100644 --- a/src/server/gconfig.py +++ b/src/server/gconfig.py @@ -59,6 +59,8 @@ # We did some initial quick & dirty benchmarking to get these results. # We'll likely have to measure better and also understand better the # way the various Gunicorn worker types actually work. (Pun intended.) +# IMPORTANT: current implementation of ConnectionManager is not thread safe, +# keep this value to 1. threads = 1 @@ -66,7 +68,7 @@ # Logging config section. # -loglevel = 'debug' +loglevel = 'error' # TODO: other settings. diff --git a/src/server/wsgi.py b/src/server/wsgi.py index 07fd3c41..fb2a4f92 100644 --- a/src/server/wsgi.py +++ b/src/server/wsgi.py @@ -1,5 +1,5 @@ from connexion import FlaskApp - +import logging import server @@ -16,6 +16,9 @@ def new_wrapper() -> FlaskApp: """ wrapper = FlaskApp(__name__, specification_dir=SPEC_DIR) + log = logging.getLogger('werkzeug') + log.setLevel(logging.ERROR) + wrapper.add_api(SPEC, arguments={'title': 'QuantumLeap V2 API'}, pythonic_params=True, diff --git a/src/tests/docker-compose.yml b/src/tests/docker-compose.yml index c41f178d..7f72dab3 100644 --- a/src/tests/docker-compose.yml +++ b/src/tests/docker-compose.yml @@ -32,10 +32,10 @@ services: - crate environment: - CRATE_HOST=${CRATE_HOST:-crate} - - USE_GEOCODING=True + - USE_GEOCODING=False - REDIS_HOST=redis - REDIS_PORT=6379 - - LOGLEVEL=ERROR + - LOGLEVEL=INFO crate: image: crate:${CRATE_VERSION:-4.1.4} diff --git a/src/tests/notify-load-test.js b/src/tests/notify-load-test.js index 3cae34c4..c3315a7a 100644 --- a/src/tests/notify-load-test.js +++ b/src/tests/notify-load-test.js @@ -12,8 +12,8 @@ export default function() { "id": "Room:1", "type": "Room", "temperature": { - "value": 23, - "type": "Float" + "value": 23.3, + "type": "Number" }, "pressure": { "value": 720, diff --git a/src/translators/config.py b/src/translators/config.py index d41df340..29684c9b 100644 --- a/src/translators/config.py +++ b/src/translators/config.py @@ -15,7 +15,7 @@ class SQLTranslatorConfig: def __init__(self, env: dict = os.environ): self.store = EnvReader(var_store=env, - log=logging.getLogger(__name__).info) + log=logging.getLogger(__name__).debug) def default_limit(self) -> int: fallback_limit = 10000 diff --git a/src/translators/crate.py b/src/translators/crate.py index 2f1d32d5..c55f8461 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -7,18 +7,17 @@ from geocoding.slf.querytypes import SlfQuery from translators import sql_translator from translators.sql_translator import NGSI_ISO8601, NGSI_DATETIME, \ - NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, TIME_INDEX, \ - METADATA_TABLE_NAME, FIWARE_SERVICEPATH + NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ + NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH import logging from .crate_geo_query import from_ngsi_query from utils.cfgreader import EnvReader, StrVar, IntVar - +from utils.connection_manager import ConnectionManager # CRATE TYPES # https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html CRATE_ARRAY_STR = 'array(string)' - # Translation NGSI_TO_SQL = { "Array": CRATE_ARRAY_STR, # TODO #36: Support numeric arrays @@ -30,6 +29,7 @@ NGSI_DATETIME: 'timestamp', "Integer": 'long', NGSI_GEOJSON: 'geo_shape', + NGSI_LD_GEOMETRY: 'geo_shape', NGSI_GEOPOINT: 'geo_point', "Number": 'float', NGSI_TEXT: 'string', @@ -37,26 +37,33 @@ TIME_INDEX: 'timestamp' } - -CRATE_TO_NGSI = dict((v, k) for (k,v) in NGSI_TO_SQL.items()) +CRATE_TO_NGSI = dict((v, k) for (k, v) in NGSI_TO_SQL.items()) CRATE_TO_NGSI['string_array'] = 'Array' class CrateTranslator(sql_translator.SQLTranslator): - - NGSI_TO_SQL = NGSI_TO_SQL - def __init__(self, host, port=4200, db_name="ngsi-tsdb"): super(CrateTranslator, self).__init__(host, port, db_name) self.logger = logging.getLogger(__name__) - + self.ccm = None + self.connection = None + self.cursor = None def setup(self): url = "{}:{}".format(self.host, self.port) - self.conn = client.connect([url], error_trace=True) - self.cursor = self.conn.cursor() + self.ccm = ConnectionManager() + self.connection = self.ccm.get_connection('crate') + if self.connection is None: + try: + self.connection = client.connect([url], error_trace=True) + self.ccm.set_connection('crate', self.connection) + except Exception as e: + self.logger.warning(str(e), exc_info=True) + raise e + + self.cursor = self.connection.cursor() # TODO this reduce queries to crate, # but only within a single API call to QUANTUMLEAP # we need to think if we want to cache this information @@ -76,16 +83,19 @@ def setup(self): NGSI_TO_SQL["Number"] = 'real' NGSI_TO_SQL[NGSI_TEXT] = 'text' - def dispose(self): + super(CrateTranslator, self).dispose() self.cursor.close() - self.conn.close() + def sql_error_handler(self, exception): + return def get_db_version(self): - self.cursor.execute("select version['number'] from sys.nodes") - return self.cursor.fetchall()[0][0] - + stmt = "select version['number'] from sys.nodes" + res = self._execute_query_via_cache("crate", + "dbversion", + stmt, None, 6000) + return res[0][0] def get_health(self): """ @@ -120,8 +130,7 @@ def get_health(self): return health - - def _preprocess_values(self, e, table, col_names, fiware_servicepath): + def _preprocess_values(self, e, original_attrs, col_names, fiware_servicepath): values = [] for cn in col_names: if cn == 'entity_type': @@ -135,24 +144,45 @@ def _preprocess_values(self, e, table, col_names, fiware_servicepath): else: # Normal attributes try: - if 'type' in e[cn] and e[cn]['type'] == 'geo:point': - lat, lon = e[cn]['value'].split(',') - values.append([float(lon), float(lat)]) - else: - values.append(e[cn]['value']) + attr = original_attrs[cn][0] + value = e[attr].get('value', None) + if 'type' in e[attr] and e[attr]['type'] == 'geo:point': + lat, lon = e[attr]['value'].split(',') + value = [float(lon), float(lat)] + elif 'type' in e[attr] and e[attr]['type'] == 'Property' \ + and 'value' in e[attr] \ + and isinstance(e[attr]['value'], dict) \ + and '@type' in e[attr]['value'] \ + and e[attr]['value']['@type'] == 'DateTime': + value = e[attr]['value']['@value'] + elif 'type' in e[attr] and e[attr]['type'] == 'Relationship': + value = e[attr].get('value', None) or \ + e[attr].get('object', None) + + values.append(value) except KeyError: - # this entity update does not have a value for the column so use None which will be inserted as NULL to the db. - values.append( None ) + # this entity update does not have a value for the column + # so use None which will be inserted as NULL to the db. + values.append(None) return values - def _prepare_data_table(self, table_name, table, fiware_service): + def _create_data_table(self, table_name, table, fiware_service): columns = ', '.join('"{}" {}'.format(cn.lower(), ct) for cn, ct in table.items()) stmt = "create table if not exists {} ({}) with " \ - "(number_of_replicas = '2-all', column_policy = 'dynamic')".format(table_name, columns) + "(number_of_replicas = '2-all', " \ + "column_policy = 'strict')".format(table_name, columns) self.cursor.execute(stmt) - def _should_insert_original_entities(self, insert_error: Exception) -> bool: + def _update_data_table(self, table_name, new_columns, fiware_service): + #crate allows to add only one column for alter command! + for cn in new_columns: + alt_cols = 'add column "{}" {}'.format(cn.lower(), new_columns[cn]) + stmt = "alter table {} {};".format(table_name, alt_cols) + self.cursor.execute(stmt) + + def _should_insert_original_entities(self, + insert_error: Exception) -> bool: return isinstance(insert_error, exceptions.ProgrammingError) def _create_metadata_table(self): @@ -162,18 +192,20 @@ def _create_metadata_table(self): op = stmt.format(METADATA_TABLE_NAME) self.cursor.execute(op) - def _store_medatata(self, table_name, persisted_metadata): + def _store_metadata(self, table_name, persisted_metadata): major = int(self.db_version.split('.')[0]) if (major <= 3): stmt = "insert into {} (table_name, entity_attrs) values (?,?) " \ - "on duplicate key update entity_attrs = values(entity_attrs)" + "on duplicate key " \ + "update entity_attrs = values(entity_attrs)" else: stmt = "insert into {} (table_name, entity_attrs) values (?,?) " \ - "on conflict(table_name) DO UPDATE SET entity_attrs = excluded.entity_attrs" + "on conflict(table_name) " \ + "DO UPDATE SET entity_attrs = excluded.entity_attrs" stmt = stmt.format(METADATA_TABLE_NAME) self.cursor.execute(stmt, (table_name, persisted_metadata)) - def _compute_type(self, attr_t, attr): + def _compute_db_specific_type(self, attr_t, attr): """ Github issue 44: Disable indexing for long string """ @@ -236,7 +268,7 @@ def _db_value_to_ngsi(self, db_value: Any, ngsi_type: str) -> Any: @contextmanager def CrateTranslatorInstance(): - r = EnvReader(log=logging.getLogger(__name__).info) + r = EnvReader(log=logging.getLogger(__name__).debug) db_host = r.read(StrVar('CRATE_HOST', 'crate')) db_port = r.read(IntVar('CRATE_PORT', 4200)) db_name = "ngsi-tsdb" diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index ba14b114..98ee68ea 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -12,12 +12,18 @@ import dateutil.parser from typing import Any, List, Optional, Sequence from uuid import uuid4 +import pickle +import types + +from cache.factory import get_cache, is_cache_available +from utils.connection_manager import Borg # NGSI TYPES # Based on Orion output because official docs don't say much about these :( NGSI_DATETIME = 'DateTime' NGSI_ID = 'id' NGSI_GEOJSON = 'geo:json' +NGSI_LD_GEOMETRY = 'GeoProperty' NGSI_GEOPOINT = 'geo:point' NGSI_ISO8601 = 'ISO8601' NGSI_STRUCTURED_VALUE = 'StructuredValue' @@ -50,6 +56,7 @@ "Integer": 'bigint', # NOT all databases supports geometry NGSI_GEOJSON: 'text', + NGSI_LD_GEOMETRY: 'text', SlfPoint.ngsi_type(): 'text', SlfLine.ngsi_type(): 'text', SlfPolygon.ngsi_type(): 'text', @@ -102,10 +109,35 @@ def entity_type(entity: dict) -> Optional[str]: # many calls on each insert! Perhaps we should come up with a more efficient # design or at least consider stored procs. +# Recent changes reduced number of queries via caching. +# Regarding SQLAlchemy, investigations showed it does not support +# geographic types for Crate. + class SQLTranslator(base_translator.BaseTranslator): NGSI_TO_SQL = NGSI_TO_SQL config = SQLTranslatorConfig() + start_time = None + + def __init__(self, host, port, db_name): + super(SQLTranslator, self).__init__(host, port, db_name) + qcm = QueryCacheManager() + self.cache = qcm.get_query_cache() + self.default_ttl = None + if self.cache: + self.default_ttl = self.cache.default_ttl + self.start_time = datetime.now() + + def dispose(self): + dt = datetime.now() - self.start_time + time_difference = ( + dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 + self.logger.debug("Translation completed | time={} msec".format( + str(time_difference))) + + def sql_error_handler(self, exception): + raise NotImplementedError + def _refresh(self, entity_types, fiware_service=None): """ Used for testing purposes only! @@ -117,7 +149,10 @@ def _refresh(self, entity_types, fiware_service=None): table_names.append(METADATA_TABLE_NAME) self.cursor.execute("refresh table {}".format(','.join(table_names))) - def _prepare_data_table(self, table_name, table, fiware_service): + def _create_data_table(self, table_name, table, fiware_service): + raise NotImplementedError + + def _update_data_table(self, table_name, new_columns, fiware_service): raise NotImplementedError def _create_metadata_table(self): @@ -256,63 +291,62 @@ def _insert_entities_of_type(self, } for e in entities: + entity_id = e.get('id') for attr in iter_entity_attrs(e): if attr == self.TIME_INDEX_NAME: continue - if isinstance(e[attr], dict) and 'type' in e[attr]: + if isinstance(e[attr], dict) and 'type' in e[attr] \ + and e[attr]['type'] != 'Property': attr_t = e[attr]['type'] + elif isinstance(e[attr], dict) and 'type' in e[attr] \ + and e[attr]['type'] == 'Property' \ + and 'value' in e[attr] \ + and isinstance(e[attr]['value'], dict) \ + and '@type' in e[attr]['value'] \ + and e[attr]['value']['@type'] == 'DateTime': + attr_t = NGSI_DATETIME + elif isinstance(e[attr], dict) and 'value' in e[attr]: + value = e[attr]['value'] + if isinstance(value, list): + attr_t = NGSI_STRUCTURED_VALUE + elif value is not None and isinstance(value, dict): + attr_t = NGSI_STRUCTURED_VALUE + elif isinstance(value, int): + attr_t = 'Integer' + elif isinstance(value, float): + attr_t = 'Number' + elif isinstance(value, bool): + attr_t = 'Boolean' + elif self._is_iso_date(value): + attr_t = NGSI_DATETIME + else: + attr_t = NGSI_TEXT else: - # Won't guess the type if user did't specify the type. - # TODO Guess Type! - attr_t = NGSI_TEXT + attr_t = None col = self._ea2cn(attr) original_attrs[col] = (attr, attr_t) - entity_id = e.get('id') - - if attr_t not in self.NGSI_TO_SQL: - # if attribute is complex assume it as an NGSI StructuredValue - # TODO we should support type name different from NGSI types - # but mapping to NGSI types - if self._attr_is_structured(e[attr]): - table[col] = self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE] - else: - # TODO fallback type should be defined by actual JSON type - supported_types = ', '.join(self.NGSI_TO_SQL.keys()) - msg = ("'{}' is not a supported NGSI type" - " for Attribute: '{}' " - " and id : '{}'. " - "Please use any of the following: {}. " - "Falling back to {}.") - self.logger.warning(msg.format( - attr_t, attr, entity_id, supported_types, - NGSI_TEXT)) - - table[col] = self.NGSI_TO_SQL[NGSI_TEXT] - - else: - # Github issue 44: Disable indexing for long string - sql_type = self._compute_type(attr_t, e[attr]) - - # Github issue 24: StructuredValue == object or array - is_list = isinstance(e[attr].get('value', None), list) - if attr_t == NGSI_STRUCTURED_VALUE and is_list: - sql_type = self.NGSI_TO_SQL['Array'] - table[attr] = sql_type + table[col] = self._compute_type(entity_id, attr_t, e[attr]) # Create/Update metadata table for this type table_name = self._et2tn(entity_type, fiware_service) - self._update_metadata_table(table_name, original_attrs) + modified = self._update_metadata_table(table_name, original_attrs) # Sort out data table. - self._prepare_data_table(table_name, table, fiware_service) + if modified and modified == original_attrs.keys(): + self._create_data_table(table_name, table, fiware_service) + elif modified: + new_columns = {} + for k in modified: + new_columns[k] = table[k] + self._update_data_table(table_name, new_columns, fiware_service) # Gather attribute values col_names = sorted(table.keys()) entries = [] # raw values in same order as column names for e in entities: - values = self._preprocess_values(e, table, col_names, + values = self._preprocess_values(e, original_attrs, col_names, fiware_servicepath) entries.append(values) @@ -327,8 +361,15 @@ def _insert_entity_rows(self, table_name: str, col_names: List[str], stmt = f"insert into {table_name} ({col_list}) values ({placeholders})" try: + start_time = datetime.now() self.cursor.executemany(stmt, rows) + dt = datetime.now() - start_time + time_difference = ( + dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 + self.logger.debug("Query completed | time={} msec".format( + str(time_difference))) except Exception as e: + self.sql_error_handler(e) if not self._should_insert_original_entities(e): raise @@ -410,7 +451,8 @@ def is_text(attr_type): # factor this logic out and keep it in just one place! return attr_type == NGSI_TEXT or attr_type not in NGSI_TO_SQL - def _preprocess_values(self, e, table, col_names, fiware_servicepath): + def _preprocess_values(self, e, original_attrs, col_names, + fiware_servicepath): raise NotImplementedError def _update_metadata_table(self, table_name, metadata): @@ -432,24 +474,50 @@ def _update_metadata_table(self, table_name, metadata): The dict mapping the matedata of each column. See original_attrs. """ - self._create_metadata_table() + if not self._is_query_in_cache("quantumleap", METADATA_TABLE_NAME): + self._create_metadata_table() + self._cache("quantumleap", + METADATA_TABLE_NAME, + None, + self.default_ttl) # Bring translation table! - stmt = "select entity_attrs from {} where table_name = ?" - self.cursor.execute(stmt.format(METADATA_TABLE_NAME), [table_name]) + stmt = "select entity_attrs from {} where table_name = ?".format( + METADATA_TABLE_NAME) # By design, one entry per table_name - res = self.cursor.fetchall() - persisted_metadata = res[0][0] if res else {} - - if metadata.keys() - persisted_metadata.keys(): - persisted_metadata.update(metadata) - self._store_medatata(table_name, persisted_metadata) + try: + res = self._execute_query_via_cache("quantumleap", + table_name, + stmt, + [table_name], + self.default_ttl) + persisted_metadata = res[0][0] if res else {} + except Exception as e: + self.sql_error_handler(e) + # Metadata table still not created + logging.debug(str(e), exc_info=True) + # Attempt to re-create metadata table + self._create_metadata_table() + persisted_metadata = {} + + diff = metadata.keys() - persisted_metadata.keys() + if diff: + # we update using the difference to "not" corrupt the metadata + # by previous insert + update = dict((k, metadata[k]) for k in diff if k in metadata) + persisted_metadata.update(update) + self._store_metadata(table_name, persisted_metadata) + self._cache("quantumleap", + table_name, + [[persisted_metadata]], + self.default_ttl) + return diff # TODO: concurrency. # This implementation paves # the way to lost updates... - def _store_medatata(self, table_name, persisted_metadata): + def _store_metadata(self, table_name, persisted_metadata): raise NotImplementedError def _get_et_table_names(self, fiware_service=None): @@ -457,20 +525,26 @@ def _get_et_table_names(self, fiware_service=None): Return the names of all the tables representing entity types. :return: list(unicode) """ - op = "select distinct table_name from {} ".format(METADATA_TABLE_NAME) + stmt = "select distinct table_name from {}".format(METADATA_TABLE_NAME) + key = None if fiware_service: - where = "where table_name ~* '\"{}{}\"[.].*'" - op += where.format(TENANT_PREFIX, fiware_service.lower()) - + key = fiware_service.lower() + where = " where table_name ~* '\"{}{}\"[.].*'" + stmt += where.format(TENANT_PREFIX, key) + else: + where = " where table_name !~* '\"{}{}\"[.].*'" + stmt += where.format(TENANT_PREFIX, '.*') try: - self.cursor.execute(op) - rows = self.cursor.fetchall() - return [r[0] for r in rows] + table_names = self._execute_query_via_cache(key, + "tableNames", + stmt, + [], + self.default_ttl) except Exception as e: - # Metadata table still not created - msg = "Could not retrieve METADATA_TABLE. Empty database maybe?. {}" - logging.debug(msg.format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) return [] + return [r[0] for r in table_names] def _get_select_clause(self, attr_names, aggr_method, aggr_period): if not attr_names: @@ -555,6 +629,13 @@ def _parse_date(self, date): except Exception as e: raise InvalidParameterValue(date, "**fromDate** or **toDate**") + def _is_iso_date(self, date): + try: + dateutil.parser.isoparse(date.strip('\"')).isoformat() + return True + except Exception as e: + return False + def _parse_limit(sefl, limit): if (not (limit is None or isinstance(limit, int))): raise InvalidParameterValue(limit, "limit") @@ -811,12 +892,13 @@ def query(self, # TODO due to this except in case of sql errors, # all goes fine, and users gets 404 as result # Reason 1: fiware_service_path column in legacy dbs. - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) entities = [] else: res = self.cursor.fetchall() col_names = self._column_names_from_query_meta( - self.cursor.description) + self.cursor.description) entities = self._format_response(res, col_names, tn, @@ -867,43 +949,53 @@ def query_ids(self, len_tn = 0 result = [] stmt = '' - for tn in sorted(table_names): - len_tn += 1 - if len_tn != len(table_names): - stmt += "select entity_id, entity_type, max(time_index) as time_index " \ - "from {tn} {where_clause} " \ - "group by entity_id, entity_type " \ - "union all ".format( - tn=tn, - where_clause=where_clause - ) - else: - stmt += "select entity_id, entity_type, max(time_index) as time_index " \ - "from {tn} {where_clause} " \ - "group by entity_id, entity_type ".format( - tn=tn, - where_clause=where_clause - ) - - # TODO ORDER BY time_index asc is removed for the time being till we have a solution for https://github.com/crate/crate/issues/9854 - op = stmt + "limit {limit} offset {offset}".format( - offset=offset, - limit=limit - ) + if len(table_names) > 0: + for tn in sorted(table_names): + len_tn += 1 + if len_tn != len(table_names): + stmt += "select " \ + "entity_id, " \ + "entity_type, " \ + "max(time_index) as time_index " \ + "from {tn} {where_clause} " \ + "group by entity_id, entity_type " \ + "union all ".format( + tn=tn, + where_clause=where_clause + ) + else: + stmt += "select " \ + "entity_id, " \ + "entity_type, " \ + "max(time_index) as time_index " \ + "from {tn} {where_clause} " \ + "group by entity_id, entity_type ".format( + tn=tn, + where_clause=where_clause + ) + + # TODO ORDER BY time_index asc is removed for the time being + # till we have a solution for + # https://github.com/crate/crate/issues/9854 + op = stmt + "limit {limit} offset {offset}".format( + offset=offset, + limit=limit + ) - try: - self.cursor.execute(op) - except Exception as e: - logging.debug("{}".format(e)) - entities = [] - else: - res = self.cursor.fetchall() - col_names = ['entity_id', 'entity_type', 'time_index'] - entities = self._format_response(res, - col_names, - tn, - None) - result.extend(entities) + try: + self.cursor.execute(op) + except Exception as e: + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) + entities = [] + else: + res = self.cursor.fetchall() + col_names = ['entity_id', 'entity_type', 'time_index'] + entities = self._format_response(res, + col_names, + tn, + None) + result.extend(entities) return result def _format_response(self, resultset, col_names, table_name, last_n): @@ -954,8 +1046,24 @@ def _format_response(self, resultset, col_names, table_name, last_n): """ stmt = "select entity_attrs from {} " \ "where table_name = ?".format(METADATA_TABLE_NAME) - self.cursor.execute(stmt, [table_name]) - res = self.cursor.fetchall() + try: + # TODO we tested using cache here, but with current "delete" + # approach this causes issues scenario triggering the issue is: + # a entity is create, delete is used to delete all values what + # happens is that the table is empty, but metadata are still + # there, so caching the query with res = + # self._execute_query_via_cache(table_name, "metadata", stmt, + # [table_name], self.default_ttl) actually create an entry in + # the cache table_name, "metadata" in a following query call ( + # below ttl) the same cache can be called despite there is no + # data. a possible solution is to create a cache based on query + # parameters that would cache all the results + self.cursor.execute(stmt, [table_name]) + res = self.cursor.fetchall() + except Exception as e: + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) + res = {} if len(res) == 0: # See GH #173 @@ -1044,9 +1152,15 @@ def delete_entities(self, etype, eid=None, from_date=None, to_date=None, op = "delete from {} {}".format(table_name, where_clause) try: self.cursor.execute(op) + key = None + if fiware_service: + key = fiware_service.lower() + self._remove_from_cache("quantumleap", table_name) + self._remove_from_cache(key, "tableNames") return self.cursor.rowcount except Exception as e: - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) return 0 def drop_table(self, etype, fiware_service=None): @@ -1055,14 +1169,21 @@ def drop_table(self, etype, fiware_service=None): try: self.cursor.execute(op) except Exception as e: - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) # Delete entry from metadata table op = "delete from {} where table_name = ?".format(METADATA_TABLE_NAME) try: self.cursor.execute(op, [table_name]) + self._remove_from_cache("quantumleap", table_name) + key = None + if fiware_service: + key = fiware_service.lower() + self._remove_from_cache(key, "tableNames") except Exception as e: - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) if self.cursor.rowcount == 0 and table_name.startswith('"'): # See GH #173 @@ -1070,7 +1191,55 @@ def drop_table(self, etype, fiware_service=None): try: self.cursor.execute(op, [old_tn]) except Exception as e: - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) + + def query_entity_types(self, fiware_service=None, fiware_servicepath=None): + """ + Find the types of for a given fiware_service and fiware_servicepath. + :return: list of strings. + """ + # Filter using tenant information + if fiware_service is None: + wc = "where table_name NOT like '\"{}%.%'".format(TENANT_PREFIX) + else: + # Old is prior QL 0.6.0. GH #173 + old_prefix = '{}{}'.format(TENANT_PREFIX, fiware_service.lower()) + prefix = self._et2tn("FooType", fiware_service).split('.')[0] + wc = "where table_name like '{}.%' " \ + "or table_name like '{}.%'".format(old_prefix, prefix) + + stmt = "select distinct(table_name) from {} {}".format( + METADATA_TABLE_NAME, + wc + ) + + try: + self.cursor.execute(stmt) + table_names = self.cursor.fetchall() + except Exception as e: + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) + return None + + else: + matching_types = [] + + all_types = [tn[0] for tn in table_names] + + for et in all_types: + stmt = "select distinct(entity_type) from {}".format(et) + if fiware_servicepath == '/': + stmt = stmt + " WHERE {} ~* '/.*'" \ + .format(FIWARE_SERVICEPATH) + elif fiware_servicepath: + stmt = stmt + " WHERE {} ~* '{}($|/.*)'" \ + .format(FIWARE_SERVICEPATH, fiware_servicepath) + self.cursor.execute(stmt) + types = [t[0] for t in self.cursor.fetchall()] + matching_types.extend(types) + + return matching_types def _get_entity_type(self, entity_id, fiware_service): """ @@ -1082,6 +1251,7 @@ def _get_entity_type(self, entity_id, fiware_service): with such entity_id. """ # Filter using tenant information + key = None if fiware_service is None: wc = "where table_name NOT like '\"{}%.%'".format(TENANT_PREFIX) else: @@ -1090,20 +1260,23 @@ def _get_entity_type(self, entity_id, fiware_service): prefix = self._et2tn("FooType", fiware_service).split('.')[0] wc = "where table_name like '{}.%' " \ "or table_name like '{}.%'".format(old_prefix, prefix) + key = fiware_service.lower() stmt = "select distinct(table_name) from {} {}".format( METADATA_TABLE_NAME, wc ) + try: self.cursor.execute(stmt) - + entity_types = self.cursor.fetchall() except Exception as e: - logging.error("{}".format(e)) + self.sql_error_handler(e) + self.logger.error(str(e), exc_info=True) return None else: - all_types = [et[0] for et in self.cursor.fetchall()] + all_types = [et[0] for et in entity_types] matching_types = [] for et in all_types: @@ -1115,5 +1288,112 @@ def _get_entity_type(self, entity_id, fiware_service): return ','.join(matching_types) - def _compute_type(self, attr_t, attr): + def _compute_type(self, entity_id, attr_t, attr): + + if attr_t not in self.NGSI_TO_SQL: + # if attribute is complex assume it as an NGSI StructuredValue + # TODO we should support type name different from NGSI types + # but mapping to NGSI types + if self._attr_is_structured(attr): + return self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE] + else: + value = attr.get('value', None) or attr.get('object', None) + sql_type = self.NGSI_TO_SQL[NGSI_TEXT] + if isinstance(value, list): + sql_type = self.NGSI_TO_SQL['Array'] + elif value is not None and isinstance(value, dict): + if attr_t == 'Property' \ + and '@type' in value \ + and value['@type'] == 'DateTime': + sql_type = self.NGSI_TO_SQL[NGSI_DATETIME] + else: + sql_type = self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE] + elif isinstance(value, int): + sql_type = self.NGSI_TO_SQL['Integer'] + elif isinstance(value, float): + sql_type = self.NGSI_TO_SQL['Number'] + elif isinstance(value, bool): + sql_type = self.NGSI_TO_SQL['Boolean'] + elif self._is_iso_date(value): + sql_type = self.NGSI_TO_SQL[NGSI_DATETIME] + + supported_types = ', '.join(self.NGSI_TO_SQL.keys()) + msg = ("'{}' is not a supported NGSI type" + " for Attribute: '{}' " + " and id : '{}'. " + "Please use any of the following: {}. " + "Falling back to {}.") + self.logger.warning(msg.format( + attr_t, attr, entity_id, supported_types, + sql_type)) + + return sql_type + + else: + # Github issue 44: Disable indexing for long string + sql_type = self._compute_db_specific_type(attr_t, attr) + + # Github issue 24: StructuredValue == object or array + is_list = isinstance(attr.get('value', None), list) + if attr_t == NGSI_STRUCTURED_VALUE and is_list: + sql_type = self.NGSI_TO_SQL['Array'] + return sql_type + + def _compute_db_specific_type(self, attr_t, attr): raise NotImplementedError + + def _execute_query_via_cache(self, tenant_name, key, stmt, parameters=None, + ex=None): + if self.cache: + try: + value = self.cache.get(tenant_name, key) + if value: + res = pickle.loads(value) + return res + except Exception as e: + self.logger.warning(str(e), exc_info=True) + + self.cursor.execute(stmt, parameters) + res = self.cursor.fetchall() + if res: + self._cache(tenant_name, key, res, ex) + return res + + def _is_query_in_cache(self, tenant_name, key): + if self.cache: + try: + return self.cache.exists(tenant_name, key) + except Exception as e: + self.logger.warning(str(e), exc_info=True) + return False + + def _cache(self, tenant_name, key, value=None, ex=None): + if self.cache: + try: + if value: + value = pickle.dumps(value) + self.cache.put(tenant_name, key, value, ex) + except Exception as e: + self.logger.warning(str(e), exc_info=True) + + def _remove_from_cache(self, tenant_name, key): + if self.cache: + try: + self.cache.delete(tenant_name, key) + except Exception as e: + self.logger.warning(str(e), exc_info=True) + + +class QueryCacheManager(Borg): + cache = None + + def __init__(self): + super(QueryCacheManager, self).__init__() + if is_cache_available() and self.cache is None: + try: + self.cache = get_cache() + except Exception as e: + self.logger.warning(str(e), exc_info=True) + + def get_query_cache(self): + return self.cache diff --git a/src/translators/tests/docker-compose.yml b/src/translators/tests/docker-compose.yml index 49b52159..2ce93a12 100644 --- a/src/translators/tests/docker-compose.yml +++ b/src/translators/tests/docker-compose.yml @@ -40,7 +40,7 @@ services: timescale: image: timescale/timescaledb-postgis:${TIMESCALE_VERSION} ports: - - "54320:5432" + - "5432:5432" # Don't expose container port 5432 with the same number outside of the # swarm. In the Travis test env, there's already a PG instance running # on port 5432! @@ -62,6 +62,11 @@ services: - PG_HOST=timescale - PG_PASS=* + redis: + image: redis + ports: + - "6379:6379" + networks: translatorstests: driver: bridge diff --git a/src/translators/tests/run_tests.sh b/src/translators/tests/run_tests.sh index cf9ae98c..d3936382 100644 --- a/src/translators/tests/run_tests.sh +++ b/src/translators/tests/run_tests.sh @@ -19,15 +19,12 @@ sleep 20 cd ../../../ -# Set Postgres port to same value as in docker-compose.yml -export POSTGRES_PORT='54320' # Set test QL config file export QL_CONFIG='src/translators/tests/ql-config.yml' pytest src/translators/ --cov-report= --cov-config=.coveragerc --cov=src/ r=$? -unset POSTGRES_PORT unset QL_CONFIG cd - diff --git a/src/translators/tests/test_crate.py b/src/translators/tests/test_crate.py index 124564cb..32cd2d9f 100644 --- a/src/translators/tests/test_crate.py +++ b/src/translators/tests/test_crate.py @@ -5,6 +5,9 @@ from utils.common import * from datetime import datetime, timezone +from src.utils.common import create_random_entities + + def test_db_version(translator): version = translator.get_db_version() major = int(version.split('.')[0]) @@ -15,6 +18,7 @@ def test_insert(translator): entities = create_random_entities(1, 2, 3, use_time=True, use_geo=True) result = translator.insert(entities) assert result.rowcount > 0 + translator.clean() def test_insert_entity(translator, entity): @@ -29,6 +33,7 @@ def test_insert_entity(translator, entity): assert len(loaded_entities) == 1 check_notifications_record([entity], loaded_entities) + translator.clean() def test_insert_same_entity_with_different_attrs( translator, sameEntityWithDifferentAttrs ): """ @@ -45,6 +50,7 @@ def test_insert_same_entity_with_different_attrs( translator, sameEntityWithDiff assert len(loaded_entities) == 1 check_notifications_record( sameEntityWithDifferentAttrs, loaded_entities) + translator.clean() def test_insert_multiple_types(translator): args = { @@ -62,6 +68,7 @@ def test_insert_multiple_types(translator): entities = create_random_entities(**args) result = translator.insert(entities) assert result.rowcount > 0 + translator.clean() def test_query_all_before_insert(translator): @@ -80,6 +87,7 @@ def test_query_all_before_insert(translator): fiware_service="openiot", fiware_servicepath="/") assert len(loaded_entities) == 0 + translator.clean() def test_query_all(translator): @@ -104,6 +112,7 @@ def test_query_all(translator): notifications = [e for e in entities if e['id'] == i] records = [e for e in loaded_entities if e['id'] == i] check_notifications_record(notifications, records) + translator.clean() def test_limit_0(translator): @@ -116,6 +125,7 @@ def test_limit_0(translator): loaded_entities = translator.query(limit=0) assert loaded_entities == [] + translator.clean() def test_limit_overrides_lastN(translator): @@ -125,6 +135,7 @@ def test_limit_overrides_lastN(translator): loaded_entities = translator.query(last_n=5, limit=3) assert len(loaded_entities[0]['index']) == 3 + translator.clean() def test_lastN_ordering(translator): @@ -136,6 +147,7 @@ def test_lastN_ordering(translator): index = loaded_entities[0]['index'] assert len(index) == 3 assert index[-1] > index[0] + translator.clean() def test_attrs_by_entity_id(translator): @@ -165,6 +177,7 @@ def test_attrs_by_entity_id(translator): # nonexistent id should return no data loaded_entities = translator.query(entity_id='some_nonexistent_id') assert len(loaded_entities) == 0 + translator.clean() def test_attrs_by_id_ambiguity(translator): @@ -184,6 +197,7 @@ def test_attrs_by_id_ambiguity(translator): # NOT OK otherwise with pytest.raises(AmbiguousNGSIIdError): translator.query(entity_id='repeated_id') + translator.clean() WITHIN_EAST_HEMISPHERE = "within(attr_geo, " \ @@ -226,6 +240,7 @@ def test_query_per_attribute(translator, attr_name, clause, tester): "Not expected from an " \ "uniform random distribution" assert all(map(tester, entities)) + translator.clean() def test_unsupported_ngsi_type(translator): @@ -241,6 +256,7 @@ def test_unsupported_ngsi_type(translator): translator.insert([e]) entities = translator.query() check_notifications_record([e], entities) + translator.clean() def test_accept_unknown_ngsi_type(translator): """ @@ -265,6 +281,7 @@ def test_accept_unknown_ngsi_type(translator): translator.insert([e]) entities = translator.query() check_notifications_record([e], entities) + translator.clean() def test_accept_special_chars(translator): """ @@ -289,6 +306,7 @@ def test_accept_special_chars(translator): translator.insert([e]) entities = translator.query() check_notifications_record([e], entities) + translator.clean() def test_missing_type_defaults_to_string(translator): e = { @@ -306,6 +324,7 @@ def test_missing_type_defaults_to_string(translator): # Response will include the type e["foo"]["type"] = NGSI_TEXT check_notifications_record([e], entities) + translator.clean() def test_capitals(translator): @@ -344,6 +363,7 @@ def test_capitals(translator): # Note that old entity gets None for the new attribute assert entities[1]['id'] == e1['id'] assert entities[1]['NewAttr']['values'] == [None] + translator.clean() @pytest.mark.filterwarnings("ignore") @@ -362,6 +382,7 @@ def test_no_time_index(translator): records = translator.query() assert len(records) == 1 assert len(records[0]['index']) == 1 + translator.clean() def test_long_json(translator): @@ -380,6 +401,7 @@ def test_long_json(translator): r = translator.query() assert len(r) == 1 check_notifications_record([big_entity], r) + translator.clean() def test_geo_point(translator): @@ -407,6 +429,7 @@ def test_geo_point(translator): # Check entity is retrieved as it was inserted check_notifications_record([entity], entities) + translator.clean() def test_geo_point_null_values(translator): @@ -431,7 +454,7 @@ def test_geo_point_null_values(translator): TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'), 'temperature': { 'type': 'Number', - 'value': "19" + 'value': 19 } } translator.insert([entity_new]) @@ -445,7 +468,8 @@ def test_geo_point_null_values(translator): res = translator.cursor.fetchall() assert len(res) == 2 assert res[0] == [19.6389474, -98.9109537, None] - assert res[1] == [None, None, '19'] + assert res[1] == [None, None, 19] + translator.clean() def test_structured_value_to_array(translator): @@ -474,6 +498,7 @@ def test_structured_value_to_array(translator): r = translator.query() check_notifications_record([entity], r) + translator.clean() def test_ISO8601(translator): @@ -494,6 +519,7 @@ def test_ISO8601(translator): loaded = translator.query() assert len(loaded) > 0 check_notifications_record([e], loaded) + translator.clean() ################################################################################ @@ -508,6 +534,7 @@ def test_air_quality_observed(translator, air_quality_observed): translator.insert([air_quality_observed]) loaded = translator.query() check_notifications_record([air_quality_observed], loaded) + translator.clean() def test_traffic_flow_observed(translator, traffic_flow_observed): @@ -518,3 +545,4 @@ def test_traffic_flow_observed(translator, traffic_flow_observed): translator.insert([traffic_flow_observed]) loaded = translator.query() check_notifications_record([traffic_flow_observed], loaded) + translator.clean() diff --git a/src/translators/tests/test_crate_aggregation.py b/src/translators/tests/test_crate_aggregation.py index e5b2f5d5..fd9cf0b1 100644 --- a/src/translators/tests/test_crate_aggregation.py +++ b/src/translators/tests/test_crate_aggregation.py @@ -47,3 +47,4 @@ def test_aggr_per_second(translator): 'values': [5, 15, 25, 32], } } + translator.clean() \ No newline at end of file diff --git a/src/translators/tests/test_crate_delete.py b/src/translators/tests/test_crate_delete.py index 33b748bd..9175af48 100644 --- a/src/translators/tests/test_crate_delete.py +++ b/src/translators/tests/test_crate_delete.py @@ -28,6 +28,7 @@ def test_delete_entity_defaults(translator): survivors = translator.query(entity_type=deleted_type, entity_id=deleted_id) assert len(survivors) == 0 + translator.clean() def test_delete_entity_customs(translator): @@ -63,6 +64,7 @@ def test_delete_entity_customs(translator): assert unaffected['id'] != deleted_id assert unaffected['type'] == deleted_type assert len(unaffected['index']) == 10 + translator.clean() def test_delete_entity_with_tenancy(translator): @@ -93,6 +95,7 @@ def test_delete_entity_with_tenancy(translator): fiware_service=fs, fiware_servicepath=fsp) assert res == 5 + translator.clean(fs) def test_delete_entities_defaults(translator): @@ -108,6 +111,7 @@ def test_delete_entities_defaults(translator): remaining = translator.query() assert len(remaining) == (3-1) * 2 assert all([r['type'] != type_to_delete for r in remaining]) + translator.clean() def test_delete_entities_customs(translator): @@ -128,6 +132,7 @@ def test_delete_entities_customs(translator): remaining = translator.query() assert sum([len(r['index']) for r in remaining]) == ((4 * 4) - 3) + translator.clean() def test_delete_entities_with_tenancy(translator): @@ -151,3 +156,4 @@ def test_delete_entities_with_tenancy(translator): fiware_service=fs, fiware_servicepath=fsp) assert res == 10 + translator.clean(fs) diff --git a/src/translators/tests/test_crate_multientities.py b/src/translators/tests/test_crate_multientities.py index 7da119ab..d604771c 100644 --- a/src/translators/tests/test_crate_multientities.py +++ b/src/translators/tests/test_crate_multientities.py @@ -29,6 +29,7 @@ def test_query_multiple_ids(translator): assert loaded_entities[0]['type'] == '0' assert loaded_entities[1]['id'] == '0-2' assert loaded_entities[1]['type'] == '0' + translator.clean() def test_query_multiple_ids_bak(translator): @@ -42,6 +43,7 @@ def test_query_multiple_ids_bak(translator): records = translator.query(entity_type='0', entity_ids=['0-1']) assert len(records) == 1 assert records[0]['id'] == '0-1' + translator.clean() def test_query_multiple_ids_with_invalids(translator): @@ -59,3 +61,4 @@ def test_query_multiple_ids_with_invalids(translator): loaded_entities = translator.query(entity_type='0', entity_ids=['0-1', 'nonexistent']) assert len(loaded_entities) == 1 + translator.clean() diff --git a/src/translators/tests/test_crate_multitenancy.py b/src/translators/tests/test_crate_multitenancy.py index 3cf365a7..3d3ad5bb 100644 --- a/src/translators/tests/test_crate_multitenancy.py +++ b/src/translators/tests/test_crate_multitenancy.py @@ -43,6 +43,7 @@ def test_fiware_tenant(translator): # Query WITH tenant -> Result entities = translator.query(fiware_service=fs, fiware_servicepath=fsp) assert len(entities) == 1 + translator.clean(fs) def test_fiware_tenant_services(translator): @@ -63,6 +64,8 @@ def test_fiware_tenant_services(translator): entities = translator.query(fiware_service="B", fiware_servicepath="/") assert len(entities) == 1 assert entities[0]['id'] == "Y" + translator.clean("A") + translator.clean("B") def test_fiware_tenant_servicepath(translator): @@ -95,6 +98,7 @@ def insert_with_tenant(e, path): entities = translator.query(fiware_service="EU", fiware_servicepath="/eu/greece/athens") assert len(entities) == 1 + translator.clean("EU") def test_fiware_empty_tenant_is_no_tenant(translator): @@ -115,6 +119,7 @@ def test_fiware_empty_tenant_is_no_tenant(translator): # Query with EMPTY tenant -> get results entities = translator.query() assert len(entities) == 2 + translator.clean() def test_fiware_tenant_reserved_word(translator): @@ -125,3 +130,4 @@ def test_fiware_tenant_reserved_word(translator): entities = translator.query(fiware_service=fs, fiware_servicepath=fsp) assert len(entities) == 1 + translator.clean(fs) diff --git a/src/translators/tests/test_crate_replication.py b/src/translators/tests/test_crate_replication.py index c5f3bc75..3138697d 100644 --- a/src/translators/tests/test_crate_replication.py +++ b/src/translators/tests/test_crate_replication.py @@ -27,3 +27,4 @@ def test_default_replication(translator): translator.cursor.execute(op.format(METADATA_TABLE_NAME)) res = translator.cursor.fetchall() assert res[0] == ['2-all'] + translator.clean() diff --git a/src/translators/timescale.py b/src/translators/timescale.py index 425c91a0..1e8851ce 100644 --- a/src/translators/timescale.py +++ b/src/translators/timescale.py @@ -6,15 +6,17 @@ from translators import sql_translator from translators.sql_translator import NGSI_ISO8601, NGSI_DATETIME, \ - NGSI_GEOJSON, NGSI_TEXT, NGSI_STRUCTURED_VALUE, TIME_INDEX, \ - METADATA_TABLE_NAME, FIWARE_SERVICEPATH, TENANT_PREFIX + NGSI_LD_GEOMETRY, NGSI_GEOJSON, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ + TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH, TENANT_PREFIX from translators.timescale_geo_query import from_ngsi_query +import logging import geocoding.geojson.wktcodec from geocoding.slf.geotypes import * import geocoding.slf.jsoncodec from geocoding.slf.querytypes import SlfQuery import geocoding.slf.wktcodec from utils.cfgreader import * +from utils.connection_manager import ConnectionManager # POSTGRES TYPES PG_JSON_ARRAY = 'jsonb' @@ -27,6 +29,7 @@ NGSI_DATETIME: 'timestamp WITH TIME ZONE', "Integer": 'bigint', NGSI_GEOJSON: 'geometry', + NGSI_LD_GEOMETRY: 'geometry', SlfPoint.ngsi_type(): 'geometry', SlfLine.ngsi_type(): 'geometry', SlfPolygon.ngsi_type(): 'geometry', @@ -34,7 +37,7 @@ "Number": 'float', NGSI_TEXT: 'text', NGSI_STRUCTURED_VALUE: 'jsonb', -# hyper-table requires a non-null time index + # hyper-table requires a non-null time index TIME_INDEX: 'timestamp WITH TIME ZONE NOT NULL' } @@ -52,7 +55,7 @@ def __init__(self, host='0.0.0.0', port=5432, use_ssl=False, self.db_pass = db_pass def read_env(self, env: dict = os.environ): - r = EnvReader(env, log=logging.getLogger(__name__).info) + r = EnvReader(env, log=logging.getLogger(__name__).debug) self.host = r.read(StrVar('POSTGRES_HOST', self.host)) self.port = r.read(IntVar('POSTGRES_PORT', self.port)) self.use_ssl = r.read(BoolVar('POSTGRES_USE_SSL', self.use_ssl)) @@ -63,7 +66,6 @@ def read_env(self, env: dict = os.environ): class PostgresTranslator(sql_translator.SQLTranslator): - NGSI_TO_SQL = NGSI_TO_SQL def __init__(self, conn_data=PostgresConnectionData()): @@ -73,30 +75,48 @@ def __init__(self, conn_data=PostgresConnectionData()): self.db_user = conn_data.db_user self.db_pass = conn_data.db_pass self.ssl = {} if conn_data.use_ssl else None - self.conn = None + self.ccm = None + self.connection = None self.cursor = None + self.logger = logging.getLogger(__name__) def setup(self): - pg8000.paramstyle = "qmark" - self.conn = pg8000.connect(host=self.host, port=self.port, ssl_context=self.ssl, - database=self.db_name, - user=self.db_user, password=self.db_pass) - self.conn.autocommit = True - self.cursor = self.conn.cursor() + self.ccm = ConnectionManager() + self.connection = self.ccm.get_connection('timescale') + if self.connection is None: + try: + pg8000.paramstyle = "qmark" + self.connection = pg8000.connect(host=self.host, port=self.port, + ssl_context=self.ssl, + database=self.db_name, + user=self.db_user, + password=self.db_pass) + self.connection.autocommit = True + self.ccm.set_connection('timescale', self.connection) + except Exception as e: + self.logger.warning(str(e), exc_info=True) + raise e + + self.cursor = self.connection.cursor() def dispose(self): + super(PostgresTranslator, self).dispose() self.cursor.close() - self.conn.close() + + def sql_error_handler(self, exception): + if exception.__class__ == BrokenPipeError: + self.ccm.reset_connection('timescale') + self.setup() @staticmethod def _svc_to_schema_name(fiware_service): if fiware_service: return '"{}{}"'.format(TENANT_PREFIX, fiware_service.lower()) - def _compute_type(self, attr_t, attr): + def _compute_db_specific_type(self, attr_t, attr): return NGSI_TO_SQL[attr_t] - def _prepare_data_table(self, table_name, table, fiware_service): + def _create_data_table(self, table_name, table, fiware_service): schema = self._svc_to_schema_name(fiware_service) if schema: stmt = "create schema if not exists {}".format(schema) @@ -120,11 +140,19 @@ def _prepare_data_table(self, table_name, table, fiware_service): self.cursor.execute(stmt) ix_name = '"ix_{}_eid_and_tx"'.format(table_name.replace('"', '')) - stmt = f"create index if not exists {ix_name} " +\ + stmt = f"create index if not exists {ix_name} " + \ f"on {table_name} (entity_id, {self.TIME_INDEX_NAME} desc)" self.cursor.execute(stmt) - def _preprocess_values(self, e, table, col_names, fiware_servicepath): + def _update_data_table(self, table_name, new_columns, fiware_service): + + alt_cols = ', '.join('add column if not exists "{}" {}' + .format(cn.lower(), ct) + for cn, ct in new_columns.items()) + stmt = "alter table {} {};".format(table_name, alt_cols) + self.cursor.execute(stmt) + + def _preprocess_values(self, e, original_attrs, col_names, fiware_servicepath): values = [] for cn in col_names: if cn == 'entity_type': @@ -138,11 +166,13 @@ def _preprocess_values(self, e, table, col_names, fiware_servicepath): else: # Normal attributes try: - mapped_type = table[cn] - ngsi_value = e[cn]['value'] + attr = original_attrs[cn][0] + attr_t = original_attrs[cn][1] + ngsi_value = e[attr]['value'] + mapped_type = self._compute_type(e['id'], attr_t, e[attr]) - if SlfGeometry.is_ngsi_slf_attr(e[cn]): - ast = SlfGeometry.build_from_ngsi_dict(e[cn]) + if SlfGeometry.is_ngsi_slf_attr(e[attr]): + ast = SlfGeometry.build_from_ngsi_dict(e[attr]) mapped_value = geocoding.slf.wktcodec.encode_as_wkt( ast, srid=4326) elif mapped_type == NGSI_TO_SQL[NGSI_GEOJSON]: @@ -155,6 +185,16 @@ def _preprocess_values(self, e, table, col_names, fiware_servicepath): mapped_value = str(ngsi_value) elif mapped_type == PG_JSON_ARRAY: mapped_value = pg8000.PGJsonb(ngsi_value) + elif 'type' in e[attr] and e[attr]['type'] == 'Property' \ + and 'value' in e[attr] \ + and isinstance(e[attr]['value'], dict) \ + and '@type' in e[attr]['value'] \ + and e[attr]['value']['@type'] == 'DateTime': + mapped_value = e[attr]['value']['@value'] + elif 'type' in e[attr] and e[attr][ + 'type'] == 'Relationship': + mapped_value = e[attr].get('value', None) or \ + e[attr].get('object', None) else: mapped_value = ngsi_value @@ -186,6 +226,7 @@ def _db_value_to_ngsi(self, db_value: Any, ngsi_type: str) -> Any: return geocoding.geojson.wktcodec.decode_wkb_hexstr(db_value) return db_value + # NOTE. Implicit conversions. # 1. JSON. NGSI struct values and arrays get inserted as `jsonb`. When # reading `jsonb` values back from the DB, pg8000 automatically converts @@ -197,7 +238,8 @@ def _db_value_to_ngsi(self, db_value: Any, ngsi_type: str) -> Any: def _to_db_ngsi_structured_value(data: dict) -> pg8000.PGJsonb: return pg8000.PGJsonb(data) - def _should_insert_original_entities(self, insert_error: Exception) -> bool: + def _should_insert_original_entities(self, + insert_error: Exception) -> bool: return isinstance(insert_error, pg8000.ProgrammingError) def _create_metadata_table(self): @@ -206,7 +248,7 @@ def _create_metadata_table(self): op = stmt.format(METADATA_TABLE_NAME) self.cursor.execute(op) - def _store_medatata(self, table_name, persisted_metadata): + def _store_metadata(self, table_name, persisted_metadata): stmt = "insert into {} (table_name, entity_attrs) values (?, ?) " \ "on conflict (table_name) " \ "do update set entity_attrs = ?" diff --git a/src/utils/common.py b/src/utils/common.py index 8faf159c..e36b4b14 100644 --- a/src/utils/common.py +++ b/src/utils/common.py @@ -133,7 +133,7 @@ def create_random_entities(num_types=1, def iter_entity_attrs(entity): for attr in entity: - if attr not in ['type', 'id']: + if attr not in ['type', 'id', '@context']: yield attr diff --git a/src/utils/connection_manager.py b/src/utils/connection_manager.py new file mode 100644 index 00000000..7c79ff87 --- /dev/null +++ b/src/utils/connection_manager.py @@ -0,0 +1,25 @@ +class Borg: + _shared_state = {} + + def __init__(self): + self.__dict__ = self._shared_state + + +class ConnectionManager(Borg): + + connection = {} + + def __init__(self): + Borg.__init__(self) + + def set_connection(self, db, connection): + self.connection[db] = connection + + def get_connection(self, db): + try: + return self.connection[db] + except KeyError as e: + return None + + def reset_connection(self, db): + self.connection[db] = None