diff --git a/splitgraph/core/_common.py b/splitgraph/core/_common.py index 05acae05..3ad102d5 100644 --- a/splitgraph/core/_common.py +++ b/splitgraph/core/_common.py @@ -63,7 +63,7 @@ def manage_audit_triggers(engine, object_engine=None): if head is not None for t in set(object_engine.get_all_tables(r.to_schema())) & set(head.get_tables()) ] - tracked_tables = engine.get_tracked_tables() + tracked_tables = object_engine.get_tracked_tables() to_untrack = [t for t in tracked_tables if t not in repos_tables] to_track = [t for t in repos_tables if t not in tracked_tables] diff --git a/splitgraph/hooks/s3.py b/splitgraph/hooks/s3.py index 6d2b6016..79af7208 100644 --- a/splitgraph/hooks/s3.py +++ b/splitgraph/hooks/s3.py @@ -22,6 +22,34 @@ # just exist on a hard drive somewhere. +def delete_objects(client, object_ids): + """ + Delete objects stored in Minio + + :param client: Minio client + :param object_ids: List of Splitgraph object IDs to delete + """ + + # Expand the list of objects into actual files we store in Minio + all_object_ids = [o + suffix for o in object_ids for suffix in ("", ".schema", ".footer")] + list(client.remove_objects(S3_ACCESS_KEY, all_object_ids)) + + +def list_objects(client): + """ + List objects stored in Minio + + :param client: Minio client + :return: List of Splitgraph object IDs + """ + + return [ + o.object_name + for o in client.list_objects(bucket_name=S3_ACCESS_KEY) + if not o.object_name.endswith(".footer") and not o.object_name.endswith(".schema") + ] + + class S3ExternalObjectHandler(ExternalObjectHandler): """Uploads/downloads the objects to/from S3/S3-compatible host using the Minio client. The parameters for this handler (overriding the .sgconfig) are: diff --git a/test/splitgraph/commands/test_commit_diff.py b/test/splitgraph/commands/test_commit_diff.py index 1e83d865..1d1c1f4b 100644 --- a/test/splitgraph/commands/test_commit_diff.py +++ b/test/splitgraph/commands/test_commit_diff.py @@ -6,9 +6,11 @@ import pytest from psycopg2.sql import SQL, Identifier -from splitgraph import SPLITGRAPH_META_SCHEMA, ResultShape, select +from splitgraph import SPLITGRAPH_META_SCHEMA, ResultShape, select, Repository from splitgraph.core.fragment_manager import Digest from splitgraph.core.metadata_manager import OBJECT_COLS +from splitgraph.hooks.s3 import delete_objects, list_objects +from test.splitgraph.commands.test_layered_querying import _prepare_fully_remote_repo from test.splitgraph.conftest import OUTPUT, PG_DATA, SMALL_OBJECT_SIZE @@ -845,3 +847,62 @@ def test_diff_schema_change(pg_repo_local): (True, (1, "pineapple")), (True, (2, "mustard")), ] + + +def test_multiengine_object_gets_recreated(local_engine_empty, pg_repo_remote, clean_minio): + # Use the remote engine as a metadata store and the local engine as an object store -- make sure + # the object gets re-registered on the remote engine if it gets deleted from it but not the + # cache. + _prepare_fully_remote_repo(local_engine_empty, pg_repo_remote) + pg_repo_local = Repository.from_template(pg_repo_remote, object_engine=local_engine_empty) + + # Checkout to force objects to download + pg_repo_local.images["latest"].checkout() + pg_repo_local.uncheckout() + parent = pg_repo_local.images[pg_repo_local.images["latest"].parent_id] + + # Delete an image from the metadata engine and delete the corresponding object + # from meta and Minio + objects = pg_repo_local.images["latest"].get_table("fruits").objects + assert objects == ["o75dd055ad2465eb1c3f4e03c6f772c48d87029ef6f141fd4cf3d198e5b247f"] + object_to_delete = objects[0] + + old_object_parent = pg_repo_local.objects.get_object_meta([object_to_delete])[ + object_to_delete + ].parent_id + assert old_object_parent == "occfcd55402d9ca3d3d7fa18dd56227d56df4151888a9518c9103b3bac0ee8c" + assert parent.get_table("fruits").objects == [old_object_parent] + + pg_repo_local.images.delete([pg_repo_local.images["latest"].image_hash]) + pg_repo_local.engine.run_sql( + "DELETE FROM splitgraph_meta.object_locations WHERE object_id = %s", (object_to_delete,) + ) + pg_repo_local.engine.run_sql( + "DELETE FROM splitgraph_meta.objects WHERE object_id = %s", (object_to_delete,) + ) + delete_objects(clean_minio, [object_to_delete]) + + # Check the parent image out and unknowingly recreate the image we deleted, with the same object. + # _prepare_fully_remote_repo has default values for columns 'number' and 'timestamp' that + # don't get set after a checkout + parent.checkout() + pg_repo_local.run_sql("INSERT INTO fruits VALUES (4, 'kumquat', 1, '2019-01-01T12:00:00')") + assert pg_repo_local.head.get_table("fruits").objects == [old_object_parent] + new_image = pg_repo_local.commit() + + # Make sure the object was reregistered + assert len(new_image.get_table("fruits").objects) == 1 + new_object = new_image.get_table("fruits").objects[0] + assert new_object == object_to_delete + assert ( + pg_repo_local.objects.get_object_meta([new_object])[new_object].parent_id + == old_object_parent + ) + + assert object_to_delete in pg_repo_local.objects.get_all_objects() + assert object_to_delete in pg_repo_local.objects.get_downloaded_objects() + + # Force a reupload and make sure the object exists in Minio. + assert object_to_delete not in list_objects(clean_minio) + pg_repo_local.objects.make_objects_external([object_to_delete], handler="S3", handler_params={}) + assert object_to_delete in list_objects(clean_minio) diff --git a/test/splitgraph/conftest.py b/test/splitgraph/conftest.py index 5eb21d07..c17f6756 100644 --- a/test/splitgraph/conftest.py +++ b/test/splitgraph/conftest.py @@ -270,6 +270,9 @@ def unprivileged_pg_repo(pg_repo_remote, unprivileged_remote_engine): SPLITFILE_ROOT = os.path.join(os.path.dirname(__file__), "../resources/") +MINIO = Minio( + "%s:%s" % (S3_HOST, S3_PORT), access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, secure=False +) def load_splitfile(name): @@ -278,23 +281,17 @@ def load_splitfile(name): def _cleanup_minio(): - client = Minio( - "%s:%s" % (S3_HOST, S3_PORT), - access_key=S3_ACCESS_KEY, - secret_key=S3_SECRET_KEY, - secure=False, - ) - if client.bucket_exists(S3_ACCESS_KEY): - objects = [o.object_name for o in client.list_objects(bucket_name=S3_ACCESS_KEY)] + if MINIO.bucket_exists(S3_ACCESS_KEY): + objects = [o.object_name for o in MINIO.list_objects(bucket_name=S3_ACCESS_KEY)] # remove_objects is an iterator, so we force evaluate it - list(client.remove_objects(bucket_name=S3_ACCESS_KEY, objects_iter=objects)) + list(MINIO.remove_objects(bucket_name=S3_ACCESS_KEY, objects_iter=objects)) @pytest.fixture def clean_minio(): # Make sure to delete extra objects in the remote Minio bucket _cleanup_minio() - yield + yield MINIO # Comment this out if tests fail and you want to see what the hell went on in the bucket. _cleanup_minio()