Skip to content

Commit

Permalink
Save original data on translation error (#335)
Browse files Browse the repository at this point in the history
* port dumbed-down implementation from experiments branch.

* pin timescale/pg docker img version to latest known to be working w/ ql.

* pin timescale/pg docker img version to latest known to be working w/ ql.

* upgrade all timescale test images to latest stable version.

* factor out safe reading of env vars.

* move reading of env vars out of translator.

* document env vars.

* implement optional saving of original data.

* store original entities and associated recovery metadata as json.
  • Loading branch information
c0c0n3 committed Jul 20, 2020
1 parent 85d0559 commit 83d070c
Show file tree
Hide file tree
Showing 17 changed files with 596 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docker/docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ services:
- redisdata:/data

timescale:
image: timescale/timescaledb-postgis:latest-pg10
image: timescale/timescaledb-postgis:1.7.1-pg12
ports:
- "5432:5432"
# Don't expose container port 5432 with the same number outside of the
Expand Down
22 changes: 22 additions & 0 deletions docs/manuals/admin/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ To configure QuantumLeap you can use the following environment variables:
| `CRATE_HOST` | CrateDB Host |
| `CRATE_PORT` | CrateDB Port |
| `DEFAULT_LIMIT` | Max number of rows a query can retrieve |
| `KEEP_RAW_ENTITY` | Whether to store original entity data |
| `POSTGRES_HOST` | PostgreSQL Host |
| `POSTGRES_PORT` | PostgreSQL Port |
| `POSTGRES_DB_NAME` | PostgreSQL default db |
Expand All @@ -21,6 +22,27 @@ To configure QuantumLeap you can use the following environment variables:
| `QL_CONFIG` | Pathname for tenant configuration |
| `LOGLEVEL` | define the log level for all services (`DEBUG`, `INFO`, `WARNING` , `ERROR`) |

**NOTE**
* `DEFAULT_LIMIT`. This variable specifies the upper limit L of rows a query
operation is allowed to fetch from the database and return to client. The
actual number of rows will be the least of L and the client-specified limit
or L if the client didn't specify a limit. If not set through this variable,
L defaults to 10,000. This variable is read in on each API call to query
endpoints so it can be set dynamically and it will affect every subsequent
query operation. The variable string value you set should be convertible to
an integer, if not, the default value of 10,000 will be used instead.
* `KEEP_RAW_ENTITY`. If true, then each notified entity will be stored in its
entirety as JSON in an additional column of the corresponding entity table.
(This may result in the table needing up to 10x more storage.) If false, the
JSON will only be stored (as detailed earlier) in case the conversion from
JSON to tabular fails---typically this happens when the notified entity
contains a previously notified attribute whose type is now different than
it used to be in the past. This variable is read in on each API call to the
notify endpoint so it can be set dynamically and it will affect every
subsequent insert operation. Any of the following (case insensitive) values
will be interpreted as true: 'true', 'yes', '1', 't', 'y'. Anything else
counts for false, which is also the default value if the variable is not set.

## Database selection per different tenant

QuantumLeap can use different time series databases to persist and
Expand Down
27 changes: 27 additions & 0 deletions src/translators/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging
import os

from utils.cfgreader import EnvReader, BoolVar, IntVar


DEFAULT_LIMIT_VAR = 'DEFAULT_LIMIT'
KEEP_RAW_ENTITY_VAR = 'KEEP_RAW_ENTITY'


class SQLTranslatorConfig:
"""
Provide access to SQL Translator config values.
"""

def __init__(self, env: dict = os.environ):
self.store = EnvReader(var_store=env,
log=logging.getLogger(__name__).info)

def default_limit(self) -> int:
fallback_limit = 10000
var = IntVar(DEFAULT_LIMIT_VAR, default_value=fallback_limit)
return self.store.safe_read(var)

def keep_raw_entity(self) -> bool:
var = BoolVar(KEEP_RAW_ENTITY_VAR, False)
return self.store.safe_read(var)
5 changes: 3 additions & 2 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, TIME_INDEX, \
METADATA_TABLE_NAME, FIWARE_SERVICEPATH
import logging
import os
from .crate_geo_query import from_ngsi_query
from utils.cfgreader import EnvReader, StrVar, IntVar

Expand Down Expand Up @@ -136,6 +135,9 @@ def _prepare_data_table(self, table_name, table, fiware_service):
"(number_of_replicas = '2-all', column_policy = 'dynamic')".format(table_name, columns)
self.cursor.execute(stmt)

def _should_insert_original_entities(self, insert_error: Exception) -> bool:
return isinstance(insert_error, exceptions.ProgrammingError)

def _create_metadata_table(self):
stmt = "create table if not exists {} " \
"(table_name string primary key, entity_attrs object) " \
Expand All @@ -154,7 +156,6 @@ def _store_medatata(self, table_name, persisted_metadata):
stmt = stmt.format(METADATA_TABLE_NAME)
self.cursor.execute(stmt, (table_name, persisted_metadata))


def _compute_type(self, attr_t, attr):
"""
Github issue 44: Disable indexing for long string
Expand Down
165 changes: 136 additions & 29 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from exceptions.exceptions import AmbiguousNGSIIdError, UnsupportedOption, \
NGSIUsageError, InvalidParameterValue
from translators import base_translator
from utils.cfgreader import EnvReader, IntVar
from translators.config import SQLTranslatorConfig
from utils.common import iter_entity_attrs
from utils.jsondict import safe_get_value
from utils.maybe import maybe_map
import logging
from geocoding.slf import SlfQuery
import dateutil.parser
import os
from typing import Any, List, Optional
from uuid import uuid4


# NGSI TYPES
# Based on Orion output because official docs don't say much about these :(
Expand All @@ -30,6 +34,13 @@
TIME_INDEX = 'timeindex'
VALID_AGGR_METHODS = ['count', 'sum', 'avg', 'min', 'max']
VALID_AGGR_PERIODS = ['year', 'month', 'day', 'hour', 'minute', 'second']
# The name of the column where we store the original JSON entity received
# in the notification when its corresponding DB row can't be inserted.
ORIGINAL_ENTITY_COL = '__original_ngsi_entity__'
# The name of the entity ID and type columns.
# TODO: replace each occurrence of these strings with the below constants.
ENTITY_ID_COL = 'entity_id'
ENTITY_TYPE_COL = 'entity_type'

# Default Translation
NGSI_TO_SQL = {
Expand All @@ -52,6 +63,39 @@
}


def current_timex() -> str:
"""
:return: QuantumLeap time index value for the current point in time.
"""
return datetime.utcnow().isoformat(timespec='milliseconds')


# TODO: use below getters everywhere rather than entity id and type strings!

def entity_id(entity: dict) -> Optional[str]:
"""
Safely get the NGSI ID of the given entity.
The ID, if present, is expected to be a string, so we convert it if it
isn't.
:param entity: the entity.
:return: the ID string if there's an ID, `None` otherwise.
"""
return maybe_map(str, safe_get_value(entity, NGSI_ID))


def entity_type(entity: dict) -> Optional[str]:
"""
Safely get the NGSI type of the given entity.
The type, if present, is expected to be a string, so we convert it if it
isn't.
:param entity: the entity.
:return: the type string if there's an type, `None` otherwise.
"""
return maybe_map(str, safe_get_value(entity, NGSI_TYPE))


# TODO: Refactor
# I suggest we refactor both this and the Crate translator using something
# like SQLAlchemy if we want to keep the same approach of doing everything
Expand All @@ -61,6 +105,7 @@

class SQLTranslator(base_translator.BaseTranslator):
NGSI_TO_SQL = NGSI_TO_SQL
config = SQLTranslatorConfig()

def _refresh(self, entity_types, fiware_service=None):
"""
Expand Down Expand Up @@ -143,6 +188,8 @@ def _insert_entities_of_type(self,
fiware_service=None,
fiware_servicepath=None):
# All entities must be of the same type and have a time index
# Also, an entity can't have an attribute with the same name
# as that specified by ORIGINAL_ENTITY_COL_NAME.
for e in entities:
if e[NGSI_TYPE] != entity_type:
msg = "Entity {} is not of type {}."
Expand All @@ -153,16 +200,21 @@ def _insert_entities_of_type(self,
msg = "Translating entity without TIME_INDEX. " \
"It should have been inserted by the 'Reporter'. {}"
warnings.warn(msg.format(e))
now_iso = datetime.utcnow().isoformat(timespec='milliseconds')
e[self.TIME_INDEX_NAME] = now_iso
e[self.TIME_INDEX_NAME] = current_timex()

if ORIGINAL_ENTITY_COL in e:
raise ValueError(
f"Entity {e[NGSI_ID]} has a reserved attribute name: " +
"'{ORIGINAL_ENTITY_COL_NAME}'")

# Define column types
# {column_name -> crate_column_type}
table = {
'entity_id': self.NGSI_TO_SQL['Text'],
'entity_type': self.NGSI_TO_SQL['Text'],
self.TIME_INDEX_NAME: self.NGSI_TO_SQL[TIME_INDEX],
FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text']
FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text'],
ORIGINAL_ENTITY_COL: self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE]
}

# Preserve original attr names and types
Expand Down Expand Up @@ -230,13 +282,82 @@ def _insert_entities_of_type(self,
entries.append(values)

# Insert entities data
p1 = table_name
p2 = ', '.join(['"{}"'.format(c.lower()) for c in col_names])
p3 = ','.join(['?'] * len(col_names))
stmt = "insert into {} ({}) values ({})".format(p1, p2, p3)
self.cursor.executemany(stmt, entries)
self._insert_entity_rows(table_name, col_names, entries, entities)
return self.cursor

def _insert_entity_rows(self, table_name: str, col_names: List[str],
rows: List[List], entities: List[dict]):
col_list, placeholders, rows = \
self._build_insert_params_and_values(col_names, rows, entities)

stmt = f"insert into {table_name} ({col_list}) values ({placeholders})"
try:
self.cursor.executemany(stmt, rows)
except Exception as e:
if not self._should_insert_original_entities(e):
raise

self.logger.exception(
'Failed to insert entities because of below error; ' +
'translator will still try saving original JSON in ' +
f"{table_name}.{ORIGINAL_ENTITY_COL}"
)
self._insert_original_entities_in_failed_batch(
table_name, entities, e)

def _build_insert_params_and_values(
self, col_names: List[str], rows: List[List],
entities: List[dict]) -> (str, str, List[List]):
if self.config.keep_raw_entity():
original_entity_col_index = col_names.index(ORIGINAL_ENTITY_COL)
for i, r in enumerate(rows):
wrapper = self._build_original_data_value(entities[i])
r[original_entity_col_index] = wrapper

col_list = ', '.join(['"{}"'.format(c.lower()) for c in col_names])
placeholders = ','.join(['?'] * len(col_names))
return col_list, placeholders, rows
# NOTE. Brittle code.
# This code, like the rest of the insert workflow implicitly assumes
# 1. col_names[k] <-> rows[k] <-> entities[k]
# 2. original entity column always gets added upfront
# But we never really check anywhere (1) and (2) always hold true,
# so slight changes to the insert workflow could cause nasty bugs...

def _build_original_data_value(self, entity: dict,
insert_error: Exception = None,
failed_batch_id: str = None) -> Any:
value = {
'data': entity
}
if failed_batch_id:
value['failedBatchID'] = failed_batch_id
if insert_error:
value['error'] = repr(insert_error)

return self._to_db_ngsi_structured_value(value)

@staticmethod
def _to_db_ngsi_structured_value(data: dict) -> Any:
return data

def _should_insert_original_entities(self, insert_error: Exception) -> bool:
raise NotImplementedError

def _insert_original_entities_in_failed_batch(
self, table_name: str, entities: List[dict],
insert_error: Exception):
cols = f"{ENTITY_ID_COL}, {ENTITY_TYPE_COL}, {self.TIME_INDEX_NAME}" + \
f", {ORIGINAL_ENTITY_COL}"
stmt = f"insert into {table_name} ({cols}) values (?, ?, ?, ?)"
tix = current_timex()
batch_id = uuid4().hex
rows = [[entity_id(e), entity_type(e), tix,
self._build_original_data_value(e, insert_error, batch_id)]
for e in entities]

self.cursor.executemany(stmt, rows)

def _attr_is_structured(self, a):
if a['value'] is not None and isinstance(a['value'], dict):
self.logger.debug("attribute {} has 'value' attribute of type dict"
Expand Down Expand Up @@ -339,23 +460,9 @@ def _get_select_clause(self, attr_names, aggr_method, aggr_period):
select = ','.join(attrs)
return select

@staticmethod
def _get_default_limit(env: dict = os.environ):
r = EnvReader(var_store=env, log=logging.getLogger(__name__).info)
env_var_name = 'DEFAULT_LIMIT'
fallback_limit = 10000
try:
return r.read(IntVar(env_var_name, fallback_limit))
except ValueError:
msg = "Environment variable {} set to non-numeric value; " +\
"using fallback query limit of {}.".\
format(env_var_name, fallback_limit)
logging.getLogger(__name__).warning(msg)
return fallback_limit

def _get_limit(self, limit, last_n):
# https://crate.io/docs/crate/reference/en/latest/general/dql/selects.html#limits
default_limit = self._get_default_limit()
default_limit = self.config.default_limit()

if limit is None or limit > default_limit:
limit = default_limit
Expand All @@ -364,11 +471,11 @@ def _get_limit(self, limit, last_n):
last_n = limit

if limit < 1:
raise InvalidParameterValue("{} should be >=1 and <= {"
"}.".format('limit', default_limit))
raise InvalidParameterValue(
f"limit should be >=1 and <= {default_limit}.")
if last_n < 1:
raise InvalidParameterValue("{} should be >=1 and <= {"
"}.".format('last_n', default_limit))
raise InvalidParameterValue(
f"last_n should be >=1 and <= {default_limit}.")
return min(last_n, limit)

def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None,
Expand Down
2 changes: 1 addition & 1 deletion src/translators/tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ services:
- translatorstests

timescale:
image: timescale/timescaledb-postgis:latest-pg10
image: timescale/timescaledb-postgis:1.7.1-pg12
ports:
- "54320:5432"
# Don't expose container port 5432 with the same number outside of the
Expand Down
Loading

0 comments on commit 83d070c

Please sign in to comment.