Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 62 additions & 13 deletions splitgraph/core/fragment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,22 @@ def _log_commit_progress(table_size, no_chunks):
return table_size > 500000 or no_chunks > 100


def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str, Any, Any]]]:
def _key(c):
# Hack to work around Nones sometimes being there in comparables. We make a fake
# key, prepending a boolean (is the item None?) to every item to get a consistent
# ordering that includes Nones.
if isinstance(c, tuple):
return tuple((ci is None, ci) for ci in c)
return c is None, c


def _pk_overlap(pk_1: Tuple, pk_2: Tuple) -> bool:
return bool(_key(pk_1[0]) <= _key(pk_2[1]) and _key(pk_1[1]) >= _key(pk_2[0]))


def get_chunk_groups(
chunks: List[Tuple[str, Any, Any]],
) -> List[List[Tuple[str, Any, Any]]]:
"""
Takes a list of chunks and their boundaries and combines them
into independent groups such that chunks from no two groups
Expand Down Expand Up @@ -101,14 +116,6 @@ def get_chunk_groups(chunks: List[Tuple[str, Any, Any]],) -> List[List[Tuple[str
current_group_start = None
current_group_end = None

def _key(c):
# Hack to work around Nones sometimes being there in comparables. We make a fake
# key, prepending a boolean (is the item None?) to every item to get a consistent
# ordering that includes Nones.
if isinstance(c, tuple):
return tuple((ci is None, ci) for ci in c)
return c is None, c

for original_id, chunk_id, start, end in sorted(
chunks, key=lambda c: _key(c[2]) # type:ignore
):
Expand All @@ -121,7 +128,7 @@ def _key(c):
assert current_group_start
assert current_group_end
# See if the chunk overlaps with the current chunk group
if _key(start) <= _key(current_group_end) and _key(end) >= _key(current_group_start):
if _pk_overlap((start, end), (current_group_start, current_group_end)):
current_group.append((original_id, chunk_id, start, end))
current_group_start = min(current_group_start, start, key=_key)
current_group_end = max(current_group_end, end, key=_key)
Expand Down Expand Up @@ -508,12 +515,18 @@ def _store_changeset(
upserted = [pk for pk, data in sub_changeset.items() if data[0]]
deleted = [pk for pk, data in sub_changeset.items() if not data[0]]
self.object_engine.store_fragment(
upserted, deleted, "pg_temp", tmp_object_id, schema, table, table_schema,
upserted,
deleted,
"pg_temp",
tmp_object_id,
schema,
table,
table_schema,
)
return tmp_object_id

def calculate_fragment_insertion_hash_stats(
self, schema: str, table: str, table_schema: TableSchema = None
self, schema: str, table: str, table_schema: Optional[TableSchema] = None
) -> Tuple[Digest, int]:
"""
Calculate the homomorphic hash of just the rows that a given fragment inserts
Expand Down Expand Up @@ -1060,8 +1073,44 @@ def filter_fragments(self, object_ids: List[str], table: "Table", quals: Any) ->
len(range_filter_result),
)

objects_to_scan = self._add_overlapping_objects(table, object_ids, bloom_filter_result)

if len(objects_to_scan) > len(bloom_filter_result):
logging.info(
"Will need to scan through extra %d overlapping fragment(s)",
len(objects_to_scan) - len(bloom_filter_result),
)

# Preserve original object order.
return [r for r in object_ids if r in bloom_filter_result]
return [r for r in object_ids if r in objects_to_scan]

def _add_overlapping_objects(
self, table: "Table", all_objects: List[str], filtered_objects: List[str]
) -> Set[str]:
# Expand the list of objects by also adding the objects that might overwrite these. In some
# cases, we don't keep track of the rows that an object deletes in the index, since that
# adds an implicit dependency on those previous objects.

table_pk = get_change_key(table.table_schema)
object_pks = self.get_min_max_pks(all_objects, table_pk)

# Go through all objects and see if they 1) come after any of our chosen objects and 2)
# overlap those objects' PKs (if they come after them)
original_order = {object_id: i for i, object_id in enumerate(all_objects)}
object_pk_dict = {
object_id: object_pk for object_id, object_pk in zip(all_objects, object_pks)
}
objects_to_scan = set(filtered_objects)
for overlap_candidate in all_objects:
if overlap_candidate in objects_to_scan:
continue
for our_object in filtered_objects:
if original_order[our_object] < original_order[overlap_candidate] and _pk_overlap(
object_pk_dict[our_object], object_pk_dict[overlap_candidate]
):
objects_to_scan.add(overlap_candidate)
break
return objects_to_scan

def delete_objects(self, objects: Union[Set[str], List[str]]) -> None:
"""
Expand Down
Loading