Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: Initial ObjectStore client implementation #331

Merged
merged 23 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cad4e7c
Fix issue reconnecting against LBs
wallyqs Nov 11, 2020
7b068da
Bump version
wallyqs Nov 11, 2020
70aa87b
Add test for condition of bad server behind LB
wallyqs Nov 12, 2020
9f69047
Merge pull request #189 from nats-io/reconnect-with-lbs
wallyqs Nov 12, 2020
7217f3c
Create dependencies.md
gcolliso Dec 22, 2020
045f7bf
Merge pull request #191 from nats-io/add_dependencies_doc
wallyqs Dec 22, 2020
4c48f90
support set pending size to client connect options
blackstorm Feb 25, 2021
50034c9
Merge pull request #196 from blackstorm/lee/connect-pending-size-option
wallyqs Feb 25, 2021
84afa7f
Bump py from 1.8.0 to 1.10.0
dependabot[bot] Apr 20, 2021
83b5b6a
yapf: format fixes
wallyqs Jul 12, 2021
f011db7
Merge branch 'dependabot/pip/py-1.10.0'
wallyqs Jul 13, 2021
3f35065
Update __main__.py
tdelora Apr 19, 2022
03070fc
Merge pull request #294 from tdelora/patch-1
wallyqs Apr 19, 2022
eef93c5
Add Object Store implementation
domderen Oct 9, 2022
7816cf6
Merge remote-tracking branch 'origin/master' into object_store
domderen Dec 7, 2022
4bfd334
Removing not wanted change.
domderen Dec 7, 2022
c6c07ff
Updating lock file.
domderen Dec 7, 2022
3a2c798
Fixing object store item name parser.
domderen Dec 7, 2022
d308a51
Merge branch 'main' into object_store
domderen Dec 7, 2022
e84d195
Fixing code formatting.
domderen Dec 7, 2022
2bd99de
Merge branch 'object_store' of https://github.com/domderen/nats.py in…
domderen Dec 7, 2022
166bc0a
Removing test that was added during incorrect merge.
domderen Dec 7, 2022
cb4bf45
Fixing CI issue.
domderen Dec 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
619 changes: 503 additions & 116 deletions Pipfile.lock

Large diffs are not rendered by default.

105 changes: 104 additions & 1 deletion nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from dataclasses import dataclass, fields, replace
from enum import Enum
from typing import Any, TypeVar
from typing import Any, Dict, Optional, TypeVar

_NANOSECOND = 10**9

Expand Down Expand Up @@ -574,3 +574,106 @@ def as_dict(self) -> dict[str, object]:
result = super().as_dict()
result['ttl'] = self._to_nanoseconds(self.ttl)
return result


@dataclass
class StreamPurgeRequest(Base):
"""
StreamPurgeRequest is optional request information to the purge API.
"""
# Purge up to but not including sequence.
seq: Optional[int] = None
# Subject to match against messages for the purge command.
filter: Optional[str] = None
# Number of messages to keep.
keep: Optional[int] = None


@dataclass
class ObjectStoreConfig(Base):
"""
ObjectStoreConfig is the configurigation of an ObjectStore.
"""
bucket: str
description: Optional[str] = None
ttl: Optional[float] = None
max_bytes: Optional[int] = None
storage: Optional[StorageType] = None
replicas: int = 1
placement: Optional[Placement] = None

def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result['ttl'] = self._to_nanoseconds(self.ttl)
return result


@dataclass
class ObjectLink(Base):
"""
ObjectLink is used to embed links to other buckets and objects.
"""
# Bucket is the name of the other object store.
bucket: str
# Name can be used to link to a single object.
# If empty means this is a link to the whole store, like a directory.
name: Optional[str] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
return super().from_response(resp)


@dataclass
class ObjectMetaOptions(Base):
link: Optional[ObjectLink] = None
max_chunk_size: Optional[int] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'link', ObjectLink)
return super().from_response(resp)


@dataclass
class ObjectMeta(Base):
"""
ObjectMeta is high level information about an object.
"""
name: str
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)


@dataclass
class ObjectInfo(Base):
"""
ObjectInfo is meta plus instance information.
"""
name: str
bucket: str
nuid: str
size: int
mtime: str
chunks: int
digest: Optional[str] = None
deleted: Optional[bool] = None
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None

