Skip to content
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies = [
"pydantic>=2.0.0",
"py_trees>=2.2,<3.0",
"pyyaml>=6.0.3",
"slack-sdk>=3.38.0",
"slack-bolt>=1.27.0",
]

[project.optional-dependencies]
Expand Down
75 changes: 72 additions & 3 deletions src/redis_release/bht/behaviours.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def initialise(self) -> None:
return
self.workflow.inputs["release_tag"] = self.release_meta.tag
ref = self.package_meta.ref if self.package_meta.ref is not None else "main"
self.workflow.ephemeral.trigger_attempted = True
if self.log_once(
"workflow_trigger_start", self.workflow.ephemeral.log_once_flags
):
Expand Down Expand Up @@ -366,21 +365,40 @@ def update(self) -> Status:
return self.log_exception_and_return_failure(e)


class UpdateWorkflowStatus(ReleaseAction):
class UpdateWorkflowStatusUntilCompletion(ReleaseAction):
def __init__(
self,
name: str,
workflow: Workflow,
github_client: GitHubClientAsync,
package_meta: PackageMeta,
log_prefix: str = "",
timeout_seconds: int = 0,
cutoff: int = 0,
poll_interval: int = 3,
) -> None:
self.github_client = github_client
self.workflow = workflow
self.package_meta = package_meta
self.timeout_seconds = timeout_seconds
self.cutoff = cutoff
self.interval = poll_interval
self.start_time: Optional[float] = None
self.tick_count: int = 0
self.is_sleeping: bool = False
super().__init__(name=name, log_prefix=log_prefix)

def initialise(self) -> None:
self.logger.debug(
f"Initialise: timeout: {self.timeout_seconds}, cutoff: {self.cutoff}, interval: {self.interval}"
)
self.start_time = asyncio.get_event_loop().time()
self.is_sleeping = False
self.tick_count = 0
self.feedback_message = ""
self._initialise_status_task()

def _initialise_status_task(self) -> None:
if self.workflow.run_id is None:
self.logger.error(
"[red]Workflow run_id is None - cannot check completion[/red]"
Expand All @@ -398,6 +416,11 @@ def initialise(self) -> None:
self.package_meta.repo, self.workflow.run_id
)
)
self.is_sleeping = False

def _initialise_sleep_task(self) -> None:
self.task = asyncio.create_task(asyncio.sleep(self.interval))
self.is_sleeping = True

def update(self) -> Status:
try:
Expand All @@ -406,7 +429,15 @@ def update(self) -> Status:
if not self.task.done():
return Status.RUNNING

# If we just finished sleeping, switch back to status request
if self.is_sleeping:
self._initialise_status_task()
return Status.RUNNING

# We just finished a status request
result = self.task.result()
self.tick_count += 1

if self.log_once(
"workflow_status_current", self.workflow.ephemeral.log_once_flags
):
Expand All @@ -426,10 +457,48 @@ def update(self) -> Status:
self.feedback_message = (
f" {self.workflow.status}, {self.workflow.conclusion}"
)
return Status.SUCCESS

if self.workflow.conclusion is not None:
if self.workflow.conclusion == WorkflowConclusion.SUCCESS:
return Status.SUCCESS
self.feedback_message = f"Workflow failed"
return Status.FAILURE

# Check cutoff (0 means no limit)
if self.cutoff > 0 and self.tick_count >= self.cutoff:
self.logger.debug(f"Cutoff reached: {self.tick_count} ticks")
self.feedback_message = f"Cutoff reached: {self.tick_count}"
return Status.FAILURE

# Check timeout (0 means no limit)
if self.timeout_seconds > 0 and self.start_time is not None:
elapsed = asyncio.get_event_loop().time() - self.start_time
self.feedback_message = (
f"{self.feedback_message}, elapsed: {elapsed:.1f}s"
)
if elapsed >= self.timeout_seconds:
self.logger.debug(f"Timeout reached: {elapsed:.1f}s")
self.feedback_message = (
f"Timed out: {elapsed:.1f}s of {self.timeout_seconds}s"
)
self.workflow.ephemeral.wait_for_completion_timed_out = True
return Status.FAILURE

# Switch to sleep task
self._initialise_sleep_task()
return Status.RUNNING

except Exception as e:
return self.log_exception_and_return_failure(e)

def terminate(self, new_status: Status) -> None:
"""Cancel the current task if it's running."""
if self.task is not None and not self.task.done():
self.task.cancel()
self.logger.debug(
f"Cancelled task during terminate with status: {new_status}"
)


class Sleep(LoggingAction):

Expand Down
100 changes: 36 additions & 64 deletions src/redis_release/bht/composites.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
Sleep,
)
from .behaviours import TriggerWorkflow as TriggerWorkflow
from .behaviours import UpdateWorkflowStatus
from .decorators import FlagGuard
from .behaviours import UpdateWorkflowStatusUntilCompletion
from .decorators import ConditionGuard, FlagGuard, StatusFlagGuard
from .state import Package, PackageMeta, ReleaseMeta, Workflow


Expand Down Expand Up @@ -110,7 +110,7 @@ def tick(self) -> Iterator[Behaviour]:
yield self


class FindWorkflowByUUID(FlagGuard):
class FindWorkflowByUUID(StatusFlagGuard):
max_retries: int = 3
poll_interval: int = 5

