diff --git a/poetry.lock b/poetry.lock index d3cbfe1e..5bfae682 100644 --- a/poetry.lock +++ b/poetry.lock @@ -128,7 +128,7 @@ pycparser = "*" [[package]] name = "cfgv" -version = "3.2.0" +version = "3.3.0" description = "Validate configuration and produce human readable error messages." category = "dev" optional = false @@ -177,6 +177,9 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" +[package.dependencies] +toml = {version = "*", optional = true, markers = "extra == \"toml\""} + [package.extras] toml = ["toml"] @@ -293,7 +296,7 @@ docs = ["sphinx"] [[package]] name = "httpretty" -version = "1.0.5" +version = "1.1.1" description = "HTTP client mock for Python" category = "dev" optional = false @@ -344,7 +347,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes [[package]] name = "importlib-resources" -version = "5.1.2" +version = "5.1.3" description = "Read resources from Python packages" category = "dev" optional = false @@ -355,7 +358,7 @@ zipp = {version = ">=0.4", markers = "python_version < \"3.8\""} [package.extras] docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] -testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pytest-cov", "pytest-enabler", "pytest-black (>=0.3.7)", "pytest-mypy"] +testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-black (>=0.3.7)", "pytest-mypy"] [[package]] name = "inflection" @@ -561,7 +564,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [[package]] name = "pefile" -version = "2019.4.18" +version = "2021.5.13" description = "Python PE parsing module" category = "dev" optional = false @@ -639,7 +642,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pydantic" -version = "1.8.1" +version = "1.8.2" description = "Data validation and settings management using python 3.6 type hinting" category = "main" optional = false @@ -752,14 +755,14 @@ testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xm [[package]] name = "pytest-cov" -version = "2.11.1" +version = "2.12.0" description = "Pytest plugin for measuring coverage." category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [package.dependencies] -coverage = ">=5.2.1" +coverage = {version = ">=5.2.1", extras = ["toml"]} pytest = ">=4.6" [package.extras] @@ -1121,14 +1124,11 @@ testing = ["coverage (>=4)", "coverage-enable-subprocess (>=1)", "flaky (>=3)", [[package]] name = "websocket-client" -version = "0.59.0" +version = "1.0.0" description = "WebSocket client for Python with low level API options" category = "main" optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - -[package.dependencies] -six = "*" +python-versions = ">=3.6" [[package]] name = "wrapt" @@ -1241,8 +1241,8 @@ cffi = [ {file = "cffi-1.14.5.tar.gz", hash = "sha256:fd78e5fee591709f32ef6edb9a015b4aa1a5022598e36227500c8f4e02328d9c"}, ] cfgv = [ - {file = "cfgv-3.2.0-py2.py3-none-any.whl", hash = "sha256:32e43d604bbe7896fe7c248a9c2276447dbef840feb28fe20494f62af110211d"}, - {file = "cfgv-3.2.0.tar.gz", hash = "sha256:cf22deb93d4bcf92f345a5c3cd39d3d41d6340adc60c78bbbd6588c384fda6a1"}, + {file = "cfgv-3.3.0-py2.py3-none-any.whl", hash = "sha256:b449c9c6118fe8cca7fa5e00b9ec60ba08145d281d52164230a69211c5d597a1"}, + {file = "cfgv-3.3.0.tar.gz", hash = "sha256:9e600479b3b99e8af981ecdfc80a0296104ee610cab48a5ae4ffd0b668650eb1"}, ] chardet = [ {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"}, @@ -1410,7 +1410,7 @@ greenlet = [ {file = "greenlet-1.1.0.tar.gz", hash = "sha256:c87df8ae3f01ffb4483c796fe1b15232ce2b219f0b18126948616224d3f658ee"}, ] httpretty = [ - {file = "httpretty-1.0.5.tar.gz", hash = "sha256:e53c927c4d3d781a0761727f1edfad64abef94e828718e12b672a678a8b3e0b5"}, + {file = "httpretty-1.1.1.tar.gz", hash = "sha256:05223ede0a720564eeed95fbea00000c9569a9863e3661ad513a0c03e6e6eba1"}, ] identify = [ {file = "identify-2.2.4-py2.py3-none-any.whl", hash = "sha256:ad9f3fa0c2316618dc4d840f627d474ab6de106392a4f00221820200f490f5a8"}, @@ -1429,8 +1429,8 @@ importlib-metadata = [ {file = "importlib_metadata-4.0.1.tar.gz", hash = "sha256:8c501196e49fb9df5df43833bdb1e4328f64847763ec8a50703148b73784d581"}, ] importlib-resources = [ - {file = "importlib_resources-5.1.2-py3-none-any.whl", hash = "sha256:ebab3efe74d83b04d6bf5cd9a17f0c5c93e60fb60f30c90f56265fce4682a469"}, - {file = "importlib_resources-5.1.2.tar.gz", hash = "sha256:642586fc4740bd1cad7690f836b3321309402b20b332529f25617ff18e8e1370"}, + {file = "importlib_resources-5.1.3-py3-none-any.whl", hash = "sha256:3b9c774e0e7e8d9c069eb2fe6aee7e9ae71759a381dec02eb45249fba7f38713"}, + {file = "importlib_resources-5.1.3.tar.gz", hash = "sha256:0786b216556e53b34156263ab654406e543a8b0d9b1381019e25a36a09263c36"}, ] inflection = [ {file = "inflection-0.5.1-py2.py3-none-any.whl", hash = "sha256:f38b2b640938a4f35ade69ac3d053042959b62a0f1076a5bbaa1b9526605a8a2"}, @@ -1652,7 +1652,7 @@ pathspec = [ {file = "pathspec-0.8.1.tar.gz", hash = "sha256:86379d6b86d75816baba717e64b1a3a3469deb93bb76d613c9ce79edc5cb68fd"}, ] pefile = [ - {file = "pefile-2019.4.18.tar.gz", hash = "sha256:a5d6e8305c6b210849b47a6174ddf9c452b2888340b8177874b862ba6c207645"}, + {file = "pefile-2021.5.13.tar.gz", hash = "sha256:2aae0c135d4d37e81ff120e825af18b5e4884a97b9290aee811afd6317618f52"}, ] pglast = [ {file = "pglast-1.17-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:c409df8bf3c2f099c057ed7accecb3dd2765b94ffe21da4c51527bc66928a65b"}, @@ -1727,28 +1727,28 @@ pycparser = [ {file = "pycparser-2.20.tar.gz", hash = "sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0"}, ] pydantic = [ - {file = "pydantic-1.8.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux2014_i686.whl", hash = "sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux2014_x86_64.whl", hash = "sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771"}, - {file = "pydantic-1.8.1-cp36-cp36m-win_amd64.whl", hash = "sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f"}, - {file = "pydantic-1.8.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux2014_i686.whl", hash = "sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4"}, - {file = "pydantic-1.8.1-cp37-cp37m-win_amd64.whl", hash = "sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a"}, - {file = "pydantic-1.8.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux2014_i686.whl", hash = "sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0"}, - {file = "pydantic-1.8.1-cp38-cp38-win_amd64.whl", hash = "sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99"}, - {file = "pydantic-1.8.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux1_i686.whl", hash = "sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux2014_i686.whl", hash = "sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f"}, - {file = "pydantic-1.8.1-cp39-cp39-win_amd64.whl", hash = "sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58"}, - {file = "pydantic-1.8.1-py3-none-any.whl", hash = "sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520"}, - {file = "pydantic-1.8.1.tar.gz", hash = "sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3"}, + {file = "pydantic-1.8.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:05ddfd37c1720c392f4e0d43c484217b7521558302e7069ce8d318438d297739"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:a7c6002203fe2c5a1b5cbb141bb85060cbff88c2d78eccbc72d97eb7022c43e4"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_i686.whl", hash = "sha256:589eb6cd6361e8ac341db97602eb7f354551482368a37f4fd086c0733548308e"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_x86_64.whl", hash = "sha256:10e5622224245941efc193ad1d159887872776df7a8fd592ed746aa25d071840"}, + {file = "pydantic-1.8.2-cp36-cp36m-win_amd64.whl", hash = "sha256:99a9fc39470010c45c161a1dc584997f1feb13f689ecf645f59bb4ba623e586b"}, + {file = "pydantic-1.8.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a83db7205f60c6a86f2c44a61791d993dff4b73135df1973ecd9eed5ea0bda20"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:41b542c0b3c42dc17da70554bc6f38cbc30d7066d2c2815a94499b5684582ecb"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_i686.whl", hash = "sha256:ea5cb40a3b23b3265f6325727ddfc45141b08ed665458be8c6285e7b85bd73a1"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:18b5ea242dd3e62dbf89b2b0ec9ba6c7b5abaf6af85b95a97b00279f65845a23"}, + {file = "pydantic-1.8.2-cp37-cp37m-win_amd64.whl", hash = "sha256:234a6c19f1c14e25e362cb05c68afb7f183eb931dd3cd4605eafff055ebbf287"}, + {file = "pydantic-1.8.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:021ea0e4133e8c824775a0cfe098677acf6fa5a3cbf9206a376eed3fc09302cd"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e710876437bc07bd414ff453ac8ec63d219e7690128d925c6e82889d674bb505"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_i686.whl", hash = "sha256:ac8eed4ca3bd3aadc58a13c2aa93cd8a884bcf21cb019f8cfecaae3b6ce3746e"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:4a03cbbe743e9c7247ceae6f0d8898f7a64bb65800a45cbdc52d65e370570820"}, + {file = "pydantic-1.8.2-cp38-cp38-win_amd64.whl", hash = "sha256:8621559dcf5afacf0069ed194278f35c255dc1a1385c28b32dd6c110fd6531b3"}, + {file = "pydantic-1.8.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8b223557f9510cf0bfd8b01316bf6dd281cf41826607eada99662f5e4963f316"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux1_i686.whl", hash = "sha256:244ad78eeb388a43b0c927e74d3af78008e944074b7d0f4f696ddd5b2af43c62"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_i686.whl", hash = "sha256:05ef5246a7ffd2ce12a619cbb29f3307b7c4509307b1b49f456657b43529dc6f"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:54cd5121383f4a461ff7644c7ca20c0419d58052db70d8791eacbbe31528916b"}, + {file = "pydantic-1.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:4be75bebf676a5f0f87937c6ddb061fa39cbea067240d98e298508c1bda6f3f3"}, + {file = "pydantic-1.8.2-py3-none-any.whl", hash = "sha256:fec866a0b59f372b7e776f2d7308511784dace622e0992a0b59ea3ccee0ae833"}, + {file = "pydantic-1.8.2.tar.gz", hash = "sha256:26464e57ccaafe72b7ad156fdaa4e9b9ef051f69e175dbbb463283000c05ab7b"}, ] pyfakefs = [ {file = "pyfakefs-4.4.0-py3-none-any.whl", hash = "sha256:1ac3b2845dabe69af56c20691b9347914581195ccdde352535fb7d4ff0055c19"}, @@ -1781,8 +1781,8 @@ pytest = [ {file = "pytest-6.2.4.tar.gz", hash = "sha256:50bcad0a0b9c5a72c8e4e7c9855a3ad496ca6a881a3641b4260605450772c54b"}, ] pytest-cov = [ - {file = "pytest-cov-2.11.1.tar.gz", hash = "sha256:359952d9d39b9f822d9d29324483e7ba04a3a17dd7d05aa6beb7ea01e359e5f7"}, - {file = "pytest_cov-2.11.1-py2.py3-none-any.whl", hash = "sha256:bdb9fdb0b85a7cc825269a4c56b48ccaa5c7e365054b6038772c32ddcdc969da"}, + {file = "pytest-cov-2.12.0.tar.gz", hash = "sha256:8535764137fecce504a49c2b742288e3d34bc09eed298ad65963616cc98fd45e"}, + {file = "pytest_cov-2.12.0-py2.py3-none-any.whl", hash = "sha256:95d4933dcbbacfa377bb60b29801daa30d90c33981ab2a79e9ab4452c165066e"}, ] pytest-env = [ {file = "pytest-env-0.6.2.tar.gz", hash = "sha256:7e94956aef7f2764f3c147d216ce066bf6c42948bb9e293169b1b1c880a580c2"}, @@ -2029,8 +2029,8 @@ virtualenv = [ {file = "virtualenv-20.4.6.tar.gz", hash = "sha256:72cf267afc04bf9c86ec932329b7e94db6a0331ae9847576daaa7ca3c86b29a4"}, ] websocket-client = [ - {file = "websocket-client-0.59.0.tar.gz", hash = "sha256:d376bd60eace9d437ab6d7ee16f4ab4e821c9dae591e1b783c58ebd8aaf80c5c"}, - {file = "websocket_client-0.59.0-py2.py3-none-any.whl", hash = "sha256:2e50d26ca593f70aba7b13a489435ef88b8fc3b5c5643c1ce8808ff9b40f0b32"}, + {file = "websocket-client-1.0.0.tar.gz", hash = "sha256:5051b38a2f4c27fbd7ca077ebb23ec6965a626ded5a95637f36be1b35b6c4f81"}, + {file = "websocket_client-1.0.0-py2.py3-none-any.whl", hash = "sha256:57f876f1af4731cacb806cf54d02f5fbf75dee796053b9a5b94fd7c1d9621db9"}, ] wrapt = [ {file = "wrapt-1.12.1.tar.gz", hash = "sha256:b62ffa81fb85f4332a4f609cab4ac40709470da05643a082ec1eb88e6d9b97d7"}, diff --git a/splitgraph/core/fragment_manager.py b/splitgraph/core/fragment_manager.py index e319ee9e..53308530 100644 --- a/splitgraph/core/fragment_manager.py +++ b/splitgraph/core/fragment_manager.py @@ -65,7 +65,22 @@ def _log_commit_progress(table_size, no_chunks): return table_size > 500000 or no_chunks > 100 -def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str, Any, Any]]]: +def _key(c): + # Hack to work around Nones sometimes being there in comparables. We make a fake + # key, prepending a boolean (is the item None?) to every item to get a consistent + # ordering that includes Nones. + if isinstance(c, tuple): + return tuple((ci is None, ci) for ci in c) + return c is None, c + + +def _pk_overlap(pk_1: Tuple, pk_2: Tuple) -> bool: + return bool(_key(pk_1[0]) <= _key(pk_2[1]) and _key(pk_1[1]) >= _key(pk_2[0])) + + +def get_chunk_groups( + chunks: List[Tuple[str, Any, Any]], +) -> List[List[Tuple[str, Any, Any]]]: """ Takes a list of chunks and their boundaries and combines them into independent groups such that chunks from no two groups @@ -101,14 +116,6 @@ def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str current_group_start = None current_group_end = None - def _key(c): - # Hack to work around Nones sometimes being there in comparables. We make a fake - # key, prepending a boolean (is the item None?) to every item to get a consistent - # ordering that includes Nones. - if isinstance(c, tuple): - return tuple((ci is None, ci) for ci in c) - return c is None, c - for original_id, chunk_id, start, end in sorted( chunks, key=lambda c: _key(c[2]) # type:ignore ): @@ -121,7 +128,7 @@ def _key(c): assert current_group_start assert current_group_end # See if the chunk overlaps with the current chunk group - if _key(start) <= _key(current_group_end) and _key(end) >= _key(current_group_start): + if _pk_overlap((start, end), (current_group_start, current_group_end)): current_group.append((original_id, chunk_id, start, end)) current_group_start = min(current_group_start, start, key=_key) current_group_end = max(current_group_end, end, key=_key) @@ -508,12 +515,18 @@ def _store_changeset( upserted = [pk for pk, data in sub_changeset.items() if data[0]] deleted = [pk for pk, data in sub_changeset.items() if not data[0]] self.object_engine.store_fragment( - upserted, deleted, "pg_temp", tmp_object_id, schema, table, table_schema, + upserted, + deleted, + "pg_temp", + tmp_object_id, + schema, + table, + table_schema, ) return tmp_object_id def calculate_fragment_insertion_hash_stats( - self, schema: str, table: str, table_schema: TableSchema = None + self, schema: str, table: str, table_schema: Optional[TableSchema] = None ) -> Tuple[Digest, int]: """ Calculate the homomorphic hash of just the rows that a given fragment inserts @@ -1060,8 +1073,44 @@ def filter_fragments(self, object_ids: List[str], table: "Table", quals: Any) -> len(range_filter_result), ) + objects_to_scan = self._add_overlapping_objects(table, object_ids, bloom_filter_result) + + if len(objects_to_scan) > len(bloom_filter_result): + logging.info( + "Will need to scan through extra %d overlapping fragment(s)", + len(objects_to_scan) - len(bloom_filter_result), + ) + # Preserve original object order. - return [r for r in object_ids if r in bloom_filter_result] + return [r for r in object_ids if r in objects_to_scan] + + def _add_overlapping_objects( + self, table: "Table", all_objects: List[str], filtered_objects: List[str] + ) -> Set[str]: + # Expand the list of objects by also adding the objects that might overwrite these. In some + # cases, we don't keep track of the rows that an object deletes in the index, since that + # adds an implicit dependency on those previous objects. + + table_pk = get_change_key(table.table_schema) + object_pks = self.get_min_max_pks(all_objects, table_pk) + + # Go through all objects and see if they 1) come after any of our chosen objects and 2) + # overlap those objects' PKs (if they come after them) + original_order = {object_id: i for i, object_id in enumerate(all_objects)} + object_pk_dict = { + object_id: object_pk for object_id, object_pk in zip(all_objects, object_pks) + } + objects_to_scan = set(filtered_objects) + for overlap_candidate in all_objects: + if overlap_candidate in objects_to_scan: + continue + for our_object in filtered_objects: + if original_order[our_object] < original_order[overlap_candidate] and _pk_overlap( + object_pk_dict[our_object], object_pk_dict[overlap_candidate] + ): + objects_to_scan.add(overlap_candidate) + break + return objects_to_scan def delete_objects(self, objects: Union[Set[str], List[str]]) -> None: """ diff --git a/splitgraph/ingestion/singer/_utils.py b/splitgraph/ingestion/singer/_utils.py index 065b5e25..6f3f5c0c 100644 --- a/splitgraph/ingestion/singer/_utils.py +++ b/splitgraph/ingestion/singer/_utils.py @@ -45,14 +45,18 @@ def _migrate_schema(engine, table_schema, table_name, table_schema_spec, new_sch if c not in new_cols: engine.run_sql( SQL("ALTER TABLE {}.{} DROP COLUMN {}").format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) for c in new_cols: if c not in old_cols: engine.run_sql( SQL("ALTER TABLE {}.{} ADD COLUMN {} %s" % validate_type(new_cols[c])).format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) elif new_cols[c] != old_cols[c]: @@ -60,15 +64,15 @@ def _migrate_schema(engine, table_schema, table_name, table_schema_spec, new_sch SQL( "ALTER TABLE {}.{} ALTER COLUMN {} TYPE %s" % validate_type(new_cols[c]) ).format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) def _make_changeset( engine: PostgresEngine, - old_schema: str, - old_table: str, schema: str, table: str, schema_spec: TableSchema, @@ -78,34 +82,34 @@ def _make_changeset( to the object manager (store as a Splitgraph diff).""" # PK -> (upserted / deleted, old row, new row) - # As a memory-saving hack, we only record the values of the old row (read from the - # current table) -- this is because object creation routines read the inserted rows - # from the staging table anyway. + # We don't find out the old row here. This is because it requires a JOIN on the current + # Splitgraph table, so if we're adding e.g. 100k rows to a 1M row table, it's going to cause big + # performance issues. Instead, we pretend that all rows + # have been inserted (apart from the ones that have been deleted by having the magic + # _sdc_deleted_at column). + + # We also don't care about finding out the new row here, as the storage routine queries + # the table directly to get those values. + + # The tradeoff is that now, when querying the table, we need to include not only fragments + # whose index matches the query, but also all fragments that might overwrite those fragments + # (through PK range overlap). Since we don't record old row values in this changeset's index, + # we can no longer find if a fragment deletes some row by inspecting the index -- we need to + # use PK ranges to find out overlapping fragments. + change_key = [c for c, _ in get_change_key(schema_spec)] # Query: - # SELECT (new, pk, columns) AS pk, + # SELECT (col_1, col_2, ...) AS pk, # (custom upsert condition), - # (row_to_json(old non-pk cols)) AS old_row - # FROM new_table n LEFT OUTER JOIN old_table o ON [o.pk = n.pk] - # WHERE old row != new row + # {} AS old_row + # FROM new_table n query = ( SQL("SELECT ") + SQL(",").join(SQL("n.") + Identifier(c) for c in change_key) + SQL(",") - + SQL(upsert_condition + " AS upserted, ") - # If PK doesn't exist in the new table, old_row is null, else output it - + SQL("CASE WHEN ") - + SQL(" AND ").join(SQL("o.{0} IS NULL").format(Identifier(c)) for c in change_key) - + SQL(" THEN '{}'::json ELSE json_build_object(") - + SQL(",").join( - SQL("%s, o.") + Identifier(c.name) for c in schema_spec if c.name not in change_key - ) - + SQL(") END AS old_row FROM {}.{} n LEFT OUTER JOIN {}.{} o ON ").format( - Identifier(schema), Identifier(table), Identifier(old_schema), Identifier(old_table), + + SQL(upsert_condition + " AS upserted FROM {}.{} n").format( + Identifier(schema), Identifier(table) ) - + SQL(" AND ").join(SQL("o.{0} = n.{0}").format(Identifier(c)) for c in change_key) - + SQL("WHERE o.* IS DISTINCT FROM n.*") ).as_string(engine.connection) - args = [c.name for c in schema_spec if c.name not in change_key] - result = engine.run_sql(query, args) - return {tuple(row[:-2]): (row[-2], row[-1], {}) for row in result} + result = engine.run_sql(query) + return {tuple(row[:-1]): (row[-1], {}, {}) for row in result} diff --git a/splitgraph/ingestion/singer/data_source.py b/splitgraph/ingestion/singer/data_source.py index 8e86f4c6..7be62865 100644 --- a/splitgraph/ingestion/singer/data_source.py +++ b/splitgraph/ingestion/singer/data_source.py @@ -150,8 +150,9 @@ def sync( logging.warning("Data source partially failed. Keeping the image anyway", exc_info=e) failure = e - new_state = output_stream.getvalue() - logging.info("New state: %s", new_state) + states = output_stream.getvalue() + latest_state = states.splitlines()[-1] + logging.info("State stream: %s", states) # Add a table to the new image with the new state repository.object_engine.create_table( @@ -161,11 +162,12 @@ def sync( temporary=True, ) # NB: new_state here is a JSON-serialized string, so we don't wrap it into psycopg2.Json() + logging.info("Writing state: %s", latest_state) repository.object_engine.run_sql( SQL("INSERT INTO pg_temp.{} (timestamp, state) VALUES(now(), %s)").format( Identifier(INGESTION_STATE_TABLE) ), - (new_state,), + (latest_state,), ) object_id = repository.objects.create_base_fragment( diff --git a/splitgraph/ingestion/singer/db_sync.py b/splitgraph/ingestion/singer/db_sync.py index 3bc69a79..4dded737 100644 --- a/splitgraph/ingestion/singer/db_sync.py +++ b/splitgraph/ingestion/singer/db_sync.py @@ -190,8 +190,6 @@ def _merge_existing_table(self, old_table, temp_table): # Find PKs that have been upserted and deleted (make fake changeset) changeset = _make_changeset( self.image.object_engine, - s, - old_table.table_name, "pg_temp", temp_table, old_table.table_schema, @@ -221,7 +219,14 @@ def _merge_existing_table(self, old_table, temp_table): # the object to the table if it already exists) self.image.repository.objects.register_tables( self.image.repository, - [(self.image.image_hash, old_table.table_name, old_table.table_schema, object_ids,)], + [ + ( + self.image.image_hash, + old_table.table_name, + old_table.table_schema, + object_ids, + ) + ], ) def delete_rows(self, stream): diff --git a/test/splitgraph/commandline/test_cloud_metadata.py b/test/splitgraph/commandline/test_cloud_metadata.py index 21f9606f..c96ea283 100644 --- a/test/splitgraph/commandline/test_cloud_metadata.py +++ b/test/splitgraph/commandline/test_cloud_metadata.py @@ -233,13 +233,20 @@ def test_commandline_dump(): ) assert result.exit_code == 0 - assert list(os.walk(tmpdir)) == [ - (tmpdir, ["readmes"], ["repositories.yml"]), - ( - os.path.join(tmpdir, "readmes"), - [], - ["otheruser-somerepo_2.fe37.md", "someuser-somerepo_1.b7f3.md"], - ), + contents = list(os.walk(tmpdir)) + # Check the dump root + assert contents[0] == (tmpdir, ["readmes"], ["repositories.yml"]) + + # Check the readmes subdirectory: no directories + assert contents[1][:2] == ( + os.path.join(tmpdir, "readmes"), + [], + ) + + # ... and two files + assert sorted(contents[1][2]) == [ + "otheruser-somerepo_2.fe37.md", + "someuser-somerepo_1.b7f3.md", ] _somerepo_1_dump = { diff --git a/test/splitgraph/commands/test_layered_querying.py b/test/splitgraph/commands/test_layered_querying.py index 45ed6c78..d8cf3f43 100644 --- a/test/splitgraph/commands/test_layered_querying.py +++ b/test/splitgraph/commands/test_layered_querying.py @@ -158,9 +158,9 @@ def test_direct_table_lq_query_plan_cache(self, lq_test_repo): assert fo.call_count == 1 query_plan = table.get_query_plan(quals=quals, columns=["name", "timestamp"]) - assert query_plan.estimated_rows == 2 + assert query_plan.estimated_rows == 1 assert len(query_plan.required_objects) == 4 - assert len(query_plan.filtered_objects) == 2 + assert len(query_plan.filtered_objects) == 3 def test_layered_querying_against_single_fragment(pg_repo_local): @@ -288,7 +288,7 @@ def _prepare_fully_remote_repo(local_engine_empty, pg_repo_remote_registry): "test_case", [ # Each test case is a: query, expected result, mask of which objects were downloaded - # Test single PK qual + # Test single PK qual -- hits the last object with that PK ( "SELECT * FROM fruits WHERE fruit_id = 4", [(4, "kumquat", 1, _DT)], @@ -300,25 +300,30 @@ def _prepare_fully_remote_repo(local_engine_empty, pg_repo_remote_registry): [(3, "mayonnaise", 1, _DT), (4, "kumquat", 1, _DT)], (False, True, False, False, True), ), - # Test the upsert fetches the original fragment as well as one that overwrites it + # Test the upsert fetches the original fragment as well as one that overwrites it. + # Because we load the original fragment, we also have to check the one that DELETEs + # a row from it (since its PK overlaps with it) ( "SELECT * FROM fruits WHERE fruit_id = 2", [(2, "guitar", 1, _DT)], - (True, False, False, True, False), + (True, False, True, True, False), ), # Test NULLs don't break anything (even though we still look at all objects) ("SELECT * FROM fruits WHERE name IS NULL", [], (True, True, True, True, True)), # Same but also add a filter on the string column to exclude 'guitar'. # Make sure the chunk that updates 'orange' into 'guitar' is still fetched # since it overwrites the old value (even though the updated value doesn't match the qual any more) + # Also, the DELETE pk=1 fragment is also loaded (since it overlaps with the first fragment + # that we're also loading) ( "SELECT * FROM fruits WHERE fruit_id = 2 AND name > 'guitar'", [], - (True, False, False, True, False), + (True, False, True, True, False), ), # Similar here: the chunk that deletes 'apple' is supposed to have 'apple' included in its index # and fetched as well. - ("SELECT * FROM fruits WHERE name = 'apple'", [], (True, False, True, False, False)), + # In this case, we also load the UPDATE pk=2 fragment (overlaps with the first). + ("SELECT * FROM fruits WHERE name = 'apple'", [], (True, False, True, True, False)), ], ) @pytest.mark.registry @@ -580,81 +585,50 @@ def test_disjoint_table_lq_two_singletons_one_overwritten(pg_repo_local): ] # This time we had to apply the fragments in the final group (since there were two of them) - apply_fragments.assert_called_once_with( - [ - ( - "splitgraph_meta", - "oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc", - ), - ( - "splitgraph_meta", - "o15a420721b04e9749761b5368628cb15593cb8cfdcc547107b98eddda5031d", - ), - ], - SPLITGRAPH_META_SCHEMA, - mock.ANY, - extra_qual_args=("3",), - extra_quals=mock.ANY, - schema_spec=mock.ANY, - ) - - # Two calls to _generate_select_queries -- one to directly query the pk=3 chunk... - assert _gsc.call_args_list == [ - mock.call( - pg_repo_local.engine, - b'"splitgraph_meta".' - b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', - ["fruit_id", "name"], - mock.ANY, - ("3",), - ), - # ...and one to query the applied fragments in the second group. - mock.call(pg_repo_local.engine, mock.ANY, ["fruit_id", "name"], mock.ANY, ("3",),), - ] - - # Check the temporary table has been deleted since we've exhausted the query - args, _ = apply_fragments.call_args_list[0] - tmp_table = args[2] - assert not pg_repo_local.engine.table_exists(SPLITGRAPH_META_SCHEMA, tmp_table) - - # Now query PKs 3 and 4. Even though the chunk containing PKs 4 and 5 was updated - # (by changing PK 5), the qual filter should drop the update, as it's not pertinent - # to the query. Hence, we should end up not needing fragment application. - with mock.patch.object( - PostgresEngine, "apply_fragments", wraps=pg_repo_local.engine.apply_fragments - ) as apply_fragments: - with mock.patch( - "splitgraph.core.table._generate_select_query", side_effect=_generate_select_query - ) as _gsc: - assert list( - fruits.query( - columns=["fruit_id", "name"], - quals=[[("fruit_id", "=", "3"), ("fruit_id", "=", "4")]], - ) - ) == [{"fruit_id": 3, "name": "mayonnaise"}, {"fruit_id": 4, "name": "fruit_4"}] + _assert_fragments_applied(_gsc, apply_fragments, pg_repo_local) - # No fragment application - assert apply_fragments.call_count == 0 - # Single call to _generate_select_queries directly selecting rows from the two chunks - assert _gsc.mock_calls == [ - call( - mock.ANY, - b'"splitgraph_meta".' - b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', - ["fruit_id", "name"], - mock.ANY, - ("3", "4"), - ), - call( - mock.ANY, - b'"splitgraph_meta".' - b'"oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc"', - ["fruit_id", "name"], - mock.ANY, - ("3", "4"), - ), - ] +def _assert_fragments_applied(_gsc, apply_fragments, pg_repo_local): + apply_fragments.assert_called_once_with( + [ + ( + "splitgraph_meta", + "oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc", + ), + ( + "splitgraph_meta", + "o15a420721b04e9749761b5368628cb15593cb8cfdcc547107b98eddda5031d", + ), + ], + SPLITGRAPH_META_SCHEMA, + mock.ANY, + extra_qual_args=("3",), + extra_quals=mock.ANY, + schema_spec=mock.ANY, + ) + # Two calls to _generate_select_queries -- one to directly query the pk=3 chunk... + assert _gsc.call_args_list == [ + mock.call( + pg_repo_local.engine, + b'"splitgraph_meta".' + b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', + ["fruit_id", "name"], + mock.ANY, + ("3",), + ), + # ...and one to query the applied fragments in the second group. + mock.call( + pg_repo_local.engine, + mock.ANY, + ["fruit_id", "name"], + mock.ANY, + ("3",), + ), + ] + # Check the temporary table has been deleted since we've exhausted the query + args, _ = apply_fragments.call_args_list[0] + tmp_table = args[2] + assert not pg_repo_local.engine.table_exists(SPLITGRAPH_META_SCHEMA, tmp_table) def test_disjoint_table_lq_two_singletons_one_overwritten_indirect(pg_repo_local): diff --git a/test/splitgraph/ingestion/test_singer.py b/test/splitgraph/ingestion/test_singer.py index 7af7bb8a..046f3586 100644 --- a/test/splitgraph/ingestion/test_singer.py +++ b/test/splitgraph/ingestion/test_singer.py @@ -187,15 +187,18 @@ def test_singer_ingestion_update(local_engine_empty): # Extra DIFF at the end assert image.get_table("stargazers").objects == [ "od68e932ebc99c1a337363c1b92056dcf7fc7c6c45494bc42e1e1ec4e0c88ac", - "oc61804b31dcae8294a6b780efe41601eaeb7a1d0b7cd7bdfea4843db214df0", + "o7a04cc1f13d00eea692bede58b57b07c4272e07458ac8b405971b1f5f49679", ] assert repo.run_sql( "SELECT sg_ud_flag, user_id, starred_at " - "FROM splitgraph_meta.oc61804b31dcae8294a6b780efe41601eaeb7a1d0b7cd7bdfea4843db214df0 " + "FROM splitgraph_meta.o7a04cc1f13d00eea692bede58b57b07c4272e07458ac8b405971b1f5f49679 " "ORDER BY user_id" ) == [ (True, Decimal("100004"), datetime(2020, 10, 11, 21, 9, 30)), + # Even though this row is the same as in the previous fragment, we keep it here + # as we don't compare its value with the previous fragment. + (True, Decimal("100005"), datetime(2019, 4, 18, 2, 40, 47)), (True, Decimal("100006"), datetime(2019, 6, 6, 20, 53)), ] @@ -586,5 +589,8 @@ def test_singer_tap_mysql_sync(local_engine_empty): assert len(repo.images()) == 1 image = repo.images["latest"] assert image.get_table("mushrooms").objects == [ - "o69e4529709af65f37f2e2f3a8290340ae7ad9ada6bca9c393a09572f12cbb3" + "o69e4529709af65f37f2e2f3a8290340ae7ad9ada6bca9c393a09572f12cbb3", + # TODO: this object has the pk=2 row from the previous one repeated, a tap-mysql bug + # but we don't conflate these with Singer now. + "od487f26d32a347ae4cc81a7442ef5a28615f70a9fff426991ab0d9d14bf7aa", ] diff --git a/test/splitgraph/ingestion/test_socrata.py b/test/splitgraph/ingestion/test_socrata.py index e4920995..d6403cb8 100644 --- a/test/splitgraph/ingestion/test_socrata.py +++ b/test/splitgraph/ingestion/test_socrata.py @@ -358,8 +358,7 @@ def test_socrata_column_deduplication(): @pytest.mark.parametrize( "domain,dataset_id", [ - # Had issues with 403s if the :id column was requested explicitly - ("data.healthcare.gov", "7h6f-vws8"), + ("data.cityofchicago.org", "x2n5-8w5q"), # Popular for hire vehicles dataset ("data.cityofnewyork.us", "8wbx-tsch"), ],