Skip to content

Commit

Permalink
Merge pull request #906 from koordinates/byoda-import
Browse files Browse the repository at this point in the history
Adds an experimental byod-point-cloud-import command
  • Loading branch information
olsen232 committed Sep 20, 2023
2 parents f974601 + 2195933 commit bfd3644
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 12 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ env:
PY_VER: "3.10"
CMAKE_VERSION: "~3.25.0"
FORCE_COLOR: "YES"
KART_S3_TEST_DATA_POINT_CLOUDS: "s3://kart-bring-your-own-data-poc/auckland-small-laz1.2/*.laz"
AWS_NO_SIGN_REQUEST: "1"

jobs:
#
Expand Down
1 change: 1 addition & 0 deletions kart.spec
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ pyi_analysis = Analysis(
# TODO: improve this somehow
*collect_submodules('kart'),
*collect_submodules('kart.annotations'),
*collect_submodules('kart.byod'),
*collect_submodules('kart.lfs_commands'),
*collect_submodules('kart.point_cloud'),
*collect_submodules('kart.sqlalchemy'),
Expand Down
Empty file added kart/byod/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions kart/byod/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging

import click

from kart.fast_import import (
write_blob_to_stream,
)
from kart.lfs_util import dict_to_pointer_file_bytes
from kart.progress_util import progress_bar
from kart.s3_util import expand_s3_glob

L = logging.getLogger(__name__)


class ByodTileImporter:
"""
Subclassable logic for importing the metadata from tile-based datasets,
while leaving the data in-place on existing hosted storage.
"""

EXTRACT_TILE_METADATA_STEP = "Fetching tile metadata"

def sanity_check_sources(self, sources):
for source in sources:
if not source.startswith("s3://"):
raise click.UsageError(f"SOURCE {source} should be an S3 url")

for source in list(sources):
if "*" in source:
sources.remove(source)
sources += expand_s3_glob(source)

def extract_tile_metadata(self, tile_location):
raise NotImplementedError()

def get_conversion_func(self, source_metadata):
return None

def import_tiles_to_stream(self, stream, sources):
progress = progress_bar(
total=len(sources), unit="tile", desc="Writing tile metadata"
)
with progress as p:
for source in sources:
tilename = self.DATASET_CLASS.tilename_from_path(source)
rel_blob_path = self.DATASET_CLASS.tilename_to_blob_path(
tilename, relative=True
)
blob_path = f"{self.dataset_inner_path}/{rel_blob_path}"

# Check if tile has already been imported previously:
if self.existing_dataset is not None:
existing_summary = self.existing_dataset.get_tile_summary(
tilename, missing_ok=True
)
if existing_summary:
source_oid = self.source_to_hash_and_size[source][0]
if self.existing_tile_matches_source(
source_oid, existing_summary
):
# This tile has already been imported before. Reuse it rather than re-importing it.
# Re-importing it could cause it to be re-converted, which is a waste of time,
# and it may not convert the same the second time, which is then a waste of space
# and shows up as a pointless diff.
write_blob_to_stream(
stream,
blob_path,
(self.existing_dataset.inner_tree / rel_blob_path).data,
)
self.include_existing_metadata = True
continue

# Tile hasn't been imported previously.
pointer_data = dict_to_pointer_file_bytes(
self.source_to_metadata[source]["tile"]
)
write_blob_to_stream(stream, blob_path, pointer_data)

p.update(1)

self.source_to_imported_metadata = self.source_to_metadata

def prompt_for_convert_to_cloud_optimized(self):
return False
132 changes: 132 additions & 0 deletions kart/byod/point_cloud_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import logging

import click

from kart.byod.importer import ByodTileImporter
from kart.cli_util import StringFromFile, MutexOption, KartCommand
from kart.point_cloud.import_ import PointCloudImporter
from kart.point_cloud.metadata_util import extract_pc_tile_metadata
from kart.s3_util import get_hash_and_size_of_s3_object, fetch_from_s3


L = logging.getLogger(__name__)


@click.command("byod-point-cloud-import", hidden=True, cls=KartCommand)
@click.pass_context
@click.option(
"--message",
"-m",
type=StringFromFile(encoding="utf-8"),
help="Commit message. By default this is auto-generated.",
)
@click.option(
"--checkout/--no-checkout",
"do_checkout",
is_flag=True,
default=True,
help="Whether to create a working copy once the import is finished, if no working copy exists yet.",
)
@click.option(
"--replace-existing",
is_flag=True,
cls=MutexOption,
exclusive_with=["--delete", "--update-existing"],
help="Replace existing dataset at the same path.",
)
@click.option(
"--update-existing",
is_flag=True,
cls=MutexOption,
exclusive_with=["--replace-existing"],
help=(
"Update existing dataset at the same path. "
"Tiles will be replaced by source tiles with the same name. "
"Tiles in the existing dataset which are not present in SOURCES will remain untouched."
),
)
@click.option(
"--delete",
type=StringFromFile(encoding="utf-8"),
cls=MutexOption,
exclusive_with=["--replace-existing"],
multiple=True,
help=("Deletes the given tile. Can be used multiple times."),
)
@click.option(
"--amend",
default=False,
is_flag=True,
help="Amend the previous commit instead of adding a new commit",
)
@click.option(
"--allow-empty",
is_flag=True,
default=False,
help=(
"Usually recording a commit that has the exact same tree as its sole "
"parent commit is a mistake, and the command prevents you from making "
"such a commit. This option bypasses the safety"
),
)
@click.option(
"--num-workers",
"--num-processes",
type=click.INT,
help="How many import workers to run in parallel. Defaults to the number of available CPU cores.",
default=None,
hidden=True,
)
@click.option("--dataset-path", "--dataset", help="The dataset's path once imported")
@click.argument(
"sources",
nargs=-1,
metavar="SOURCE [SOURCES...]",
)
def byod_point_cloud_import(
ctx,
message,
do_checkout,
replace_existing,
update_existing,
delete,
amend,
allow_empty,
num_workers,
dataset_path,
sources,
):
"""
Experimental. Import a dataset of point-cloud tiles from S3. Doesn't fetch the tiles, does store the tiles original location.
SOURCES should be one or more LAZ or LAS files (or wildcards that match multiple LAZ or LAS files).
"""
repo = ctx.obj.repo

ByodPointCloudImporter(repo, ctx).import_tiles(
convert_to_cloud_optimized=False,
dataset_path=dataset_path,
message=message,
do_checkout=do_checkout,
replace_existing=replace_existing,
update_existing=update_existing,
delete=delete,
amend=amend,
allow_empty=allow_empty,
sources=list(sources),
num_workers=num_workers,
)


class ByodPointCloudImporter(ByodTileImporter, PointCloudImporter):
def extract_tile_metadata(self, tile_location):
oid_and_size = get_hash_and_size_of_s3_object(tile_location)
# TODO - download only certain ranges of the file, and extract metadata from those.
tmp_downloaded_tile = fetch_from_s3(tile_location)
result = extract_pc_tile_metadata(
tmp_downloaded_tile, oid_and_size=oid_and_size
)
tmp_downloaded_tile.unlink()
# TODO - format still not definite, we might not put the whole URL in here.
result["tile"]["url"] = tile_location
return result
1 change: 1 addition & 0 deletions kart/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"point_cloud.import_": {"point-cloud-import"},
"install": {"install"},
"add_dataset": {"add-dataset"},
"byod.point_cloud_import": {"byod-point-cloud-import"},
}

