Skip to content

Commit

Permalink
Implement basic Rocksdb storage for state based on rocksdict lib
Browse files Browse the repository at this point in the history
- Add RocksDBStorage to access the db partition
- Add TransactionStore to atomically flush the updates
  • Loading branch information
daniil-quix committed Oct 5, 2023
1 parent 22523ae commit c62a9a2
Show file tree
Hide file tree
Showing 13 changed files with 947 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/StreamingDataFrames/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
confluent-kafka>=2.2,<2.3
requests>=2.28
typing_extensions>=4.8
rocksdict>=0.3, <0.4
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .stores import *
from .exceptions import *
from .options import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from streamingdataframes.exceptions import QuixException

__all__ = ("StateSerializationError", "StateTransactionError", "NestedPrefixError")


class StateSerializationError(QuixException):
...


class StateTransactionError(QuixException):
...


class NestedPrefixError(QuixException):
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import dataclasses
from typing import Optional

import rocksdict

from .types import RocksDBOptionsProto

__all__ = ("RocksDBOptions",)


@dataclasses.dataclass(frozen=True)
class RocksDBOptions(RocksDBOptionsProto):
"""
Common RocksDB database options.
Please see `rocksdict.Options` for a complete description of each option.
To provide extra options that are not presented in this class, feel free
to override it and specify the additional values.
"""

write_buffer_size: int = 64 * 1024 * 1024 # 64MB
target_file_size_base: int = 64 * 1024 * 1024 # 64MB
max_write_buffer_number: int = 3
block_cache_size: int = 128 * 1024 * 1024 # 128MB
enable_pipelined_write: bool = False
wal_dir: Optional[str] = None
db_log_dir: Optional[str] = None

def to_options(self) -> rocksdict.Options:
"""
Convert parameters to `rocksdict.Options`
:return: instance of `rocksdict.Options`
"""
opts = rocksdict.Options(raw_mode=True)
opts.create_if_missing(True)
opts.set_write_buffer_size(self.write_buffer_size)
opts.set_target_file_size_base(self.target_file_size_base)
opts.set_max_write_buffer_number(self.max_write_buffer_number)
opts.set_enable_pipelined_write(self.enable_pipelined_write)
if self.wal_dir is not None:
opts.set_wal_dir(self.wal_dir)
if self.db_log_dir is not None:
opts.set_db_log_dir(self.db_log_dir)

table_factory_options = rocksdict.BlockBasedOptions()
table_factory_options.set_block_cache(rocksdict.Cache(self.block_cache_size))
opts.set_block_based_table_factory(table_factory_options)
return opts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
from typing import Any, Optional

from .exceptions import StateSerializationError
from .types import DumpsFunc, LoadsFunc

__all__ = (
"serialize",
"deserialize",
"serialize_key",
)


def _default_dumps(value: Any) -> bytes:
return json.dumps(value, separators=(",", ":")).encode()


def _default_loads(value: bytes) -> Any:
return json.loads(value)


def serialize(value: Any, dumps: Optional[DumpsFunc] = None) -> bytes:
dumps = dumps or _default_dumps
try:
return dumps(value)
except Exception as exc:
raise StateSerializationError(f'Failed to serialize value: "{value}"') from exc


def deserialize(value: bytes, loads: Optional[LoadsFunc] = None) -> Any:
loads = loads or _default_loads
try:
return loads(value)
except Exception as exc:
raise StateSerializationError(
f'Failed to deserialize value: "{value}"'
) from exc


def serialize_key(
key: Any, prefix: bytes = b"", dumps: Optional[DumpsFunc] = None
) -> bytes:
dumps = dumps or _default_dumps
return prefix + serialize(key, dumps=dumps)
Loading

0 comments on commit c62a9a2

Please sign in to comment.