Skip to content

Commit

Permalink
Merge 4e292e8 into 85d0559
Browse files Browse the repository at this point in the history
  • Loading branch information
c0c0n3 committed Jul 8, 2020
2 parents 85d0559 + 4e292e8 commit ef7ce6e
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docker/docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ services:
- redisdata:/data

timescale:
image: timescale/timescaledb-postgis:latest-pg10
image: timescale/timescaledb-postgis:1.7.1-pg12
ports:
- "5432:5432"
# Don't expose container port 5432 with the same number outside of the
Expand Down
5 changes: 3 additions & 2 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, TIME_INDEX, \
METADATA_TABLE_NAME, FIWARE_SERVICEPATH
import logging
import os
from .crate_geo_query import from_ngsi_query
from utils.cfgreader import EnvReader, StrVar, IntVar

Expand Down Expand Up @@ -136,6 +135,9 @@ def _prepare_data_table(self, table_name, table, fiware_service):
"(number_of_replicas = '2-all', column_policy = 'dynamic')".format(table_name, columns)
self.cursor.execute(stmt)

def _should_insert_original_entities(self, insert_error: Exception) -> bool:
return isinstance(insert_error, exceptions.ProgrammingError)

def _create_metadata_table(self):
stmt = "create table if not exists {} " \
"(table_name string primary key, entity_attrs object) " \
Expand All @@ -154,7 +156,6 @@ def _store_medatata(self, table_name, persisted_metadata):
stmt = stmt.format(METADATA_TABLE_NAME)
self.cursor.execute(stmt, (table_name, persisted_metadata))


