diff --git a/Pipfile b/Pipfile index af782d7d..70cf6935 100644 --- a/Pipfile +++ b/Pipfile @@ -4,6 +4,7 @@ verify_ssl = true name = "pypi" [packages] +bitmath = "~=1.3" certifi = "==2018.10.15" "connexion[swagger-ui]" = "~=2.2" crate = "~=0.22" @@ -16,6 +17,7 @@ pg8000 = "==1.16.5" pymongo = "~=3.4" python-dateutil = ">=2.7" pyyaml = ">=4.2" +objsize = "~=0.3" redis = "~=2.10" requests = ">=2.20" pickle-mixin = "==1.0.2" diff --git a/Pipfile.lock b/Pipfile.lock index 9c884162..135e3d4c 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "b9979553e45eb01a13e611b0c00d03e2c0f3a9058015802949be3f593ecae0a3" + "sha256": "ba9609f3f05e0a1deb116714a67c6c6ba7643f1c38ec4e9f5c11676ab8c4b656" }, "pipfile-spec": 6, "requires": { @@ -21,9 +21,15 @@ "sha256:31b2eced602aa8423c2aea9c76a724617ed67cf9513173fd3a4f03e3a929c7e6", "sha256:832aa3cde19744e49938b91fea06d69ecb9e649c93ba974535d08ad92164f700" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==20.3.0" }, + "bitmath": { + "hashes": [ + "sha256:293325f01e65defe966853111df11d39215eb705a967cb115851da8c4cfa3eb8" + ], + "index": "pypi", + "version": "==1.3.3.1" + }, "certifi": { "hashes": [ "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c", @@ -37,7 +43,6 @@ "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa", "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==4.0.0" }, "click": { @@ -45,7 +50,6 @@ "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a", "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==7.1.2" }, "clickclick": { @@ -64,7 +68,6 @@ "sha256:5439e9659a89c4380d93a07acfbf3380d70be4130574de8881e5f0dfec7ad0e2" ], "index": "pypi", - "markers": "python_version >= '3.6'", "version": "==2.7.0" }, "crate": { @@ -94,7 +97,6 @@ "hashes": [ "sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d" ], - "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==0.18.2" }, "geocoder": { @@ -116,8 +118,7 @@ "geomet": { "hashes": [ "sha256:a3e55f5f7a5da116f05874acae93f612e2b5bdaa0724d008dfe13e68cb63f9b1", - "sha256:cb52411978ee01ff104ab48f108d7333b14423ae7a15a65fee25b7d29bda2e1b", - "sha256:eb47ed9e22f3e4fa88333f5540c65aa0b6ec33ef6e9e1f2217b9b6df26e47461" + "sha256:cb52411978ee01ff104ab48f108d7333b14423ae7a15a65fee25b7d29bda2e1b" ], "index": "pypi", "version": "==0.3.0" @@ -135,7 +136,6 @@ "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.10" }, "inflection": { @@ -143,7 +143,6 @@ "sha256:1a29730d366e996aaacffb2f1f1cb9593dc38e2ddd30c91250c6dde09ea9b417", "sha256:f38b2b640938a4f35ade69ac3d053042959b62a0f1076a5bbaa1b9526605a8a2" ], - "markers": "python_version >= '3.5'", "version": "==0.5.1" }, "isodate": { @@ -158,7 +157,6 @@ "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", "sha256:b12271b2047cb23eeb98c8b5622e2e5c5e9abd9784a153e9d8ef9cb4dd09d749" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.1.0" }, "jinja2": { @@ -166,7 +164,6 @@ "sha256:03e47ad063331dd6a3f04a43eddca8a966a26ba0c5b7207a9a9e4e08f1b29419", "sha256:a6d58433de0ae800347cab1fa3043cebbabe8baa9d29e668f1c768cb87a333c6" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==2.11.3" }, "jsonschema": { @@ -231,16 +228,21 @@ "sha256:e8313f01ba26fbbe36c7be1966a7b7424942f670f38e666995b88d012765b9be", "sha256:feb7b34d6325451ef96bc0e36e1a6c0c1c64bc1fbec4b854f4529e51887b1621" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.1.1" }, + "objsize": { + "hashes": [ + "sha256:6b950e0d78e7d93918fbd711126d740291a61ff4e53fc47cce799dce7554b1ad" + ], + "index": "pypi", + "version": "==0.3.3" + }, "openapi-schema-validator": { "hashes": [ "sha256:60d3401cc1579eed0c1f2bec1a669a964a9974bc6a940261bf7c8aa9edb7d409", "sha256:66fa34b40a40b1ff927e1db8bf5a20dc84fd0933fc284ec0dc87d7ae889a0874", "sha256:782c7be7fd75ab172fb47b4504a63833001f7d5de0a06a08998c146e5f923f0a" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==0.1.4" }, "openapi-spec-validator": { @@ -249,7 +251,6 @@ "sha256:53ba3d884e98ff2062d5ada025aa590541dcd665b8f81067dc82dd61c0923759", "sha256:e11df7c559339027bd04f2399bc82474983129a6a7a6a0421eaa95e2c844d686" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==0.3.0" }, "pg8000": { @@ -307,7 +308,6 @@ "sha256:7c77801620e5e75fb9c7abae235d3cc45d212a67efa98f4972eef63e736a8daa", "sha256:7cd42c66d49ffb68dea065e1c8a4323e7ceab386e660fee9863d4fa227302ba9", "sha256:7d2ae2f7c50adec20fde46a73465de31a6a6fbb4903240f8b7304549752ca7a1", - "sha256:7edff02e44dd0badd749d7342e40705a398d98c5d8f7570f57cff9568c2351fa", "sha256:87981008d565f647142869d99915cc4760b7725858da3d39ecb2a606e23f36fd", "sha256:92e2376ce3ca0e3e443b3c5c2bb5d584c7e59221edfb0035313c6306049ba55a", "sha256:950710f7370613a6bfa2ccd842b488c5b8072e83fb6b7d45d99110bf44651d06", @@ -341,7 +341,6 @@ "hashes": [ "sha256:2e636185d9eb976a18a8a8e96efce62f2905fea90041958d8cc2a189756ebf3e" ], - "markers": "python_version >= '3.5'", "version": "==0.17.3" }, "python-dateutil": { @@ -415,7 +414,6 @@ "sha256:74815c25aad1fe0b5fb994e96c3de63e8695164358a80138352aaadfa4760350", "sha256:d6865ed1d135ddb124a619d7cd3a5b505f69a7c92e248024dd7e48bc77752af5" ], - "markers": "python_version >= '3.5'", "version": "==1.2.0" }, "six": { @@ -423,7 +421,6 @@ "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259", "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.15.0" }, "swagger-ui-bundle": { @@ -438,7 +435,6 @@ "sha256:1b465e494e3e0d8939b50680403e3aedaa2bc434b7d5af64dfd3c958d7f5ae80", "sha256:de3eedaad74a2683334e282005cd8d7f22f4d55fa690a2a1020a416cb0a47e73" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "version": "==1.26.3" }, "werkzeug": { @@ -446,7 +442,6 @@ "sha256:2de2a5db0baeae7b2d2664949077c2ac63fbd16d98da0ff71837f7d1dea3fd43", "sha256:6c80b1e5ad3665290ea39320b91e1be1e0d5f60652b964a3070216de83d2e47c" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==1.0.1" } }, @@ -499,7 +494,6 @@ "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f", "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3" ], - "markers": "python_full_version >= '3.5.3'", "version": "==3.0.1" }, "atomicwrites": { @@ -507,7 +501,6 @@ "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197", "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.4.0" }, "attrs": { @@ -515,7 +508,6 @@ "sha256:31b2eced602aa8423c2aea9c76a724617ed67cf9513173fd3a4f03e3a929c7e6", "sha256:832aa3cde19744e49938b91fea06d69ecb9e649c93ba974535d08ad92164f700" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==20.3.0" }, "backoff": { @@ -539,7 +531,6 @@ "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa", "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==4.0.0" }, "click": { @@ -547,7 +538,6 @@ "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a", "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==7.1.2" }, "coverage": { @@ -605,7 +595,6 @@ "sha256:f0b278ce10936db1a37e6954e15a3730bea96a0997c26d7fee88e6c396c2086d", "sha256:f11642dddbb0253cc8853254301b51390ba0081750a8ac03f20ea8103f0c56b6" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "version": "==5.5" }, "coveralls": { @@ -642,7 +631,6 @@ "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.10" }, "itsdangerous": { @@ -650,7 +638,6 @@ "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", "sha256:b12271b2047cb23eeb98c8b5622e2e5c5e9abd9784a153e9d8ef9cb4dd09d749" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.1.0" }, "jinja2": { @@ -658,7 +645,6 @@ "sha256:03e47ad063331dd6a3f04a43eddca8a966a26ba0c5b7207a9a9e4e08f1b29419", "sha256:a6d58433de0ae800347cab1fa3043cebbabe8baa9d29e668f1c768cb87a333c6" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==2.11.3" }, "kiwisolver": { @@ -696,7 +682,6 @@ "sha256:f8d6f8db88049a699817fd9178782867bf22283e3813064302ac59f61d95be05", "sha256:fd34fbbfbc40628200730bc1febe30631347103fc8d3d4fa012c21ab9c11eca9" ], - "markers": "python_version >= '3.6'", "version": "==1.3.1" }, "lovely-pytest-docker": { @@ -761,7 +746,6 @@ "sha256:e8313f01ba26fbbe36c7be1966a7b7424942f670f38e666995b88d012765b9be", "sha256:feb7b34d6325451ef96bc0e36e1a6c0c1c64bc1fbec4b854f4529e51887b1621" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.1.1" }, "matplotlib": { @@ -800,7 +784,6 @@ "sha256:5652a9ac72209ed7df8d9c15daf4e1aa0e3d2ccd3c87f8265a0673cd9cbc9ced", "sha256:c5d6da9ca3ff65220c3bfd2a8db06d698f05d4d2b9be57e1deb2be5a45019713" ], - "markers": "python_version >= '3.5'", "version": "==8.7.0" }, "multidict": { @@ -843,7 +826,6 @@ "sha256:f21756997ad8ef815d8ef3d34edd98804ab5ea337feedcd62fb52d22bf531281", "sha256:fc13a9524bc18b6fb6e0dbec3533ba0496bbed167c56d0aabefd965584557d80" ], - "markers": "python_version >= '3.6'", "version": "==5.1.0" }, "numpy": { @@ -873,7 +855,6 @@ "sha256:c91ec9569facd4757ade0888371eced2ecf49e7982ce5634cc2cf4e7331a4b14", "sha256:ecb5b74c702358cdc21268ff4c37f7466357871f53a30e6f84c686952bef16a9" ], - "markers": "python_version >= '3.7'", "version": "==1.20.1" }, "pandas": { @@ -934,7 +915,6 @@ "sha256:f36c3ff63d6fc509ce599a2f5b0d0732189eed653420e7294c039d342c6e204a", "sha256:f91b50ad88048d795c0ad004abbe1390aa1882073b1dca10bfd55d0b8cf18ec5" ], - "markers": "python_version >= '3.6'", "version": "==8.1.2" }, "pluggy": { @@ -942,7 +922,6 @@ "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==0.13.1" }, "py": { @@ -950,7 +929,6 @@ "sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3", "sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.10.0" }, "pyparsing": { @@ -958,7 +936,6 @@ "sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1", "sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b" ], - "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==2.4.7" }, "pytest": { @@ -1021,7 +998,6 @@ "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259", "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", "version": "==1.15.0" }, "typing-extensions": { @@ -1037,7 +1013,6 @@ "sha256:1b465e494e3e0d8939b50680403e3aedaa2bc434b7d5af64dfd3c958d7f5ae80", "sha256:de3eedaad74a2683334e282005cd8d7f22f4d55fa690a2a1020a416cb0a47e73" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "version": "==1.26.3" }, "werkzeug": { @@ -1045,7 +1020,6 @@ "sha256:2de2a5db0baeae7b2d2664949077c2ac63fbd16d98da0ff71837f7d1dea3fd43", "sha256:6c80b1e5ad3665290ea39320b91e1be1e0d5f60652b964a3070216de83d2e47c" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==1.0.1" }, "yarl": { @@ -1088,7 +1062,6 @@ "sha256:f0b059678fd549c66b89bed03efcabb009075bd131c248ecdf087bdb6faba24a", "sha256:fcbb48a93e8699eae920f8d92f7160c03567b421bc17362a9ffbbd706a816f71" ], - "markers": "python_version >= '3.6'", "version": "==1.6.3" } } diff --git a/docs/manuals/admin/configuration.md b/docs/manuals/admin/configuration.md index 7eea89a3..b383e276 100644 --- a/docs/manuals/admin/configuration.md +++ b/docs/manuals/admin/configuration.md @@ -10,6 +10,7 @@ To configure QuantumLeap you can use the following environment variables: | `CRATE_PORT` | CrateDB Port | | `DEFAULT_LIMIT` | Max number of rows a query can retrieve | | `KEEP_RAW_ENTITY` | Whether to store original entity data | +| `INSERT_MAX_SIZE` | Maximum amount of data a SQL (bulk) insert should take | | `POSTGRES_HOST` | PostgreSQL Host | | `POSTGRES_PORT` | PostgreSQL Port | | `POSTGRES_DB_NAME` | PostgreSQL default db | @@ -51,6 +52,20 @@ To configure QuantumLeap you can use the following environment variables: will be interpreted as true: 'true', 'yes', '1', 't', 'y'. Anything else counts for false, which is also the default value if the variable is not set. +- `INSERT_MAX_SIZE`. If set, this variable limits the amount of data that + can be packed in a single SQL bulk insert to the specified value `M`. If + the size of the data to be inserted exceeds `M`, the data is split into + smaller batches, each having a size no greater than `M`, and each batch + is inserted separately, i.e. a separate SQL bulk insert statement is issued + for each batch. Limiting the amount of data that can be inserted at once + is useful with some backends like Crate that abort insert operations when + the data size exceeds an internal threshold. This variable is read in on + each API call to the notify endpoint so it can be set dynamically and it + will affect every subsequent insert operation. Accepted values are sizes + in bytes (B) or `2^10` multiples (KiB, MiB, GiB), e.g. `10 B`, `1.2 KiB`, + `0.9 GiB`. If this variable is not set (or the set value isn't valid), + SQL inserts are processed normally without splitting data into batches. + ## Database selection per different tenant QuantumLeap can use different time series databases to persist and diff --git a/src/translators/insert_splitter.py b/src/translators/insert_splitter.py new file mode 100644 index 00000000..66f68c4a --- /dev/null +++ b/src/translators/insert_splitter.py @@ -0,0 +1,68 @@ +import logging +from objsize import get_deep_size +from typing import Optional, Tuple + +from utils.cfgreader import BitSizeVar, EnvReader +from utils.itersplit import IterCostSplitter + +INSERT_MAX_SIZE_VAR = 'INSERT_MAX_SIZE' +""" +The name of the environment variable to configure the insert max size. +""" + + +def _log(): + return logging.getLogger(__name__) + + +def configured_insert_max_size_in_bytes() -> Optional[int]: + """ + Read the insert max size env var and return its value in bytes if + set to a parsable value or ``None`` otherwise. Notice if a value + is present but is garbage we still return ``None`` but we also + log a warning. + + :return: the max size in bytes if available, ``None`` otherwise. + """ + env_reader = EnvReader(log=_log().debug) + parsed = env_reader.safe_read(BitSizeVar(INSERT_MAX_SIZE_VAR, None)) + if parsed: + return int(parsed.to_Byte()) + return None + + +def compute_row_size(r: Tuple) -> int: + """ + Compute the memory size, in bytes, of the given row's components. + + :param r: the row to insert. + :return: the size in bytes. + """ + component_sizes = [get_deep_size(k) for k in r] + return sum(component_sizes) + + +def to_insert_batches(rows: [Tuple]) -> [[Tuple]]: + """ + Split the SQL rows to insert into batches so the Translator can insert + each batch separately, i.e. issue a SQL insert statement for each batch + as opposed to a single insert for the whole input lot. We do this since + some backends (e.g. Crate) have a cap on how much data you can shovel + in a single SQL (bulk) insert statement---see #445 about it. + + Split only if the insert max size env var holds a valid value. (If that's + not the case, return a single batch with all input rows.) + Splitting happens as explained in the ``IterCostSplitter`` docs with + ``compute_row_size`` as a cost function so the cost of each input row + is the amount of bytes its components take up in memory and the value + of the env var as a maximum batch size (= cost in bytes). + + :param rows: the rows the SQL translator lined up for an insert. + :return: the insert batches. + """ + config_max_cost = configured_insert_max_size_in_bytes() + if config_max_cost is None: + return [rows] + splitter = IterCostSplitter(cost_fn=compute_row_size, + batch_max_cost=config_max_cost) + return splitter.list_batches(rows) diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 4eb390c9..eb50f35b 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -13,9 +13,9 @@ from typing import Any, List, Optional, Sequence from uuid import uuid4 import pickle -import types from cache.factory import get_cache, is_cache_available +from translators.insert_splitter import to_insert_batches from utils.connection_manager import Borg # NGSI TYPES @@ -363,7 +363,10 @@ def _insert_entity_rows(self, table_name: str, col_names: List[str], stmt = f"insert into {table_name} ({col_list}) values ({placeholders})" try: start_time = datetime.now() - self.cursor.executemany(stmt, rows) + + for batch in to_insert_batches(rows): + self.cursor.executemany(stmt, batch) + dt = datetime.now() - start_time time_difference = (dt.days * 24 * 60 * 60 + dt.seconds) \ * 1000 + dt.microseconds / 1000.0 diff --git a/src/translators/tests/test_insert_batches.py b/src/translators/tests/test_insert_batches.py new file mode 100644 index 00000000..afd88566 --- /dev/null +++ b/src/translators/tests/test_insert_batches.py @@ -0,0 +1,123 @@ +from itertools import takewhile +import os +import pytest +import sys + +from translators.base_translator import TIME_INDEX_NAME +from translators.insert_splitter import INSERT_MAX_SIZE_VAR +from translators.tests.original_data_scenarios import full_table_name, \ + gen_tenant_id, gen_entity, OriginalDataScenarios +from translators.tests.test_original_data import translators, \ + with_crate, with_timescale +# NOTE. ^ your IDE is likely to tell you this is dead code, but it isn't +# actually, we need to bring those two fixtures into scope to use them +# with the lazy_fixture calls in 'translators'. + + +def set_insert_max_size(number_of_bytes: int): + os.environ[INSERT_MAX_SIZE_VAR] = f"{number_of_bytes}B" + + +def clear_insert_max_size(): + os.environ[INSERT_MAX_SIZE_VAR] = '' + + +class DataGen: + + def __init__(self, insert_max_size: int, min_batches: int): + self.insert_max_size = insert_max_size + self.min_batches = min_batches + self.unique_tenant_id = gen_tenant_id() + + @staticmethod + def _compute_insert_vector_size_lower_bound(entity: dict) -> int: + vs = entity['id'], entity['type'], entity[TIME_INDEX_NAME], \ + entity['a_number']['value'], entity['an_attr']['value'] + sz = [sys.getsizeof(v) for v in vs] + return sum(sz) + # NOTE. lower bound since it doesn't include e.g. fiware service. + + def _next_entity(self) -> (dict, int): + eid = 0 + size = 0 + while True: + eid += 1 + e = gen_entity(entity_id=eid, attr_type='Number', attr_value=1) + size += self._compute_insert_vector_size_lower_bound(e) + yield e, size + + def generate_insert_payload(self) -> [dict]: + """ + Generate enough data that when the SQL translator is configured with + the given insert_max_size value, it'll have to split the payload in + at least min_batches. + + :return: the entities to insert. + """ + sz = self.insert_max_size * self.min_batches + ts = takewhile(lambda t: t[1] <= sz, self._next_entity()) + return [t[0] for t in ts] +# NOTE. Actual number of batches >= min_batches. +# In fact, say each entity row vector is actually 10 bytes, but our computed +# lower bound is 5. Then with insert_max_size=10 and min_batches=3, es will +# have 6 entities in it for a total payload of 60 which the translator should +# then split into 6 batches. + + +class TestDriver: + + def __init__(self, translator: OriginalDataScenarios, + test_data: DataGen): + self.translator = translator + self.data = test_data + + def _do_insert(self, entities: [dict]): + try: + tid = self.data.unique_tenant_id + self.translator.insert_entities(tid, entities) + finally: + clear_insert_max_size() + + def _assert_row_count(self, expected: int): + table = full_table_name(self.data.unique_tenant_id) + stmt = f"select count(*) as count from {table}" + r = self.translator.query(stmt) + assert r[0]['count'] == expected + + def run(self, with_batches: bool): + if with_batches: + set_insert_max_size(self.data.insert_max_size) + + entities = self.data.generate_insert_payload() + self._do_insert(entities) + self._assert_row_count(len(entities)) + + +@pytest.mark.parametrize('translator', translators, + ids=['timescale', 'crate']) +def test_insert_all_entities_in_one_go(translator): + test_data = DataGen(insert_max_size=1024, min_batches=2) + driver = TestDriver(translator, test_data) + driver.run(with_batches=False) + + +@pytest.mark.parametrize('translator', translators, + ids=['timescale', 'crate']) +@pytest.mark.parametrize('min_batches', [2, 3, 4]) +def test_insert_entities_in_batches(translator, min_batches): + test_data = DataGen(insert_max_size=1024, min_batches=min_batches) + driver = TestDriver(translator, test_data) + driver.run(with_batches=True) + + +# NOTE. Couldn't reproduce #445. +# You can try this, but the exception I get is weirdly enough a connection +# exception. Python will crunch data in memory for about 30 mins, then the +# translator mysteriously fails w/ a connection exception, even though Crate +# is up and running... +# +# def test_huge_crate_insert(with_crate): +# test_data = DataGen(insert_max_size=2*1024*1024, min_batches=1024) +# # ^ should produce at least 2GiB worth of entities!! +# driver = TestDriver(with_crate, test_data) +# driver.run(with_batches=True) diff --git a/src/utils/cfgreader.py b/src/utils/cfgreader.py index fa31f325..68c6b9bd 100644 --- a/src/utils/cfgreader.py +++ b/src/utils/cfgreader.py @@ -3,6 +3,8 @@ variables, YAML files, etc. """ +import bitmath +from bitmath import Bitmath import logging import os from typing import Union @@ -79,6 +81,18 @@ def _do_read(self, rep: str) -> bool: return rep.strip().lower() in ('true', 'yes', '1', 't', 'y') +class BitSizeVar(EVar): + """ + An env value parsed as a digital information size, e.g. file size in + giga bytes, memory size in mega bytes, word size in bits, etc. This + class is just a wrapper around the ``bitmath`` lib, see there for + usage and examples. + """ + + def _do_read(self, rep: str) -> Bitmath: + return bitmath.parse_string(rep) + + class EnvReader: """ Reads environment variables. diff --git a/src/utils/itersplit.py b/src/utils/itersplit.py new file mode 100644 index 00000000..4d80c1f1 --- /dev/null +++ b/src/utils/itersplit.py @@ -0,0 +1,131 @@ +""" +This module provides utilities to spilt iterables into batches. +""" + +from itertools import chain +from typing import Any, Callable, Iterable + + +CostFn = Callable[[Any], int] +""" +A function to assign a "cost" to an item. Typically the cost is a non +negative integer. +""" + + +class IterCostSplitter: + """ + Split a stream in batches so the cumulative cost of each batch is + within a set cost goal. + Given an input stream ``s`` and a cost function ``c``, produce a + sequence of streams ``b`` such that joining the ``b`` streams + yields ``s`` and, for each ``b`` stream of length ``> 1``, mapping + ``c`` to each element and summing the costs yields a value ``≤ M`, + where ``M`` is a set cost goal. In symbols: + + 1. s = b[0] + b[1] + ... + 2. b[k] = [x1, x2, ...] ⟹ M ≥ c(x1) + c(x2) + ... + + Notice it can happen that to make batches satisfying (1) and (2), + some ``b[k]`` contains just one element ``x > M`` since that doesn't + violate (1) and (2). + + Examples: + + >>> splitter = IterCostSplitter(cost_fn=lambda x: x, batch_max_cost=5) + >>> splitter.list_batches([1, 7, 2, 3, 8, 5, 1, 2, 1]) + [[1], [7], [2, 3], [8], [5], [1, 2, 1]] + """ + +# NOTE. Algebra of programming. +# For the mathematically inclined soul out there, the Python implementation +# below is based on this maths spec of sorts, using FP-like syntax for lists +# +# ϕ [] = [] +# ϕ [x] = [[x]] +# ϕ [x, y, ...] = [ x:t, u, ...] if c(x) + Σ c(t[i]) ≤ M +# ϕ [x, y, ...] = [ [x], t, u, ...] otherwise +# +# where [t, u, ...] = ϕ [y, ...] +# Or if you can read Haskell: +# +# ϕ :: [Int] -> [[Int]] +# ϕ [] = [] +# ϕ [x] = [[x]] +# ϕ (x:xs) +# | c x + s y ≤ m = (x:y) : ys +# | otherwise = [x] : y : ys +# where +# (y:ys) = ϕ xs +# c = ...your cost function +# m = ...your cost goal +# s = sum . map c +# +# Why is the Python implementation so damn complicated then?! The mind +# boggles. + + def __init__(self, cost_fn: CostFn, batch_max_cost: int): + """ + Create a new instance. + + :param cost_fn: the function to assign a cost to each stream element. + :param batch_max_cost: the cost goal. It determines how the input + stream gets split into batches. + """ + self._cost_of = cost_fn + self._max_cost = batch_max_cost + self._iter = None + self._keep_iterating = True + + def _put_back(self, item: Any): + self._iter = chain([item], self._iter) + + def _is_empty(self) -> bool: + try: + x = next(self._iter) + self._put_back(x) + return False + except StopIteration: + return True + + def _next_batch(self) -> Iterable[Any]: + cost_so_far = 0 + batch_size = 0 + + for x in self._iter: + next_cost = self._cost_of(x) + + if batch_size == 0 or cost_so_far + next_cost <= self._max_cost: + batch_size += 1 + cost_so_far += next_cost + yield x + else: + self._put_back(x) + return + + self._keep_iterating = False + + def iter_batches(self, xs: Iterable[Any]) -> Iterable[Iterable[Any]]: + """ + Split ``xs`` in batches so the cumulative cost of each batch is + no greater than the ``batch_max_cost`` given to this class' + constructor. + + :param xs: the stream to split. + :return: a stream of streams (batches) with the two properties + documented in this class' description. + """ + self._iter = iter(xs) + if self._is_empty(): + return self._iter + + self._keep_iterating = True + while self._keep_iterating: + yield self._next_batch() + + def list_batches(self, xs: Iterable[Any]) -> [[Any]]: + """ + Same as ``iter_batches`` but force the streams into a list, i.e. + consume all iterators to produce a list of lists. + """ + return [list(ys) for ys in self.iter_batches(xs)] diff --git a/src/utils/tests/test_cfgreader.py b/src/utils/tests/test_cfgreader.py index 91fc9360..a36ae350 100644 --- a/src/utils/tests/test_cfgreader.py +++ b/src/utils/tests/test_cfgreader.py @@ -1,3 +1,4 @@ +from bitmath import MiB import pytest from utils.cfgreader import * @@ -27,6 +28,22 @@ def test_str_var_default(value): assert var.read(value) == def_val +@pytest.mark.parametrize('value, expected', [ + ('1 B', 1), ('1B', 1), ('1 KiB', 1024) +]) +def test_bit_size_var(value, expected): + var = BitSizeVar('V', None) + parsed = var.read(value) + assert int(parsed.to_Byte()) == expected + + +@pytest.mark.parametrize('value', unset_env_values) +def test_bit_size_var_default(value): + def_val = MiB(2) + var = BitSizeVar('V', def_val) + assert var.read(value) == def_val + + @pytest.mark.parametrize('value, expected', [ (' 5', 5), ('\t5 ', 5), ('5\n', 5), (' 5 ', 5), (' 5432', 5432), ('\t5432 ', 5432), ('5432\n', 5432), (' 5432 ', 5432) diff --git a/src/utils/tests/test_itercostsplitter.py b/src/utils/tests/test_itercostsplitter.py new file mode 100644 index 00000000..b6ebe606 --- /dev/null +++ b/src/utils/tests/test_itercostsplitter.py @@ -0,0 +1,82 @@ +import pytest +from typing import Optional + +from utils.itersplit import IterCostSplitter + + +def new_splitter(batch_max_cost: int) -> IterCostSplitter: + def cost(x: Optional[int]) -> int: + return 0 if x is None else x + + return IterCostSplitter(cost_fn=cost, batch_max_cost=batch_max_cost) + + +def test_error_on_none(): + target = new_splitter(batch_max_cost=1) + with pytest.raises(TypeError): + target.list_batches(None) + + +@pytest.mark.parametrize('mx', [-1, 0, 1, 2, 3]) +@pytest.mark.parametrize('empty', [[], iter([])]) +def test_empty_yields_empty(mx, empty): + target = new_splitter(batch_max_cost=mx) + + actual = target.list_batches(empty) + assert len(actual) == 0 + + xs = target.iter_batches(empty) + actual = [x for x in xs] + assert len(actual) == 0 + + +@pytest.mark.parametrize('mx', [-1, 0, 1]) +def test_too_small_cost_yields_singletons(mx): + target = new_splitter(batch_max_cost=mx) + + actual = target.list_batches([2, 3, 4]) + assert actual == [[2], [3], [4]] + + +@pytest.mark.parametrize('in_stream', + [[None, 1, None, 2], iter([None, 1, None, 2])]) +def test_keep_any_input_none(in_stream): + target = new_splitter(batch_max_cost=2) + + actual = target.list_batches(in_stream) + assert actual == [[None, 1, None], [2]] + + +def test_produce_all_batches_even_when_skipping_iterators(): + target = new_splitter(batch_max_cost=1) + xs = target.iter_batches([1, 2, 3]) # ~~> [[1], [2], [3]] + + actual = [] + for k in range(3): + next(xs) # throwing away an iterator ... + x = list(next(xs)) # should affect overall iteration + actual.append(x) + + assert actual == [[1], [2], [3]] + + +def test_can_iter_multiple_times(): + target = new_splitter(batch_max_cost=5) + xs = [2, 3, 4] + expected = [[2, 3], [4]] + + actual = [] + for k in target.iter_batches(iter(xs)): + ks = list(k) + actual.append(ks) + assert actual == expected + + actual = target.list_batches(xs) + assert actual == expected + + +def test_typical_example(): + target = new_splitter(batch_max_cost=5) + xs = [1, 7, 2, 3, 8, 5, 1, 2, 1] + actual = target.list_batches(xs) + assert actual == [[1], [7], [2, 3], [8], [5], [1, 2, 1]]