Skip to content

Commit

Permalink
Merge pull request #110 from splitgraph/bugfix/object-creation
Browse files Browse the repository at this point in the history
Bugfix/object creation
  • Loading branch information
mildbyte committed Jul 18, 2019
2 parents aa3de5b + ea9bf91 commit 6641363
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 26 deletions.
11 changes: 10 additions & 1 deletion splitgraph/commandline/__init__.py
Expand Up @@ -8,7 +8,15 @@

from splitgraph.commandline.example import example
from splitgraph.commandline.image_creation import checkout_c, commit_c, tag_c, import_c
from splitgraph.commandline.image_info import log_c, diff_c, show_c, sql_c, status_c, object_c
from splitgraph.commandline.image_info import (
log_c,
diff_c,
show_c,
sql_c,
status_c,
object_c,
objects_c,
)
from splitgraph.commandline.ingestion import csv
from splitgraph.commandline.misc import rm_c, init_c, cleanup_c, config_c, prune_c, dump_c
from splitgraph.commandline.mount import mount_c
Expand Down Expand Up @@ -54,6 +62,7 @@ def cli():
cli.add_command(log_c)
cli.add_command(diff_c)
cli.add_command(object_c)
cli.add_command(objects_c)
cli.add_command(show_c)
cli.add_command(sql_c)
cli.add_command(status_c)
Expand Down
19 changes: 19 additions & 0 deletions splitgraph/commandline/image_info.py
Expand Up @@ -234,6 +234,25 @@ def object_c(object_id):
print("Location: " + original_location)


@click.command(name="objects")
@click.option(
"--local", is_flag=True, help="Show only objects that are physically present on this engine"
)
def objects_c(local):
"""
List objects known to this engine.
"""

om = ObjectManager(get_engine())

if local:
objects = om.get_downloaded_objects()
else:
objects = om.get_all_objects()

print("\n".join(sorted(objects)))


@click.command(name="sql")
@click.argument("sql")
@click.option("-s", "--schema", help="Run SQL against this schema.")
Expand Down
2 changes: 1 addition & 1 deletion splitgraph/core/_common.py
Expand Up @@ -63,7 +63,7 @@ def manage_audit_triggers(engine, object_engine=None):
if head is not None
for t in set(object_engine.get_all_tables(r.to_schema())) & set(head.get_tables())
]
tracked_tables = engine.get_tracked_tables()
tracked_tables = object_engine.get_tracked_tables()

to_untrack = [t for t in tracked_tables if t not in repos_tables]
to_track = [t for t in repos_tables if t not in tracked_tables]
Expand Down
16 changes: 8 additions & 8 deletions splitgraph/core/fragment_manager.py
Expand Up @@ -351,11 +351,15 @@ def _store_changesets(self, table, changesets, parents, schema):
table.table_name,
)
self.object_engine.delete_table(SPLITGRAPH_META_SCHEMA, tmp_object_id)
continue

# There are some cases where an object can already exist in the object engine (in the cache)
# but has been deleted from the metadata engine, so when it's recreated, we'll skip
# actually registering it. Hence, we still want to proceed trying to register
# it no matter what.

# Same here: if we are being called as part of a commit and an object
# already exists, we'll roll back everything that the caller has done
# (e.g. registering the new image).
# (e.g. registering the new image) if we don't have a savepoint.
with self.metadata_engine.savepoint("object_register"):
try:
self._register_object(
Expand All @@ -368,13 +372,11 @@ def _store_changesets(self, table, changesets, parents, schema):
)
except UniqueViolation:
logging.info(
"Reusing object %s for table %s/%s",
"Object %s for table %s/%s already exists, continuing...",
object_id,
table.repository,
table.table_name,
)
self.object_engine.delete_table(SPLITGRAPH_META_SCHEMA, tmp_object_id)
continue

return object_ids

Expand Down Expand Up @@ -643,7 +645,6 @@ def _insert_and_register_fragment(source_schema, source_table, limit=None, offse
offset,
)
self.object_engine.delete_table(SPLITGRAPH_META_SCHEMA, tmp_object_id)
return object_id