Expand Down Expand Up @@ -145,12 +145,12 @@ def __init__(
None if name == "" else name,
identify_loop,
workflow.ephemeral,
"identify_failed",
"identify_workflow",
log_prefix=log_prefix,
)


class WaitForWorkflowCompletion(FlagGuard):
class WaitForWorkflowCompletion(StatusFlagGuard):
poll_interval: int
timeout_seconds: int

Expand All @@ -166,36 +166,26 @@ def __init__(
self.poll_interval = poll_interval
self.timeout_seconds = workflow.timeout_minutes * 60

update_workflow_status = UpdateWorkflowStatus(
"Update Workflow Status",
update_workflow_status = UpdateWorkflowStatusUntilCompletion(
"Update status until completion",
workflow,
github_client,
package_meta,
log_prefix=log_prefix,
timeout_seconds=self.timeout_seconds,
poll_interval=self.poll_interval,
)
update_workflow_status_with_pause = Sequence(
"Update Workflow Status with Pause",
memory=True,
children=[
Sleep("Sleep", self.poll_interval, log_prefix=log_prefix),
update_workflow_status,
],
)

super().__init__(
None,
Timeout(
f"Timeout {workflow.timeout_minutes}m",
Repeat("Repeat", update_workflow_status_with_pause, -1),
self.timeout_seconds,
),
update_workflow_status,
workflow.ephemeral,
"timed_out",
"wait_for_completion",
"wait_for_completion_message",
log_prefix=log_prefix,
)


class TriggerWorkflowGuarded(FlagGuard):
class TriggerWorkflowGuarded(StatusFlagGuard):
def __init__(
self,
name: str,
Expand All @@ -217,12 +207,12 @@ def __init__(
None if name == "" else name,
trigger_workflow,
workflow.ephemeral,
"trigger_failed",
"trigger_workflow",
log_prefix=log_prefix,
)


class IdentifyTargetRefGuarded(FlagGuard):
class IdentifyTargetRefGuarded(StatusFlagGuard):
def __init__(
self,
name: str,
Expand All @@ -241,12 +231,12 @@ def __init__(
log_prefix=log_prefix,
),
package_meta.ephemeral,
"identify_ref_failed",
"identify_ref",
log_prefix=log_prefix,
)


class DownloadArtifactsListGuarded(FlagGuard):
class DownloadArtifactsListGuarded(StatusFlagGuard):
def __init__(
self,
name: str,
Expand All @@ -265,12 +255,12 @@ def __init__(
log_prefix=log_prefix,
),
workflow.ephemeral,
"artifacts_download_failed",
"download_artifacts",
log_prefix=log_prefix,
)


class ExtractArtifactResultGuarded(FlagGuard):
class ExtractArtifactResultGuarded(StatusFlagGuard):
def __init__(
self,
name: str,
Expand All @@ -291,7 +281,7 @@ def __init__(
log_prefix=log_prefix,
),
workflow.ephemeral,
"extract_result_failed",
"extract_artifact_result",
log_prefix=log_prefix,
)

Expand Down Expand Up @@ -326,7 +316,7 @@ def __init__(
)


class RestartPackageGuarded(FlagGuard):
class RestartPackageGuarded(ConditionGuard):
"""
Reset package if we didn't trigger the workflow in current run
This is intended to be used for build workflow since if build has failed
Expand All @@ -353,29 +343,20 @@ def __init__(
reset_package_state_running = SuccessIsRunning(
"Success is Running", reset_package_state
)
reset_package_state_guarded = FlagGuard(
None if name == "" else name,
reset_package_state_running,
package.meta.ephemeral,
"identify_ref_failed",
flag_value=True,
raise_on=[],
guard_status=Status.FAILURE,
log_prefix=log_prefix,
)

super().__init__(
None if name == "" else name,
reset_package_state_guarded,
workflow.ephemeral,
"trigger_attempted",
flag_value=True,
raise_on=[],
name,
# Don't restart if we already triggered the workflow or if ref is not set or workflow has timed out
condition=lambda: workflow.ephemeral.trigger_workflow is not None
or package.meta.ref is None
or workflow.ephemeral.wait_for_completion_timed_out is True,
child=reset_package_state_running,
guard_status=Status.FAILURE,
log_prefix=log_prefix,
)


class RestartWorkflowGuarded(FlagGuard):
class RestartWorkflowGuarded(ConditionGuard):
"""
Reset workflow if we didn't trigger the workflow in current run and if there was no identify target ref error

Expand All @@ -401,23 +382,14 @@ def __init__(
reset_workflow_state_running = SuccessIsRunning(
"Success is Running", reset_workflow_state
)
reset_workflow_state_guarded = FlagGuard(
None if name == "" else name,
reset_workflow_state_running,
package_meta.ephemeral,
"identify_ref_failed",
flag_value=True,
raise_on=[],
guard_status=Status.FAILURE,
log_prefix=log_prefix,
)

super().__init__(
None if name == "" else name,
reset_workflow_state_guarded,
workflow.ephemeral,
"trigger_attempted",
flag_value=True,
raise_on=[],
name,
# Don't restart if we already triggered the workflow or if ref is not set or workflow has timed out
condition=lambda: workflow.ephemeral.trigger_workflow is not None
or package_meta.ref is None
or workflow.ephemeral.wait_for_completion_timed_out is True,
child=reset_workflow_state_running,
guard_status=Status.FAILURE,
log_prefix=log_prefix,
)
Loading