Skip to content

Commit

Permalink
fix #208 (#363)
Browse files Browse the repository at this point in the history
* fix #208

* added code to group entities by servicepath when multiple notifications arrive at once (#208)
* add tests and integration tests (so we have also test with orion triggering such case)
* fix notification tests to rely only on api calls (remove call to translator)
* changed error code to 400 when due to invalid request
* upgraded python to 3.8.5
* fix integration test to use new docker image with entrypoint (#362)

* minor fixes

* remove build (we can assume a build is available)
* remove volumes at the end of the test
* increase timesleep for one test
* improve code

* minor fixes

* fix timeing for notifications
* fix error message check

* add back docker build
  • Loading branch information
chicco785 committed Sep 22, 2020
1 parent cafca9c commit 23e4b85
Show file tree
Hide file tree
Showing 13 changed files with 590 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
language: python
python:
- 3.8.3
- 3.8.5

services:
- docker
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8.3-alpine3.12 as base
FROM python:3.8.5-alpine3.12 as base
FROM base as builder
RUN apk --no-cache --update-cache add gcc python3 python3-dev py-pip build-base wget
RUN ln -s /usr/include/locale.h /usr/include/xlocale.h
Expand Down
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ rethinkdb = "==2.3"
[dev-packages]

[requires]
python_version = "3.8.3"
python_version = "3.8.5"
181 changes: 101 additions & 80 deletions Pipfile.lock

Large diffs are not rendered by default.

89 changes: 84 additions & 5 deletions src/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest
import requests


QL_HOST = os.environ.get('QL_HOST', 'quantumleap')
QL_PORT = 8668
QL_URL = "http://{}:{}/v2".format(QL_HOST, QL_PORT)
Expand Down Expand Up @@ -41,20 +40,34 @@ def clean_mongo():
do_clean_mongo()


def headers(service, service_path, content_type=True):
h = {}
if content_type:
h['Content-Type'] = 'application/json'
if service:
h['Fiware-Service'] = service
if service_path:
h['Fiware-ServicePath'] = service_path

return h


# TODO we have fully fledged client library, why not using that?
class OrionClient(object):

def __init__(self, host, port):
self.url = 'http://{}:{}'.format(host, port)

def subscribe(self, subscription):
def subscribe(self, subscription, service=None, service_path=None):
r = requests.post('{}/v2/subscriptions'.format(self.url),
data=json.dumps(subscription),
headers={'Content-Type': 'application/json'})
headers=headers(service, service_path))
return r

def insert(self, entity):
def insert(self, entity, service=None, service_path=None):
r = requests.post('{}/v2/entities'.format(self.url),
data=json.dumps(entity),
headers={'Content-Type': 'application/json'})
headers=headers(service, service_path))
return r