def _compute_type(self, attr_t, attr):
"""
Github issue 44: Disable indexing for long string
Expand Down
94 changes: 86 additions & 8 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
from translators import base_translator
from utils.cfgreader import EnvReader, IntVar
from utils.common import iter_entity_attrs
from utils.jsondict import safe_get_value
from utils.maybe import maybe_map
import logging
from geocoding.slf import SlfQuery
import dateutil.parser
import os
import json
from typing import List, Optional

# NGSI TYPES
# Based on Orion output because official docs don't say much about these :(
Expand All @@ -30,6 +34,13 @@
TIME_INDEX = 'timeindex'
VALID_AGGR_METHODS = ['count', 'sum', 'avg', 'min', 'max']
VALID_AGGR_PERIODS = ['year', 'month', 'day', 'hour', 'minute', 'second']
# The name of the column where we store the original JSON entity received
# in the notification when its corresponding DB row can't be inserted.
ORIGINAL_ENTITY_COL = '__original_ngsi_entity__'
# The name of the entity ID and type columns.
# TODO: replace each occurrence of these strings with the below constants.
ENTITY_ID_COL = 'entity_id'
ENTITY_TYPE_COL = 'entity_type'

# Default Translation
NGSI_TO_SQL = {
Expand All @@ -52,6 +63,39 @@
}


def current_timex() -> str:
"""
:return: QuantumLeap time index value for the current point in time.
"""
return datetime.utcnow().isoformat(timespec='milliseconds')


# TODO: use below getters everywhere rather than entity id and type strings!

def entity_id(entity: dict) -> Optional[str]:
"""
Safely get the NGSI ID of the given entity.
The ID, if present, is expected to be a string, so we convert it if it
isn't.
:param entity: the entity.
:return: the ID string if there's an ID, `None` otherwise.
"""
return maybe_map(str, safe_get_value(entity, NGSI_ID))


def entity_type(entity: dict) -> Optional[str]:
"""
Safely get the NGSI type of the given entity.
The type, if present, is expected to be a string, so we convert it if it
isn't.
:param entity: the entity.
:return: the type string if there's an type, `None` otherwise.
"""
return maybe_map(str, safe_get_value(entity, NGSI_TYPE))


# TODO: Refactor
# I suggest we refactor both this and the Crate translator using something
# like SQLAlchemy if we want to keep the same approach of doing everything
Expand Down Expand Up @@ -143,6 +187,8 @@ def _insert_entities_of_type(self,
fiware_service=None,
fiware_servicepath=None):
# All entities must be of the same type and have a time index
# Also, an entity can't have an attribute with the same name
# as that specified by ORIGINAL_ENTITY_COL_NAME.
for e in entities:
if e[NGSI_TYPE] != entity_type:
msg = "Entity {} is not of type {}."
Expand All @@ -153,16 +199,21 @@ def _insert_entities_of_type(self,
msg = "Translating entity without TIME_INDEX. " \
"It should have been inserted by the 'Reporter'. {}"
warnings.warn(msg.format(e))
now_iso = datetime.utcnow().isoformat(timespec='milliseconds')
e[self.TIME_INDEX_NAME] = now_iso
e[self.TIME_INDEX_NAME] = current_timex()

if ORIGINAL_ENTITY_COL in e:
raise ValueError(
f"Entity {e[NGSI_ID]} has a reserved attribute name: " +
"'{ORIGINAL_ENTITY_COL_NAME}'")

# Define column types
# {column_name -> crate_column_type}
table = {
'entity_id': self.NGSI_TO_SQL['Text'],
'entity_type': self.NGSI_TO_SQL['Text'],
self.TIME_INDEX_NAME: self.NGSI_TO_SQL[TIME_INDEX],
FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text']
FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text'],
ORIGINAL_ENTITY_COL: self.NGSI_TO_SQL[NGSI_TEXT]
}

# Preserve original attr names and types
Expand Down Expand Up @@ -230,13 +281,40 @@ def _insert_entities_of_type(self,
entries.append(values)

# Insert entities data
p1 = table_name
p2 = ', '.join(['"{}"'.format(c.lower()) for c in col_names])
p3 = ','.join(['?'] * len(col_names))
stmt = "insert into {} ({}) values ({})".format(p1, p2, p3)
self.cursor.executemany(stmt, entries)
self._insert_entity_rows(table_name, col_names, entries, entities)
return self.cursor

def _insert_entity_rows(self, table_name: str, col_names: List[str],
rows: List[List], entities: List[dict]):
col_list = ', '.join(['"{}"'.format(c.lower()) for c in col_names])
placeholders = ','.join(['?'] * len(col_names))
stmt = f"insert into {table_name} ({col_list}) values ({placeholders})"
try:
self.cursor.executemany(stmt, rows)
except Exception as e:
if not self._should_insert_original_entities(e):
raise

self.logger.exception(
'Failed to insert entities because of below error; ' +
'translator will try saving original JSON instead in ' +
f"{table_name}.{ORIGINAL_ENTITY_COL}"
)
self._insert_original_entities(table_name, entities)

def _should_insert_original_entities(self, insert_error: Exception) -> bool:
raise NotImplementedError

def _insert_original_entities(self, table_name: str, entities: List[dict]):
cols = f"{ENTITY_ID_COL}, {ENTITY_TYPE_COL}, {self.TIME_INDEX_NAME}" + \
f", {ORIGINAL_ENTITY_COL}"
stmt = f"insert into {table_name} ({cols}) values (?, ?, ?, ?)"
tix = current_timex()
rows = [[entity_id(e), entity_type(e), tix, json.dumps(e)]
for e in entities]

self.cursor.executemany(stmt, rows)

def _attr_is_structured(self, a):
if a['value'] is not None and isinstance(a['value'], dict):
self.logger.debug("attribute {} has 'value' attribute of type dict"
Expand Down
2 changes: 1 addition & 1 deletion src/translators/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ services:
- translatorstests

timescale:
image: timescale/timescaledb-postgis:latest-pg10
image: timescale/timescaledb-postgis:1.7.1-pg12
ports:
- "54320:5432"
# Don't expose container port 5432 with the same number outside of the
Expand Down
151 changes: 151 additions & 0 deletions src/translators/tests/original_data_scenarios.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import json
import pytest
import random
from time import sleep
from typing import Any, Callable, Generator, List

from translators.base_translator import TIME_INDEX_NAME
from translators.sql_translator import SQLTranslator, current_timex
from translators.sql_translator import ORIGINAL_ENTITY_COL, ENTITY_ID_COL, \
TYPE_PREFIX, TENANT_PREFIX
from utils.jsondict import maybe_value


ENTITY_TYPE = 'device'
TranslatorFactory = Callable[[], Generator[SQLTranslator, Any, None]]

#
# NOTE. Each test scenario gets a (sort of) unique tenant so that we won't
# have to clean up the DB after each test, which would slow down the whole
# test suite.
#


def gen_tenant_id() -> str:
tid = random.randint(1, 2**32)
return f"tenant{tid}"


def gen_entity(entity_id: int, attr_type: str, attr_value) -> dict:
return {
'id': f"eid:{entity_id}",
'type': ENTITY_TYPE,
TIME_INDEX_NAME: current_timex(),
'a_number': {
'type': 'Number',
'value': 50.12
},
'an_attr': {
'type': attr_type,
'value': attr_value
}
}


def assert_inserted_entity(actual_row, original_entity):
assert actual_row['a_number'] == \
maybe_value(original_entity, 'a_number', 'value')
assert actual_row['an_attr'] == \
maybe_value(original_entity, 'an_attr', 'value')
assert actual_row[ORIGINAL_ENTITY_COL] is None


def assert_failed_entity(actual_row, original_entity):
assert actual_row['a_number'] is None
assert actual_row['an_attr'] is None
assert actual_row[ORIGINAL_ENTITY_COL] is not None

saved_entity = json.loads(actual_row[ORIGINAL_ENTITY_COL])
assert original_entity == saved_entity


def full_table_name(tenant: str) -> str:
return f'"{TENANT_PREFIX}{tenant}"."{TYPE_PREFIX}{ENTITY_TYPE}"'


class OriginalDataScenarios:

def __init__(self, translator: TranslatorFactory, cursor,
delay_query_by: int = 0):
self.translator = translator
self.cursor = cursor
self.delay_query_by = delay_query_by

def insert_entities(self, tenant: str, entities: List[dict]):
with self.translator() as t:
t.insert(entities, fiware_service=tenant)

@staticmethod
def col_name(column_description: List) -> str:
name = column_description[0]
if isinstance(name, bytes):
name = name.decode('utf-8')
return name

def fetch_rows(self, tenant: str) -> List[dict]:
table = full_table_name(tenant)
stmt = f"select * from {table} order by {ENTITY_ID_COL}"

if self.delay_query_by > 0:
sleep(self.delay_query_by)
self.cursor.execute(stmt)
rows = self.cursor.fetchall()

keys = [self.col_name(k) for k in self.cursor.description]
return [dict(zip(keys, row)) for row in rows]

def run_changed_attr_type_scenario(self):
tenant = gen_tenant_id()
good_entity = gen_entity(1, 'Number', 123)
bad_entity = gen_entity(2, 'Text', 'shud of been a nbr!')

self.insert_entities(tenant, [good_entity])
self.insert_entities(tenant, [bad_entity])

rs = self.fetch_rows(tenant)

assert len(rs) == 2
assert_inserted_entity(rs[0], good_entity)
assert_failed_entity(rs[1], bad_entity)

def run_inconsistent_attr_type_in_batch_scenario(self):
tenant = gen_tenant_id()
good_entity = gen_entity(1, 'Text', 'wada wada')
bad_entity = gen_entity(2, 'DateTime', current_timex())

self.insert_entities(tenant, [good_entity, bad_entity])

rs = self.fetch_rows(tenant)

assert len(rs) == 2
assert_failed_entity(rs[0], good_entity)
assert_failed_entity(rs[1], bad_entity)

def run_data_loss_scenario(self):
tenant = gen_tenant_id()
good_entity = gen_entity(1, 'Number', 1)
bad_entity = gen_entity(1, 'Number', 2)
bad_entity[ORIGINAL_ENTITY_COL] = 'kaboom!'

self.insert_entities(tenant, [good_entity])
with pytest.raises(ValueError):
self.insert_entities(tenant, [bad_entity])

rs = self.fetch_rows(tenant)

assert len(rs) == 1
assert_inserted_entity(rs[0], good_entity)

def run_success_scenario(self):
tenant = gen_tenant_id()
e1, e2, e3 = [gen_entity(k + 1, 'Number', k + 1) for k in range(3)]

self.insert_entities(tenant, [e1])
self.insert_entities(tenant, [e2, e3])

rs = self.fetch_rows(tenant)

assert len(rs) == 3
assert_inserted_entity(rs[0], e1)
assert_inserted_entity(rs[1], e2)
assert_inserted_entity(rs[2], e3)
38 changes: 38 additions & 0 deletions src/translators/tests/test_crate_original_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from crate import client

from translators.crate import CrateTranslatorInstance
from utils.cfgreader import *

from .original_data_scenarios import *


@pytest.fixture(scope='module')
def with_crate():
r = EnvReader(log=logging.getLogger(__name__).info)
host = r.read(StrVar('CRATE_HOST', 'crate'))
port = r.read(IntVar('CRATE_PORT', 4200))

conn = client.connect([f"{host}:{port}"], error_trace=True)
cursor = conn.cursor()

yield OriginalDataScenarios(CrateTranslatorInstance, cursor,
delay_query_by=1)

cursor.close()
conn.close()


def test_changed_attr_type_scenario(with_crate):
with_crate.run_changed_attr_type_scenario()


def test_inconsistent_attr_type_in_batch_scenario(with_crate):
with_crate.run_inconsistent_attr_type_in_batch_scenario()


def test_data_loss_scenario(with_crate):
with_crate.run_data_loss_scenario()


def test_success_scenario(with_crate):
with_crate.run_success_scenario()
Loading

0 comments on commit ef7ce6e

Please sign in to comment.