Skip to content

Commit

Permalink
Insert batches (#450)
Browse files Browse the repository at this point in the history
* read digital info sizes from env vars.
* implement utils to split iterables into batches.
* split large sql inserts into batches.
* document insert max size env var.

Co-authored-by: c0c0n3 <c0c0n3@users.noreply.github.com>
  • Loading branch information
c0c0n3 and c0c0n3 committed Mar 9, 2021
1 parent 77802aa commit 41bb25f
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
bitmath = "~=1.3"
certifi = "==2018.10.15"
"connexion[swagger-ui]" = "~=2.2"
crate = "~=0.22"
Expand All @@ -16,6 +17,7 @@ pg8000 = "==1.16.5"
pymongo = "~=3.4"
python-dateutil = ">=2.7"
pyyaml = ">=4.2"
objsize = "~=0.3"
redis = "~=2.10"
requests = ">=2.20"
pickle-mixin = "==1.0.2"
Expand Down
59 changes: 16 additions & 43 deletions Pipfile.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions docs/manuals/admin/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ To configure QuantumLeap you can use the following environment variables:
| `CRATE_PORT` | CrateDB Port |
| `DEFAULT_LIMIT` | Max number of rows a query can retrieve |
| `KEEP_RAW_ENTITY` | Whether to store original entity data |
| `INSERT_MAX_SIZE` | Maximum amount of data a SQL (bulk) insert should take |
| `POSTGRES_HOST` | PostgreSQL Host |
| `POSTGRES_PORT` | PostgreSQL Port |
| `POSTGRES_DB_NAME` | PostgreSQL default db |
Expand Down Expand Up @@ -51,6 +52,20 @@ To configure QuantumLeap you can use the following environment variables:
will be interpreted as true: 'true', 'yes', '1', 't', 'y'. Anything else
counts for false, which is also the default value if the variable is not set.

- `INSERT_MAX_SIZE`. If set, this variable limits the amount of data that
can be packed in a single SQL bulk insert to the specified value `M`. If
the size of the data to be inserted exceeds `M`, the data is split into
smaller batches, each having a size no greater than `M`, and each batch
is inserted separately, i.e. a separate SQL bulk insert statement is issued
for each batch. Limiting the amount of data that can be inserted at once
is useful with some backends like Crate that abort insert operations when
the data size exceeds an internal threshold. This variable is read in on
each API call to the notify endpoint so it can be set dynamically and it
will affect every subsequent insert operation. Accepted values are sizes
in bytes (B) or `2^10` multiples (KiB, MiB, GiB), e.g. `10 B`, `1.2 KiB`,
`0.9 GiB`. If this variable is not set (or the set value isn't valid),
SQL inserts are processed normally without splitting data into batches.

## Database selection per different tenant

QuantumLeap can use different time series databases to persist and
Expand Down
68 changes: 68 additions & 0 deletions src/translators/insert_splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import logging
from objsize import get_deep_size
from typing import Optional, Tuple

from utils.cfgreader import BitSizeVar, EnvReader
from utils.itersplit import IterCostSplitter

INSERT_MAX_SIZE_VAR = 'INSERT_MAX_SIZE'
"""
The name of the environment variable to configure the insert max size.
"""


def _log():
return logging.getLogger(__name__)


def configured_insert_max_size_in_bytes() -> Optional[int]:
"""
Read the insert max size env var and return its value in bytes if
set to a parsable value or ``None`` otherwise. Notice if a value
is present but is garbage we still return ``None`` but we also
log a warning.
:return: the max size in bytes if available, ``None`` otherwise.
"""
env_reader = EnvReader(log=_log().debug)
parsed = env_reader.safe_read(BitSizeVar(INSERT_MAX_SIZE_VAR, None))
if parsed:
return int(parsed.to_Byte())
return None


def compute_row_size(r: Tuple) -> int:
"""
Compute the memory size, in bytes, of the given row's components.
:param r: the row to insert.
:return: the size in bytes.
"""
component_sizes = [get_deep_size(k) for k in r]
return sum(component_sizes)


def to_insert_batches(rows: [Tuple]) -> [[Tuple]]:
"""
Split the SQL rows to insert into batches so the Translator can insert
each batch separately, i.e. issue a SQL insert statement for each batch
as opposed to a single insert for the whole input lot. We do this since
some backends (e.g. Crate) have a cap on how much data you can shovel
in a single SQL (bulk) insert statement---see #445 about it.
Split only if the insert max size env var holds a valid value. (If that's
not the case, return a single batch with all input rows.)
Splitting happens as explained in the ``IterCostSplitter`` docs with
``compute_row_size`` as a cost function so the cost of each input row
is the amount of bytes its components take up in memory and the value
of the env var as a maximum batch size (= cost in bytes).
:param rows: the rows the SQL translator lined up for an insert.
:return: the insert batches.
"""
config_max_cost = configured_insert_max_size_in_bytes()
if config_max_cost is None:
return [rows]
splitter = IterCostSplitter(cost_fn=compute_row_size,
batch_max_cost=config_max_cost)
return splitter.list_batches(rows)
7 changes: 5 additions & 2 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from typing import Any, List, Optional, Sequence
from uuid import uuid4
import pickle
import types

from cache.factory import get_cache, is_cache_available
from translators.insert_splitter import to_insert_batches
from utils.connection_manager import Borg

# NGSI TYPES
Expand Down Expand Up @@ -363,7 +363,10 @@ def _insert_entity_rows(self, table_name: str, col_names: List[str],
stmt = f"insert into {table_name} ({col_list}) values ({placeholders})"
try:
start_time = datetime.now()
self.cursor.executemany(stmt, rows)

for batch in to_insert_batches(rows):
self.cursor.executemany(stmt, batch)

dt = datetime.now() - start_time
time_difference = (dt.days * 24 * 60 * 60 + dt.seconds) \
* 1000 + dt.microseconds / 1000.0
Expand Down
123 changes: 123 additions & 0 deletions src/translators/tests/test_insert_batches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from itertools import takewhile
import os
import pytest
import sys

from translators.base_translator import TIME_INDEX_NAME
from translators.insert_splitter import INSERT_MAX_SIZE_VAR
from translators.tests.original_data_scenarios import full_table_name, \
gen_tenant_id, gen_entity, OriginalDataScenarios
from translators.tests.test_original_data import translators, \
with_crate, with_timescale
# NOTE. ^ your IDE is likely to tell you this is dead code, but it isn't
# actually, we need to bring those two fixtures into scope to use them
# with the lazy_fixture calls in 'translators'.


def set_insert_max_size(number_of_bytes: int):
os.environ[INSERT_MAX_SIZE_VAR] = f"{number_of_bytes}B"


def clear_insert_max_size():
os.environ[INSERT_MAX_SIZE_VAR] = ''


class DataGen:

def __init__(self, insert_max_size: int, min_batches: int):
self.insert_max_size = insert_max_size
self.min_batches = min_batches
self.unique_tenant_id = gen_tenant_id()

@staticmethod
def _compute_insert_vector_size_lower_bound(entity: dict) -> int:
vs = entity['id'], entity['type'], entity[TIME_INDEX_NAME], \
entity['a_number']['value'], entity['an_attr']['value']
sz = [sys.getsizeof(v) for v in vs]
return sum(sz)
# NOTE. lower bound since it doesn't include e.g. fiware service.

def _next_entity(self) -> (dict, int):
eid = 0
size = 0
while True:
eid += 1
e = gen_entity(entity_id=eid, attr_type='Number', attr_value=1)
size += self._compute_insert_vector_size_lower_bound(e)
yield e, size

def generate_insert_payload(self) -> [dict]:
"""
Generate enough data that when the SQL translator is configured with
the given insert_max_size value, it'll have to split the payload in
at least min_batches.
:return: the entities to insert.
"""
sz = self.insert_max_size * self.min_batches
ts = takewhile(lambda t: t[1] <= sz, self._next_entity())
return [t[0] for t in ts]
# NOTE. Actual number of batches >= min_batches.
# In fact, say each entity row vector is actually 10 bytes, but our computed
# lower bound is 5. Then with insert_max_size=10 and min_batches=3, es will
# have 6 entities in it for a total payload of 60 which the translator should
# then split into 6 batches.


class TestDriver:

def __init__(self, translator: OriginalDataScenarios,
test_data: DataGen):
self.translator = translator
self.data = test_data

def _do_insert(self, entities: [dict]):
try:
tid = self.data.unique_tenant_id
self.translator.insert_entities(tid, entities)
finally:
clear_insert_max_size()

def _assert_row_count(self, expected: int):
table = full_table_name(self.data.unique_tenant_id)
stmt = f"select count(*) as count from {table}"
r = self.translator.query(stmt)
assert r[0]['count'] == expected

def run(self, with_batches: bool):
if with_batches:
set_insert_max_size(self.data.insert_max_size)

entities = self.data.generate_insert_payload()
self._do_insert(entities)
self._assert_row_count(len(entities))


@pytest.mark.parametrize('translator', translators,
ids=['timescale', 'crate'])
def test_insert_all_entities_in_one_go(translator):
test_data = DataGen(insert_max_size=1024, min_batches=2)
driver = TestDriver(translator, test_data)
driver.run(with_batches=False)


@pytest.mark.parametrize('translator', translators,
ids=['timescale', 'crate'])
@pytest.mark.parametrize('min_batches', [2, 3, 4])
def test_insert_entities_in_batches(translator, min_batches):
test_data = DataGen(insert_max_size=1024, min_batches=min_batches)
driver = TestDriver(translator, test_data)
driver.run(with_batches=True)


# NOTE. Couldn't reproduce #445.
# You can try this, but the exception I get is weirdly enough a connection
# exception. Python will crunch data in memory for about 30 mins, then the
# translator mysteriously fails w/ a connection exception, even though Crate
# is up and running...
#
# def test_huge_crate_insert(with_crate):
# test_data = DataGen(insert_max_size=2*1024*1024, min_batches=1024)
# # ^ should produce at least 2GiB worth of entities!!
# driver = TestDriver(with_crate, test_data)
# driver.run(with_batches=True)
14 changes: 14 additions & 0 deletions src/utils/cfgreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
variables, YAML files, etc.
"""

import bitmath
from bitmath import Bitmath
import logging
import os
from typing import Union
Expand Down Expand Up @@ -79,6 +81,18 @@ def _do_read(self, rep: str) -> bool:
return rep.strip().lower() in ('true', 'yes', '1', 't', 'y')


class BitSizeVar(EVar):
"""
An env value parsed as a digital information size, e.g. file size in
giga bytes, memory size in mega bytes, word size in bits, etc. This
class is just a wrapper around the ``bitmath`` lib, see there for
usage and examples.
"""

def _do_read(self, rep: str) -> Bitmath:
return bitmath.parse_string(rep)


class EnvReader:
"""
Reads environment variables.
Expand Down
Loading

0 comments on commit 41bb25f

Please sign in to comment.