From bcf7202c97a5d57d14429dfacc8fac764b75f7a6 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 13:15:41 +0100 Subject: [PATCH 1/7] When figuring out which objects to scan through, also add objects whose PKs overlap the PKs of the objects we are going to scan through and that come after them. This is to simplify image creation so that, when making an object, we don't need to care about the old rows it overwrites (and hence don't need to calculate their hash or include it into the object or even find out what they are when we don't have access to the full current table). It will allow us to actually be able to treat objects independently, recalculate their hashes and the index, make it much easier to write to Splitgraph images without checkout etc. The main reason for this is to speed up data ingestion from Singer where we add up chunks of 100k rows and currently have to run a giant JOIN against the current table to figure out which rows we updated. The tradeoff is that sometimes we'll scan through more objects (in the rare case where we're fetching some rows that aren't overwritten by a latter object but the object they come from partially overlaps with an older object). anyway). --- splitgraph/core/fragment_manager.py | 75 ++++++++-- .../commands/test_layered_querying.py | 134 +++++++----------- 2 files changed, 116 insertions(+), 93 deletions(-) diff --git a/splitgraph/core/fragment_manager.py b/splitgraph/core/fragment_manager.py index e319ee9e..53308530 100644 --- a/splitgraph/core/fragment_manager.py +++ b/splitgraph/core/fragment_manager.py @@ -65,7 +65,22 @@ def _log_commit_progress(table_size, no_chunks): return table_size > 500000 or no_chunks > 100 -def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str, Any, Any]]]: +def _key(c): + # Hack to work around Nones sometimes being there in comparables. We make a fake + # key, prepending a boolean (is the item None?) to every item to get a consistent + # ordering that includes Nones. + if isinstance(c, tuple): + return tuple((ci is None, ci) for ci in c) + return c is None, c + + +def _pk_overlap(pk_1: Tuple, pk_2: Tuple) -> bool: + return bool(_key(pk_1[0]) <= _key(pk_2[1]) and _key(pk_1[1]) >= _key(pk_2[0])) + + +def get_chunk_groups( + chunks: List[Tuple[str, Any, Any]], +) -> List[List[Tuple[str, Any, Any]]]: """ Takes a list of chunks and their boundaries and combines them into independent groups such that chunks from no two groups @@ -101,14 +116,6 @@ def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str current_group_start = None current_group_end = None - def _key(c): - # Hack to work around Nones sometimes being there in comparables. We make a fake - # key, prepending a boolean (is the item None?) to every item to get a consistent - # ordering that includes Nones. - if isinstance(c, tuple): - return tuple((ci is None, ci) for ci in c) - return c is None, c - for original_id, chunk_id, start, end in sorted( chunks, key=lambda c: _key(c[2]) # type:ignore ): @@ -121,7 +128,7 @@ def _key(c): assert current_group_start assert current_group_end # See if the chunk overlaps with the current chunk group - if _key(start) <= _key(current_group_end) and _key(end) >= _key(current_group_start): + if _pk_overlap((start, end), (current_group_start, current_group_end)): current_group.append((original_id, chunk_id, start, end)) current_group_start = min(current_group_start, start, key=_key) current_group_end = max(current_group_end, end, key=_key) @@ -508,12 +515,18 @@ def _store_changeset( upserted = [pk for pk, data in sub_changeset.items() if data[0]] deleted = [pk for pk, data in sub_changeset.items() if not data[0]] self.object_engine.store_fragment( - upserted, deleted, "pg_temp", tmp_object_id, schema, table, table_schema, + upserted, + deleted, + "pg_temp", + tmp_object_id, + schema, + table, + table_schema, ) return tmp_object_id def calculate_fragment_insertion_hash_stats( - self, schema: str, table: str, table_schema: TableSchema = None + self, schema: str, table: str, table_schema: Optional[TableSchema] = None ) -> Tuple[Digest, int]: """ Calculate the homomorphic hash of just the rows that a given fragment inserts @@ -1060,8 +1073,44 @@ def filter_fragments(self, object_ids: List[str], table: "Table", quals: Any) -> len(range_filter_result), ) + objects_to_scan = self._add_overlapping_objects(table, object_ids, bloom_filter_result) + + if len(objects_to_scan) > len(bloom_filter_result): + logging.info( + "Will need to scan through extra %d overlapping fragment(s)", + len(objects_to_scan) - len(bloom_filter_result), + ) + # Preserve original object order. - return [r for r in object_ids if r in bloom_filter_result] + return [r for r in object_ids if r in objects_to_scan] + + def _add_overlapping_objects( + self, table: "Table", all_objects: List[str], filtered_objects: List[str] + ) -> Set[str]: + # Expand the list of objects by also adding the objects that might overwrite these. In some + # cases, we don't keep track of the rows that an object deletes in the index, since that + # adds an implicit dependency on those previous objects. + + table_pk = get_change_key(table.table_schema) + object_pks = self.get_min_max_pks(all_objects, table_pk) + + # Go through all objects and see if they 1) come after any of our chosen objects and 2) + # overlap those objects' PKs (if they come after them) + original_order = {object_id: i for i, object_id in enumerate(all_objects)} + object_pk_dict = { + object_id: object_pk for object_id, object_pk in zip(all_objects, object_pks) + } + objects_to_scan = set(filtered_objects) + for overlap_candidate in all_objects: + if overlap_candidate in objects_to_scan: + continue + for our_object in filtered_objects: + if original_order[our_object] < original_order[overlap_candidate] and _pk_overlap( + object_pk_dict[our_object], object_pk_dict[overlap_candidate] + ): + objects_to_scan.add(overlap_candidate) + break + return objects_to_scan def delete_objects(self, objects: Union[Set[str], List[str]]) -> None: """ diff --git a/test/splitgraph/commands/test_layered_querying.py b/test/splitgraph/commands/test_layered_querying.py index 45ed6c78..d8cf3f43 100644 --- a/test/splitgraph/commands/test_layered_querying.py +++ b/test/splitgraph/commands/test_layered_querying.py @@ -158,9 +158,9 @@ def test_direct_table_lq_query_plan_cache(self, lq_test_repo): assert fo.call_count == 1 query_plan = table.get_query_plan(quals=quals, columns=["name", "timestamp"]) - assert query_plan.estimated_rows == 2 + assert query_plan.estimated_rows == 1 assert len(query_plan.required_objects) == 4 - assert len(query_plan.filtered_objects) == 2 + assert len(query_plan.filtered_objects) == 3 def test_layered_querying_against_single_fragment(pg_repo_local): @@ -288,7 +288,7 @@ def _prepare_fully_remote_repo(local_engine_empty, pg_repo_remote_registry): "test_case", [ # Each test case is a: query, expected result, mask of which objects were downloaded - # Test single PK qual + # Test single PK qual -- hits the last object with that PK ( "SELECT * FROM fruits WHERE fruit_id = 4", [(4, "kumquat", 1, _DT)], @@ -300,25 +300,30 @@ def _prepare_fully_remote_repo(local_engine_empty, pg_repo_remote_registry): [(3, "mayonnaise", 1, _DT), (4, "kumquat", 1, _DT)], (False, True, False, False, True), ), - # Test the upsert fetches the original fragment as well as one that overwrites it + # Test the upsert fetches the original fragment as well as one that overwrites it. + # Because we load the original fragment, we also have to check the one that DELETEs + # a row from it (since its PK overlaps with it) ( "SELECT * FROM fruits WHERE fruit_id = 2", [(2, "guitar", 1, _DT)], - (True, False, False, True, False), + (True, False, True, True, False), ), # Test NULLs don't break anything (even though we still look at all objects) ("SELECT * FROM fruits WHERE name IS NULL", [], (True, True, True, True, True)), # Same but also add a filter on the string column to exclude 'guitar'. # Make sure the chunk that updates 'orange' into 'guitar' is still fetched # since it overwrites the old value (even though the updated value doesn't match the qual any more) + # Also, the DELETE pk=1 fragment is also loaded (since it overlaps with the first fragment + # that we're also loading) ( "SELECT * FROM fruits WHERE fruit_id = 2 AND name > 'guitar'", [], - (True, False, False, True, False), + (True, False, True, True, False), ), # Similar here: the chunk that deletes 'apple' is supposed to have 'apple' included in its index # and fetched as well. - ("SELECT * FROM fruits WHERE name = 'apple'", [], (True, False, True, False, False)), + # In this case, we also load the UPDATE pk=2 fragment (overlaps with the first). + ("SELECT * FROM fruits WHERE name = 'apple'", [], (True, False, True, True, False)), ], ) @pytest.mark.registry @@ -580,81 +585,50 @@ def test_disjoint_table_lq_two_singletons_one_overwritten(pg_repo_local): ] # This time we had to apply the fragments in the final group (since there were two of them) - apply_fragments.assert_called_once_with( - [ - ( - "splitgraph_meta", - "oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc", - ), - ( - "splitgraph_meta", - "o15a420721b04e9749761b5368628cb15593cb8cfdcc547107b98eddda5031d", - ), - ], - SPLITGRAPH_META_SCHEMA, - mock.ANY, - extra_qual_args=("3",), - extra_quals=mock.ANY, - schema_spec=mock.ANY, - ) - - # Two calls to _generate_select_queries -- one to directly query the pk=3 chunk... - assert _gsc.call_args_list == [ - mock.call( - pg_repo_local.engine, - b'"splitgraph_meta".' - b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', - ["fruit_id", "name"], - mock.ANY, - ("3",), - ), - # ...and one to query the applied fragments in the second group. - mock.call(pg_repo_local.engine, mock.ANY, ["fruit_id", "name"], mock.ANY, ("3",),), - ] - - # Check the temporary table has been deleted since we've exhausted the query - args, _ = apply_fragments.call_args_list[0] - tmp_table = args[2] - assert not pg_repo_local.engine.table_exists(SPLITGRAPH_META_SCHEMA, tmp_table) - - # Now query PKs 3 and 4. Even though the chunk containing PKs 4 and 5 was updated - # (by changing PK 5), the qual filter should drop the update, as it's not pertinent - # to the query. Hence, we should end up not needing fragment application. - with mock.patch.object( - PostgresEngine, "apply_fragments", wraps=pg_repo_local.engine.apply_fragments - ) as apply_fragments: - with mock.patch( - "splitgraph.core.table._generate_select_query", side_effect=_generate_select_query - ) as _gsc: - assert list( - fruits.query( - columns=["fruit_id", "name"], - quals=[[("fruit_id", "=", "3"), ("fruit_id", "=", "4")]], - ) - ) == [{"fruit_id": 3, "name": "mayonnaise"}, {"fruit_id": 4, "name": "fruit_4"}] + _assert_fragments_applied(_gsc, apply_fragments, pg_repo_local) - # No fragment application - assert apply_fragments.call_count == 0 - # Single call to _generate_select_queries directly selecting rows from the two chunks - assert _gsc.mock_calls == [ - call( - mock.ANY, - b'"splitgraph_meta".' - b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', - ["fruit_id", "name"], - mock.ANY, - ("3", "4"), - ), - call( - mock.ANY, - b'"splitgraph_meta".' - b'"oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc"', - ["fruit_id", "name"], - mock.ANY, - ("3", "4"), - ), - ] +def _assert_fragments_applied(_gsc, apply_fragments, pg_repo_local): + apply_fragments.assert_called_once_with( + [ + ( + "splitgraph_meta", + "oaa6d009e485bfa91aec4ab6b0ed1ebcd67055f6a3420d29f26446b034f41cc", + ), + ( + "splitgraph_meta", + "o15a420721b04e9749761b5368628cb15593cb8cfdcc547107b98eddda5031d", + ), + ], + SPLITGRAPH_META_SCHEMA, + mock.ANY, + extra_qual_args=("3",), + extra_quals=mock.ANY, + schema_spec=mock.ANY, + ) + # Two calls to _generate_select_queries -- one to directly query the pk=3 chunk... + assert _gsc.call_args_list == [ + mock.call( + pg_repo_local.engine, + b'"splitgraph_meta".' + b'"of0fb43e477311f82aa30055be303ff00599dfe155d737def0d00f06e07228b"', + ["fruit_id", "name"], + mock.ANY, + ("3",), + ), + # ...and one to query the applied fragments in the second group. + mock.call( + pg_repo_local.engine, + mock.ANY, + ["fruit_id", "name"], + mock.ANY, + ("3",), + ), + ] + # Check the temporary table has been deleted since we've exhausted the query + args, _ = apply_fragments.call_args_list[0] + tmp_table = args[2] + assert not pg_repo_local.engine.table_exists(SPLITGRAPH_META_SCHEMA, tmp_table) def test_disjoint_table_lq_two_singletons_one_overwritten_indirect(pg_repo_local): From 8523379de058c31708d943002a8bbb14a1d84301 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 12:48:21 +0100 Subject: [PATCH 2/7] Simplify and speed up the Singer fragment loading by not scanning the full current table when appending a new fragment to generate a changeset (to find out the old values of the rows we deleted). --- splitgraph/ingestion/singer/_utils.py | 58 +++++++++++++----------- splitgraph/ingestion/singer/db_sync.py | 11 +++-- test/splitgraph/ingestion/test_singer.py | 7 ++- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/splitgraph/ingestion/singer/_utils.py b/splitgraph/ingestion/singer/_utils.py index 065b5e25..6f3f5c0c 100644 --- a/splitgraph/ingestion/singer/_utils.py +++ b/splitgraph/ingestion/singer/_utils.py @@ -45,14 +45,18 @@ def _migrate_schema(engine, table_schema, table_name, table_schema_spec, new_sch if c not in new_cols: engine.run_sql( SQL("ALTER TABLE {}.{} DROP COLUMN {}").format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) for c in new_cols: if c not in old_cols: engine.run_sql( SQL("ALTER TABLE {}.{} ADD COLUMN {} %s" % validate_type(new_cols[c])).format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) elif new_cols[c] != old_cols[c]: @@ -60,15 +64,15 @@ def _migrate_schema(engine, table_schema, table_name, table_schema_spec, new_sch SQL( "ALTER TABLE {}.{} ALTER COLUMN {} TYPE %s" % validate_type(new_cols[c]) ).format( - Identifier(table_schema), Identifier(table_name), Identifier(c), + Identifier(table_schema), + Identifier(table_name), + Identifier(c), ) ) def _make_changeset( engine: PostgresEngine, - old_schema: str, - old_table: str, schema: str, table: str, schema_spec: TableSchema, @@ -78,34 +82,34 @@ def _make_changeset( to the object manager (store as a Splitgraph diff).""" # PK -> (upserted / deleted, old row, new row) - # As a memory-saving hack, we only record the values of the old row (read from the - # current table) -- this is because object creation routines read the inserted rows - # from the staging table anyway. + # We don't find out the old row here. This is because it requires a JOIN on the current + # Splitgraph table, so if we're adding e.g. 100k rows to a 1M row table, it's going to cause big + # performance issues. Instead, we pretend that all rows + # have been inserted (apart from the ones that have been deleted by having the magic + # _sdc_deleted_at column). + + # We also don't care about finding out the new row here, as the storage routine queries + # the table directly to get those values. + + # The tradeoff is that now, when querying the table, we need to include not only fragments + # whose index matches the query, but also all fragments that might overwrite those fragments + # (through PK range overlap). Since we don't record old row values in this changeset's index, + # we can no longer find if a fragment deletes some row by inspecting the index -- we need to + # use PK ranges to find out overlapping fragments. + change_key = [c for c, _ in get_change_key(schema_spec)] # Query: - # SELECT (new, pk, columns) AS pk, + # SELECT (col_1, col_2, ...) AS pk, # (custom upsert condition), - # (row_to_json(old non-pk cols)) AS old_row - # FROM new_table n LEFT OUTER JOIN old_table o ON [o.pk = n.pk] - # WHERE old row != new row + # {} AS old_row + # FROM new_table n query = ( SQL("SELECT ") + SQL(",").join(SQL("n.") + Identifier(c) for c in change_key) + SQL(",") - + SQL(upsert_condition + " AS upserted, ") - # If PK doesn't exist in the new table, old_row is null, else output it - + SQL("CASE WHEN ") - + SQL(" AND ").join(SQL("o.{0} IS NULL").format(Identifier(c)) for c in change_key) - + SQL(" THEN '{}'::json ELSE json_build_object(") - + SQL(",").join( - SQL("%s, o.") + Identifier(c.name) for c in schema_spec if c.name not in change_key - ) - + SQL(") END AS old_row FROM {}.{} n LEFT OUTER JOIN {}.{} o ON ").format( - Identifier(schema), Identifier(table), Identifier(old_schema), Identifier(old_table), + + SQL(upsert_condition + " AS upserted FROM {}.{} n").format( + Identifier(schema), Identifier(table) ) - + SQL(" AND ").join(SQL("o.{0} = n.{0}").format(Identifier(c)) for c in change_key) - + SQL("WHERE o.* IS DISTINCT FROM n.*") ).as_string(engine.connection) - args = [c.name for c in schema_spec if c.name not in change_key] - result = engine.run_sql(query, args) - return {tuple(row[:-2]): (row[-2], row[-1], {}) for row in result} + result = engine.run_sql(query) + return {tuple(row[:-1]): (row[-1], {}, {}) for row in result} diff --git a/splitgraph/ingestion/singer/db_sync.py b/splitgraph/ingestion/singer/db_sync.py index 3bc69a79..4dded737 100644 --- a/splitgraph/ingestion/singer/db_sync.py +++ b/splitgraph/ingestion/singer/db_sync.py @@ -190,8 +190,6 @@ def _merge_existing_table(self, old_table, temp_table): # Find PKs that have been upserted and deleted (make fake changeset) changeset = _make_changeset( self.image.object_engine, - s, - old_table.table_name, "pg_temp", temp_table, old_table.table_schema, @@ -221,7 +219,14 @@ def _merge_existing_table(self, old_table, temp_table): # the object to the table if it already exists) self.image.repository.objects.register_tables( self.image.repository, - [(self.image.image_hash, old_table.table_name, old_table.table_schema, object_ids,)], + [ + ( + self.image.image_hash, + old_table.table_name, + old_table.table_schema, + object_ids, + ) + ], ) def delete_rows(self, stream): diff --git a/test/splitgraph/ingestion/test_singer.py b/test/splitgraph/ingestion/test_singer.py index 7af7bb8a..76772f53 100644 --- a/test/splitgraph/ingestion/test_singer.py +++ b/test/splitgraph/ingestion/test_singer.py @@ -187,15 +187,18 @@ def test_singer_ingestion_update(local_engine_empty): # Extra DIFF at the end assert image.get_table("stargazers").objects == [ "od68e932ebc99c1a337363c1b92056dcf7fc7c6c45494bc42e1e1ec4e0c88ac", - "oc61804b31dcae8294a6b780efe41601eaeb7a1d0b7cd7bdfea4843db214df0", + "o7a04cc1f13d00eea692bede58b57b07c4272e07458ac8b405971b1f5f49679", ] assert repo.run_sql( "SELECT sg_ud_flag, user_id, starred_at " - "FROM splitgraph_meta.oc61804b31dcae8294a6b780efe41601eaeb7a1d0b7cd7bdfea4843db214df0 " + "FROM splitgraph_meta.o7a04cc1f13d00eea692bede58b57b07c4272e07458ac8b405971b1f5f49679 " "ORDER BY user_id" ) == [ (True, Decimal("100004"), datetime(2020, 10, 11, 21, 9, 30)), + # Even though this row is the same as in the previous fragment, we keep it here + # as we don't compare its value with the previous fragment. + (True, Decimal("100005"), datetime(2019, 4, 18, 2, 40, 47)), (True, Decimal("100006"), datetime(2019, 6, 6, 20, 53)), ] From 9a135283a6b8d2b602a88e567fd0273c1e4904ff Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 13:00:31 +0100 Subject: [PATCH 3/7] Change a Socrata smoke test dataset (data.healthcare.gov is currently down) --- test/splitgraph/ingestion/test_socrata.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/splitgraph/ingestion/test_socrata.py b/test/splitgraph/ingestion/test_socrata.py index e4920995..d6403cb8 100644 --- a/test/splitgraph/ingestion/test_socrata.py +++ b/test/splitgraph/ingestion/test_socrata.py @@ -358,8 +358,7 @@ def test_socrata_column_deduplication(): @pytest.mark.parametrize( "domain,dataset_id", [ - # Had issues with 403s if the :id column was requested explicitly - ("data.healthcare.gov", "7h6f-vws8"), + ("data.cityofchicago.org", "x2n5-8w5q"), # Popular for hire vehicles dataset ("data.cityofnewyork.us", "8wbx-tsch"), ], From 31a9eba5428f713e77276292373e64693bc91315 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 13:40:03 +0100 Subject: [PATCH 4/7] Make the `sgr cloud dump` test deterministic. --- .../commandline/test_cloud_metadata.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/test/splitgraph/commandline/test_cloud_metadata.py b/test/splitgraph/commandline/test_cloud_metadata.py index 21f9606f..c96ea283 100644 --- a/test/splitgraph/commandline/test_cloud_metadata.py +++ b/test/splitgraph/commandline/test_cloud_metadata.py @@ -233,13 +233,20 @@ def test_commandline_dump(): ) assert result.exit_code == 0 - assert list(os.walk(tmpdir)) == [ - (tmpdir, ["readmes"], ["repositories.yml"]), - ( - os.path.join(tmpdir, "readmes"), - [], - ["otheruser-somerepo_2.fe37.md", "someuser-somerepo_1.b7f3.md"], - ), + contents = list(os.walk(tmpdir)) + # Check the dump root + assert contents[0] == (tmpdir, ["readmes"], ["repositories.yml"]) + + # Check the readmes subdirectory: no directories + assert contents[1][:2] == ( + os.path.join(tmpdir, "readmes"), + [], + ) + + # ... and two files + assert sorted(contents[1][2]) == [ + "otheruser-somerepo_2.fe37.md", + "someuser-somerepo_1.b7f3.md", ] _somerepo_1_dump = { From 5d6b2095b86ec43234ca8466f21458519a445203 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 20:10:10 +0100 Subject: [PATCH 5/7] Singer taps output a sequence of intermediary bookmark states -- only write out the latest one when ingestion ends. --- splitgraph/ingestion/singer/data_source.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/splitgraph/ingestion/singer/data_source.py b/splitgraph/ingestion/singer/data_source.py index 8e86f4c6..7be62865 100644 --- a/splitgraph/ingestion/singer/data_source.py +++ b/splitgraph/ingestion/singer/data_source.py @@ -150,8 +150,9 @@ def sync( logging.warning("Data source partially failed. Keeping the image anyway", exc_info=e) failure = e - new_state = output_stream.getvalue() - logging.info("New state: %s", new_state) + states = output_stream.getvalue() + latest_state = states.splitlines()[-1] + logging.info("State stream: %s", states) # Add a table to the new image with the new state repository.object_engine.create_table( @@ -161,11 +162,12 @@ def sync( temporary=True, ) # NB: new_state here is a JSON-serialized string, so we don't wrap it into psycopg2.Json() + logging.info("Writing state: %s", latest_state) repository.object_engine.run_sql( SQL("INSERT INTO pg_temp.{} (timestamp, state) VALUES(now(), %s)").format( Identifier(INGESTION_STATE_TABLE) ), - (new_state,), + (latest_state,), ) object_id = repository.objects.create_base_fragment( From 96bd43ad2fbb8aec8758d5a2203c37b46be5557b Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Fri, 14 May 2021 20:11:22 +0100 Subject: [PATCH 6/7] Kind of fix the Singer tap-mysql test (not clear who should be doing the right thing here) --- test/splitgraph/ingestion/test_singer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/splitgraph/ingestion/test_singer.py b/test/splitgraph/ingestion/test_singer.py index 76772f53..046f3586 100644 --- a/test/splitgraph/ingestion/test_singer.py +++ b/test/splitgraph/ingestion/test_singer.py @@ -589,5 +589,8 @@ def test_singer_tap_mysql_sync(local_engine_empty): assert len(repo.images()) == 1 image = repo.images["latest"] assert image.get_table("mushrooms").objects == [ - "o69e4529709af65f37f2e2f3a8290340ae7ad9ada6bca9c393a09572f12cbb3" + "o69e4529709af65f37f2e2f3a8290340ae7ad9ada6bca9c393a09572f12cbb3", + # TODO: this object has the pk=2 row from the previous one repeated, a tap-mysql bug + # but we don't conflate these with Singer now. + "od487f26d32a347ae4cc81a7442ef5a28615f70a9fff426991ab0d9d14bf7aa", ] From 230d22b054c68a6e0ba6d616a2a4a81505c8a02f Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Tue, 18 May 2021 10:26:39 +0100 Subject: [PATCH 7/7] Bump dependencies. --- poetry.lock | 90 ++++++++++++++++++++++++++--------------------------- 1 file changed, 45 insertions(+), 45 deletions(-) diff --git a/poetry.lock b/poetry.lock index d3cbfe1e..5bfae682 100644 --- a/poetry.lock +++ b/poetry.lock @@ -128,7 +128,7 @@ pycparser = "*" [[package]] name = "cfgv" -version = "3.2.0" +version = "3.3.0" description = "Validate configuration and produce human readable error messages." category = "dev" optional = false @@ -177,6 +177,9 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" +[package.dependencies] +toml = {version = "*", optional = true, markers = "extra == \"toml\""} + [package.extras] toml = ["toml"] @@ -293,7 +296,7 @@ docs = ["sphinx"] [[package]] name = "httpretty" -version = "1.0.5" +version = "1.1.1" description = "HTTP client mock for Python" category = "dev" optional = false @@ -344,7 +347,7 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes [[package]] name = "importlib-resources" -version = "5.1.2" +version = "5.1.3" description = "Read resources from Python packages" category = "dev" optional = false @@ -355,7 +358,7 @@ zipp = {version = ">=0.4", markers = "python_version < \"3.8\""} [package.extras] docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] -testing = ["pytest (>=4.6)", "pytest-checkdocs (>=1.2.3)", "pytest-flake8", "pytest-cov", "pytest-enabler", "pytest-black (>=0.3.7)", "pytest-mypy"] +testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-black (>=0.3.7)", "pytest-mypy"] [[package]] name = "inflection" @@ -561,7 +564,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [[package]] name = "pefile" -version = "2019.4.18" +version = "2021.5.13" description = "Python PE parsing module" category = "dev" optional = false @@ -639,7 +642,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "pydantic" -version = "1.8.1" +version = "1.8.2" description = "Data validation and settings management using python 3.6 type hinting" category = "main" optional = false @@ -752,14 +755,14 @@ testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xm [[package]] name = "pytest-cov" -version = "2.11.1" +version = "2.12.0" description = "Pytest plugin for measuring coverage." category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [package.dependencies] -coverage = ">=5.2.1" +coverage = {version = ">=5.2.1", extras = ["toml"]} pytest = ">=4.6" [package.extras] @@ -1121,14 +1124,11 @@ testing = ["coverage (>=4)", "coverage-enable-subprocess (>=1)", "flaky (>=3)", [[package]] name = "websocket-client" -version = "0.59.0" +version = "1.0.0" description = "WebSocket client for Python with low level API options" category = "main" optional = false -python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" - -[package.dependencies] -six = "*" +python-versions = ">=3.6" [[package]] name = "wrapt" @@ -1241,8 +1241,8 @@ cffi = [ {file = "cffi-1.14.5.tar.gz", hash = "sha256:fd78e5fee591709f32ef6edb9a015b4aa1a5022598e36227500c8f4e02328d9c"}, ] cfgv = [ - {file = "cfgv-3.2.0-py2.py3-none-any.whl", hash = "sha256:32e43d604bbe7896fe7c248a9c2276447dbef840feb28fe20494f62af110211d"}, - {file = "cfgv-3.2.0.tar.gz", hash = "sha256:cf22deb93d4bcf92f345a5c3cd39d3d41d6340adc60c78bbbd6588c384fda6a1"}, + {file = "cfgv-3.3.0-py2.py3-none-any.whl", hash = "sha256:b449c9c6118fe8cca7fa5e00b9ec60ba08145d281d52164230a69211c5d597a1"}, + {file = "cfgv-3.3.0.tar.gz", hash = "sha256:9e600479b3b99e8af981ecdfc80a0296104ee610cab48a5ae4ffd0b668650eb1"}, ] chardet = [ {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"}, @@ -1410,7 +1410,7 @@ greenlet = [ {file = "greenlet-1.1.0.tar.gz", hash = "sha256:c87df8ae3f01ffb4483c796fe1b15232ce2b219f0b18126948616224d3f658ee"}, ] httpretty = [ - {file = "httpretty-1.0.5.tar.gz", hash = "sha256:e53c927c4d3d781a0761727f1edfad64abef94e828718e12b672a678a8b3e0b5"}, + {file = "httpretty-1.1.1.tar.gz", hash = "sha256:05223ede0a720564eeed95fbea00000c9569a9863e3661ad513a0c03e6e6eba1"}, ] identify = [ {file = "identify-2.2.4-py2.py3-none-any.whl", hash = "sha256:ad9f3fa0c2316618dc4d840f627d474ab6de106392a4f00221820200f490f5a8"}, @@ -1429,8 +1429,8 @@ importlib-metadata = [ {file = "importlib_metadata-4.0.1.tar.gz", hash = "sha256:8c501196e49fb9df5df43833bdb1e4328f64847763ec8a50703148b73784d581"}, ] importlib-resources = [ - {file = "importlib_resources-5.1.2-py3-none-any.whl", hash = "sha256:ebab3efe74d83b04d6bf5cd9a17f0c5c93e60fb60f30c90f56265fce4682a469"}, - {file = "importlib_resources-5.1.2.tar.gz", hash = "sha256:642586fc4740bd1cad7690f836b3321309402b20b332529f25617ff18e8e1370"}, + {file = "importlib_resources-5.1.3-py3-none-any.whl", hash = "sha256:3b9c774e0e7e8d9c069eb2fe6aee7e9ae71759a381dec02eb45249fba7f38713"}, + {file = "importlib_resources-5.1.3.tar.gz", hash = "sha256:0786b216556e53b34156263ab654406e543a8b0d9b1381019e25a36a09263c36"}, ] inflection = [ {file = "inflection-0.5.1-py2.py3-none-any.whl", hash = "sha256:f38b2b640938a4f35ade69ac3d053042959b62a0f1076a5bbaa1b9526605a8a2"}, @@ -1652,7 +1652,7 @@ pathspec = [ {file = "pathspec-0.8.1.tar.gz", hash = "sha256:86379d6b86d75816baba717e64b1a3a3469deb93bb76d613c9ce79edc5cb68fd"}, ] pefile = [ - {file = "pefile-2019.4.18.tar.gz", hash = "sha256:a5d6e8305c6b210849b47a6174ddf9c452b2888340b8177874b862ba6c207645"}, + {file = "pefile-2021.5.13.tar.gz", hash = "sha256:2aae0c135d4d37e81ff120e825af18b5e4884a97b9290aee811afd6317618f52"}, ] pglast = [ {file = "pglast-1.17-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:c409df8bf3c2f099c057ed7accecb3dd2765b94ffe21da4c51527bc66928a65b"}, @@ -1727,28 +1727,28 @@ pycparser = [ {file = "pycparser-2.20.tar.gz", hash = "sha256:2d475327684562c3a96cc71adf7dc8c4f0565175cf86b6d7a404ff4c771f15f0"}, ] pydantic = [ - {file = "pydantic-1.8.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux2014_i686.whl", hash = "sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e"}, - {file = "pydantic-1.8.1-cp36-cp36m-manylinux2014_x86_64.whl", hash = "sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771"}, - {file = "pydantic-1.8.1-cp36-cp36m-win_amd64.whl", hash = "sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f"}, - {file = "pydantic-1.8.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux2014_i686.whl", hash = "sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8"}, - {file = "pydantic-1.8.1-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4"}, - {file = "pydantic-1.8.1-cp37-cp37m-win_amd64.whl", hash = "sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a"}, - {file = "pydantic-1.8.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux2014_i686.whl", hash = "sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125"}, - {file = "pydantic-1.8.1-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0"}, - {file = "pydantic-1.8.1-cp38-cp38-win_amd64.whl", hash = "sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99"}, - {file = "pydantic-1.8.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux1_i686.whl", hash = "sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux2014_i686.whl", hash = "sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e"}, - {file = "pydantic-1.8.1-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f"}, - {file = "pydantic-1.8.1-cp39-cp39-win_amd64.whl", hash = "sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58"}, - {file = "pydantic-1.8.1-py3-none-any.whl", hash = "sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520"}, - {file = "pydantic-1.8.1.tar.gz", hash = "sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3"}, + {file = "pydantic-1.8.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:05ddfd37c1720c392f4e0d43c484217b7521558302e7069ce8d318438d297739"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:a7c6002203fe2c5a1b5cbb141bb85060cbff88c2d78eccbc72d97eb7022c43e4"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_i686.whl", hash = "sha256:589eb6cd6361e8ac341db97602eb7f354551482368a37f4fd086c0733548308e"}, + {file = "pydantic-1.8.2-cp36-cp36m-manylinux2014_x86_64.whl", hash = "sha256:10e5622224245941efc193ad1d159887872776df7a8fd592ed746aa25d071840"}, + {file = "pydantic-1.8.2-cp36-cp36m-win_amd64.whl", hash = "sha256:99a9fc39470010c45c161a1dc584997f1feb13f689ecf645f59bb4ba623e586b"}, + {file = "pydantic-1.8.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a83db7205f60c6a86f2c44a61791d993dff4b73135df1973ecd9eed5ea0bda20"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:41b542c0b3c42dc17da70554bc6f38cbc30d7066d2c2815a94499b5684582ecb"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_i686.whl", hash = "sha256:ea5cb40a3b23b3265f6325727ddfc45141b08ed665458be8c6285e7b85bd73a1"}, + {file = "pydantic-1.8.2-cp37-cp37m-manylinux2014_x86_64.whl", hash = "sha256:18b5ea242dd3e62dbf89b2b0ec9ba6c7b5abaf6af85b95a97b00279f65845a23"}, + {file = "pydantic-1.8.2-cp37-cp37m-win_amd64.whl", hash = "sha256:234a6c19f1c14e25e362cb05c68afb7f183eb931dd3cd4605eafff055ebbf287"}, + {file = "pydantic-1.8.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:021ea0e4133e8c824775a0cfe098677acf6fa5a3cbf9206a376eed3fc09302cd"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e710876437bc07bd414ff453ac8ec63d219e7690128d925c6e82889d674bb505"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_i686.whl", hash = "sha256:ac8eed4ca3bd3aadc58a13c2aa93cd8a884bcf21cb019f8cfecaae3b6ce3746e"}, + {file = "pydantic-1.8.2-cp38-cp38-manylinux2014_x86_64.whl", hash = "sha256:4a03cbbe743e9c7247ceae6f0d8898f7a64bb65800a45cbdc52d65e370570820"}, + {file = "pydantic-1.8.2-cp38-cp38-win_amd64.whl", hash = "sha256:8621559dcf5afacf0069ed194278f35c255dc1a1385c28b32dd6c110fd6531b3"}, + {file = "pydantic-1.8.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8b223557f9510cf0bfd8b01316bf6dd281cf41826607eada99662f5e4963f316"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux1_i686.whl", hash = "sha256:244ad78eeb388a43b0c927e74d3af78008e944074b7d0f4f696ddd5b2af43c62"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_i686.whl", hash = "sha256:05ef5246a7ffd2ce12a619cbb29f3307b7c4509307b1b49f456657b43529dc6f"}, + {file = "pydantic-1.8.2-cp39-cp39-manylinux2014_x86_64.whl", hash = "sha256:54cd5121383f4a461ff7644c7ca20c0419d58052db70d8791eacbbe31528916b"}, + {file = "pydantic-1.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:4be75bebf676a5f0f87937c6ddb061fa39cbea067240d98e298508c1bda6f3f3"}, + {file = "pydantic-1.8.2-py3-none-any.whl", hash = "sha256:fec866a0b59f372b7e776f2d7308511784dace622e0992a0b59ea3ccee0ae833"}, + {file = "pydantic-1.8.2.tar.gz", hash = "sha256:26464e57ccaafe72b7ad156fdaa4e9b9ef051f69e175dbbb463283000c05ab7b"}, ] pyfakefs = [ {file = "pyfakefs-4.4.0-py3-none-any.whl", hash = "sha256:1ac3b2845dabe69af56c20691b9347914581195ccdde352535fb7d4ff0055c19"}, @@ -1781,8 +1781,8 @@ pytest = [ {file = "pytest-6.2.4.tar.gz", hash = "sha256:50bcad0a0b9c5a72c8e4e7c9855a3ad496ca6a881a3641b4260605450772c54b"}, ] pytest-cov = [ - {file = "pytest-cov-2.11.1.tar.gz", hash = "sha256:359952d9d39b9f822d9d29324483e7ba04a3a17dd7d05aa6beb7ea01e359e5f7"}, - {file = "pytest_cov-2.11.1-py2.py3-none-any.whl", hash = "sha256:bdb9fdb0b85a7cc825269a4c56b48ccaa5c7e365054b6038772c32ddcdc969da"}, + {file = "pytest-cov-2.12.0.tar.gz", hash = "sha256:8535764137fecce504a49c2b742288e3d34bc09eed298ad65963616cc98fd45e"}, + {file = "pytest_cov-2.12.0-py2.py3-none-any.whl", hash = "sha256:95d4933dcbbacfa377bb60b29801daa30d90c33981ab2a79e9ab4452c165066e"}, ] pytest-env = [ {file = "pytest-env-0.6.2.tar.gz", hash = "sha256:7e94956aef7f2764f3c147d216ce066bf6c42948bb9e293169b1b1c880a580c2"}, @@ -2029,8 +2029,8 @@ virtualenv = [ {file = "virtualenv-20.4.6.tar.gz", hash = "sha256:72cf267afc04bf9c86ec932329b7e94db6a0331ae9847576daaa7ca3c86b29a4"}, ] websocket-client = [ - {file = "websocket-client-0.59.0.tar.gz", hash = "sha256:d376bd60eace9d437ab6d7ee16f4ab4e821c9dae591e1b783c58ebd8aaf80c5c"}, - {file = "websocket_client-0.59.0-py2.py3-none-any.whl", hash = "sha256:2e50d26ca593f70aba7b13a489435ef88b8fc3b5c5643c1ce8808ff9b40f0b32"}, + {file = "websocket-client-1.0.0.tar.gz", hash = "sha256:5051b38a2f4c27fbd7ca077ebb23ec6965a626ded5a95637f36be1b35b6c4f81"}, + {file = "websocket_client-1.0.0-py2.py3-none-any.whl", hash = "sha256:57f876f1af4731cacb806cf54d02f5fbf75dee796053b9a5b94fd7c1d9621db9"}, ] wrapt = [ {file = "wrapt-1.12.1.tar.gz", hash = "sha256:b62ffa81fb85f4332a4f609cab4ac40709470da05643a082ec1eb88e6d9b97d7"},