Skip to content

Commit

Permalink
added circuit breaker for flows
Browse files Browse the repository at this point in the history
  • Loading branch information
tmbo committed Sep 29, 2023
1 parent 5247210 commit ebc254e
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 73 deletions.
5 changes: 1 addition & 4 deletions rasa/core/actions/flow_trigger_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
from rasa.core.channels import OutputChannel
from rasa.shared.constants import FLOW_PREFIX

from rasa.shared.core.constants import (
DIALOGUE_STACK_SLOT,
)
from rasa.shared.core.domain import Domain
from rasa.shared.core.events import (
ActiveLoop,
Expand Down Expand Up @@ -70,7 +67,7 @@ async def run(
]

events: List[Event] = [
SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())
stack.persist_as_event(),
] + slot_set_events
if tracker.active_loop_name:
events.append(ActiveLoop(None))
Expand Down
107 changes: 70 additions & 37 deletions rasa/core/policies/flow_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from rasa.shared.core.constants import (
ACTION_LISTEN_NAME,
ACTION_SEND_TEXT_NAME,
DIALOGUE_STACK_SLOT,
)
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.flows.flow import (
Expand All @@ -61,7 +60,7 @@
StaticFlowLink,
)
from rasa.core.featurizers.tracker_featurizers import TrackerFeaturizer
from rasa.core.policies.policy import Policy, PolicyPrediction, SupportedData
from rasa.core.policies.policy import Policy, PolicyPrediction
from rasa.engine.graph import ExecutionContext
from rasa.engine.recipes.default_recipe import DefaultV1Recipe
from rasa.engine.storage.resource import Resource
Expand All @@ -73,15 +72,39 @@
)
import structlog

from rasa.shared.exceptions import RasaException

structlogger = structlog.get_logger()

MAX_NUMBER_OF_STEPS = 250


class FlowException(Exception):
class FlowException(RasaException):
"""Exception that is raised when there is a problem with a flow."""

pass


class FlowCircuitBreakerTrippedException(FlowException):
"""Exception that is raised when there is a problem with a flow."""

def __init__(
self, dialogue_stack: DialogueStack, number_of_steps_taken: int
) -> None:
"""Creates a `FlowCircuitBreakerTrippedException`.
Args:
dialogue_stack: The dialogue stack.
number_of_steps_taken: The number of steps that were taken.
"""
super().__init__(
f"Flow circuit breaker tripped after {number_of_steps_taken} steps. "
"There appears to be an infinite loop in the flows."
)
self.dialogue_stack = dialogue_stack
self.number_of_steps_taken = number_of_steps_taken


@DefaultV1Recipe.register(
DefaultV1Recipe.ComponentType.POLICY_WITHOUT_END_TO_END_SUPPORT, is_trainable=False
)
Expand All @@ -94,7 +117,15 @@ class FlowPolicy(Policy):

@staticmethod
def does_support_stack_frame(frame: DialogueStackFrame) -> bool:
"""Checks if the policy supports the given stack frame."""
"""Checks if the policy supports the topmost frame on the dialogue stack.
If `False` is returned, the policy will abstain from making a prediction.
Args:
frame: The frame to check.
Returns:
`True` if the policy supports the frame, `False` otherwise."""
return isinstance(frame, BaseFlowStackFrame)

@staticmethod
Expand All @@ -106,18 +137,6 @@ def get_default_config() -> Dict[Text, Any]:
POLICY_MAX_HISTORY: None,
}

@staticmethod
def supported_data() -> SupportedData:
"""The type of data supported by this policy.
By default, this is only ML-based training data. If policies support rule data,
or both ML-based data and rule data, they need to override this method.
Returns:
The data type supported by this policy (ML-based training data).
"""
return SupportedData.ML_DATA