# These commands aren't valid Python symbols, even when we change dash to underscore.
Expand Down
7 changes: 5 additions & 2 deletions kart/point_cloud/metadata_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_native_extent(info):
)


def extract_pc_tile_metadata(pc_tile_path):
def extract_pc_tile_metadata(pc_tile_path, oid_and_size=None):
"""
Use pdal to get any and all point-cloud metadata we can make use of in Kart.
This includes metadata that must be dataset-homogenous and would be stored in the dataset's /meta/ folder,
Expand Down Expand Up @@ -192,7 +192,10 @@ def extract_pc_tile_metadata(pc_tile_path):
}

schema_json = pdal_schema_to_kart_schema(metadata["filters.info"]["schema"])
oid, size = get_hash_and_size_of_file(pc_tile_path)
if oid_and_size:
oid, size = oid_and_size
else:
oid, size = get_hash_and_size_of_file(pc_tile_path)

# Keep tile info keys in alphabetical order, except oid and size should be last.
tile_info = {
Expand Down
106 changes: 106 additions & 0 deletions kart/s3_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from base64 import standard_b64decode
import functools
import os
from pathlib import Path
import tempfile
from urllib.parse import urlparse

import boto3
import click

from kart.exceptions import NotFound, NO_IMPORT_SOURCE

# Utility functions for dealing with S3 - not yet launched.


@functools.lru_cache(maxsize=1)
def get_s3_config():
# TODO - add an option --s3-region to commands where it would be useful.
return None


@functools.lru_cache(maxsize=1)
def get_s3_client():
client = boto3.client("s3", config=get_s3_config())
if "AWS_NO_SIGN_REQUEST" in os.environ:
client._request_signer.sign = lambda *args, **kwargs: None
return client


@functools.lru_cache(maxsize=1)
def get_s3_resource():
resource = boto3.resource("s3", config=get_s3_config())
if "AWS_NO_SIGN_REQUEST" in os.environ:
resource.meta.client._request_signer.sign = lambda *args, **kwargs: None
return resource


@functools.lru_cache()
def get_bucket(name):
return get_s3_resource().Bucket(name)


def fetch_from_s3(s3_url, output_path=None):
"""
Downloads the object at s3_url to output_path.
If output-path is not set, creates a temporary file using tempfile.mkstemp()
"""
# TODO: handle failure.
parsed = urlparse(s3_url)
bucket = get_bucket(parsed.netloc)
if output_path is None:
fd, path = tempfile.mkstemp()
# If we keep it open, boto3 won't be able to write to it (on Windows):
os.close(fd)
output_path = Path(path)
bucket.download_file(parsed.path.lstrip("/"), str(output_path.resolve()))
return output_path


def expand_s3_glob(source_spec):
"""
Given an s3_path with '*' wildcard in, uses prefix and suffix matching to find all S3 objects that match.
Subdirectories (or the S3 equivalent - S3 is not exactly a directory hierarchy) are not matched -
that is, s3://bucket/path/*.txt matches s3://bucket/path/example.txt but not s3://bucket/path/subpath/example.txt
"""
# TODO: handle any kind of failure, sanity check to make sure we don't match a million objects.
if "*" not in source_spec:
return [source_spec]

parsed = urlparse(source_spec)
prefix, suffix = parsed.path.split("*", maxsplit=1)
if "*" in suffix:
raise click.UsageError(
f"Two wildcards '*' found in {source_spec} - only one wildcard is supported"
)
prefix = prefix.lstrip("/")
prefix_len = len(prefix)

bucket = get_bucket(parsed.netloc)
matches = bucket.objects.filter(Prefix=prefix)
result = []
for match in matches:
assert match.key.startswith(prefix)
match_suffix = match.key[prefix_len:]
if match_suffix.endswith(suffix) and "/" not in match_suffix:
result.append(f"s3://{match.bucket_name}/{match.key}")

if not result:
raise NotFound(
f"No S3 objects found at {source_spec}", exit_code=NO_IMPORT_SOURCE
)
return result


def get_hash_and_size_of_s3_object(s3_url):
"""Returns the (SHA256-hash-in-Base64, filesize) of an S3 object."""
parsed = urlparse(s3_url)
bucket = parsed.netloc
key = parsed.path.lstrip("/")
response = get_s3_client().head_object(
Bucket=bucket, Key=key, ChecksumMode="ENABLED"
)
# TODO - handle failure (eg missing SHA256 checksum), which is extremely likely.
sha256 = standard_b64decode(response["ChecksumSHA256"]).hex()
size = response["ContentLength"]
return sha256, size

0 comments on commit bfd3644

Please sign in to comment.