Skip to content

Commit

Permalink
Add Object Store implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Deren <dominik.deren@live.com>
  • Loading branch information
domderen committed Oct 9, 2022
1 parent 5a698b9 commit eef93c5
Show file tree
Hide file tree
Showing 6 changed files with 1,103 additions and 109 deletions.
550 changes: 443 additions & 107 deletions Pipfile.lock

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,106 @@ def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result['ttl'] = self._to_nanoseconds(self.ttl)
return result


@dataclass
class StreamPurgeRequest(Base):
"""
StreamPurgeRequest is optional request information to the purge API.
"""
# Purge up to but not including sequence.
seq: Optional[int] = None
# Subject to match against messages for the purge command.
filter: Optional[str] = None
# Number of messages to keep.
keep: Optional[int] = None


@dataclass
class ObjectStoreConfig(Base):
"""
ObjectStoreConfig is the configurigation of an ObjectStore.
"""
bucket: str
description: Optional[str] = None
ttl: Optional[float] = None
max_bytes: Optional[int] = None
storage: Optional[StorageType] = None
replicas: int = 1
placement: Optional[Placement] = None

def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result['ttl'] = self._to_nanoseconds(self.ttl)
return result


@dataclass
class ObjectLink(Base):
"""
ObjectLink is used to embed links to other buckets and objects.
"""
# Bucket is the name of the other object store.
bucket: str
# Name can be used to link to a single object.
# If empty means this is a link to the whole store, like a directory.
name: Optional[str] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
return super().from_response(resp)


@dataclass
class ObjectMetaOptions(Base):
link: Optional[ObjectLink] = None
max_chunk_size: Optional[int] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'link', ObjectLink)
return super().from_response(resp)


@dataclass
class ObjectMeta(Base):
"""
ObjectMeta is high level information about an object.
"""
name: str
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)


@dataclass
class ObjectInfo(Base):
"""
ObjectInfo is meta plus instance information.
"""
name: str
bucket: str
nuid: str
size: int
mtime: str
chunks: int
digest: Optional[str] = None
deleted: Optional[bool] = None
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None

def is_link(self) -> bool:
return self.options is not None and self.options.link is not None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)
87 changes: 86 additions & 1 deletion nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
from nats.js import api
from nats.js.errors import BadBucketError, BucketNotFoundError, NotFoundError
from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError
from nats.js.kv import KeyValue
from nats.js.manager import JetStreamManager
from nats.js.object_store import (
VALID_BUCKET_RE, OBJ_ALL_CHUNKS_PRE_TEMPLATE, OBJ_ALL_META_PRE_TEMPLATE,
OBJ_STREAM_TEMPLATE, ObjectStore
)

if TYPE_CHECKING:
from nats import NATS
Expand Down Expand Up @@ -991,6 +995,9 @@ async def _fetch_n(
######################

async def key_value(self, bucket: str) -> KeyValue:
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = KV_STREAM_TEMPLATE.format(bucket=bucket)
try:
si = await self.stream_info(stream)
Expand Down Expand Up @@ -1019,6 +1026,9 @@ async def create_key_value(
config = api.KeyValueConfig(bucket=params["bucket"])
config = config.evolve(**params)

if VALID_BUCKET_RE.match(config.bucket) is None:
raise InvalidBucketNameError

duplicate_window = 2 * 60 # 2 minutes
if config.ttl and config.ttl < duplicate_window:
duplicate_window = config.ttl
Expand Down Expand Up @@ -1061,5 +1071,80 @@ async def delete_key_value(self, bucket: str) -> bool:
delete_key_value deletes a JetStream KeyValue store by destroying
the associated stream.
"""
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = KV_STREAM_TEMPLATE.format(bucket=bucket)
return await self.delete_stream(stream)

#######################
# #
# ObjectStore Context #
# #
#######################

async def object_store(self, bucket: str) -> ObjectStore:
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket)
try:
await self.stream_info(stream)
except NotFoundError:
raise BucketNotFoundError

return ObjectStore(
name=bucket,
stream=stream,
js=self,
)

async def create_object_store(
self,
config: Optional[api.ObjectStoreConfig] = None,
**params,
) -> ObjectStore:
"""
create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream.
"""
if config is None:
config = api.ObjectStoreConfig(bucket=params["bucket"])
config = config.evolve(**params)

if VALID_BUCKET_RE.match(config.bucket) is None:
raise InvalidBucketNameError

name = config.bucket
chunks = OBJ_ALL_CHUNKS_PRE_TEMPLATE.format(bucket=name)
meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=name)

stream = api.StreamConfig(
name=OBJ_STREAM_TEMPLATE.format(bucket=config.bucket),
description=config.description,
subjects=[chunks, meta],
max_age=config.ttl,
max_bytes=config.max_bytes,
storage=config.storage,
num_replicas=config.replicas,
placement=config.placement,
discard=api.DiscardPolicy.NEW,
allow_rollup_hdrs=True,
)
await self.add_stream(stream)

assert stream.name is not None
return ObjectStore(
name=config.bucket,
stream=stream.name,
js=self,
)

async def delete_object_store(self, bucket: str) -> bool:
"""
delete_object_store will delete the underlying stream for the named object.
"""
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket)
return await self.delete_stream(stream)
35 changes: 35 additions & 0 deletions nats/js/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,38 @@ class KeyHistoryTooLargeError(KeyValueError):

def __str__(self) -> str:
return "nats: history limited to a max of 64"


class InvalidBucketNameError(Error):
"""
Raised when trying to create a KV or OBJ bucket with invalid name.
"""
pass


class InvalidObjectNameError(Error):
"""
Raised when trying to put an object in Object Store with invalid key.
"""
pass


class BadObjectMetaError(Error):
"""
Raised when trying to read corrupted metadata from Object Store.
"""
pass


class LinkIsABucketError(Error):
"""
Raised when trying to get object from Object Store that is a bucket.
"""
pass


class DigestMismatchError(Error):
"""
Raised when getting an object from Object Store that has a different digest than expected.
"""
pass
Loading

0 comments on commit eef93c5

Please sign in to comment.