def __init__(
self,
config: Dict[Text, Any],
Expand Down Expand Up @@ -150,9 +169,6 @@ def train(
A policy must return its resource locator so that potential children nodes
can load the policy from the resource.
"""
# currently, nothing to do here. we have access to the flows during
# prediction. we might want to store the flows in the future
# or do some preprocessing here.
return self.resource

def predict_action_probabilities(
Expand All @@ -178,20 +194,33 @@ def predict_action_probabilities(
The prediction.
"""
if not self.supports_current_stack_frame(tracker):
# if the policy doesn't support the current stack frame, we'll abstain
return self._prediction(self._default_predictions(domain))

flows = flows or FlowsList([])
executor = FlowExecutor.from_tracker(tracker, flows, domain)

# create executor and predict next action
prediction = executor.advance_flows(tracker)
return self._create_prediction_result(
prediction.action_name,
domain,
prediction.score,
prediction.events,
prediction.metadata,
)
try:
prediction = executor.advance_flows(tracker)
return self._create_prediction_result(
prediction.action_name,
domain,
prediction.score,
prediction.events,
prediction.metadata,
)
except FlowCircuitBreakerTrippedException as e:
structlogger.error(
"flow.circuit_breaker",
dialogue_stack=e.dialogue_stack,
number_of_steps_taken=e.number_of_steps_taken,
event_info=(
"The flow circuit breaker tripped. "
"There appears to be an infinite loop in the flows."
),
)
return self._prediction(self._default_predictions(domain))

def _create_prediction_result(
self,
Expand Down Expand Up @@ -425,20 +454,15 @@ def advance_flows(self, tracker: DialogueStateTracker) -> ActionPrediction:
return ActionPrediction(None, 0.0)
else:
previous_stack = DialogueStack.get_persisted_stack(tracker)
prediction = self._select_next_action(tracker)
prediction = self.select_next_action(tracker)
if previous_stack != self.dialogue_stack.as_dict():
# we need to update dialogue stack to persist the state of the executor
if not prediction.events:
prediction.events = []
prediction.events.append(
SlotSet(
DIALOGUE_STACK_SLOT,
self.dialogue_stack.as_dict(),
)
)
prediction.events.append(self.dialogue_stack.persist_as_event())
return prediction

def _select_next_action(
def select_next_action(
self,
tracker: DialogueStateTracker,
) -> ActionPrediction:
Expand All @@ -462,7 +486,16 @@ def _select_next_action(

number_of_initial_events = len(tracker.events)

number_of_steps_taken = 0

while isinstance(step_result, ContinueFlowWithNextStep):

number_of_steps_taken += 1
if number_of_steps_taken > MAX_NUMBER_OF_STEPS:
raise FlowCircuitBreakerTrippedException(
self.dialogue_stack, number_of_steps_taken
)

active_frame = self.dialogue_stack.top()
if not isinstance(active_frame, BaseFlowStackFrame):
# If there is no current flow, we assume that all flows are done
Expand All @@ -485,7 +518,7 @@ def _select_next_action(
self._advance_top_flow_on_stack(current_step.id)

with bound_contextvars(step_id=current_step.id):
step_result = self._run_step(
step_result = self.run_step(
current_flow, current_step, tracker
)
tracker.update_with_events(step_result.events, self.domain)
Expand Down Expand Up @@ -521,7 +554,7 @@ def _reset_scoped_slots(
events.append(SlotSet(step.collect, initial_value))
return events

def _run_step(
def run_step(
self,
flow: Flow,
step: FlowStep,
Expand Down
5 changes: 2 additions & 3 deletions rasa/dialogue_understanding/commands/cancel_flow_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from rasa.dialogue_understanding.patterns.cancel import CancelPatternFlowStackFrame
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.dialogue_understanding.stack.frames import UserFlowStackFrame
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker
from rasa.dialogue_understanding.stack.utils import top_user_flow_frame
Expand Down Expand Up @@ -103,4 +102,4 @@ def run_command_on_tracker(
canceled_frames=canceled_frames,
)
)
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from rasa.dialogue_understanding.commands import FreeFormAnswerCommand
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.dialogue_understanding.stack.frames.chit_chat_frame import ChitChatStackFrame
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -47,4 +46,4 @@ def run_command_on_tracker(
"""
stack = DialogueStack.from_tracker(tracker)
stack.push(ChitChatStackFrame())
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
5 changes: 2 additions & 3 deletions rasa/dialogue_understanding/commands/clarify_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from rasa.dialogue_understanding.commands import Command
from rasa.dialogue_understanding.patterns.clarify import ClarifyPatternFlowStackFrame
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -76,4 +75,4 @@ def run_command_on_tracker(
relevant_flows = [all_flows.flow_by_id(opt) for opt in clean_options]
names = [flow.readable_name() for flow in relevant_flows if flow is not None]
stack.push(ClarifyPatternFlowStackFrame(names=names))
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
5 changes: 2 additions & 3 deletions rasa/dialogue_understanding/commands/correct_slots_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
BaseFlowStackFrame,
UserFlowStackFrame,
)
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import END_STEP, ContinueFlowStep, FlowStep, FlowsList
from rasa.shared.core.trackers import DialogueStateTracker
import rasa.dialogue_understanding.stack.utils as utils
Expand Down Expand Up @@ -284,4 +283,4 @@ def run_command_on_tracker(
self.end_previous_correction(top_flow_frame, stack)

stack.push(correction_frame, index=insertion_index)
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
5 changes: 2 additions & 3 deletions rasa/dialogue_understanding/commands/error_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
InternalErrorPatternFlowStackFrame,
)
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -54,4 +53,4 @@ def run_command_on_tracker(
dialogue_stack = DialogueStack.from_tracker(tracker)
structlogger.debug("command_executor.error", command=self)
dialogue_stack.push(InternalErrorPatternFlowStackFrame())
return [SlotSet(DIALOGUE_STACK_SLOT, dialogue_stack.as_dict())]
return [dialogue_stack.persist_as_event()]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from rasa.dialogue_understanding.commands import FreeFormAnswerCommand
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.dialogue_understanding.stack.frames.search_frame import SearchStackFrame
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -47,4 +46,4 @@ def run_command_on_tracker(
"""
dialogue_stack = DialogueStack.from_tracker(tracker)
dialogue_stack.push(SearchStackFrame())
return [SlotSet(DIALOGUE_STACK_SLOT, dialogue_stack.as_dict())]
return [dialogue_stack.persist_as_event()]
5 changes: 2 additions & 3 deletions rasa/dialogue_understanding/commands/start_flow_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
top_user_flow_frame,
user_flows_on_the_stack,
)
from rasa.shared.core.constants import DIALOGUE_STACK_SLOT
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import FlowsList
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -88,4 +87,4 @@ def run_command_on_tracker(
)
structlogger.debug("command_executor.start_flow", command=self)
stack.push(UserFlowStackFrame(flow_id=self.flow, frame_type=frame_type))
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
6 changes: 3 additions & 3 deletions rasa/dialogue_understanding/patterns/cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from rasa.core.channels.channel import OutputChannel
from rasa.core.nlg.generator import NaturalLanguageGenerator
from rasa.shared.constants import RASA_DEFAULT_FLOW_PATTERN_PREFIX
from rasa.shared.core.constants import ACTION_CANCEL_FLOW, DIALOGUE_STACK_SLOT
from rasa.shared.core.constants import ACTION_CANCEL_FLOW
from rasa.shared.core.domain import Domain
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.flows.flow import END_STEP, ContinueFlowStep
from rasa.shared.core.trackers import DialogueStateTracker

Expand Down Expand Up @@ -111,4 +111,4 @@ async def run(
frame_id=canceled_frame_id,
)

return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
6 changes: 3 additions & 3 deletions rasa/dialogue_understanding/patterns/clarify.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from rasa.core.channels.channel import OutputChannel
from rasa.core.nlg.generator import NaturalLanguageGenerator
from rasa.shared.constants import RASA_DEFAULT_FLOW_PATTERN_PREFIX
from rasa.shared.core.constants import ACTION_CLARIFY_FLOWS, DIALOGUE_STACK_SLOT
from rasa.shared.core.constants import ACTION_CLARIFY_FLOWS
from rasa.shared.core.domain import Domain
from rasa.shared.core.events import Event, SlotSet
from rasa.shared.core.events import Event
from rasa.shared.core.trackers import DialogueStateTracker


Expand Down Expand Up @@ -98,4 +98,4 @@ async def run(
options_string = self.assemble_options_string(top.names)
top.clarification_options = options_string
# since we modified the stack frame, we need to update the stack
return [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
return [stack.persist_as_event()]
5 changes: 1 addition & 4 deletions rasa/dialogue_understanding/patterns/correction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
)
from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack
from rasa.shared.constants import RASA_DEFAULT_FLOW_PATTERN_PREFIX
from rasa.shared.core.constants import (
DIALOGUE_STACK_SLOT,
)
from rasa.shared.core.flows.flow import (
START_STEP,
)
Expand Down Expand Up @@ -139,7 +136,7 @@ async def run(
ContinueFlowStep.continue_step_for_id(END_STEP)
)

events: List[Event] = [SlotSet(DIALOGUE_STACK_SLOT, stack.as_dict())]
events: List[Event] = [stack.persist_as_event()]

events.extend([SlotSet(k, v) for k, v in top.corrected_slots.items()])

Expand Down

0 comments on commit ebc254e

Please sign in to comment.