Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for basic s3 url redirects #952

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions kart/lfs_commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_hash_from_pointer_file,
get_local_path_from_lfs_oid,
)
from kart.lfs_commands.url_redirector import UrlRedirector
from kart.object_builder import ObjectBuilder
from kart.rev_list_objects import rev_list_tile_pointer_files
from kart.repo import KartRepoState
Expand Down Expand Up @@ -276,29 +277,32 @@ def fetch_lfs_blobs_for_commits(
repo.spatial_filter if do_spatial_filter else SpatialFilter.MATCH_ALL
)

pointer_file_oids = set()
dataset_to_pointer_file_oids = {}
for commit in commits:
for dataset in repo.datasets(
commit, filter_dataset_type=ALL_TILE_DATASET_TYPES
):
pointer_file_oids.update(
blob.hex
for blob in dataset.tile_pointer_blobs(spatial_filter=spatial_filter)
pointer_file_oids = dataset_to_pointer_file_oids.setdefault(
dataset.path, set()
)
for blob in dataset.tile_pointer_blobs(spatial_filter=spatial_filter):
pointer_file_oids.add(blob.hex)

fetch_lfs_blobs_for_pointer_files(
repo, pointer_file_oids, dry_run=dry_run, quiet=quiet
repo, dataset_to_pointer_file_oids, dry_run=dry_run, quiet=quiet
)


def fetch_lfs_blobs_for_pointer_files(
repo, pointer_files, *, remote_name=None, dry_run=False, quiet=False
repo, dataset_to_pointer_file_oids, *, remote_name=None, dry_run=False, quiet=False
):
"""
Given a list of pointer files (or OIDs of pointer files themselves - not the OIDs they point to)
fetch all the tiles that those pointer files point to that are not already present in the local cache.
Given a dict in the format: {dataset-path: set(pointer-file-oid-1, pointer-file-oid-2, ...)}
Where dataset-path is the path to a dataset, and each pointer-file-oid is the OID of the pointer file itself
(not the LFS oid that the pointer file points to) that is present in that dataset:
Fetches all the tiles that those pointer files point to that are not already present in the local cache.
"""
if not pointer_files:
if not dataset_to_pointer_file_oids:
return

if not remote_name:
Expand All @@ -312,7 +316,12 @@ def fetch_lfs_blobs_for_pointer_files(
urls_sizes = {}
non_urls_sizes = {}

for pointer_file in pointer_files:
pointer_files_to_datasets = _invert_pointer_file_oid_dict(
dataset_to_pointer_file_oids
)
url_redirector = UrlRedirector(repo)

for pointer_file, datasets in pointer_files_to_datasets.items():
if isinstance(pointer_file, str):
pointer_blob = repo[pointer_file]
elif getattr(pointer_file, "type", None) == pygit2.GIT_OBJ_BLOB:
Expand All @@ -322,6 +331,8 @@ def fetch_lfs_blobs_for_pointer_files(

pointer_dict = pointer_file_bytes_to_dict(pointer_blob)
url = pointer_dict.get("url")
url = url_redirector.apply_redirect(url, datasets)

lfs_oid = get_hash_from_pointer_file(pointer_dict)
pointer_file_oid = pointer_blob.hex
lfs_path = get_local_path_from_lfs_oid(repo, lfs_oid)
Expand Down Expand Up @@ -369,6 +380,20 @@ def fetch_lfs_blobs_for_pointer_files(
_do_fetch_from_remote(repo, non_urls, remote_name, quiet=quiet)


def _invert_pointer_file_oid_dict(dataset_to_pointer_file_oids):
result = {}
for dataset, pointer_file_oids in dataset_to_pointer_file_oids.items():
assert isinstance(dataset, str)
for pointer_file_oid in pointer_file_oids:
existing = result.setdefault(pointer_file_oid, dataset)
if dataset != existing:
if isinstance(existing, str):
result[pointer_file_oid] = {existing, dataset}
elif isinstance(existing, set):
existing.add(dataset)
return result


def _do_fetch_from_urls(repo, urls_and_lfs_oids, quiet=False):
non_s3_url = next(
(url for (url, lfs_oid) in urls_and_lfs_oids if not url.startswith("s3://")),
Expand Down
86 changes: 86 additions & 0 deletions kart/lfs_commands/url_redirector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from collections.abc import Iterable

from kart.tile import ALL_TILE_DATASET_TYPES


class UrlRedirector:
"""
Loads a set of redirect rules that apply to linked-datasets from a given commit.

Suppose, for example, a user migrates all their data from one S3 region to another, for whatever reason.
And suppose the bucket in the new region has a new name, since bucket names are globally unique.
(It may be possible to migrate the bucket name, but for the purpose of this example, the new bucket has a new name).
That will break a linked-dataset where the URLs embedded in each tile point to the original bucket.

The workaround: each linked-dataset has a meta-item called "linked-storage.json", which may contain a mapping
called "urlRedirects". If these redirect rules are updated appropriately, then URLs that point to the old bucket
will be treated as if they point to the new bucket, without needing to update the URL in every single tile
individually and retroactively.

Here is an example urlRedirects mapping that contains 3 rules:
{
"s3://old/and/broken/": "s3://new/and/shiny/",
"s3://old/path/to/tile.laz": "s3://new/path/to/same/tile.laz",
"s3://old/", "s3://new/"
}

This would be applied to an URL as follows - each rule is attempted in turn.
If a rule applies, the url is updated, and subsequent rules are attempted against the updated url.
Eventually the url - which may have been updated by zero, one, or many rules - is returned.

- The first rule ends with a '/' so it does prefix matching:
If the url starts with "s3://old/and/broken/", this prefix will be replaced with "s3://new/and/shiny/"
- The second rule does not end with a '/' so it does exact matching:
If the url is now exactly "s3://old/path/to/tile.laz", it will be set to" s3://new/path/to/same/tile.laz"
- The third rule ends with a '/' so it does prefix matching:
If the url now starts with "s3://old/", this prefix will be replaced with "s3://new/"

Currently url redirect rules are only loaded from the HEAD commit - this is subject to change.
"""

def __init__(self, repo, commit=None):
# TODO - improve redirect-commit finding logic - probably do some of the following:
# - find the tip of the default branch
# - find the local tip of the branch that the remote HEAD was pointing to when we last fetched
# - find a branch specified somehow in the config as the url-redirect branch

self.commit = commit if commit is not None else repo.head_commit

self.dataset_to_redirects = {}

if not self.commit:
return

for dataset in repo.datasets(
self.commit, filter_dataset_type=ALL_TILE_DATASET_TYPES
):
linked_storage = dataset.get_meta_item("linked-storage.json")
redirects = linked_storage.get("urlRedirects") if linked_storage else None
if redirects:
self.dataset_to_redirects[dataset.path] = redirects

def apply_redirect(self, url, dataset):
# It could be the case that a single LFS object is in more than one dataset.
# In this case, we just try to find any set of redirect rules that applies to the object.
if isinstance(dataset, Iterable) and not isinstance(dataset, str):
for d in dataset:
result = self.apply_redirect(url, d)
if result != url:
return result
return url

if not isinstance(dataset, str):
dataset = dataset.path
redirects = self.dataset_to_redirects.get(dataset)
if not redirects:
return url

for from_, to_ in redirects.items():
if from_.endswith("/"):
if url.startswith(from_):
url = to_ + url[len(from_) :]
else:
if url == from_:
url = to_

return url
26 changes: 14 additions & 12 deletions kart/workdir.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,19 +303,20 @@ def _do_reset_datasets(
track_changes_as_dirty=False,
quiet=False,
):
pointer_files_to_fetch = set()
dataset_to_pointer_oids_to_fetch = {}
workdir_diff_cache = self.workdir_diff_cache()
update_diffs = {}

# First pass - make sure the LFS blobs are present in the local LFS cache:
# - For the datasets that will be inserted (written from scratch):
for ds_path in ds_inserts:
pointer_files_to_fetch.update(
blob.hex
for blob in target_datasets[ds_path].tile_pointer_blobs(
self.repo.spatial_filter
)
pointer_file_oids = dataset_to_pointer_oids_to_fetch.setdefault(
ds_path, set()
)
for blob in target_datasets[ds_path].tile_pointer_blobs(
self.repo.spatial_filter
):
pointer_file_oids.add(blob.hex)

# - For the datasets that will be updated:
for ds_path in ds_updates:
Expand All @@ -326,17 +327,18 @@ def _do_reset_datasets(
workdir_diff_cache,
repo_key_filter[ds_path],
)
pointer_files_to_fetch.update(
blob.hex
for blob in self._list_new_pointer_blobs_for_diff(
update_diffs[ds_path], target_datasets[ds_path]
)
pointer_file_oids = dataset_to_pointer_oids_to_fetch.setdefault(
ds_path, set()
)
for blob in self._list_new_pointer_blobs_for_diff(
update_diffs[ds_path], target_datasets[ds_path]
):
pointer_file_oids.add(blob.hex)

# We fetch the LFS tiles immediately before writing them to the working copy -
# unlike ODB objects that are already fetched.
fetch_lfs_blobs_for_pointer_files(
self.repo, pointer_files_to_fetch, quiet=quiet
self.repo, dataset_to_pointer_oids_to_fetch, quiet=quiet
)

# Second pass - actually update the working copy:
Expand Down
Binary file added tests/data/linked-dataset.tgz
Binary file not shown.
66 changes: 66 additions & 0 deletions tests/linked_storage/test_url_redirects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json

from kart.repo import KartRepo


def test_s3_url_redirects(
data_archive,
cli_runner,
s3_test_data_point_cloud,
check_lfs_hashes,
check_tile_is_reflinked,
):
with data_archive("linked-dataset") as repo_path:
r = cli_runner.invoke(["lfs+", "fetch", "HEAD", "--dry-run"])
assert r.exit_code == 0, r.stderr
assert r.stdout.splitlines()[:8] == [
"Running fetch with --dry-run:",
" Found 16 LFS blobs (373KiB) to fetch from specific URLs",
"",
"LFS blob OID: (Pointer file OID):",
"03e3d4dc6fc8e75c65ffdb39b630ffe26e4b95982b9765c919e34fb940e66fc0 (8d2362d8f14ea34aaebdede6602dcca0bcdd8297) → s3://example-bucket/example-path/auckland_3_2.laz",
"06bd15fbb6616cf63a4a410c5ba4666dab76177a58cb99c3fa2afb46c9dd6379 (f129df999b5aea453ace9d4fcd1496dcebf97fe1) → s3://example-bucket/example-path/auckland_1_3.laz",
"09701813661e369395d088a9a44f1201200155e652a8b6e291e71904f45e32a6 (553775bcbaa9c067e8ad611270d53d4f37ac37da) → s3://example-bucket/example-path/auckland_3_0.laz",
"111579edfe022ebfd3388cc47d911c16c72c7ebd84c32a7a0c1dab6ed9ec896a (76cff04b9c7ffb01bb99ac42a6e94612fdea605f) → s3://example-bucket/example-path/auckland_0_2.laz",
]

s3_test_data_point_cloud_prefix = s3_test_data_point_cloud.split("*")[0]

linked_storage_json = {
"urlRedirects": {
"s3://example-bucket/example-path/": s3_test_data_point_cloud_prefix
}
}
r = cli_runner.invoke(
[
"meta",
"set",
"auckland",
f"linked-storage.json={json.dumps(linked_storage_json)}",
]
)
assert r.exit_code == 0, r.stderr

r = cli_runner.invoke(["lfs+", "fetch", "HEAD", "--dry-run"])
assert r.exit_code == 0, r.stderr
assert r.stdout.splitlines()[:8] == [
"Running fetch with --dry-run:",
" Found 16 LFS blobs (373KiB) to fetch from specific URLs",
"",
"LFS blob OID: (Pointer file OID):",
f"03e3d4dc6fc8e75c65ffdb39b630ffe26e4b95982b9765c919e34fb940e66fc0 (8d2362d8f14ea34aaebdede6602dcca0bcdd8297) → {s3_test_data_point_cloud_prefix}auckland_3_2.laz",
f"06bd15fbb6616cf63a4a410c5ba4666dab76177a58cb99c3fa2afb46c9dd6379 (f129df999b5aea453ace9d4fcd1496dcebf97fe1) → {s3_test_data_point_cloud_prefix}auckland_1_3.laz",
f"09701813661e369395d088a9a44f1201200155e652a8b6e291e71904f45e32a6 (553775bcbaa9c067e8ad611270d53d4f37ac37da) → {s3_test_data_point_cloud_prefix}auckland_3_0.laz",
f"111579edfe022ebfd3388cc47d911c16c72c7ebd84c32a7a0c1dab6ed9ec896a (76cff04b9c7ffb01bb99ac42a6e94612fdea605f) → {s3_test_data_point_cloud_prefix}auckland_0_2.laz",
]

r = cli_runner.invoke(["checkout", "--dataset=auckland"])
assert r.exit_code == 0, r.stderr

repo = KartRepo(repo_path)
check_lfs_hashes(repo, expected_file_count=16)
for x in range(4):
for y in range(4):
check_tile_is_reflinked(
repo_path / "auckland" / f"auckland_{x}_{y}.laz", repo
)
Loading