def is_link(self) -> bool:
return self.options is not None and self.options.link is not None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, 'options', ObjectMetaOptions)
return super().from_response(resp)
90 changes: 88 additions & 2 deletions nats/js/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
import json
import time
from email.parser import BytesParser
from typing import TYPE_CHECKING, Awaitable, Callable
from typing import TYPE_CHECKING, Awaitable, Callable, Optional

import nats.errors
import nats.js.errors
from nats.aio.msg import Msg
from nats.aio.subscription import Subscription
from nats.js import api
from nats.js.errors import BadBucketError, BucketNotFoundError, NotFoundError
from nats.js.errors import BadBucketError, BucketNotFoundError, InvalidBucketNameError, NotFoundError
from nats.js.kv import KeyValue
from nats.js.manager import JetStreamManager
from nats.js.object_store import (
VALID_BUCKET_RE, OBJ_ALL_CHUNKS_PRE_TEMPLATE, OBJ_ALL_META_PRE_TEMPLATE,
OBJ_STREAM_TEMPLATE, ObjectStore
)

if TYPE_CHECKING:
from nats import NATS
Expand Down Expand Up @@ -988,6 +992,9 @@ async def _fetch_n(
######################

async def key_value(self, bucket: str) -> KeyValue:
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = KV_STREAM_TEMPLATE.format(bucket=bucket)
try:
si = await self.stream_info(stream)
Expand Down Expand Up @@ -1016,7 +1023,11 @@ async def create_key_value(
config = api.KeyValueConfig(bucket=params["bucket"])
config = config.evolve(**params)

if VALID_BUCKET_RE.match(config.bucket) is None:
raise InvalidBucketNameError

duplicate_window: float = 2 * 60 # 2 minutes

if config.ttl and config.ttl < duplicate_window:
duplicate_window = config.ttl

Expand Down Expand Up @@ -1058,5 +1069,80 @@ async def delete_key_value(self, bucket: str) -> bool:
delete_key_value deletes a JetStream KeyValue store by destroying
the associated stream.
"""
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = KV_STREAM_TEMPLATE.format(bucket=bucket)
return await self.delete_stream(stream)

#######################
# #
# ObjectStore Context #
# #
#######################

async def object_store(self, bucket: str) -> ObjectStore:
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket)
try:
await self.stream_info(stream)
except NotFoundError:
raise BucketNotFoundError

return ObjectStore(
name=bucket,
stream=stream,
js=self,
)

async def create_object_store(
self,
config: Optional[api.ObjectStoreConfig] = None,
**params,
) -> ObjectStore:
"""
create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream.
"""
if config is None:
config = api.ObjectStoreConfig(bucket=params["bucket"])
config = config.evolve(**params)

if VALID_BUCKET_RE.match(config.bucket) is None:
raise InvalidBucketNameError

name = config.bucket
chunks = OBJ_ALL_CHUNKS_PRE_TEMPLATE.format(bucket=name)
meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=name)

stream = api.StreamConfig(
name=OBJ_STREAM_TEMPLATE.format(bucket=config.bucket),
description=config.description,
subjects=[chunks, meta],
max_age=config.ttl,
max_bytes=config.max_bytes,
storage=config.storage,
num_replicas=config.replicas,
placement=config.placement,
discard=api.DiscardPolicy.NEW,
allow_rollup_hdrs=True,
)
await self.add_stream(stream)

assert stream.name is not None
return ObjectStore(
name=config.bucket,
stream=stream.name,
js=self,
)

async def delete_object_store(self, bucket: str) -> bool:
"""
delete_object_store will delete the underlying stream for the named object.
"""
if VALID_BUCKET_RE.match(bucket) is None:
raise InvalidBucketNameError

stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket)
return await self.delete_stream(stream)
35 changes: 35 additions & 0 deletions nats/js/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,38 @@ class KeyHistoryTooLargeError(KeyValueError):

def __str__(self) -> str:
return "nats: history limited to a max of 64"


class InvalidBucketNameError(Error):
"""
Raised when trying to create a KV or OBJ bucket with invalid name.
"""
pass


class InvalidObjectNameError(Error):
"""
Raised when trying to put an object in Object Store with invalid key.
"""
pass


class BadObjectMetaError(Error):
"""
Raised when trying to read corrupted metadata from Object Store.
"""
pass


class LinkIsABucketError(Error):
"""
Raised when trying to get object from Object Store that is a bucket.
"""
pass


class DigestMismatchError(Error):
"""
Raised when getting an object from Object Store that has a different digest than expected.
"""
pass
Loading