diff --git a/pyproject.toml b/pyproject.toml index bcd5fad..2133c42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dependencies = [ "pydantic>=2.0.0", "py_trees>=2.2,<3.0", "pyyaml>=6.0.3", + "slack-sdk>=3.38.0", + "slack-bolt>=1.27.0", ] [project.optional-dependencies] diff --git a/src/redis_release/bht/behaviours.py b/src/redis_release/bht/behaviours.py index 62c1e92..738e624 100644 --- a/src/redis_release/bht/behaviours.py +++ b/src/redis_release/bht/behaviours.py @@ -264,7 +264,6 @@ def initialise(self) -> None: return self.workflow.inputs["release_tag"] = self.release_meta.tag ref = self.package_meta.ref if self.package_meta.ref is not None else "main" - self.workflow.ephemeral.trigger_attempted = True if self.log_once( "workflow_trigger_start", self.workflow.ephemeral.log_once_flags ): @@ -366,7 +365,7 @@ def update(self) -> Status: return self.log_exception_and_return_failure(e) -class UpdateWorkflowStatus(ReleaseAction): +class UpdateWorkflowStatusUntilCompletion(ReleaseAction): def __init__( self, name: str, @@ -374,13 +373,32 @@ def __init__( github_client: GitHubClientAsync, package_meta: PackageMeta, log_prefix: str = "", + timeout_seconds: int = 0, + cutoff: int = 0, + poll_interval: int = 3, ) -> None: self.github_client = github_client self.workflow = workflow self.package_meta = package_meta + self.timeout_seconds = timeout_seconds + self.cutoff = cutoff + self.interval = poll_interval + self.start_time: Optional[float] = None + self.tick_count: int = 0 + self.is_sleeping: bool = False super().__init__(name=name, log_prefix=log_prefix) def initialise(self) -> None: + self.logger.debug( + f"Initialise: timeout: {self.timeout_seconds}, cutoff: {self.cutoff}, interval: {self.interval}" + ) + self.start_time = asyncio.get_event_loop().time() + self.is_sleeping = False + self.tick_count = 0 + self.feedback_message = "" + self._initialise_status_task() + + def _initialise_status_task(self) -> None: if self.workflow.run_id is None: self.logger.error( "[red]Workflow run_id is None - cannot check completion[/red]" @@ -398,6 +416,11 @@ def initialise(self) -> None: self.package_meta.repo, self.workflow.run_id ) ) + self.is_sleeping = False + + def _initialise_sleep_task(self) -> None: + self.task = asyncio.create_task(asyncio.sleep(self.interval)) + self.is_sleeping = True def update(self) -> Status: try: @@ -406,7 +429,15 @@ def update(self) -> Status: if not self.task.done(): return Status.RUNNING + # If we just finished sleeping, switch back to status request + if self.is_sleeping: + self._initialise_status_task() + return Status.RUNNING + + # We just finished a status request result = self.task.result() + self.tick_count += 1 + if self.log_once( "workflow_status_current", self.workflow.ephemeral.log_once_flags ): @@ -426,10 +457,48 @@ def update(self) -> Status: self.feedback_message = ( f" {self.workflow.status}, {self.workflow.conclusion}" ) - return Status.SUCCESS + + if self.workflow.conclusion is not None: + if self.workflow.conclusion == WorkflowConclusion.SUCCESS: + return Status.SUCCESS + self.feedback_message = f"Workflow failed" + return Status.FAILURE + + # Check cutoff (0 means no limit) + if self.cutoff > 0 and self.tick_count >= self.cutoff: + self.logger.debug(f"Cutoff reached: {self.tick_count} ticks") + self.feedback_message = f"Cutoff reached: {self.tick_count}" + return Status.FAILURE + + # Check timeout (0 means no limit) + if self.timeout_seconds > 0 and self.start_time is not None: + elapsed = asyncio.get_event_loop().time() - self.start_time + self.feedback_message = ( + f"{self.feedback_message}, elapsed: {elapsed:.1f}s" + ) + if elapsed >= self.timeout_seconds: + self.logger.debug(f"Timeout reached: {elapsed:.1f}s") + self.feedback_message = ( + f"Timed out: {elapsed:.1f}s of {self.timeout_seconds}s" + ) + self.workflow.ephemeral.wait_for_completion_timed_out = True + return Status.FAILURE + + # Switch to sleep task + self._initialise_sleep_task() + return Status.RUNNING + except Exception as e: return self.log_exception_and_return_failure(e) + def terminate(self, new_status: Status) -> None: + """Cancel the current task if it's running.""" + if self.task is not None and not self.task.done(): + self.task.cancel() + self.logger.debug( + f"Cancelled task during terminate with status: {new_status}" + ) + class Sleep(LoggingAction): diff --git a/src/redis_release/bht/composites.py b/src/redis_release/bht/composites.py index b715a70..0a43a8b 100644 --- a/src/redis_release/bht/composites.py +++ b/src/redis_release/bht/composites.py @@ -30,8 +30,8 @@ Sleep, ) from .behaviours import TriggerWorkflow as TriggerWorkflow -from .behaviours import UpdateWorkflowStatus -from .decorators import FlagGuard +from .behaviours import UpdateWorkflowStatusUntilCompletion +from .decorators import ConditionGuard, FlagGuard, StatusFlagGuard from .state import Package, PackageMeta, ReleaseMeta, Workflow @@ -110,7 +110,7 @@ def tick(self) -> Iterator[Behaviour]: yield self -class FindWorkflowByUUID(FlagGuard): +class FindWorkflowByUUID(StatusFlagGuard): max_retries: int = 3 poll_interval: int = 5 @@ -145,12 +145,12 @@ def __init__( None if name == "" else name, identify_loop, workflow.ephemeral, - "identify_failed", + "identify_workflow", log_prefix=log_prefix, ) -class WaitForWorkflowCompletion(FlagGuard): +class WaitForWorkflowCompletion(StatusFlagGuard): poll_interval: int timeout_seconds: int @@ -166,36 +166,26 @@ def __init__( self.poll_interval = poll_interval self.timeout_seconds = workflow.timeout_minutes * 60 - update_workflow_status = UpdateWorkflowStatus( - "Update Workflow Status", + update_workflow_status = UpdateWorkflowStatusUntilCompletion( + "Update status until completion", workflow, github_client, package_meta, log_prefix=log_prefix, + timeout_seconds=self.timeout_seconds, + poll_interval=self.poll_interval, ) - update_workflow_status_with_pause = Sequence( - "Update Workflow Status with Pause", - memory=True, - children=[ - Sleep("Sleep", self.poll_interval, log_prefix=log_prefix), - update_workflow_status, - ], - ) - super().__init__( None, - Timeout( - f"Timeout {workflow.timeout_minutes}m", - Repeat("Repeat", update_workflow_status_with_pause, -1), - self.timeout_seconds, - ), + update_workflow_status, workflow.ephemeral, - "timed_out", + "wait_for_completion", + "wait_for_completion_message", log_prefix=log_prefix, ) -class TriggerWorkflowGuarded(FlagGuard): +class TriggerWorkflowGuarded(StatusFlagGuard): def __init__( self, name: str, @@ -217,12 +207,12 @@ def __init__( None if name == "" else name, trigger_workflow, workflow.ephemeral, - "trigger_failed", + "trigger_workflow", log_prefix=log_prefix, ) -class IdentifyTargetRefGuarded(FlagGuard): +class IdentifyTargetRefGuarded(StatusFlagGuard): def __init__( self, name: str, @@ -241,12 +231,12 @@ def __init__( log_prefix=log_prefix, ), package_meta.ephemeral, - "identify_ref_failed", + "identify_ref", log_prefix=log_prefix, ) -class DownloadArtifactsListGuarded(FlagGuard): +class DownloadArtifactsListGuarded(StatusFlagGuard): def __init__( self, name: str, @@ -265,12 +255,12 @@ def __init__( log_prefix=log_prefix, ), workflow.ephemeral, - "artifacts_download_failed", + "download_artifacts", log_prefix=log_prefix, ) -class ExtractArtifactResultGuarded(FlagGuard): +class ExtractArtifactResultGuarded(StatusFlagGuard): def __init__( self, name: str, @@ -291,7 +281,7 @@ def __init__( log_prefix=log_prefix, ), workflow.ephemeral, - "extract_result_failed", + "extract_artifact_result", log_prefix=log_prefix, ) @@ -326,7 +316,7 @@ def __init__( ) -class RestartPackageGuarded(FlagGuard): +class RestartPackageGuarded(ConditionGuard): """ Reset package if we didn't trigger the workflow in current run This is intended to be used for build workflow since if build has failed @@ -353,29 +343,20 @@ def __init__( reset_package_state_running = SuccessIsRunning( "Success is Running", reset_package_state ) - reset_package_state_guarded = FlagGuard( - None if name == "" else name, - reset_package_state_running, - package.meta.ephemeral, - "identify_ref_failed", - flag_value=True, - raise_on=[], - guard_status=Status.FAILURE, - log_prefix=log_prefix, - ) + super().__init__( - None if name == "" else name, - reset_package_state_guarded, - workflow.ephemeral, - "trigger_attempted", - flag_value=True, - raise_on=[], + name, + # Don't restart if we already triggered the workflow or if ref is not set or workflow has timed out + condition=lambda: workflow.ephemeral.trigger_workflow is not None + or package.meta.ref is None + or workflow.ephemeral.wait_for_completion_timed_out is True, + child=reset_package_state_running, guard_status=Status.FAILURE, log_prefix=log_prefix, ) -class RestartWorkflowGuarded(FlagGuard): +class RestartWorkflowGuarded(ConditionGuard): """ Reset workflow if we didn't trigger the workflow in current run and if there was no identify target ref error @@ -401,23 +382,14 @@ def __init__( reset_workflow_state_running = SuccessIsRunning( "Success is Running", reset_workflow_state ) - reset_workflow_state_guarded = FlagGuard( - None if name == "" else name, - reset_workflow_state_running, - package_meta.ephemeral, - "identify_ref_failed", - flag_value=True, - raise_on=[], - guard_status=Status.FAILURE, - log_prefix=log_prefix, - ) + super().__init__( - None if name == "" else name, - reset_workflow_state_guarded, - workflow.ephemeral, - "trigger_attempted", - flag_value=True, - raise_on=[], + name, + # Don't restart if we already triggered the workflow or if ref is not set or workflow has timed out + condition=lambda: workflow.ephemeral.trigger_workflow is not None + or package_meta.ref is None + or workflow.ephemeral.wait_for_completion_timed_out is True, + child=reset_workflow_state_running, guard_status=Status.FAILURE, log_prefix=log_prefix, ) diff --git a/src/redis_release/bht/decorators.py b/src/redis_release/bht/decorators.py index 42a7abb..6bcc0d9 100644 --- a/src/redis_release/bht/decorators.py +++ b/src/redis_release/bht/decorators.py @@ -1,5 +1,5 @@ import logging -from typing import Iterator, List, Optional +from typing import Callable, Iterator, List, Optional from py_trees.decorators import Decorator, behaviour, common from pydantic import BaseModel @@ -21,9 +21,59 @@ def __init__( ) +class ConditionGuard(DecoratorWithLogging): + """ + A decorator that guards behaviour execution based on a condition function. + + If the condition function returns True, the guard returns guard_status + and does not execute the decorated behaviour. + """ + + def __init__( + self, + name: str, + child: behaviour.Behaviour, + condition: Callable[[], bool], + guard_status: common.Status, + log_prefix: str = "", + ): + self.condition = condition + self.guard_status = guard_status + super(ConditionGuard, self).__init__( + name=name, child=child, log_prefix=log_prefix + ) + + def update(self) -> common.Status: + if self.condition(): + self.logger.debug( + f"Condition met, returning guard status: {self.guard_status}" + ) + return self.guard_status + self.logger.debug( + f"Condition not met, returning child status: {self.decorated.status}" + ) + return self.decorated.status + + def tick(self) -> Iterator[behaviour.Behaviour]: + """ + Tick the child or bounce back with guard status if condition is met. + + Yields: + a reference to itself or a behaviour in it's child subtree + """ + if self.condition(): + # ignore the child, condition is met + for node in behaviour.Behaviour.tick(self): + yield node + else: + # tick the child, condition not met + for node in Decorator.tick(self): + yield node + + class FlagGuard(DecoratorWithLogging): """ - A decorator that guards behaviour execution based on a flag value. + A decorator that guards behaviour execution based on a boolean flag value. If the flag in the container matches the expected flag_value, the guard returns guard_status immediately without executing the decorated behaviour. @@ -116,3 +166,103 @@ def terminate(self, new_status: common.Status) -> None: ) else: self.logger.debug(f"Terminating with status {new_status}, no flag change") + + +class StatusFlagGuard(DecoratorWithLogging): + """ + A decorator that guards behaviour execution based on a status flag value. + + In contrast to FlagGuard, flag may have 4 values: None, SUCCESS, FAILURE, RUNNING + + If the flag in the container matches the guard_status, the guard + returns guard_status immediately without executing the decorated behaviour. + + On any child status update, the flag is set to the child's status value. + + Args: + name: the decorator name + child: the child behaviour or subtree + container: the BaseModel instance containing the flag + flag: the name of the flag field in the container (can hold common.Status or None) + message_field: optional name of the field in the container that holds additional message + guard_status: the status that prevents execution (FAILURE or SUCCESS, default: FAILURE) + """ + + def __init__( + self, + name: Optional[str], + child: behaviour.Behaviour, + container: BaseModel, + flag: str, + message_field: Optional[str] = None, + guard_status: common.Status = common.Status.FAILURE, + log_prefix: str = "", + ): + if guard_status not in (common.Status.FAILURE, common.Status.SUCCESS): + raise ValueError( + f"guard_status must be FAILURE or SUCCESS, got {guard_status}" + ) + + if not hasattr(container, flag): + raise ValueError( + f"Field '{flag}' does not exist on {container.__class__.__name__}" + ) + + if message_field is not None and not hasattr(container, message_field): + raise ValueError( + f"Field '{message_field}' does not exist on {container.__class__.__name__}" + ) + + current_value = getattr(container, flag) + if current_value is not None and not isinstance(current_value, common.Status): + raise TypeError( + f"Field '{flag}' must be either common.Status or None, got {type(current_value)}" + ) + + self.container = container + self.flag = flag + self.message_field = message_field + self.guard_status = guard_status + if name is None: + status_text = ( + "failed" if guard_status == common.Status.FAILURE else "succeeded" + ) + name = f"Unless {flag} {status_text}" + super(StatusFlagGuard, self).__init__( + name=name, child=child, log_prefix=log_prefix + ) + + def _is_guard_active(self) -> bool: + current_flag_value = getattr(self.container, self.flag, None) + return current_flag_value == self.guard_status + + def update(self) -> common.Status: + current_flag_value = getattr(self.container, self.flag, None) + if current_flag_value == self.guard_status: + self.logger.debug(f"Returning guard status: {self.guard_status}") + return self.guard_status + + child_status = self.decorated.status + # Update flag with child's current status + setattr(self.container, self.flag, child_status) + if self.message_field is not None: + setattr(self.container, self.message_field, self.decorated.feedback_message) + self.logger.debug(f"Updated {self.flag} to {child_status}") + self.feedback_message = f"{self.flag} set to {child_status}" + return child_status + + def tick(self) -> Iterator[behaviour.Behaviour]: + """ + Tick the child or bounce back with the original status if guard is active. + + Yields: + a reference to itself or a behaviour in it's child subtree + """ + if self._is_guard_active(): + # ignore the child + for node in behaviour.Behaviour.tick(self): + yield node + else: + # tick the child + for node in Decorator.tick(self): + yield node diff --git a/src/redis_release/bht/state.py b/src/redis_release/bht/state.py index 380302b..5345749 100644 --- a/src/redis_release/bht/state.py +++ b/src/redis_release/bht/state.py @@ -2,11 +2,11 @@ import logging from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Optional, Union +from py_trees import common +from py_trees.common import Status from pydantic import BaseModel, Field -from rich.console import Console -from rich.table import Table from redis_release.models import ( PackageType, @@ -20,17 +20,68 @@ logger = logging.getLogger(__name__) +SUPPORTED_STATE_VERSION = 2 + class WorkflowEphemeral(BaseModel): - """Ephemeral workflow state that is not persisted.""" + """Ephemeral workflow state. Reset on each run. + + The main purpose of ephemeral fields is to prevent retry loops and to allow extensive status reporting. + + Each workflow step has a pair of fields indicating the step status: + One ephemeral field is set when the step is attempted. It may have four states: + - `None` (default): Step has not been attempted + - `common.Status.RUNNING`: Step is currently running + - `common.Status.FAILURE`: Step has been attempted and failed + - `common.Status.SUCCESS`: Step has been attempted and succeeded + + Ephemeral fields are reset on each run. Their values are persisted but only until + next run is started. + So they indicate either current (if run is in progress) or last run state. + + The other field indicates the step result, it may either have some value or be empty. + + For example for trigger step we have `trigger_workflow` ephemeral + and `triggered_at` result fields. + + Optional message field may be used to provide additional information about the step. + For example wait_for_completion_message may contain information about timeout. + + Given combination of ephemeral and result fields we can determine step status. + Each step may be in one of the following states: + Not started + Failed + Succeeded or OK + Incorrect (this shouln't happen) + + The following decision table show how step status is determined for trigger step. + In general this is applicable to all steps. + + tigger_workflow -> | None (default) | Running | Failure | Success | + triggered_at: | | | | | + None | Not started | In progress | Failed | Incorrect | + Has value | OK | Incorrect | Incorrect | OK | + + The result field (triggered_at in this case) should not be set while step is + running, if step was not started or if it's failed. + And it should be set if trigger_workflow is successful. + It may be set if trigger_workflow is None, which is the case when release + process was restarted and all ephemeral fields are reset, but the particular + step was successful in previous run. - trigger_failed: bool = False - trigger_attempted: bool = False - identify_failed: bool = False - timed_out: bool = False - artifacts_download_failed: bool = False - extract_result_failed: bool = False - log_once_flags: Dict[str, bool] = Field(default_factory=dict) + Correct values are not eforced it's up to the implementation to correctly + set the fields. + """ + + identify_workflow: Optional[common.Status] = None + trigger_workflow: Optional[common.Status] = None + wait_for_completion: Optional[common.Status] = None + wait_for_completion_message: Optional[str] = None + wait_for_completion_timed_out: Optional[bool] = False + download_artifacts: Optional[common.Status] = None + extract_artifact_result: Optional[common.Status] = None + + log_once_flags: Dict[str, bool] = Field(default_factory=dict, exclude=True) class Workflow(BaseModel): @@ -47,17 +98,19 @@ class Workflow(BaseModel): conclusion: Optional[WorkflowConclusion] = None artifacts: Optional[Dict[str, Any]] = None result: Optional[Dict[str, Any]] = None - ephemeral: WorkflowEphemeral = Field( - default_factory=WorkflowEphemeral, exclude=True - ) + ephemeral: WorkflowEphemeral = Field(default_factory=WorkflowEphemeral) class PackageMetaEphemeral(BaseModel): - """Ephemeral package metadata that is not persisted.""" + """Ephemeral package metadata. Reset on each run. + + See WorkflowEphemeral for more details. + """ force_rebuild: bool = False identify_ref_failed: bool = False - log_once_flags: Dict[str, bool] = Field(default_factory=dict) + identify_ref: Optional[common.Status] = None + log_once_flags: Dict[str, bool] = Field(default_factory=dict, exclude=True) class PackageMeta(BaseModel): @@ -67,9 +120,7 @@ class PackageMeta(BaseModel): repo: str = "" ref: Optional[str] = None publish_internal_release: bool = False - ephemeral: PackageMetaEphemeral = Field( - default_factory=PackageMetaEphemeral, exclude=True - ) + ephemeral: PackageMetaEphemeral = Field(default_factory=PackageMetaEphemeral) class Package(BaseModel): @@ -81,9 +132,12 @@ class Package(BaseModel): class ReleaseMetaEphemeral(BaseModel): - """Ephemeral release metadata that is not persisted.""" + """Ephemeral release metadata. Reset on each run. - log_once_flags: Dict[str, bool] = Field(default_factory=dict) + See WorkflowEphemeral for more details. + """ + + log_once_flags: Dict[str, bool] = Field(default_factory=dict, exclude=True) class ReleaseMeta(BaseModel): @@ -91,14 +145,13 @@ class ReleaseMeta(BaseModel): tag: Optional[str] = None release_type: Optional[ReleaseType] = None - ephemeral: ReleaseMetaEphemeral = Field( - default_factory=ReleaseMetaEphemeral, exclude=True - ) + ephemeral: ReleaseMetaEphemeral = Field(default_factory=ReleaseMetaEphemeral) class ReleaseState(BaseModel): """Release state adapted for behavior tree usage.""" + version: int = 2 meta: ReleaseMeta = Field(default_factory=ReleaseMeta) packages: Dict[str, Package] = Field(default_factory=dict) @@ -178,6 +231,11 @@ def from_json(cls, data: Union[str, Dict, Path]) -> "ReleaseState": else: json_data = data + if json_data.get("version") != SUPPORTED_STATE_VERSION: + raise ValueError( + f"Unsupported state version: {json_data.get('version')}, " + f"expected: {SUPPORTED_STATE_VERSION}" + ) return cls(**json_data) @@ -205,181 +263,3 @@ def reset_model_to_defaults(target: BaseModel, default: BaseModel) -> None: else: # Simple value, copy directly setattr(target, field_name, default_value) - - -def print_state_table(state: ReleaseState, console: Optional[Console] = None) -> None: - """Print table showing the release state. - - Args: - state: The ReleaseState to display - console: Optional Rich Console instance (creates new one if not provided) - """ - if console is None: - console = Console() - - # Create table with title - table = Table( - title=f"[bold cyan]Release State: {state.meta.tag or 'N/A'}[/bold cyan]", - show_header=True, - header_style="bold magenta", - border_style="bright_blue", - title_style="bold cyan", - ) - - # Add columns - table.add_column("Package", style="cyan", no_wrap=True, width=20) - table.add_column("Build", justify="center", width=15) - table.add_column("Publish", justify="center", width=15) - table.add_column("Details", style="yellow", width=40) - - # Process each package - for package_name, package in sorted(state.packages.items()): - # Determine build status - build_status = _get_workflow_status_display(package.build) - - # Determine publish status - publish_status = _get_workflow_status_display(package.publish) - - # Collect details from workflows - details = _collect_details(package) - - # Add row to table - table.add_row( - package_name, - build_status, - publish_status, - details, - ) - - # Print the table - console.print() - console.print(table) - console.print() - - -def _get_workflow_status_display(workflow: Workflow) -> str: - """Get a rich-formatted status display for a workflow. - - Args: - workflow: The workflow to check - - Returns: - Rich-formatted status string - """ - # Check result field - if we have result, we succeeded - if workflow.result is not None: - return "[bold green]✓ Success[/bold green]" - - # Check if workflow was triggered - if workflow.triggered_at is None: - return "[dim]− Not Started[/dim]" - - # Workflow was triggered but no result - it failed - return "[bold red]✗ Failed[/bold red]" - - -def _collect_workflow_details(workflow: Workflow, prefix: str) -> List[str]: - """Collect details from a workflow using bottom-up approach. - - Shows successes until the first failure, then stops. - Bottom-up means: trigger → identify → timeout → conclusion → artifacts → result - - Args: - workflow: The workflow to check - prefix: Prefix for detail messages (e.g., "Build" or "Publish") - - Returns: - List of detail strings - """ - details: List[str] = [] - - # Stage 1: Trigger (earliest/bottom) - if workflow.ephemeral.trigger_failed or workflow.triggered_at is None: - details.append(f"[red]✗ Trigger {prefix} workflow failed[/red]") - return details - else: - details.append(f"[green]✓ {prefix} workflow triggered[/green]") - - # Stage 2: Identify - if workflow.ephemeral.identify_failed or workflow.run_id is None: - details.append(f"[red]✗ {prefix} workflow not found[/red]") - return details - else: - details.append(f"[green]✓ {prefix} workflow found[/green]") - - # Stage 3: Timeout (only ephemeral) - if workflow.ephemeral.timed_out: - details.append(f"[yellow]⏱ {prefix} timed out[/yellow]") - return details - - # Stage 4: Workflow conclusion - if workflow.conclusion == WorkflowConclusion.FAILURE: - details.append(f"[red]✗ {prefix} workflow failed[/red]") - return details - - # Stage 5: Artifacts download - if workflow.ephemeral.artifacts_download_failed or workflow.artifacts is None: - details.append(f"[red]✗ {prefix} artifacts download failed[/red]") - return details - else: - details.append(f"[green]✓ {prefix} artifacts downloaded[/green]") - - # Stage 6: Result extraction (latest/top) - if workflow.result is None or workflow.ephemeral.extract_result_failed: - details.append(f"[red]✗ {prefix} failed to extract result[/red]") - return details - else: - details.append(f"[green]✓ {prefix} result extracted[/green]") - - # Check for other workflow states - if workflow.status == WorkflowStatus.IN_PROGRESS: - details.append(f"[blue]⟳ {prefix} in progress[/blue]") - elif workflow.status == WorkflowStatus.QUEUED: - details.append(f"[cyan]⋯ {prefix} queued[/cyan]") - elif workflow.status == WorkflowStatus.PENDING: - details.append(f"[dim]○ {prefix} pending[/dim]") - - return details - - -def _collect_package_details(package: Package) -> List[str]: - """Collect details from package metadata. - - Args: - package: The package to check - - Returns: - List of detail strings (may be empty) - """ - details: List[str] = [] - - if package.meta.ephemeral.identify_ref_failed: - details.append("[red]✗ Identify target ref to run workflow failed[/red]") - elif package.meta.ref is not None: - details.append(f"[green]✓ Target Ref identified: {package.meta.ref}[/green]") - - return details - - -def _collect_details(package: Package) -> str: - """Collect and format all details from package and workflows. - - Args: - package: The package to check - - Returns: - Formatted string of details - """ - details: List[str] = [] - - # Collect package-level details - details.extend(_collect_package_details(package)) - - # Collect build workflow details - details.extend(_collect_workflow_details(package.build, "Build")) - - # Only collect publish details if build succeeded (has result) - if package.build.result is not None: - details.extend(_collect_workflow_details(package.publish, "Publish")) - - return "\n".join(details) diff --git a/src/redis_release/bht/tree.py b/src/redis_release/bht/tree.py index d404cd5..2d3b768 100644 --- a/src/redis_release/bht/tree.py +++ b/src/redis_release/bht/tree.py @@ -21,7 +21,9 @@ from ..config import Config from ..github_client_async import GitHubClientAsync from ..models import ReleaseArgs +from ..state_display import print_state_table from ..state_manager import S3StateStorage, StateManager, StateStorage +from ..state_slack import SlackStatePrinter, init_slack_printer from .backchain import latch_chains from .behaviours import NeedToPublishRelease from .composites import ( @@ -43,14 +45,7 @@ create_workflow_completion_ppa, create_workflow_success_ppa, ) -from .state import ( - Package, - PackageMeta, - ReleaseMeta, - ReleaseState, - Workflow, - print_state_table, -) +from .state import Package, PackageMeta, ReleaseMeta, ReleaseState, Workflow logger = logging.getLogger(__name__) @@ -130,6 +125,24 @@ def initialize_tree_and_state( tree.add_post_tick_handler(lambda _: state_syncer.sync()) tree.add_post_tick_handler(log_tree_state_with_markup) + # Initialize Slack printer if Slack args are provided + slack_printer: Optional[SlackStatePrinter] = None + if args.slack_token or args.slack_channel_id: + try: + slack_printer = init_slack_printer( + args.slack_token, args.slack_channel_id + ) + # Capture the non-None printer in the closure + printer = slack_printer + + def slack_tick_handler(_: BehaviourTree) -> None: + printer.update_message(state_syncer.state) + + tree.add_post_tick_handler(slack_tick_handler) + except ValueError as e: + logger.error(f"Failed to initialize Slack printer: {e}") + slack_printer = None + try: yield (tree, state_syncer) finally: @@ -248,7 +261,7 @@ def create_build_workflow_tree_branch( assert isinstance(build_workflow, Selector) reset_package_state = RestartPackageGuarded( - "", + "BuildRestartCondition", package, package.build, default_package, @@ -296,7 +309,7 @@ def create_publish_workflow_tree_branch( ), ) reset_publish_workflow_state = RestartWorkflowGuarded( - "", + "PublishRestartCondition", publish_workflow, package_meta, default_publish_workflow, diff --git a/src/redis_release/cli.py b/src/redis_release/cli.py index 5cdfe8d..71bb1f5 100644 --- a/src/redis_release/cli.py +++ b/src/redis_release/cli.py @@ -2,19 +2,19 @@ import asyncio import logging -import os from typing import List, Optional import typer from py_trees.display import render_dot_tree, unicode_tree -from redis_release.bht.state import print_state_table from redis_release.models import ReleaseType +from redis_release.state_display import print_state_table from redis_release.state_manager import ( InMemoryStateStorage, S3StateStorage, StateManager, ) +from redis_release.state_slack import init_slack_printer from .bht.tree import TreeInspector, async_tick_tock, initialize_tree_and_state from .config import load_config @@ -112,6 +112,16 @@ def release( "--override-state-name", help="Custom state name to use instead of release tag, to be able to make test runs without affecting production state", ), + slack_token: Optional[str] = typer.Option( + None, + "--slack-token", + help="Slack bot token (if not provided, uses SLACK_BOT_TOKEN env var)", + ), + slack_channel_id: Optional[str] = typer.Option( + None, + "--slack-channel-id", + help="Slack channel ID to post status updates to", + ), ) -> None: """Run release using behaviour tree implementation.""" setup_logging() @@ -125,6 +135,8 @@ def release( only_packages=only_packages or [], force_release_type=force_release_type, override_state_name=override_state_name, + slack_token=slack_token, + slack_channel_id=slack_channel_id, ) # Use context manager version with automatic lock management @@ -138,9 +150,20 @@ def status( config_file: Optional[str] = typer.Option( None, "--config", "-c", help="Path to config file (default: config.yaml)" ), + slack: bool = typer.Option(False, "--slack", help="Post status to Slack"), + slack_channel_id: Optional[str] = typer.Option( + None, + "--slack-channel-id", + help="Slack channel ID to post to (required if --slack is used)", + ), + slack_token: Optional[str] = typer.Option( + None, + "--slack-token", + help="Slack bot token (if not provided, uses SLACK_BOT_TOKEN env var)", + ), ) -> None: - """Run release using behaviour tree implementation.""" - setup_logging(logging.INFO) + """Display release status in console and optionally post to Slack.""" + setup_logging() config_path = config_file or "config.yaml" config = load_config(config_path) @@ -156,8 +179,67 @@ def status( args=args, read_only=True, ) as state_syncer: + # Always print to console print_state_table(state_syncer.state) + # Post to Slack if requested + if slack: + printer = init_slack_printer(slack_token, slack_channel_id) + printer.update_message(state_syncer.state) + + +@app.command() +def slack_bot( + config_file: Optional[str] = typer.Option( + None, "--config", "-c", help="Path to config file (default: config.yaml)" + ), + slack_bot_token: Optional[str] = typer.Option( + None, + "--slack-bot-token", + help="Slack bot token (xoxb-...). If not provided, uses SLACK_BOT_TOKEN env var", + ), + slack_app_token: Optional[str] = typer.Option( + None, + "--slack-app-token", + help="Slack app token (xapp-...). If not provided, uses SLACK_APP_TOKEN env var", + ), + reply_in_thread: bool = typer.Option( + True, + "--reply-in-thread/--no-reply-in-thread", + help="Reply in thread instead of main channel", + ), + broadcast_to_channel: bool = typer.Option( + False, + "--broadcast/--no-broadcast", + help="When replying in thread, also show in main channel", + ), +) -> None: + """Run Slack bot that listens for status requests. + + The bot listens for mentions containing 'status' and a version tag (e.g., '8.4-m01'), + and responds by posting the release status to the channel. + + By default, replies are posted in threads to keep channels clean. Use --no-reply-in-thread + to post directly in the channel. Use --broadcast to show thread replies in the main channel. + + Requires Socket Mode to be enabled in your Slack app configuration. + """ + from redis_release.slack_bot import run_bot + + setup_logging() + config_path = config_file or "config.yaml" + + logger.info("Starting Slack bot...") + asyncio.run( + run_bot( + config_path, + slack_bot_token, + slack_app_token, + reply_in_thread, + broadcast_to_channel, + ) + ) + if __name__ == "__main__": app() diff --git a/src/redis_release/models.py b/src/redis_release/models.py index f96cb5f..8be2ad5 100644 --- a/src/redis_release/models.py +++ b/src/redis_release/models.py @@ -161,3 +161,5 @@ class ReleaseArgs(BaseModel): only_packages: List[str] = Field(default_factory=list) force_release_type: Optional[ReleaseType] = None override_state_name: Optional[str] = None + slack_token: Optional[str] = None + slack_channel_id: Optional[str] = None diff --git a/src/redis_release/slack_bot.py b/src/redis_release/slack_bot.py new file mode 100644 index 0000000..3a77b94 --- /dev/null +++ b/src/redis_release/slack_bot.py @@ -0,0 +1,287 @@ +"""Async Slack bot that listens for status requests and posts release status.""" + +import asyncio +import logging +import os +import re +from typing import Any, Dict, Optional + +from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler +from slack_bolt.async_app import AsyncApp +from slack_bolt.context.say.async_say import AsyncSay + +from redis_release.config import Config, load_config +from redis_release.models import ReleaseArgs +from redis_release.state_manager import S3StateStorage, StateManager +from redis_release.state_slack import SlackStatePrinter + +logger = logging.getLogger(__name__) + +# Regex pattern to match version tags like 8.4-m01, 7.2.5, 8.0-rc1, etc. +VERSION_TAG_PATTERN = re.compile(r"\b(\d+\.\d+(?:\.\d+)?(?:-[a-zA-Z0-9]+)?)\b") + + +class ReleaseStatusBot: + """Async Slack bot that responds to status requests for releases.""" + + def __init__( + self, + config: Config, + slack_bot_token: Optional[str] = None, + slack_app_token: Optional[str] = None, + reply_in_thread: bool = True, + broadcast_to_channel: bool = False, + ): + """Initialize the bot. + + Args: + config: Release configuration + slack_bot_token: Slack bot token (xoxb-...). If None, uses SLACK_BOT_TOKEN env var + slack_app_token: Slack app token (xapp-...). If None, uses SLACK_APP_TOKEN env var + reply_in_thread: If True, reply in thread. If False, reply in main channel + broadcast_to_channel: If True and reply_in_thread is True, also show in main channel + """ + self.config = config + self.reply_in_thread = reply_in_thread + self.broadcast_to_channel = broadcast_to_channel + + # Get tokens from args or environment + bot_token = slack_bot_token or os.environ.get("SLACK_BOT_TOKEN") + app_token = slack_app_token or os.environ.get("SLACK_APP_TOKEN") + + if not bot_token: + raise ValueError( + "Slack bot token not provided. Use slack_bot_token argument or set SLACK_BOT_TOKEN environment variable" + ) + if not app_token: + raise ValueError( + "Slack app token not provided. Use slack_app_token argument or set SLACK_APP_TOKEN environment variable" + ) + + # Store validated tokens (guaranteed to be non-None) + self.bot_token: str = bot_token + self.app_token: str = app_token + + # Initialize async Slack app + self.app = AsyncApp(token=self.bot_token) + + # Register event handlers + self._register_handlers() + + def _register_handlers(self) -> None: + """Register Slack event handlers.""" + + @self.app.event("app_mention") + async def handle_app_mention( # pyright: ignore[reportUnusedFunction] + event: Dict[str, Any], say: AsyncSay, logger: logging.Logger + ) -> None: + """Handle app mentions and check for status requests.""" + try: + text = event.get("text", "").lower() + channel = event.get("channel") + user = event.get("user") + ts = event.get("ts") + thread_ts = event.get( + "thread_ts", ts + ) # Use thread_ts if in thread, else use message ts + + # Validate required fields + if not channel or not user or not thread_ts: + logger.error( + f"Missing required fields in event: channel={channel}, user={user}, thread_ts={thread_ts}" + ) + return + + logger.info( + f"Received mention from user {user} in channel {channel}: {text}" + ) + + # Check if message contains "status" + if "status" not in text: + logger.debug("Message doesn't contain 'status', ignoring") + return + + # Extract version tag from message + tag = self._extract_version_tag(event.get("text", "")) + + if not tag: + # Reply in thread if configured + if self.reply_in_thread: + await self.app.client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=f"<@{user}> I couldn't find a version tag in your message. " + "Please mention me with 'status' and a version tag like `8.4-m01` or `7.2.5`.", + ) + else: + await say( + f"<@{user}> I couldn't find a version tag in your message. " + "Please mention me with 'status' and a version tag like `8.4-m01` or `7.2.5`." + ) + return + + logger.info(f"Processing status request for tag: {tag}") + + # Post status for the tag + await self._post_status(tag, channel, user, thread_ts) + + except Exception as e: + logger.error(f"Error handling app mention: {e}", exc_info=True) + # Reply in thread if configured + channel = event.get("channel") + if self.reply_in_thread and channel: + await self.app.client.chat_postMessage( + channel=channel, + thread_ts=event.get("thread_ts", event.get("ts", "")), + text=f"Sorry, I encountered an error: {str(e)}", + ) + else: + await say(f"Sorry, I encountered an error: {str(e)}") + + def _extract_version_tag(self, text: str) -> Optional[str]: + """Extract version tag from message text. + + Args: + text: Message text + + Returns: + Version tag if found, None otherwise + """ + match = VERSION_TAG_PATTERN.search(text) + if match: + return match.group(1) + return None + + async def _post_status( + self, tag: str, channel: str, user: str, thread_ts: str + ) -> None: + """Load and post release status for a tag. + + Args: + tag: Release tag + channel: Slack channel ID + user: User ID who requested the status + thread_ts: Thread timestamp to reply in + """ + try: + # Create release args + args = ReleaseArgs( + release_tag=tag, + force_rebuild=[], + ) + + # Load state from S3 + storage = S3StateStorage() + + # Use StateManager in read-only mode + with StateManager( + storage=storage, + config=self.config, + args=args, + read_only=True, + ) as state_syncer: + state = state_syncer.state + + # Check if state exists (has any data beyond defaults) + if not state.packages: + if self.reply_in_thread: + await self.app.client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=f"<@{user}> No release state found for tag `{tag}`. " + "This release may not have been started yet.", + ) + else: + await self.app.client.chat_postMessage( + channel=channel, + text=f"<@{user}> No release state found for tag `{tag}`. " + "This release may not have been started yet.", + ) + return + + # Get status blocks from SlackStatePrinter + printer = SlackStatePrinter(self.bot_token, channel) + blocks = printer._make_blocks(state) + text = f"Release {state.meta.tag or 'N/A'} — Status" + + if self.reply_in_thread: + await self.app.client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=text, + blocks=blocks, + reply_broadcast=self.broadcast_to_channel, + ) + else: + await self.app.client.chat_postMessage( + channel=channel, + text=text, + blocks=blocks, + ) + + logger.info( + f"Posted status for tag {tag} to channel {channel}" + + (f" in thread {thread_ts}" if self.reply_in_thread else "") + ) + + except Exception as e: + logger.error(f"Error posting status for tag {tag}: {e}", exc_info=True) + if self.reply_in_thread: + await self.app.client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=f"<@{user}> Failed to load status for tag `{tag}`: {str(e)}", + ) + else: + await self.app.client.chat_postMessage( + channel=channel, + text=f"<@{user}> Failed to load status for tag `{tag}`: {str(e)}", + ) + + async def start(self) -> None: + """Start the bot using Socket Mode.""" + logger.info("Starting Slack bot in Socket Mode...") + handler = AsyncSocketModeHandler(self.app, self.app_token) + await handler.start_async() + + +async def run_bot( + config_path: str = "config.yaml", + slack_bot_token: Optional[str] = None, + slack_app_token: Optional[str] = None, + reply_in_thread: bool = True, + broadcast_to_channel: bool = False, +) -> None: + """Run the Slack bot. + + Args: + config_path: Path to config file + slack_bot_token: Slack bot token (xoxb-...). If None, uses SLACK_BOT_TOKEN env var + slack_app_token: Slack app token (xapp-...). If None, uses SLACK_APP_TOKEN env var + reply_in_thread: If True, reply in thread. If False, reply in main channel + broadcast_to_channel: If True and reply_in_thread is True, also show in main channel + """ + # Load config + config = load_config(config_path) + + # Create and start bot + bot = ReleaseStatusBot( + config=config, + slack_bot_token=slack_bot_token, + slack_app_token=slack_app_token, + reply_in_thread=reply_in_thread, + broadcast_to_channel=broadcast_to_channel, + ) + + await bot.start() + + +if __name__ == "__main__": + # Setup logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + # Run the bot + asyncio.run(run_bot()) diff --git a/src/redis_release/state_display.py b/src/redis_release/state_display.py new file mode 100644 index 0000000..988475c --- /dev/null +++ b/src/redis_release/state_display.py @@ -0,0 +1,275 @@ +"""Console display utilities for release state.""" + +from enum import Enum +from typing import List, Optional, Tuple + +from py_trees import common +from py_trees.common import Status +from rich.console import Console +from rich.table import Table + +from redis_release.models import WorkflowConclusion + +from .bht.state import Package, ReleaseState, Workflow + + +# See WorkflowEphemeral for more details on the flags and steps +class StepStatus(str, Enum): + """Status of a workflow step.""" + + NOT_STARTED = "not_started" + RUNNING = "in_progress" + FAILED = "failed" + SUCCEEDED = "succeeded" + INCORRECT = "incorrect" + + +# Decision table for step status +# See WorkflowEphemeral for more details on the flags +_STEP_STATUS_MAPPING = { + None: {False: StepStatus.NOT_STARTED, True: StepStatus.SUCCEEDED}, + Status.RUNNING: {False: StepStatus.RUNNING}, + Status.FAILURE: {False: StepStatus.FAILED}, + Status.SUCCESS: {True: StepStatus.SUCCEEDED}, +} + + +class DisplayModel: + """Model for computing display status from workflow state.""" + + @staticmethod + def get_step_status( + step_result: bool, step_status_flag: Optional[common.Status] + ) -> StepStatus: + """Get step status based on result and ephemeral flag. + + See WorkflowEphemeral for more details on the flags. + + Args: + step_result: Whether the step has a result + step_status_flag: The ephemeral status flag value + + Returns: + The determined step status + """ + if step_status_flag in _STEP_STATUS_MAPPING: + if step_result in _STEP_STATUS_MAPPING[step_status_flag]: + return _STEP_STATUS_MAPPING[step_status_flag][step_result] + return StepStatus.INCORRECT + + @staticmethod + def get_workflow_status( + package: Package, workflow: Workflow + ) -> Tuple[StepStatus, List[Tuple[StepStatus, str, Optional[str]]]]: + """Get workflow status based on ephemeral and result fields. + + Returns tuple of overall status and list of step statuses. + + See WorkflowEphemeral for more details on the flags. + + Args: + package: The package containing the workflow + workflow: The workflow to check + + Returns: + Tuple of (overall_status, list of (step_status, step_name, step_message)) + """ + steps_status: List[Tuple[StepStatus, str, Optional[str]]] = [] + steps = [ + ( + package.meta.ref is not None, + package.meta.ephemeral.identify_ref, + "Identify target ref", + None, + ), + ( + workflow.triggered_at is not None, + workflow.ephemeral.trigger_workflow, + "Trigger workflow", + None, + ), + ( + workflow.run_id is not None, + workflow.ephemeral.identify_workflow, + "Find workflow run", + None, + ), + ( + workflow.conclusion == WorkflowConclusion.SUCCESS, + workflow.ephemeral.wait_for_completion, + "Wait for completion", + workflow.ephemeral.wait_for_completion_message, + ), + ( + workflow.artifacts is not None, + workflow.ephemeral.download_artifacts, + "Download artifacts", + None, + ), + ( + workflow.result is not None, + workflow.ephemeral.extract_artifact_result, + "Get result", + None, + ), + ] + for result, status_flag, name, status_msg in steps: + s = DisplayModel.get_step_status(result, status_flag) + steps_status.append((s, name, status_msg)) + if s != StepStatus.SUCCEEDED: + return (s, steps_status) + return (StepStatus.SUCCEEDED, steps_status) + + +class ConsoleStatePrinter: + """Handles printing of release state to console using Rich tables.""" + + def __init__(self, console: Optional[Console] = None): + """Initialize the printer. + + Args: + console: Optional Rich Console instance (creates new one if not provided) + """ + self.console = console or Console() + + def print_state_table(self, state: ReleaseState) -> None: + """Print table showing the release state. + + Args: + state: The ReleaseState to display + """ + # Create table with title + table = Table( + title=f"[bold cyan]Release State: {state.meta.tag or 'N/A'}[/bold cyan]", + show_header=True, + show_lines=True, + header_style="bold magenta", + border_style="bright_blue", + title_style="bold cyan", + ) + + # Add columns + table.add_column("Package", style="cyan", no_wrap=True, min_width=20, width=20) + table.add_column( + "Build", justify="center", no_wrap=True, min_width=20, width=20 + ) + table.add_column( + "Publish", justify="center", no_wrap=True, min_width=20, width=20 + ) + table.add_column("Details", style="yellow", width=100) + + # Process each package + for package_name, package in sorted(state.packages.items()): + # Determine build status + build_status = self.get_workflow_status_display(package, package.build) + + # Determine publish status + publish_status = self.get_workflow_status_display(package, package.publish) + + # Collect details from workflows + details = self.collect_details(package) + + # Add row to table + table.add_row( + package_name, + build_status, + publish_status, + details, + ) + + # Print the table + self.console.print() + self.console.print(table) + self.console.print() + + def get_workflow_status_display(self, package: Package, workflow: Workflow) -> str: + """Get a rich-formatted status display for a workflow. + + Args: + package: The package containing the workflow + workflow: The workflow to check + + Returns: + Rich-formatted status string + """ + workflow_status = DisplayModel.get_workflow_status(package, workflow) + if workflow_status[0] == StepStatus.SUCCEEDED: + return "[bold green]✓ Success[/bold green]" + elif workflow_status[0] == StepStatus.RUNNING: + return "[bold yellow]⏳ In Progress[/bold yellow]" + elif workflow_status[0] == StepStatus.NOT_STARTED: + return "[dim]Not Started[/dim]" + elif workflow_status[0] == StepStatus.INCORRECT: + return "[bold red]✗ Invalid state![/bold red]" + + return "[bold red]✗ Failed[/bold red]" + + def collect_workflow_details( + self, package: Package, workflow: Workflow, prefix: str + ) -> List[str]: + """Collect details from a workflow using bottom-up approach. + + Shows successes until the first failure, then stops. + Bottom-up means: trigger → identify → timeout → conclusion → artifacts → result + + Args: + package: The package containing the workflow + workflow: The workflow to check + prefix: Prefix for detail messages (e.g., "Build" or "Publish") + + Returns: + List of detail strings + """ + details: List[str] = [] + + workflow_status = DisplayModel.get_workflow_status(package, workflow) + if workflow_status[0] == StepStatus.NOT_STARTED: + return details + + details.append(f"{prefix} Workflow") + indent = " " * 2 + + for step_status, step_name, step_message in workflow_status[1]: + if step_status == StepStatus.SUCCEEDED: + details.append(f"{indent}[green]✓ {step_name}[/green]") + elif step_status == StepStatus.RUNNING: + details.append(f"{indent}[yellow]⏳ {step_name}[/yellow]") + elif step_status == StepStatus.NOT_STARTED: + details.append(f"{indent}[dim]Not started: {step_name}[/dim]") + else: + msg = f" ({step_message})" if step_message else "" + details.append(f"{indent}[red]✗ {step_name} failed[/red]{msg}") + break + + return details + + def collect_details(self, package: Package) -> str: + """Collect and format all details from package and workflows. + + Args: + package: The package to check + + Returns: + Formatted string of details + """ + details: List[str] = [] + + details.extend(self.collect_workflow_details(package, package.build, "Build")) + details.extend( + self.collect_workflow_details(package, package.publish, "Publish") + ) + + return "\n".join(details) + + +def print_state_table(state: ReleaseState, console: Optional[Console] = None) -> None: + """Print table showing the release state. + + This is a convenience function that creates a ConsoleStatePrinter and prints the state. + + Args: + state: The ReleaseState to display + console: Optional Rich Console instance (creates new one if not provided) + """ + printer = ConsoleStatePrinter(console) + printer.print_state_table(state) diff --git a/src/redis_release/state_manager.py b/src/redis_release/state_manager.py index 8d15dd3..7ac46e0 100644 --- a/src/redis_release/state_manager.py +++ b/src/redis_release/state_manager.py @@ -11,8 +11,9 @@ from botocore.exceptions import ClientError, NoCredentialsError from rich.pretty import pretty_repr -from redis_release.bht.state import ReleaseState, logger, print_state_table +from redis_release.bht.state import ReleaseState, logger from redis_release.config import Config +from redis_release.state_display import print_state_table from .bht.state import ReleaseState from .models import ReleaseArgs @@ -73,11 +74,11 @@ def s3_client(self) -> Optional[boto3.client]: region_name=self.aws_region, ) else: - logger.error("AWS credentials not found") - logger.warning( - "Set AWS_PROFILE or AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables" + # Fall back to default credential chain (includes EC2 instance profile, ECS task role, etc.) + logger.info( + "Using AWS default credential chain (EC2 instance profile, ECS task role, etc.)" ) - raise NoCredentialsError() + self._s3_client = boto3.client("s3", region_name=self.aws_region) # Test connection self._s3_client.head_bucket(Bucket=self.bucket_name) @@ -91,6 +92,11 @@ def s3_client(self) -> Optional[boto3.client]: logger.error(f"S3 error: {e}") raise except NoCredentialsError: + logger.error("AWS credentials not found") + logger.warning( + "Set AWS_PROFILE or AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables, " + "or run on EC2 instance with IAM role" + ) raise except Exception as e: logger.error(f"AWS authentication error: {e}") @@ -247,10 +253,26 @@ def load(self) -> Optional[ReleaseState]: if state_data is None: return None - state = ReleaseState(**state_data) + state = ReleaseState.from_json(state_data) + + # Reset ephemeral fields to defaults if not in read-only mode + if not self.read_only: + self._reset_ephemeral_fields(state) + self.last_dump = state.model_dump_json(indent=2) return state + def _reset_ephemeral_fields(self, state: ReleaseState) -> None: + """Reset ephemeral fields to defaults (except log_once_flags which are always reset).""" + # Reset release meta ephemeral + state.meta.ephemeral = state.meta.ephemeral.__class__() + + # Reset package ephemeral fields + for package in state.packages.values(): + package.meta.ephemeral = package.meta.ephemeral.__class__() + package.build.ephemeral = package.build.ephemeral.__class__() + package.publish.ephemeral = package.publish.ephemeral.__class__() + def sync(self) -> None: """Save state to storage backend if changed since last sync.""" if self.read_only: diff --git a/src/redis_release/state_slack.py b/src/redis_release/state_slack.py new file mode 100644 index 0000000..772b6a0 --- /dev/null +++ b/src/redis_release/state_slack.py @@ -0,0 +1,298 @@ +"""Slack display utilities for release state.""" + +import json +import logging +import os +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError + +from redis_release.state_display import DisplayModel, StepStatus + +from .bht.state import Package, ReleaseState, Workflow + +logger = logging.getLogger(__name__) + + +def get_workflow_link(repo: str, run_id: Optional[int]) -> Optional[str]: + """Generate GitHub workflow URL from repo and run_id. + + Args: + repo: Repository in format "owner/repo" + run_id: GitHub workflow run ID + + Returns: + GitHub workflow URL or None if run_id is not available + """ + if not run_id or not repo: + return None + return f"https://github.com/{repo}/actions/runs/{run_id}" + + +def init_slack_printer( + slack_token: Optional[str], slack_channel_id: Optional[str] +) -> "SlackStatePrinter": + """Initialize SlackStatePrinter with validation. + + Args: + slack_token: Slack bot token (if None, uses SLACK_BOT_TOKEN env var) + slack_channel_id: Slack channel ID to post to + + Returns: + SlackStatePrinter instance + + Raises: + ValueError: If channel_id is not provided or token is not available + """ + if not slack_channel_id: + raise ValueError("Slack channel ID is required") + + # Get token from argument or environment variable + token = slack_token or os.environ.get("SLACK_BOT_TOKEN") + if not token: + raise ValueError( + "Slack token not provided. Use slack_token argument or set SLACK_BOT_TOKEN environment variable" + ) + + return SlackStatePrinter(token, slack_channel_id) + + +class SlackStatePrinter: + """Handles posting and updating release state to Slack channel.""" + + def __init__(self, slack_token: str, slack_channel_id: str): + """Initialize the Slack printer. + + Args: + slack_token: Slack bot token + slack_channel_id: Slack channel ID to post messages to + """ + self.client = WebClient(token=slack_token) + self.channel_id = slack_channel_id + self.message_ts: Optional[str] = None + self.last_blocks_json: Optional[str] = None + self.started_at = datetime.now(timezone.utc) + + def update_message(self, state: ReleaseState) -> bool: + """Post or update Slack message with release state. + + Only updates if the blocks have changed since last update. + + Args: + state: The ReleaseState to display + + Returns: + True if message was posted/updated, False if no change + """ + blocks = self._make_blocks(state) + blocks_json = json.dumps(blocks, sort_keys=True) + + # Check if blocks have changed + if blocks_json == self.last_blocks_json: + logger.debug("Slack message unchanged, skipping update") + return False + + text = f"Release {state.meta.tag or 'N/A'} — Status" + + try: + if self.message_ts is None: + # Post new message + response = self.client.chat_postMessage( + channel=self.channel_id, + text=text, + blocks=blocks, + ) + self.message_ts = response["ts"] + # Update channel_id from response (authoritative) + self.channel_id = response["channel"] + logger.info(f"Posted Slack message ts={self.message_ts}") + else: + # Update existing message + self.client.chat_update( + channel=self.channel_id, + ts=self.message_ts, + text=text, + blocks=blocks, + ) + logger.debug(f"Updated Slack message ts={self.message_ts}") + + self.last_blocks_json = blocks_json + return True + + except SlackApiError as e: + error_msg = getattr(e.response, "get", lambda x: "Unknown error")("error") if hasattr(e, "response") else str(e) # type: ignore + logger.error(f"Slack API error: {error_msg}") + raise + + def _make_blocks(self, state: ReleaseState) -> List[Dict[str, Any]]: + """Create Slack blocks for the release state. + + Args: + state: The ReleaseState to display + + Returns: + List of Slack block dictionaries + """ + blocks: List[Dict[str, Any]] = [] + + # Header + blocks.append( + { + "type": "header", + "text": { + "type": "plain_text", + "text": f"Release {state.meta.tag or 'N/A'} — Status", + }, + } + ) + + # Show started date (when SlackStatePrinter was created) + started_str = self.started_at.strftime("%Y-%m-%d %H:%M:%S %Z") + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": f"*Started:* {started_str}", + } + ], + } + ) + + # Legend with two columns + blocks.append( + { + "type": "context", + "elements": [ + { + "type": "mrkdwn", + "text": "✅ Success\n❌ Failed", + }, + { + "type": "mrkdwn", + "text": "⏳ In progress\n⚪ Not started", + }, + ], + } + ) + + blocks.append({"type": "divider"}) + + # Process each package + for package_name, package in sorted(state.packages.items()): + # Get workflow statuses + build_status_emoji = self._get_status_emoji(package, package.build) + publish_status_emoji = self._get_status_emoji(package, package.publish) + + # Package section + blocks.append( + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": f"*{package_name}*\n*Build:* {build_status_emoji} | *Publish:* {publish_status_emoji}", + }, + } + ) + + # Workflow details in context + build_details = self._collect_workflow_details_slack(package, package.build) + publish_details = self._collect_workflow_details_slack( + package, package.publish + ) + + if build_details or publish_details: + elements = [] + if build_details: + # Create link for Build Workflow if run_id exists + build_link = get_workflow_link( + package.meta.repo, package.build.run_id + ) + build_title = ( + f"<{build_link}|*Build Workflow*>" + if build_link + else "*Build Workflow*" + ) + elements.append( + {"type": "mrkdwn", "text": f"{build_title}\n{build_details}"} + ) + if publish_details: + # Create link for Publish Workflow if run_id exists + publish_link = get_workflow_link( + package.meta.repo, package.publish.run_id + ) + publish_title = ( + f"<{publish_link}|*Publish Workflow*>" + if publish_link + else "*Publish Workflow*" + ) + elements.append( + { + "type": "mrkdwn", + "text": f"{publish_title}\n{publish_details}", + } + ) + blocks.append({"type": "context", "elements": elements}) + + blocks.append({"type": "divider"}) + + return blocks + + def _get_status_emoji(self, package: Package, workflow: Workflow) -> str: + """Get emoji status for a workflow. + + Args: + package: The package containing the workflow + workflow: The workflow to check + + Returns: + Emoji status string + """ + workflow_status = DisplayModel.get_workflow_status(package, workflow) + status = workflow_status[0] + + if status == StepStatus.SUCCEEDED: + return "✅ Success" + elif status == StepStatus.RUNNING: + return "⏳ In progress" + elif status == StepStatus.NOT_STARTED: + return "⚪ Not started" + elif status == StepStatus.INCORRECT: + return "⚠️ Invalid state" + else: # FAILED + return "❌ Failed" + + def _collect_workflow_details_slack( + self, package: Package, workflow: Workflow + ) -> str: + """Collect workflow step details for Slack display. + + Args: + package: The package containing the workflow + workflow: The workflow to check + + Returns: + Formatted string of workflow steps + """ + workflow_status = DisplayModel.get_workflow_status(package, workflow) + if workflow_status[0] == StepStatus.NOT_STARTED: + return "" + + details: List[str] = [] + + for step_status, step_name, step_message in workflow_status[1]: + if step_status == StepStatus.SUCCEEDED: + details.append(f"• ✅ {step_name}") + elif step_status == StepStatus.RUNNING: + details.append(f"• ⏳ {step_name}") + elif step_status == StepStatus.NOT_STARTED: + details.append(f"• ⚪ {step_name}") + else: # FAILED or INCORRECT + msg = f" ({step_message})" if step_message else "" + details.append(f"• ❌ {step_name}{msg}") + break + + return "\n".join(details) diff --git a/src/tests/test_state.py b/src/tests/test_state.py index 9ef0f89..a972c93 100644 --- a/src/tests/test_state.py +++ b/src/tests/test_state.py @@ -290,50 +290,59 @@ def test_ephemeral_field_exists(self) -> None: workflow = Workflow(workflow_file="test.yml") assert hasattr(workflow, "ephemeral") - assert workflow.ephemeral.trigger_failed is False - assert workflow.ephemeral.timed_out is False + assert workflow.ephemeral.trigger_workflow is None + assert workflow.ephemeral.wait_for_completion is None def test_ephemeral_field_can_be_modified(self) -> None: """Test that ephemeral field values can be modified.""" + from py_trees.common import Status + workflow = Workflow(workflow_file="test.yml") - workflow.ephemeral.trigger_failed = True - workflow.ephemeral.timed_out = True + workflow.ephemeral.trigger_workflow = Status.FAILURE + workflow.ephemeral.wait_for_completion = Status.RUNNING - assert workflow.ephemeral.trigger_failed is True - assert workflow.ephemeral.timed_out is True + assert workflow.ephemeral.trigger_workflow == Status.FAILURE + assert workflow.ephemeral.wait_for_completion == Status.RUNNING def test_ephemeral_field_not_serialized_to_json(self) -> None: - """Test that ephemeral field is excluded from JSON serialization.""" + """Test that ephemeral field is serialized but log_once_flags are excluded.""" + from py_trees.common import Status + workflow = Workflow(workflow_file="test.yml") - workflow.ephemeral.trigger_failed = True - workflow.ephemeral.timed_out = True + workflow.ephemeral.trigger_workflow = Status.FAILURE + workflow.ephemeral.wait_for_completion = Status.SUCCESS + workflow.ephemeral.log_once_flags["test_flag"] = True # Serialize to JSON json_str = workflow.model_dump_json() json_data = json.loads(json_str) - # Verify ephemeral field is not in JSON - assert "ephemeral" not in json_data - assert "trigger_failed" not in json_data - assert "timed_out" not in json_data + # Verify ephemeral field IS in JSON (except log_once_flags) + assert "ephemeral" in json_data + assert json_data["ephemeral"]["trigger_workflow"] == "FAILURE" + assert json_data["ephemeral"]["wait_for_completion"] == "SUCCESS" + assert "log_once_flags" not in json_data["ephemeral"] # Verify other fields are present assert "workflow_file" in json_data assert json_data["workflow_file"] == "test.yml" def test_ephemeral_field_not_in_model_dump(self) -> None: - """Test that ephemeral field is excluded from model_dump.""" + """Test that ephemeral field is in model_dump but log_once_flags are excluded.""" + from py_trees.common import Status + workflow = Workflow(workflow_file="test.yml") - workflow.ephemeral.trigger_failed = True + workflow.ephemeral.trigger_workflow = Status.SUCCESS + workflow.ephemeral.log_once_flags["test_flag"] = True # Get dict representation data = workflow.model_dump() - # Verify ephemeral field is not in dict - assert "ephemeral" not in data - assert "trigger_failed" not in data - assert "timed_out" not in data + # Verify ephemeral field IS in dict (except log_once_flags) + assert "ephemeral" in data + assert data["ephemeral"]["trigger_workflow"] == Status.SUCCESS + assert "log_once_flags" not in data["ephemeral"] def test_ephemeral_field_initialized_on_deserialization(self) -> None: """Test that ephemeral field is initialized when loading from JSON.""" @@ -343,11 +352,13 @@ def test_ephemeral_field_initialized_on_deserialization(self) -> None: # Ephemeral field should be initialized with defaults assert hasattr(workflow, "ephemeral") - assert workflow.ephemeral.trigger_failed is False - assert workflow.ephemeral.timed_out is False + assert workflow.ephemeral.trigger_workflow is None + assert workflow.ephemeral.wait_for_completion is None def test_release_state_ephemeral_not_serialized(self) -> None: - """Test that ephemeral fields are not serialized in ReleaseState.""" + """Test that ephemeral fields are serialized but log_once_flags are excluded.""" + from py_trees.common import Status + config = Config( version=1, packages={ @@ -363,21 +374,26 @@ def test_release_state_ephemeral_not_serialized(self) -> None: state = ReleaseState.from_config(config) # Modify ephemeral fields - state.packages["test-package"].build.ephemeral.trigger_failed = True - state.packages["test-package"].publish.ephemeral.timed_out = True + state.packages["test-package"].build.ephemeral.trigger_workflow = Status.FAILURE + state.packages["test-package"].publish.ephemeral.wait_for_completion = ( + Status.SUCCESS + ) + state.packages["test-package"].build.ephemeral.log_once_flags["test"] = True # Serialize to JSON json_str = state.model_dump_json() json_data = json.loads(json_str) - # Verify ephemeral fields are not in JSON + # Verify ephemeral fields ARE in JSON (except log_once_flags) build_workflow = json_data["packages"]["test-package"]["build"] publish_workflow = json_data["packages"]["test-package"]["publish"] - assert "ephemeral" not in build_workflow - assert "trigger_failed" not in build_workflow - assert "ephemeral" not in publish_workflow - assert "timed_out" not in publish_workflow + assert "ephemeral" in build_workflow + assert build_workflow["ephemeral"]["trigger_workflow"] == "FAILURE" + assert "log_once_flags" not in build_workflow["ephemeral"] + assert "ephemeral" in publish_workflow + assert publish_workflow["ephemeral"]["wait_for_completion"] == "SUCCESS" + assert "log_once_flags" not in publish_workflow["ephemeral"] class TestReleaseMeta: @@ -449,7 +465,7 @@ def test_force_rebuild_field_can_be_modified(self) -> None: assert state.packages["test-package"].meta.ephemeral.force_rebuild is True def test_ephemeral_not_serialized(self) -> None: - """Test that ephemeral field is not serialized to JSON.""" + """Test that ephemeral field is serialized but log_once_flags are excluded.""" config = Config( version=1, packages={ @@ -464,12 +480,21 @@ def test_ephemeral_not_serialized(self) -> None: state = ReleaseState.from_config(config) state.packages["test-package"].meta.ephemeral.force_rebuild = True + state.packages["test-package"].meta.ephemeral.log_once_flags["test"] = True json_str = state.model_dump_json() json_data = json.loads(json_str) - assert "ephemeral" not in json_data["packages"]["test-package"]["meta"] - assert "force_rebuild" not in json_data["packages"]["test-package"]["meta"] + # Ephemeral field IS serialized (except log_once_flags) + assert "ephemeral" in json_data["packages"]["test-package"]["meta"] + assert ( + json_data["packages"]["test-package"]["meta"]["ephemeral"]["force_rebuild"] + is True + ) + assert ( + "log_once_flags" + not in json_data["packages"]["test-package"]["meta"]["ephemeral"] + ) class TestStateSyncerWithArgs: diff --git a/src/tests/test_status_flag_guard.py b/src/tests/test_status_flag_guard.py new file mode 100644 index 0000000..83325d7 --- /dev/null +++ b/src/tests/test_status_flag_guard.py @@ -0,0 +1,297 @@ +"""Tests for StatusFlagGuard decorator.""" + +import pytest +from py_trees import common +from py_trees.behaviour import Behaviour +from py_trees.common import Status +from pydantic import BaseModel + +from redis_release.bht.decorators import StatusFlagGuard + + +class StatusFlagContainer(BaseModel): + """Container for holding status flags.""" + + status_flag: Status | None = None + + +class SuccessBehaviour(Behaviour): + """A behaviour that always succeeds.""" + + def update(self) -> Status: + return Status.SUCCESS + + +class FailureBehaviour(Behaviour): + """A behaviour that always fails.""" + + def update(self) -> Status: + return Status.FAILURE + + +class RunningBehaviour(Behaviour): + """A behaviour that always returns RUNNING.""" + + def update(self) -> Status: + return Status.RUNNING + + +class TestStatusFlagGuardInitialization: + """Test StatusFlagGuard initialization.""" + + def test_init_with_valid_container(self) -> None: + """Test initialization with valid container.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + ) + assert guard.name == "test_guard" + assert guard.flag == "status_flag" + assert guard.guard_status == Status.FAILURE + + def test_init_with_none_name_failure(self) -> None: + """Test initialization with None name generates default name for FAILURE.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name=None, + child=child, + container=container, + flag="status_flag", + guard_status=common.Status.FAILURE, + ) + assert guard.name == "Unless status_flag failed" + + def test_init_with_none_name_success(self) -> None: + """Test initialization with None name generates default name for SUCCESS.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name=None, + child=child, + container=container, + flag="status_flag", + guard_status=common.Status.SUCCESS, + ) + assert guard.name == "Unless status_flag succeeded" + + def test_init_with_custom_guard_status(self) -> None: + """Test initialization with custom guard_status.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.SUCCESS, + ) + assert guard.guard_status == Status.SUCCESS + + def test_init_with_nonexistent_field(self) -> None: + """Test initialization fails with nonexistent field.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + with pytest.raises(ValueError, match="Field 'nonexistent' does not exist"): + StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="nonexistent", + ) + + def test_init_with_invalid_flag_type(self) -> None: + """Test initialization fails when flag has invalid type.""" + + class BadContainer(BaseModel): + status_flag: str = "invalid" + + container = BadContainer() + child = SuccessBehaviour(name="child") + with pytest.raises(TypeError, match="must be either common.Status or None"): + StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + ) + + def test_init_with_invalid_guard_status(self) -> None: + """Test initialization fails with invalid guard_status.""" + container = StatusFlagContainer() + child = SuccessBehaviour(name="child") + with pytest.raises(ValueError, match="guard_status must be FAILURE or SUCCESS"): + StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.RUNNING, + ) + + +class TestStatusFlagGuardGuarding: + """Test StatusFlagGuard guarding behavior.""" + + def test_guard_prevents_execution_when_flag_matches_guard_status(self) -> None: + """Test that guard prevents execution when flag matches guard_status.""" + container = StatusFlagContainer(status_flag=Status.FAILURE) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.FAILURE, + ) + + # Guard should return FAILURE without executing child + status = guard.update() + assert status == Status.FAILURE + + def test_guard_allows_execution_when_flag_is_none(self) -> None: + """Test that guard allows execution when flag is None.""" + container = StatusFlagContainer(status_flag=None) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.FAILURE, + ) + + # Child should execute and return SUCCESS + guard.decorated.status = Status.SUCCESS + status = guard.update() + assert status == Status.SUCCESS + + def test_guard_allows_execution_when_flag_differs_from_guard_status(self) -> None: + """Test that guard allows execution when flag differs from guard_status.""" + container = StatusFlagContainer(status_flag=Status.SUCCESS) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.FAILURE, + ) + + # Child should execute and return SUCCESS + guard.decorated.status = Status.SUCCESS + status = guard.update() + assert status == Status.SUCCESS + + +class TestStatusFlagGuardFlagUpdate: + """Test StatusFlagGuard flag update behavior.""" + + def test_flag_updated_on_child_success(self) -> None: + """Test that flag is updated to child's status on success.""" + container = StatusFlagContainer(status_flag=None) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + ) + + # Simulate child execution - update() should update the flag + guard.decorated.status = Status.SUCCESS + status = guard.update() + assert status == Status.SUCCESS + assert container.status_flag == Status.SUCCESS + + def test_flag_updated_on_child_failure(self) -> None: + """Test that flag is updated to child's status on failure.""" + container = StatusFlagContainer(status_flag=None) + child = FailureBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + ) + + # Simulate child execution - update() should update the flag + guard.decorated.status = Status.FAILURE + status = guard.update() + assert status == Status.FAILURE + assert container.status_flag == Status.FAILURE + + def test_flag_updated_on_child_running(self) -> None: + """Test that flag is updated to child's status on RUNNING.""" + container = StatusFlagContainer(status_flag=None) + child = RunningBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + ) + + # Simulate child execution - update() should update the flag even for RUNNING + guard.decorated.status = Status.RUNNING + status = guard.update() + assert status == Status.RUNNING + assert container.status_flag == Status.RUNNING + + def test_flag_not_updated_when_guard_active(self) -> None: + """Test that flag is not updated when guard is active.""" + container = StatusFlagContainer(status_flag=Status.FAILURE) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.FAILURE, + ) + + # Guard is active, flag should not be updated + guard.decorated.status = Status.SUCCESS + status = guard.update() + assert status == Status.FAILURE + assert container.status_flag == Status.FAILURE + + +class TestStatusFlagGuardWithDifferentGuardStatus: + """Test StatusFlagGuard with different guard_status values.""" + + def test_guard_with_failure_status(self) -> None: + """Test guard with FAILURE as guard_status.""" + container = StatusFlagContainer(status_flag=Status.FAILURE) + child = SuccessBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.FAILURE, + ) + + # Guard should return FAILURE + status = guard.update() + assert status == Status.FAILURE + + def test_guard_with_success_status(self) -> None: + """Test guard with SUCCESS as guard_status.""" + container = StatusFlagContainer(status_flag=Status.SUCCESS) + child = FailureBehaviour(name="child") + guard = StatusFlagGuard( + name="test_guard", + child=child, + container=container, + flag="status_flag", + guard_status=Status.SUCCESS, + ) + + # Guard should return SUCCESS + status = guard.update() + assert status == Status.SUCCESS diff --git a/uv.lock b/uv.lock index f82013a..c8a1cac 100644 --- a/uv.lock +++ b/uv.lock @@ -2825,6 +2825,8 @@ dependencies = [ { name = "requests", version = "2.32.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "requests", version = "2.32.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, { name = "rich" }, + { name = "slack-bolt" }, + { name = "slack-sdk" }, { name = "typer" }, ] @@ -2863,6 +2865,8 @@ requires-dist = [ { name = "pyyaml", specifier = ">=6.0.3" }, { name = "requests", specifier = ">=2.28.0" }, { name = "rich", specifier = ">=13.0.0" }, + { name = "slack-bolt", specifier = ">=1.27.0" }, + { name = "slack-sdk", specifier = ">=3.38.0" }, { name = "typer", extras = ["all"], specifier = ">=0.9.0" }, ] provides-extras = ["dev"] @@ -2968,6 +2972,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "slack-bolt" +version = "1.27.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "slack-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4c/28/50ed0b86e48b48e6ddcc71de93b91c8ac14a55d1249e4bff0586494a2f90/slack_bolt-1.27.0.tar.gz", hash = "sha256:3db91d64e277e176a565c574ae82748aa8554f19e41a4fceadca4d65374ce1e0", size = 129101, upload-time = "2025-11-13T20:17:46.878Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/01/a8/1acb355759747ba4da5f45c1a33d641994b9e04b914908c9434f18bd97e8/slack_bolt-1.27.0-py2.py3-none-any.whl", hash = "sha256:c43c94bf34740f2adeb9b55566c83f1e73fed6ba2878bd346cdfd6fd8ad22360", size = 230428, upload-time = "2025-11-13T20:17:45.465Z" }, +] + +[[package]] +name = "slack-sdk" +version = "3.38.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/69/90/8c830172b1847bd3084a2cf39aee9d522e2a55d1c3d4e2b066001e9765ee/slack_sdk-3.38.0.tar.gz", hash = "sha256:73f43ef535929c6034982434aba4d5fd04db3b40f4e0cd14c3abfd56419d181d", size = 240091, upload-time = "2025-11-13T16:05:20.905Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/fc/0563891d3d1f7c503b2daf0b5b9ae9e605ea112272661897c150fd3d69cb/slack_sdk-3.38.0-py2.py3-none-any.whl", hash = "sha256:6c5e908abd68e97373a88437ef2fa3ff7a4c356807bbc41fcd7d6cbbfa2034d6", size = 302796, upload-time = "2025-11-13T16:05:18.856Z" }, +] + [[package]] name = "tomli" version = "2.3.0"