Skip to content

Commit

Permalink
Added hooks to the SFN provider (#7834)
Browse files Browse the repository at this point in the history
  • Loading branch information
giograno committed Mar 14, 2023
1 parent d47f509 commit fd25513
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
23 changes: 23 additions & 0 deletions localstack/services/stepfunctions/provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
import os
import threading

from localstack import config
Expand All @@ -16,12 +18,16 @@
from localstack.services.plugins import ServiceLifecycleHook
from localstack.services.stepfunctions.stepfunctions_starter import (
start_stepfunctions,
stop_stepfunctions,
wait_for_stepfunctions,
)
from localstack.state import AssetDirectory, StateVisitor

# lock to avoid concurrency issues when creating state machines in parallel (required for StepFunctions-Local)
CREATION_LOCK = threading.RLock()

LOG = logging.getLogger(__name__)


class StepFunctionsProvider(StepfunctionsApi, ServiceLifecycleHook):
def __init__(self):
Expand All @@ -31,10 +37,27 @@ def get_forward_url(self) -> str:
"""Return the URL of the backend StepFunctions server to forward requests to"""
return f"http://{LOCALHOST}:{config.LOCAL_PORT_STEPFUNCTIONS}"

def accept_state_visitor(self, visitor: StateVisitor):
visitor.visit(AssetDirectory(os.path.join(config.dirs.data, self.service)))

def on_before_start(self):
start_stepfunctions()
wait_for_stepfunctions()

def on_before_state_reset(self):
stop_stepfunctions()

def on_before_state_load(self):
stop_stepfunctions()

def on_after_state_reset(self):
start_stepfunctions()
wait_for_stepfunctions()

def on_after_state_load(self):
start_stepfunctions()
wait_for_stepfunctions()

def create_state_machine(
self, context: RequestContext, request: CreateStateMachineInput
) -> CreateStateMachineOutput:
Expand Down
25 changes: 21 additions & 4 deletions localstack/services/stepfunctions/stepfunctions_starter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
from typing import Optional
import subprocess

from localstack import config
from localstack.aws.accounts import get_aws_account_id
from localstack.services.infra import do_run, log_startup_message
from localstack.services.stepfunctions.packages import stepfunctions_local_package
from localstack.utils.aws import aws_stack
from localstack.utils.common import wait_for_port_open
from localstack.utils.net import wait_for_port_closed
from localstack.utils.run import ShellCommandThread, wait_for_process_to_be_killed
from localstack.utils.sync import retry

LOG = logging.getLogger(__name__)
Expand All @@ -15,7 +17,7 @@
MAX_HEAP_SIZE = "256m"

# todo: will be replaced with plugin mechanism
PROCESS_THREAD = None
PROCESS_THREAD: ShellCommandThread | subprocess.Popen | None = None


# TODO: pass env more explicitly
Expand Down Expand Up @@ -66,7 +68,7 @@ def get_command(backend_port):
return cmd


def start_stepfunctions(asynchronous=True, persistence_path: Optional[str] = None):
def start_stepfunctions(asynchronous: bool = True, persistence_path: str | None = None):
# TODO: introduce Server abstraction for StepFunctions process
global PROCESS_THREAD
backend_port = config.LOCAL_PORT_STEPFUNCTIONS
Expand All @@ -91,7 +93,22 @@ def wait_for_stepfunctions():
retry(check_stepfunctions, sleep=0.5, retries=15)


def check_stepfunctions(expect_shutdown=False, print_error=False):
def stop_stepfunctions():
if PROCESS_THREAD or not PROCESS_THREAD.process:
return
LOG.debug("Restarting StepFunctions process ...")

pid = PROCESS_THREAD.process.pid
PROCESS_THREAD.stop()
wait_for_port_closed(config.LOCAL_PORT_STEPFUNCTIONS, sleep_time=0.5, retries=15)
try:
# TODO: currently failing in CI (potentially due to a defunct process) - need to investigate!
wait_for_process_to_be_killed(pid, sleep=0.3, retries=10)
except Exception as e:
LOG.warning("StepFunctions process not properly terminated: %s", e)


def check_stepfunctions(expect_shutdown: bool = False, print_error: bool = False) -> None:
out = None
try:
wait_for_port_open(config.LOCAL_PORT_STEPFUNCTIONS, sleep_time=2)
Expand Down

0 comments on commit fd25513

Please sign in to comment.