with self.metadata_engine.savepoint("object_register"):
try:
Expand All @@ -657,14 +658,13 @@ def _insert_and_register_fragment(source_schema, source_table, limit=None, offse
except UniqueViolation:
# Someone registered this object (perhaps a concurrent pull) already.
logging.info(
"Reusing object %s for table %s/%s limit %r offset %d",
"Object %s for table %s/%s limit %r offset %d already exists, continuing...",
object_id,
source_schema,
source_table,
limit,
offset,
)
self.object_engine.delete_table(SPLITGRAPH_META_SCHEMA, object_id)

return object_id

Expand Down
2 changes: 1 addition & 1 deletion splitgraph/core/object_manager.py
Expand Up @@ -264,7 +264,7 @@ def make_objects_external(self, objects, handler, handler_params):
now = dt.now()
self.object_engine.run_sql_batch(
insert("object_cache_status", ("object_id", "ready", "refcount", "last_used"))
+ SQL("ON CONFLICT DO NOTHING"),
+ SQL("ON CONFLICT (object_id) DO UPDATE SET ready = 'f'"),
[(object_id, False, 1, now) for object_id in new_objects],
)

Expand Down
28 changes: 28 additions & 0 deletions splitgraph/hooks/s3.py
Expand Up @@ -22,6 +22,34 @@
# just exist on a hard drive somewhere.


def delete_objects(client, object_ids):
"""
Delete objects stored in Minio
:param client: Minio client
:param object_ids: List of Splitgraph object IDs to delete
"""

# Expand the list of objects into actual files we store in Minio
all_object_ids = [o + suffix for o in object_ids for suffix in ("", ".schema", ".footer")]
list(client.remove_objects(S3_ACCESS_KEY, all_object_ids))


def list_objects(client):
"""
List objects stored in Minio
:param client: Minio client
:return: List of Splitgraph object IDs
"""

return [
o.object_name
for o in client.list_objects(bucket_name=S3_ACCESS_KEY)
if not o.object_name.endswith(".footer") and not o.object_name.endswith(".schema")
]


class S3ExternalObjectHandler(ExternalObjectHandler):
"""Uploads/downloads the objects to/from S3/S3-compatible host using the Minio client.
The parameters for this handler (overriding the .sgconfig) are:
Expand Down
63 changes: 62 additions & 1 deletion test/splitgraph/commands/test_commit_diff.py
Expand Up @@ -6,9 +6,11 @@
import pytest
from psycopg2.sql import SQL, Identifier

from splitgraph import SPLITGRAPH_META_SCHEMA, ResultShape, select
from splitgraph import SPLITGRAPH_META_SCHEMA, ResultShape, select, Repository
from splitgraph.core.fragment_manager import Digest
from splitgraph.core.metadata_manager import OBJECT_COLS
from splitgraph.hooks.s3 import delete_objects, list_objects
from test.splitgraph.commands.test_layered_querying import _prepare_fully_remote_repo
from test.splitgraph.conftest import OUTPUT, PG_DATA, SMALL_OBJECT_SIZE


Expand Down Expand Up @@ -845,3 +847,62 @@ def test_diff_schema_change(pg_repo_local):
(True, (1, "pineapple")),
(True, (2, "mustard")),
]


def test_multiengine_object_gets_recreated(local_engine_empty, pg_repo_remote, clean_minio):
# Use the remote engine as a metadata store and the local engine as an object store -- make sure
# the object gets re-registered on the remote engine if it gets deleted from it but not the
# cache.
_prepare_fully_remote_repo(local_engine_empty, pg_repo_remote)
pg_repo_local = Repository.from_template(pg_repo_remote, object_engine=local_engine_empty)

# Checkout to force objects to download
pg_repo_local.images["latest"].checkout()
pg_repo_local.uncheckout()
parent = pg_repo_local.images[pg_repo_local.images["latest"].parent_id]

# Delete an image from the metadata engine and delete the corresponding object
# from meta and Minio
objects = pg_repo_local.images["latest"].get_table("fruits").objects
assert objects == ["o75dd055ad2465eb1c3f4e03c6f772c48d87029ef6f141fd4cf3d198e5b247f"]
object_to_delete = objects[0]

old_object_parent = pg_repo_local.objects.get_object_meta([object_to_delete])[
object_to_delete
].parent_id
assert old_object_parent == "occfcd55402d9ca3d3d7fa18dd56227d56df4151888a9518c9103b3bac0ee8c"
assert parent.get_table("fruits").objects == [old_object_parent]

pg_repo_local.images.delete([pg_repo_local.images["latest"].image_hash])
pg_repo_local.engine.run_sql(
"DELETE FROM splitgraph_meta.object_locations WHERE object_id = %s", (object_to_delete,)
)
pg_repo_local.engine.run_sql(
"DELETE FROM splitgraph_meta.objects WHERE object_id = %s", (object_to_delete,)
)
delete_objects(clean_minio, [object_to_delete])

# Check the parent image out and unknowingly recreate the image we deleted, with the same object.
# _prepare_fully_remote_repo has default values for columns 'number' and 'timestamp' that
# don't get set after a checkout
parent.checkout()
pg_repo_local.run_sql("INSERT INTO fruits VALUES (4, 'kumquat', 1, '2019-01-01T12:00:00')")
assert pg_repo_local.head.get_table("fruits").objects == [old_object_parent]
new_image = pg_repo_local.commit()

# Make sure the object was reregistered
assert len(new_image.get_table("fruits").objects) == 1
new_object = new_image.get_table("fruits").objects[0]
assert new_object == object_to_delete
assert (
pg_repo_local.objects.get_object_meta([new_object])[new_object].parent_id
== old_object_parent
)

assert object_to_delete in pg_repo_local.objects.get_all_objects()
assert object_to_delete in pg_repo_local.objects.get_downloaded_objects()

# Force a reupload and make sure the object exists in Minio.
assert object_to_delete not in list_objects(clean_minio)
pg_repo_local.objects.make_objects_external([object_to_delete], handler="S3", handler_params={})
assert object_to_delete in list_objects(clean_minio)
17 changes: 7 additions & 10 deletions test/splitgraph/conftest.py
Expand Up @@ -270,6 +270,9 @@ def unprivileged_pg_repo(pg_repo_remote, unprivileged_remote_engine):


SPLITFILE_ROOT = os.path.join(os.path.dirname(__file__), "../resources/")
MINIO = Minio(
"%s:%s" % (S3_HOST, S3_PORT), access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, secure=False
)


def load_splitfile(name):
Expand All @@ -278,23 +281,17 @@ def load_splitfile(name):


def _cleanup_minio():
client = Minio(
"%s:%s" % (S3_HOST, S3_PORT),
access_key=S3_ACCESS_KEY,
secret_key=S3_SECRET_KEY,
secure=False,
)
if client.bucket_exists(S3_ACCESS_KEY):
objects = [o.object_name for o in client.list_objects(bucket_name=S3_ACCESS_KEY)]
if MINIO.bucket_exists(S3_ACCESS_KEY):
objects = [o.object_name for o in MINIO.list_objects(bucket_name=S3_ACCESS_KEY)]
# remove_objects is an iterator, so we force evaluate it
list(client.remove_objects(bucket_name=S3_ACCESS_KEY, objects_iter=objects))
list(MINIO.remove_objects(bucket_name=S3_ACCESS_KEY, objects_iter=objects))


@pytest.fixture
def clean_minio():
# Make sure to delete extra objects in the remote Minio bucket
_cleanup_minio()
yield
yield MINIO
# Comment this out if tests fail and you want to see what the hell went on in the bucket.
_cleanup_minio()

Expand Down
13 changes: 10 additions & 3 deletions test/splitgraph/test_commandline.py
Expand Up @@ -9,7 +9,7 @@
from splitgraph.commandline import *
from splitgraph.commandline._common import ImageType
from splitgraph.commandline.example import generate_c, alter_c, splitfile_c
from splitgraph.commandline.image_info import object_c
from splitgraph.commandline.image_info import object_c, objects_c
from splitgraph.config import PG_PWD, PG_USER
from splitgraph.core._common import parse_connection_string, serialize_connection_string, insert
from splitgraph.core.engine import repository_exists, init_engine
Expand Down Expand Up @@ -59,13 +59,12 @@ def test_conn_string_serialization():
)


def test_commandline_basics(pg_repo_local, mg_repo_local):
def test_commandline_basics(pg_repo_local):
runner = CliRunner()

# sgr status
result = runner.invoke(status_c, [])
assert pg_repo_local.to_schema() in result.output
assert mg_repo_local.to_schema() in result.output
old_head = pg_repo_local.head
assert old_head.image_hash in result.output

Expand Down Expand Up @@ -281,6 +280,14 @@ def test_object_info(local_engine_empty):
assert result.exit_code == 0
assert "Location: created locally" in result.output

result = runner.invoke(objects_c)
assert result.exit_code == 0
assert result.output == "base_1\npatch_1\npatch_2\n"

result = runner.invoke(objects_c, ["--local"])
assert result.exit_code == 0
assert result.output == "base_1\npatch_2\n"


def test_upstream_management(pg_repo_local):
runner = CliRunner()
Expand Down
2 changes: 1 addition & 1 deletion test/splitgraph/test_drawing.py
Expand Up @@ -3,7 +3,7 @@
from test.splitgraph.conftest import OUTPUT, load_splitfile


def test_drawing(pg_repo_local, mg_repo_local):
def test_drawing(pg_repo_local):
# Doesn't really check anything, mostly used to make sure the tree drawing code doesn't throw.
execute_commands(load_splitfile("import_local.splitfile"), output=OUTPUT)

Expand Down

0 comments on commit 6641363

Please sign in to comment.