Expand Down Expand Up @@ -133,6 +146,7 @@ def delete_entities(self, entity_type=None, fiware_service=None,
with Translator(host=CRATE_HOST, port=CRATE_PORT) as trans:
yield trans


@pytest.fixture
def entity():
entity = {
Expand All @@ -151,6 +165,7 @@ def entity():
}
return entity


@pytest.fixture
def sameEntityWithDifferentAttrs():
"""
Expand Down Expand Up @@ -199,6 +214,70 @@ def sameEntityWithDifferentAttrs():
]
return entities


@pytest.fixture
def diffEntityWithDifferentAttrs():
"""
Two updates for the same entity with different attributes.
The first update has temperature and pressure but the second update has only temperature.
"""
entities = [
{
'id': 'Room1',
'type': 'Room',
'temperature': {
'value': 24.2,
'type': 'Number',
'metadata': {
'dateModified': {
'type': 'DateTime',
'value': '2019-05-09T15:28:30.000Z'
}
}
},
'pressure': {
'value': 720,
'type': 'Number',
'metadata': {
'dateModified': {
'type': 'DateTime',
'value': '2019-05-09T15:28:30.000Z'
}
}
}
},
{
'id': 'Room2',
'type': 'Room',
'temperature': {
'value': 25.2,
'type': 'Number',
'metadata': {
'dateModified': {
'type': 'DateTime',
'value': '2019-05-09T15:28:30.000Z'
}
}
}
},
{
'id': 'Room3',
'type': 'Room',
'temperature': {
'value': 25.2,
'type': 'Number',
'metadata': {
'dateModified': {
'type': 'DateTime',
'value': '2019-05-09T15:28:30.000Z'
}
}
}
}
]
return entities


@pytest.fixture
def air_quality_observed():
"""
Expand Down
12 changes: 12 additions & 0 deletions src/exceptions/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ class QLError(Exception):
"""
Error raised in QuantumLeap usage.
"""
def __init__(self, message="Quantum Leap Error"):
self.message = message
super().__init__(self.message)


class UnsupportedOption(QLError):
Expand Down Expand Up @@ -36,3 +39,12 @@ class InvalidParameterValue(QLError):
def __init__(self, par_value='', par_name=''):
msg = "The parameter value '{}' for parameter {} is not valid."
QLError.__init__(self, msg.format(par_value, par_name))


class InvalidHeaderValue(QLError):
"""
Passed parameter value is not valid.
"""
def __init__(self, header_value='', header_name='', message=''):
msg = "The header value '{}' for header {} is not valid. {}"
QLError.__init__(self, msg.format(header_value, header_name, message))
43 changes: 28 additions & 15 deletions src/reporter/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
interest and make QL actually perform the corresponding subscription to orion.
I.e, QL must be told where orion is.
"""

from flask import request
from geocoding import geocoding
from geocoding.factory import get_geo_cache, is_geo_coding_available
Expand All @@ -41,13 +42,18 @@
TIME_INDEX_HEADER_NAME
from geocoding.location import normalize_location
from utils.cfgreader import EnvReader, StrVar
from exceptions.exceptions import AmbiguousNGSIIdError, UnsupportedOption, \
NGSIUsageError, InvalidParameterValue, InvalidHeaderValue


def log():
r = EnvReader(log=logging.getLogger(__name__).info)
level = r.read(StrVar('LOGLEVEL', 'INFO')).upper()

logging.basicConfig(level=level, format='%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s', datefmt='%Y-%m-%d %I:%M:%S')
logging.basicConfig(level=level,
format='%(asctime)s.%(msecs)03d '
'%(levelname)s:%(name)s:%(message)s',
datefmt='%Y-%m-%d %I:%M:%S')
return logging.getLogger(__name__)


Expand Down Expand Up @@ -105,24 +111,24 @@ def _validate_payload(payload):
payload[attr].update({'value': None})
log().warning(
'An entity update is missing value for attribute {}'.format(attr))


def _filter_empty_entities(payload):
log().debug('Received payload: {}'.format(payload))
attrs = list(iter_entity_attrs(payload))
Flag = False
empty = False
attrs.remove('time_index')
for j in attrs:
value = payload[j]['value']
if isinstance(value, int) and value is not None:
Flag = True
empty = True
elif value:
Flag = True
if Flag:
empty = True
if empty:
return payload
else:
return None


def _filter_no_type_no_value_entities(payload):
attrs = list(iter_entity_attrs(payload))
Expand All @@ -138,7 +144,6 @@ def _filter_no_type_no_value_entities(payload):


def notify():

if request.json is None:
return 'Discarding notification due to lack of request body. ' \
'Lost in a redirect maybe?', 400
Expand All @@ -148,12 +153,13 @@ def notify():
'content.', 400

payload = request.json['data']

# preprocess and validate each entity update
for entity in payload:
# Validate entity update
error = _validate_payload(entity)
if error:
# TODO in this way we return error for even if only one entity is wrong
return error, 400
# Add TIME_INDEX attribute
custom_index = request.headers.get(TIME_INDEX_HEADER_NAME, None)
Expand Down Expand Up @@ -188,11 +194,19 @@ def notify():
try:
with translator_for(fiware_s) as trans:
trans.insert(payload, fiware_s, fiware_sp)
except:
msg = "Notification not processed or not updated for payload: %s" % (payload)
except Exception as e:
msg = "Notification not processed or not updated for payload: %s. " \
"%s" % (payload, str(e))
log().error(msg)
return msg, 500
msg = "Notification successfully processed for : 'tenant' %s, 'fiwareServicePath' %s, 'entity_id' %s" % (fiware_s, fiware_sp, entity_id)
error_code = 500
if e.__class__ == InvalidHeaderValue or \
e.__class__ == InvalidParameterValue or \
e.__class__ == NGSIUsageError:
error_code = 400
return msg, error_code
msg = "Notification successfully processed for : 'tenant' %s, " \
"'fiwareServicePath' %s, " \
"'entity_id' %s" % (fiware_s, fiware_sp, entity_id)
log().info(msg)
return msg

Expand Down Expand Up @@ -277,8 +291,7 @@ def subscribe(orion_url,
if r is None or not r.ok:
msg = {
"error": "Bad Request",
"description": "Orion is not reachable by QuantumLeap at {}"
.format(orion_url)
"description": "Orion is not reachable at {}".format(orion_url)
}
return msg, 400

Expand Down

0 comments on commit 23e4b85

Please sign in to comment.