Skip to content

Commit

Permalink
Merge pull request #668 from splitgraph/feature/writeable-lq-splitfiles
Browse files Browse the repository at this point in the history
Add support for writeable LQ to Splitfiles
  • Loading branch information
mildbyte committed Apr 26, 2022
2 parents 36fd830 + 3665bc9 commit dbb693e
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 37 deletions.
4 changes: 3 additions & 1 deletion splitgraph/commandline/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,9 @@ def upload_c(remote, file_format, repository, files):

wait_for_load(client, repository.namespace, repository.repository, task_id)

web_url = _construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/-/tables"
web_url = (
_construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/latest/-/tables"
)
click.echo()
click.echo(
"Success. See the repository at " + Color.BLUE + web_url + Color.END + " or query it with:"
Expand Down
22 changes: 18 additions & 4 deletions splitgraph/commandline/splitfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
@click.option(
"-o", "--output-repository", help="Repository to store the result in.", type=RepositoryType()
)
def build_c(splitfile, args, output_repository):
@click.option(
"-l",
"--layered-querying",
help="Use writeable layered querying when checking images out. Experimental.",
is_flag=True,
)
def build_c(splitfile, args, output_repository, layered_querying):
"""
Build Splitgraph images.
Expand Down Expand Up @@ -54,7 +60,9 @@ def build_c(splitfile, args, output_repository):
file_name = os.path.splitext(os.path.basename(splitfile.name))[0]
output_repository = Repository.from_schema(file_name)

execute_commands(splitfile.read(), args, output=output_repository)
execute_commands(
splitfile.read(), args, output=output_repository, use_writeable_lq=layered_querying
)


@click.command(name="provenance")
Expand Down Expand Up @@ -205,7 +213,13 @@ def dependents_c(image_spec, source_on, dependents_on):
help="Images to substitute into the reconstructed Splitfile, of the form"
" [NAMESPACE/]REPOSITORY[:HASH_OR_TAG]. Default tag is 'latest'.",
)
def rebuild_c(image_spec, update, against):
@click.option(
"-l",
"--layered-querying",
help="Use writeable layered querying when checking images out. Experimental.",
is_flag=True,
)
def rebuild_c(image_spec, update, against, layered_querying):
"""
Rebuild images against different dependencies.
Expand Down Expand Up @@ -244,4 +258,4 @@ def rebuild_c(image_spec, update, against):

from splitgraph.splitfile.execution import rebuild_image

rebuild_image(image, new_images)
rebuild_image(image, new_images, use_writeable_lq=layered_querying)
32 changes: 22 additions & 10 deletions splitgraph/splitfile/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ def _combine_hashes(hashes: List[str]) -> str:
return sha256("".join(hashes).encode("ascii")).hexdigest()


def _checkout_or_calculate_layer(output: Repository, image_hash: str, calc_func: Callable) -> None:
def _checkout_or_calculate_layer(
output: Repository, image_hash: str, calc_func: Callable, use_writeable_lq: bool = False
) -> None:
# Future optimization here: don't actually check the layer out if it exists -- only do it at Splitfile execution
# end or when a command needs it.

# Have we already calculated this hash?
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
logging.info(" ---> Using cache")
except ImageNotFoundError:
try:
Expand All @@ -64,6 +66,7 @@ def execute_commands(
params: Optional[Dict[str, str]] = None,
output: Optional[Repository] = None,
output_base: str = "0" * 32,
use_writeable_lq: bool = False,
) -> None:
"""
Executes a series of Splitfile commands.
Expand All @@ -74,6 +77,7 @@ def execute_commands(
:param output: Output repository to execute the Splitfile against.
:param output_base: If not None, a revision that gets checked out for all Splitfile actions to be committed
on top of it.
:param use_writeable_lq: Use writeable LQ to execute commands
"""
if params is None:
params = {}
Expand Down Expand Up @@ -152,16 +156,18 @@ def _initialize_output(output):
raise


def checkout_if_changed(repository: Repository, image_hash: str) -> None:
def checkout_if_changed(
repository: Repository, image_hash: str, use_writeable_lq: bool = False
) -> None:
if (
repository.head is None
or (repository.head.image_hash != image_hash)
or repository.has_pending_changes()
):
repository.images.by_hash(image_hash).checkout()
repository.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
else:
logging.info(
"Skipping checkout of %s as %s has it checked out " "and there have been no changes",
"Skipping checkout of %s as %s has it checked out and there have been no changes",
image_hash,
repository,
)
Expand Down Expand Up @@ -379,7 +385,9 @@ def _calc():
source_mountpoint.delete()


def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
def _execute_custom(
node: Node, output: Repository, use_writeable_lq: bool = False
) -> ProvenanceLine:
assert output.head is not None
command, args = parse_custom_command(node)

Expand Down Expand Up @@ -410,7 +418,7 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
if command_hash is not None:
image_hash = _combine_hashes([output_head, command_hash])
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
logging.info(" ---> Using cache")
return {"type": "CUSTOM"}
except ImageNotFoundError:
Expand All @@ -425,14 +433,16 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:

# Check just in case if the new hash produced by the command already exists.
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
except ImageNotFoundError:
# Full command as a commit comment
output.commit(image_hash, comment=node.text)
return {"type": "CUSTOM"}


def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> None:
def rebuild_image(
image: Image, source_replacement: Dict[Repository, str], use_writeable_lq: bool = False
) -> None:
"""
Recreates the Splitfile used to create a given image and reruns it, replacing its dependencies with a different
set of versions.
Expand All @@ -444,4 +454,6 @@ def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> No
ignore_irreproducible=False, source_replacement=source_replacement
)
# Params are supposed to be stored in the commands already (baked in) -- what if there's sensitive data there?
execute_commands("\n".join(splitfile_commands), output=image.repository)
execute_commands(
"\n".join(splitfile_commands), output=image.repository, use_writeable_lq=use_writeable_lq
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ Uploading the files...
(SUCCESS) Waiting for task ID ingest_task


Success. See the repository at http://www.example.com/someuser/somerepo_1/-/tables or query it with:
Success. See the repository at http://www.example.com/someuser/somerepo_1/latest/-/tables or query it with:
sgr cloud sql 'SELECT * FROM "someuser/somerepo_1"."base_df"'
30 changes: 23 additions & 7 deletions test/splitgraph/commandline/test_splitfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import mock
from unittest.mock import call

import pytest
from click.testing import CliRunner

from splitgraph.commandline import build_c, dependents_c, provenance_c, rebuild_c
Expand All @@ -17,11 +18,17 @@ def test_splitfile_default():
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "latest"],
)
assert ec.mock_calls == [
call(mock.ANY, {"TAG": "latest"}, output=Repository("", "import_remote_multiple"))
call(
mock.ANY,
{"TAG": "latest"},
output=Repository("", "import_remote_multiple"),
use_writeable_lq=False,
)
]


def test_splitfile(local_engine_empty, pg_repo_remote):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile(local_engine_empty, pg_repo_remote, use_writeable_lq):
runner = CliRunner()

result = runner.invoke(
Expand All @@ -33,7 +40,8 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
"latest",
"-o",
"output",
],
]
+ (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0
assert OUTPUT.run_sql("SELECT id, fruit, vegetable FROM join_table") == [
Expand Down Expand Up @@ -78,17 +86,22 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
assert "%s:%s" % (OUTPUT, OUTPUT.head.image_hash) in result.output


def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag, use_writeable_lq):
runner = CliRunner()

result = runner.invoke(
build_c,
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"],
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"]
+ (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0

# Rerun the output:latest against v2 of the test/pg_mount
result = runner.invoke(rebuild_c, ["output:latest", "--against", "test/pg_mount:v2"])
result = runner.invoke(
rebuild_c,
["output:latest", "--against", "test/pg_mount:v2"] + (["-l"] if use_writeable_lq else []),
)
output_v2 = OUTPUT.head
assert result.exit_code == 0
v2 = pg_repo_remote_multitag.images["v2"]
Expand All @@ -98,7 +111,10 @@ def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
# In this case, this should all resolve to the same version of test/pg_mount (v2) and not produce
# any extra commits.
curr_commits = OUTPUT.images()
result = runner.invoke(rebuild_c, ["output:latest", "-u"])
result = runner.invoke(
rebuild_c,
["output:latest", "-u"] + (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0
assert output_v2 == OUTPUT.head
assert OUTPUT.images() == curr_commits
60 changes: 46 additions & 14 deletions test/splitgraph/splitfile/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ def test_local_import_splitfile(pg_repo_local):
assert not OUTPUT.engine.table_exists(OUTPUT.to_schema(), "fruits")


def test_advanced_splitfile(pg_repo_local):
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_advanced_splitfile(pg_repo_local, use_writeable_lq):
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)

assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "my_fruits")
assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "vegetables")
Expand All @@ -128,13 +133,22 @@ def test_advanced_splitfile(pg_repo_local):
assert OUTPUT.run_sql("SELECT * FROM my_fruits") == [(2, "orange")]


def test_splitfile_cached(pg_repo_local):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_cached(pg_repo_local, use_writeable_lq):
# Check that no new commits/snaps are created if we rerun the same splitfile
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)
images = OUTPUT.images()
assert len(images) == 4

execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)
new_images = OUTPUT.images()
assert new_images == images

Expand Down Expand Up @@ -205,8 +219,14 @@ def test_import_updating_splitfile_with_uploading(


@pytest.mark.mounting
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_end_to_end_with_uploading(
local_engine_empty, remote_engine, pg_repo_remote_multitag, mg_repo_remote, clean_minio
local_engine_empty,
remote_engine,
pg_repo_remote_multitag,
mg_repo_remote,
clean_minio,
use_writeable_lq,
):
# An end-to-end test:
# * Create a derived dataset from some tables imported from the remote engine
Expand All @@ -217,7 +237,10 @@ def test_splitfile_end_to_end_with_uploading(

# Do the same setting up first and run the splitfile against the remote data.
execute_commands(
load_splitfile("import_remote_multiple.splitfile"), params={"TAG": "v1"}, output=OUTPUT
load_splitfile("import_remote_multiple.splitfile"),
params={"TAG": "v1"},
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)

remote_output = Repository(OUTPUT.namespace, OUTPUT.repository, remote_engine)
Expand All @@ -230,22 +253,31 @@ def test_splitfile_end_to_end_with_uploading(
OUTPUT.objects.cleanup()

stage_2 = R("output_stage_2")
execute_commands(load_splitfile("import_from_preuploaded_remote.splitfile"), output=stage_2)
execute_commands(
load_splitfile("import_from_preuploaded_remote.splitfile"),
output=stage_2,
use_writeable_lq=use_writeable_lq,
)

assert stage_2.run_sql("SELECT id, name, fruit, vegetable FROM diet") == [
(2, "James", "orange", "carrot")
]


@pytest.mark.mounting
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local):
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local, use_writeable_lq):
execute_commands(
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)
old_output_head = OUTPUT.head

# Then, alter the dataset and rerun the splitfile.
pg_repo_local.run_sql("INSERT INTO fruits VALUES (12, 'mayonnaise')")
pg_repo_local.commit()
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)
new_output_head = OUTPUT.head

old_output_head.checkout()
Expand Down Expand Up @@ -438,16 +470,16 @@ def test_splitfile_with_external_sql(readonly_pg_repo):


@pytest.mark.registry
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local, use_writeable_lq):
# Test SQL commands accessing repos directly -- join a remote repo with
# some local data.

prepare_lq_repo(pg_repo_local, commit_after_every=False, include_pk=True)
pg_repo_local.head.tag("v2")

execute_commands(
load_splitfile("inline_sql.splitfile"),
output=OUTPUT,
load_splitfile("inline_sql.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)

new_head = OUTPUT.head
Expand Down

0 comments on commit dbb693e

Please sign in to comment.