Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding local deltacat storage module #175

Merged
merged 4 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
init,
)
from deltacat.catalog.model.table_definition import TableDefinition
from deltacat.compute.compactor import SortKey, SortOrder
from deltacat.storage import (
DistributedDataset,
LifecycleState,
Expand All @@ -37,6 +36,8 @@
LocalTable,
Namespace,
SchemaConsistencyType,
SortKey,
SortOrder,
)
from deltacat.types.media import ContentEncoding, ContentType, TableType
from deltacat.types.tables import TableWriteMode
Expand Down
2 changes: 1 addition & 1 deletion deltacat/catalog/delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from deltacat.catalog.model.catalog import Catalog, all_catalogs
from deltacat.catalog.model.table_definition import TableDefinition
from deltacat.compute.compactor.model.sort_key import SortKey
from deltacat.storage.model.sort_key import SortKey
from deltacat.storage.model.list_result import ListResult
from deltacat.storage.model.namespace import Namespace
from deltacat.storage.model.types import (
Expand Down
2 changes: 1 addition & 1 deletion deltacat/catalog/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pyarrow as pa

from deltacat.catalog.model.table_definition import TableDefinition
from deltacat.compute.compactor.model.sort_key import SortKey
from deltacat.storage.model.sort_key import SortKey
from deltacat.storage.model.list_result import ListResult
from deltacat.storage.model.namespace import Namespace
from deltacat.storage.model.types import (
Expand Down
3 changes: 0 additions & 3 deletions deltacat/compute/compactor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
RoundCompletionInfo,
HighWatermark,
)
from deltacat.compute.compactor.model.sort_key import SortKey, SortOrder

__all__ = [
"DeltaAnnotated",
Expand All @@ -27,6 +26,4 @@
"PyArrowWriteResult",
"RoundCompletionInfo",
"HighWatermark",
"SortKey",
"SortOrder",
]
5 changes: 3 additions & 2 deletions deltacat/compute/compactor/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from deltacat.compute.compactor import (
PyArrowWriteResult,
RoundCompletionInfo,
SortKey,
)
from deltacat.storage.model.sort_key import SortKey
from deltacat.compute.compactor.model.dedupe_result import DedupeResult
from deltacat.compute.compactor.model.hash_bucket_result import HashBucketResult
from deltacat.io.object_store import IObjectStore
Expand Down Expand Up @@ -50,6 +50,7 @@
from deltacat.compute.compactor.model.compaction_session_audit_info import (
CompactionSessionAuditInfo,
)
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys
from deltacat.utils.resources import get_current_node_peak_memory_usage_in_bytes


Expand Down Expand Up @@ -83,7 +84,7 @@ def check_preconditions(
assert (
new_hash_bucket_count >= 1
), "New hash bucket count must be a positive value"
return SortKey.validate_sort_keys(
return validate_sort_keys(
source_partition_locator,
sort_keys,
deltacat_storage,
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/model/delta_file_envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class DeltaFileEnvelope(dict):
@staticmethod
def of(
stream_position: int,
file_index: int,
delta_type: DeltaType,
table: LocalTable,
file_index: int = None,
is_src_delta: np.bool_ = True,
file_record_count: Optional[int] = None,
) -> DeltaFileEnvelope:
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/model/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, Dict, List
from uuid import uuid4

from deltacat.compute.compactor.model.sort_key import SortKey
from deltacat.storage.model.sort_key import SortKey
from deltacat.storage import Locator, PartitionLocator
from deltacat.utils.common import sha1_hexdigest

Expand Down
98 changes: 0 additions & 98 deletions deltacat/compute/compactor/model/sort_key.py

This file was deleted.

5 changes: 3 additions & 2 deletions deltacat/compute/compactor/repartition_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import itertools
from deltacat.compute.compactor import (
RoundCompletionInfo,
SortKey,
)
from deltacat.storage.model.sort_key import SortKey
from deltacat.types.media import ContentType
from deltacat.compute.compactor import DeltaAnnotated
from deltacat.utils.ray_utils.concurrency import (
Expand All @@ -31,6 +31,7 @@
interface as unimplemented_deltacat_storage,
)
from deltacat.utils.metrics import MetricsConfig
from deltacat.compute.compactor.utils.sort_key import validate_sort_keys

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))

Expand Down Expand Up @@ -157,7 +158,7 @@ def repartition(
new_compacted_partition_locator,
compacted_delta.stream_position,
)
bit_width_of_sort_keys = SortKey.validate_sort_keys(
bit_width_of_sort_keys = validate_sort_keys(
source_partition_locator,
sort_keys,
deltacat_storage,
Expand Down
3 changes: 1 addition & 2 deletions deltacat/compute/compactor/steps/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@

from deltacat import logs
from deltacat.compute.compactor import (
SortKey,
SortOrder,
DeltaFileEnvelope,
DeltaFileLocator,
)
from deltacat.storage.model.sort_key import SortKey, SortOrder
from deltacat.compute.compactor.model.dedupe_result import DedupeResult
from deltacat.compute.compactor.utils import system_columns as sc
from deltacat.utils.ray_utils.runtime import (
Expand Down
2 changes: 1 addition & 1 deletion deltacat/compute/compactor/steps/hash_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from deltacat.compute.compactor import (
DeltaAnnotated,
DeltaFileEnvelope,
SortKey,
RoundCompletionInfo,
)
from deltacat.storage.model.sort_key import SortKey
from deltacat.compute.compactor.model.delta_file_envelope import DeltaFileEnvelopeGroups
from deltacat.compute.compactor.model.hash_bucket_result import HashBucketResult
from deltacat.compute.compactor.utils import system_columns as sc
Expand Down
16 changes: 1 addition & 15 deletions deltacat/compute/compactor/utils/primary_key_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,12 @@
from ray.types import ObjectRef

from deltacat import logs
from deltacat.aws import s3u
from deltacat.compute.compactor import (
PrimaryKeyIndexVersionLocator,
)
from deltacat.compute.compactor.utils import system_columns as sc
from deltacat.io.object_store import IObjectStore

logger = logs.configure_deltacat_logger(logging.getLogger(__name__))


def delete_primary_key_index_version(
s3_bucket: str, pki_version_locator: PrimaryKeyIndexVersionLocator
) -> None:

logger.info(f"Deleting primary key index: {pki_version_locator}")
s3u.delete_files_by_prefix(
s3_bucket,
pki_version_locator.primary_key_index_version_root_path,
)
logger.info(f"Primary key index deleted: {pki_version_locator}")
# TODO: Deprecate this module in the favor of compactor_v2


def group_record_indices_by_hash_bucket(
Expand Down
52 changes: 52 additions & 0 deletions deltacat/compute/compactor/utils/sort_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pyarrow as pa
from typing import List
from deltacat.storage import PartitionLocator, SortKey

MAX_SORT_KEYS_BIT_WIDTH = 256


def validate_sort_keys(
source_partition_locator: PartitionLocator,
sort_keys: List[SortKey],
deltacat_storage,
) -> int:
"""
Validates the input sort keys to ensure that they are unique, are using
a valid sort key model, are all fixed-width data types, and that the
sum of bit widths across sort key data types is less-than-or-equal-to
256. Returns the sum of bit widths across all sort keys.
"""
total_sort_keys_bit_width = 0
if sort_keys:
sort_key_names = [key.key_name for key in sort_keys]
assert len(sort_key_names) == len(
set(sort_key_names)
), f"Sort key names must be unique: {sort_key_names}"
stream_locator = source_partition_locator.stream_locator
table_version_schema = deltacat_storage.get_table_version_schema(
stream_locator.namespace,
stream_locator.table_name,
stream_locator.table_version,
)
if isinstance(table_version_schema, pa.Schema):
for sort_key_name in sort_key_names:
pa_field: pa.Field = pa.Schema.field(sort_key_name)
pa_type: pa.DataType = pa_field.type
try:
total_sort_keys_bit_width += pa_type.bit_width
if total_sort_keys_bit_width > MAX_SORT_KEYS_BIT_WIDTH:
raise ValueError(
f"Total length of sort keys "
f"({total_sort_keys_bit_width}) is greater "
f"than the max supported bit width for all "
f"sort keys ({MAX_SORT_KEYS_BIT_WIDTH})"
)
except ValueError as e:
raise ValueError(
f"Unable to get bit width of sort key: {pa_field}. "
f"Please ensure that all sort keys are fixed-size "
f"PyArrow data types."
) from e
else:
total_sort_keys_bit_width = MAX_SORT_KEYS_BIT_WIDTH
return total_sort_keys_bit_width
3 changes: 3 additions & 0 deletions deltacat/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
LocalTable,
SchemaConsistencyType,
)
from deltacat.storage.model.sort_key import SortKey, SortOrder

__all__ = [
"CommitState",
Expand Down Expand Up @@ -50,4 +51,6 @@
"TableLocator",
"TableVersion",
"TableVersionLocator",
"SortKey",
"SortOrder",
]
4 changes: 3 additions & 1 deletion deltacat/storage/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pyarrow as pa

from deltacat import SortKey
from deltacat.storage import (
Delta,
DeltaLocator,
Expand All @@ -21,6 +20,7 @@
StreamLocator,
Table,
TableVersion,
SortKey,
)
from deltacat.types.media import ContentType, StorageType, TableType
from deltacat.utils.common import ReadKwargsProvider
Expand Down Expand Up @@ -295,6 +295,8 @@ def update_table(
description: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
new_table_name: Optional[str] = None,
*args,
**kwargs
) -> None:
"""
Update table metadata describing the table versions it contains. By default,
Expand Down
Loading
Loading