Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce BackendStateLifecycle abstraction #6114

Merged
merged 6 commits into from May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 12 additions & 24 deletions localstack/services/dynamodb/server.py
Expand Up @@ -8,6 +8,7 @@
from localstack.services.install import DDB_AGENT_JAR_PATH
from localstack.utils.aws import aws_stack
from localstack.utils.common import TMP_THREADS, ShellCommandThread, get_free_tcp_port, mkdir
from localstack.utils.files import rm_rf
from localstack.utils.run import FuncThread
from localstack.utils.serving import Server
from localstack.utils.sync import retry
Expand Down Expand Up @@ -95,28 +96,27 @@ def _log_listener(self, line, **_kwargs):
LOG.info(line.rstrip())


def create_dynamodb_server(port=None) -> DynamodbServer:
def create_dynamodb_server(
port=None, db_path: Optional[str] = None, clean_db_path: bool = False
) -> DynamodbServer:
"""
Creates a dynamodb server from the LocalStack configuration.
"""
port = port or get_free_tcp_port()
ddb_data_dir = f"{config.dirs.data}/dynamodb" if config.dirs.data else None
return do_create_dynamodb_server(port, ddb_data_dir)


def do_create_dynamodb_server(port: int, ddb_data_dir: Optional[str]) -> DynamodbServer:
server = DynamodbServer(port)
if ddb_data_dir:
mkdir(ddb_data_dir)
absolute_path = os.path.abspath(ddb_data_dir)
db_path = f"{config.dirs.data}/dynamodb" if not db_path and config.dirs.data else db_path
if db_path:
if clean_db_path:
rm_rf(db_path)
mkdir(db_path)
absolute_path = os.path.abspath(db_path)
server.db_path = absolute_path

server.heap_size = config.DYNAMODB_HEAP_SIZE
server.share_db = is_env_true("DYNAMODB_SHARE_DB")
server.optimize_db_before_startup = is_env_true("DYNAMODB_OPTIMIZE_DB_BEFORE_STARTUP")
server.delay_transient_statuses = is_env_true("DYNAMODB_DELAY_TRANSIENT_STATUSES")
server.cors = os.getenv("DYNAMODB_CORS", None)

return server


Expand All @@ -142,10 +142,10 @@ def check_dynamodb(expect_shutdown=False, print_error=False):
assert isinstance(out["TableNames"], list)


def start_dynamodb(port=None, asynchronous=True, update_listener=None):
def start_dynamodb(port=None, db_path=None, clean_db_path=False):
global _server
if not _server:
_server = create_dynamodb_server()
_server = create_dynamodb_server(port, db_path, clean_db_path)

_server.start()

Expand All @@ -154,15 +154,3 @@ def start_dynamodb(port=None, asynchronous=True, update_listener=None):

def get_server():
return _server


def restart_dynamodb():
global _server
if _server:
_server.shutdown()
_server.join(timeout=10)
_server = None

LOG.debug("Restarting DynamoDB process ...")
start_dynamodb()
wait_for_dynamodb()
27 changes: 21 additions & 6 deletions localstack/services/plugins.py
Expand Up @@ -133,11 +133,26 @@ def on_exception(self):
pass


class StateLifecycle:
def retrieve_state(self):
pass
class BackendStateLifecycle(abc.ABC):
"""
Interface that supports the retrieval, injection and restore of the backend for services.
"""

@abc.abstractmethod
def retrieve_state(self, **kwargs):
"""Retrieves the backend of a service"""

def inject_state(self, state):
@abc.abstractmethod
def inject_state(self, **kwargs):
"""Injects a backend for a service"""

@abc.abstractmethod
def reset_state(self):
"""Resets a backend for a service"""

@abc.abstractmethod
def on_after_reset(self):
"""Performed after the reset of a service"""
pass


Expand All @@ -151,7 +166,7 @@ def __init__(
active=False,
stop=None,
lifecycle_hook: ServiceLifecycleHook = None,
state_lifecycle: StateLifecycle = None,
backend_state_lifecycle: BackendStateLifecycle = None,
):
self.plugin_name = name
self.start_function = start
Expand All @@ -160,7 +175,7 @@ def __init__(
self.default_active = active
self.stop_function = stop
self.lifecycle_hook = lifecycle_hook or ServiceLifecycleHook()
self.state_lifecycle = state_lifecycle
self.backend_state_lifecycle = backend_state_lifecycle
call_safe(self.lifecycle_hook.on_after_init)

def start(self, asynchronous):
Expand Down