Skip to content

Commit

Permalink
Fixed command abort handling
Browse files Browse the repository at this point in the history
* Added abort support to `Cog.Command.GenCommand`
* Fixed `Cog.Pipeline.ExecutionStage`'s handling of command aborts
* Added `operable:abort-when` w/tests
* Catch error when updating pipeline state to `finished`
  • Loading branch information
Kevin Smith committed Feb 2, 2017
1 parent 1177bb8 commit 4e48734
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 35 deletions.
12 changes: 12 additions & 0 deletions lib/cog/command/gen_command.ex
Expand Up @@ -203,6 +203,10 @@ defmodule Cog.Command.GenCommand do
new_state = %{state | cb_state: cb_state}
send_error_reply(error_message, reply_to, new_state)
{:noreply, new_state}
{:abort, reply_to, message, cb_state} ->
new_state = %{state | cb_state: cb_state}
send_abort_reply(message, reply_to, new_state)
{:noreply, new_state}
{:noreply, cb_state} ->
new_state = %{state | cb_state: cb_state}
{:noreply, new_state}
Expand All @@ -217,6 +221,14 @@ defmodule Cog.Command.GenCommand do

########################################################################

defp send_abort_reply(message, reply_to, state) do
resp = %Cog.Messages.CommandResponse{status: "abort",
status_message: message}
Carrier.Messaging.Connection.publish(state.mq_conn, resp, routed_by: reply_to)
end

########################################################################

defp send_error_reply(error_message, reply_to, state) when is_binary(error_message) do
resp = %Cog.Messages.CommandResponse{status: "error",
status_message: error_message}
Expand Down
31 changes: 31 additions & 0 deletions lib/cog/commands/abort_when.ex
@@ -0,0 +1,31 @@
defmodule Cog.Commands.AbortWhen do
use Cog.Command.GenCommand.Base,
bundle: Cog.Util.Misc.embedded_bundle,
name: "abort-when"

@description "Aborts pipeline when argument evaluates to truthy"

@arguments "value"

@default_message "Pipeline aborted"

option "message", short: "m", type: "string", required: false,
description: "Message sent when pipeline is aborted"

# Allow any user to run
rule "when command is #{Cog.Util.Misc.embedded_bundle}:abort-when allow"

def handle_message(req, state) do
case eval_args(req.args) do
true ->
{:abort, req.reply_to, Map.get(req.options, "message", @default_message), state}
false ->
{:reply, req.reply_to, req.cog_env, state}
end
end

defp eval_args([n|_]) when n > 0, do: true
defp eval_args([true|_]), do: true
defp eval_args(_), do: false

end
7 changes: 6 additions & 1 deletion lib/cog/pipeline.ex
Expand Up @@ -123,7 +123,12 @@ defmodule Cog.Pipeline do
# Close bus connection and exit after last stage exits
# This avoids closing the connection while one of the sinks is still using it
def handle_info({:DOWN, _, _, stage, _}, %__MODULE__{status: :done, stages: [stage]}=state) do
HistoryRepo.update_state(state.request.id, "finished")
try do
HistoryRepo.update_state(state.request.id, "finished")
rescue
e in Postgrex.Error ->
Logger.error("Failed to update state for pipeline #{state.request.id}: #{Exception.message(e)}")
end
Logger.debug("Pipeline #{state.request.id} terminated")
Connection.disconnect(state.conn)
{:stop, :normal, %{state | stages: []}}
Expand Down
65 changes: 49 additions & 16 deletions lib/cog/pipeline/error_sink.ex
Expand Up @@ -4,7 +4,7 @@ defmodule Cog.Pipeline.ErrorSink do
alias Cog.Chat.Adapter, as: ChatAdapter
alias Cog.Events.PipelineEvent
alias Cog.Pipeline
alias Cog.Pipeline.{Destination, DoneSignal, Errors}
alias Cog.Pipeline.{Destination, AbortSignal, DoneSignal, Errors}
alias Cog.Template.Evaluator

@moduledoc ~s"""
Expand Down Expand Up @@ -66,10 +66,12 @@ defmodule Cog.Pipeline.ErrorSink do
end

def handle_events(events, _from, state) do
events = Enum.filter(events, &DoneSignal.error?/1)
events = Enum.filter(events, &want_signal?/1)
state = state
|> Map.update(:all_events, events, &(&1 ++ events))
|> process_errors
|> process_aborts
Pipeline.teardown(state.pipeline)
{:noreply, [], state}
end

Expand All @@ -87,34 +89,65 @@ defmodule Cog.Pipeline.ErrorSink do
Logger.debug("Error sink for pipeline #{state.request.id} shutting down")
end

defp want_signal?(%DoneSignal{}=signal), do: DoneSignal.error?(signal)
defp want_signal?(%AbortSignal{}), do: true
defp want_signal?(_), do: false

defp process_aborts(%__MODULE__{all_events: []}=state), do: state
defp process_aborts(state) do
aborts = Enum.filter(state.all_events, &AbortSignal.abort?/1)
send_to_owner(:aborts, aborts, state)
if state.policy in [:adapter, :adapter_owner] do
dests = Destination.here(state.request)
Enum.each(aborts, &send_abort_to_adapter(&1, dests, state))
end
%{state | all_events: state.all_events -- aborts}
end

defp process_errors(%__MODULE__{all_events: []}=state), do: state
defp process_errors(state) do
send_to_owner(state)
state = if state.policy in [:adapter, :adapter_owner] do
errors = Enum.filter(state.all_events, &DoneSignal.error?/1)
send_to_owner(:errors, errors, state)
if state.policy in [:adapter, :adapter_owner] do
dests = Destination.here(state.request)
Enum.each(state.all_events, &(send_to_adapter(&1, dests, state)))
%{state | all_events: []}
else
state
Enum.each(errors, &(send_error_to_adapter(&1, dests, state)))
end
Pipeline.teardown(state.pipeline)
state
%{state | all_events: state.all_events -- errors}
end

defp send_to_adapter(%DoneSignal{}=signal, dests, state) do
Enum.each(dests, &(send_to_adapter(&1, signal, state)))
defp send_abort_to_adapter(%AbortSignal{}=signal, dests, state) do
Enum.each(dests, &send_abort_to_adapter(&1, signal, state))
end
defp send_to_adapter({type, targets}, signal, state) do
defp send_abort_to_adapter({type, targets}, signal, state) do
context = prepare_abort_context(signal, state)
output = output_for(type, signal, context)
Enum.each(targets, &ChatAdapter.send(state.conn, &1.provider, &1.room, output))
end

defp prepare_abort_context(signal, state) do
%{"cog_env" => signal.cog_env,
"cog_env_text" => Poison.encode!(signal.cog_env, pretty: true),
"invocation" => "#{signal.invocation}",
"pipeline_id" => state.request.id,
"message" => signal.message}
end

defp send_error_to_adapter(%DoneSignal{}=signal, dests, state) do
Enum.each(dests, &(send_error_to_adapter(&1, signal, state)))
end
defp send_error_to_adapter({type, targets}, signal, state) do
context = prepare_error_context(signal, state)
failure_event(signal.error, context["error_message"], state)
output = output_for(type, signal, context)
Enum.each(targets, &ChatAdapter.send(state.conn, &1.provider, &1.room, output))
end

defp send_to_owner(%__MODULE__{all_events: events, policy: policy, owner: owner}=state) when policy in [:owner, :adapter_owner] do
Process.send(owner, {:pipeline, state.request.id, {:error, events}}, [])
defp send_to_owner(type, events, %__MODULE__{policy: policy, owner: owner}=state) when policy in [:owner, :adapter_owner] do
if Enum.count(events) > 0 do
Process.send(owner, {:pipeline, state.request.id, {type, events}}, [])
end
end
defp send_to_owner(_), do: :ok
defp send_to_owner(_, _, _), do: :ok

defp prepare_error_context(signal, state) do
case signal.error do
Expand Down
40 changes: 23 additions & 17 deletions lib/cog/pipeline/execution_stage.ex
Expand Up @@ -6,8 +6,7 @@ defmodule Cog.Pipeline.ExecutionStage do
alias Cog.Events.PipelineEvent
alias Cog.Pipeline.{Binder, OptionParser, PermissionEnforcer}
alias Cog.Messages.{Command, CommandResponse}
alias Cog.Pipeline.DataSignal
alias Cog.Pipeline.DoneSignal
alias Cog.Pipeline.{AbortSignal, DataSignal, DoneSignal}
alias Cog.Pipeline.RelaySelector
alias Cog.Repository.PipelineHistory, as: HistoryRepo
alias Piper.Command.Ast.BadValueError
Expand Down Expand Up @@ -160,10 +159,17 @@ defmodule Cog.Pipeline.ExecutionStage do
defp process_events(%DoneSignal{}=done, _, state, accum) do
{accum ++ [done], state}
end
defp process_events(%AbortSignal{}=abort, _, state, accum) do
{accum ++ [abort], state}
end
defp process_events(%DataSignal{}=current, [next|events], state, accum) do
{current, state} = add_position(current, next, state)
{out_events, state} = invoke_command(current, state)
process_events(next, events, state, accum ++ Enum.reverse(out_events))
case invoke_command(current, state) do
{:cont, out_events, new_state} ->
process_events(next, events, new_state, accum ++ Enum.reverse(out_events))
{:halt, out_events, new_state} ->
{accum ++ Enum.reverse(out_events), new_state}
end
end

defp add_position(signal, %DoneSignal{}, %__MODULE__{stream_position: :init}=state) do
Expand Down Expand Up @@ -199,23 +205,23 @@ defmodule Cog.Pipeline.ExecutionStage do
receive do
{:publish, ^topic, message} ->
HistoryRepo.update_state(state.request_id, "running")
process_response(CommandResponse.decode!(message), state)
process_response(CommandResponse.decode!(message), signal, state)
{:pipeline_complete, ^pipeline} ->
Process.exit(self(), :normal)
after timeout ->
HistoryRepo.update_state(state.request_id, "running")
{[%DoneSignal{error: {:error, :timeout}, invocation: text, template: "error"}], state}
{:halt, [%DoneSignal{error: {:error, :timeout}, invocation: text, template: "error"}], state}
end
error ->
{[%DoneSignal{error: error}], state}
{:halt, [%DoneSignal{error: error}], state}
end
{:error, :denied, rule, text} ->
{[%DoneSignal{error: {:error, :denied, rule}, invocation: text, template: "error"}], state}
{:halt, [%DoneSignal{error: {:error, :denied, rule}, invocation: text, template: "error"}], state}
error ->
{[%DoneSignal{error: error, invocation: "#{state.invocation}", template: "error"}], state}
{:halt, [%DoneSignal{error: error, invocation: "#{state.invocation}", template: "error"}], state}
end
error ->
{[%DoneSignal{error: error}], state}
{:halt, [%DoneSignal{error: error}], state}
end
end

Expand Down Expand Up @@ -245,25 +251,25 @@ defmodule Cog.Pipeline.ExecutionStage do
Probe.notify(event)
end

defp process_response(response, state) do
defp process_response(response, signal, state) do
bundle_version_id = state.invocation.meta.bundle_version_id
case response.status do
"ok" ->
if response.body == nil do
{[], state}
{:cont, [], state}
else
if is_list(response.body) do
{Enum.reduce_while(response.body, [],
{:cont, Enum.reduce_while(response.body, [],
&(expand_output(bundle_version_id, response.template, &1, &2))), state}
else
{[DataSignal.wrap(response.body, bundle_version_id, response.template)], state}
{:cont, [DataSignal.wrap(response.body, bundle_version_id, response.template)], state}
end
end
"abort" ->
signals = [DataSignal.wrap(response.body, bundle_version_id, response.template), %DoneSignal{}]
{signals, state}
signals = [%DoneSignal{}, AbortSignal.wrap(state.invocation, signal.data, response.status_message)]
{:halt, signals, state}
"error" ->
{[%DoneSignal{error: {:error, response.status_message || :unknown_error}, template: response.template}], state}
{:halt, [%DoneSignal{error: {:error, response.status_message || :unknown_error}, template: response.template}], state}
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/cog/pipeline/output_sink.ex
Expand Up @@ -99,6 +99,7 @@ defmodule Cog.Pipeline.OutputSink do
defp want_signal?(%DoneSignal{}=done) do
DoneSignal.error?(done) == false
end
defp want_signal?(_), do: false

def process_output(%__MODULE__{all_events: []}=state, _) do
state
Expand Down
16 changes: 15 additions & 1 deletion lib/cog/pipeline/signals.ex
Expand Up @@ -10,7 +10,6 @@ defmodule Cog.Pipeline.DoneSignal do

end


defmodule Cog.Pipeline.DataSignal do
defstruct [invocation: nil,
template: nil,
Expand All @@ -28,3 +27,18 @@ defmodule Cog.Pipeline.DataSignal do
end

end

defmodule Cog.Pipeline.AbortSignal do
defstruct [message: nil,
cog_env: nil,
invocation: nil,
template: "abort"]

def wrap(invocation, cog_env, message) do
%__MODULE__{invocation: invocation, cog_env: cog_env, message: message}
end

def abort?(%__MODULE__{}), do: true
def abort?(_), do: false

end
7 changes: 7 additions & 0 deletions priv/templates/common/abort.greenbar
@@ -0,0 +1,7 @@
~attachment title=$message color="yellow"~

**Command:** `~$invocation~`
**Calling Environment:** `~$cog_env_text~`
**Pipeline Id:** ~$pipeline_id~

~end~
34 changes: 34 additions & 0 deletions test/integration/abort_test.exs
@@ -0,0 +1,34 @@
defmodule Integration.AbortTest do

use Cog.AdapterCase, provider: "test"

@moduletag integration: :general
@moduletag :command
@moduletag :abort

setup do
user = user("vanstee", first_name: "Patrick", last_name: "Van Stee")
|> with_chat_handle_for("test")

{:ok, %{user: user}}
end

test "abort-when aborts a pipeline", %{user: user} do
response = send_message(user, "@bot: operable:seed '[{\"foo\": 1}]' | operable:abort-when $foo")
assert String.starts_with?(response, "Command: operable:abort-when $foo\nCalling Environment: {\n \"foo\": 1\n}\n")
[response] = send_message(user, "@bot: operable:seed '[{\"foo\": 0}]' | operable:abort-when $foo")
assert response == %{foo: 0}
end

test "abort-when uses custom abort message", %{user: user} do
response = send_message(user, "@bot: operable:seed '[{\"foo\": 1}]' | operable:abort-when -m \"PIPELINE ABORT\" $foo")
assert String.starts_with?(response, "Command: operable:abort-when -m \"PIPELINE ABORT\" $foo\nCalling Environment: {\n \"foo\": 1\n}\n")
end

test "abort-when works in middle of pipeline", %{user: user} do
json = Poison.encode!([%{foo: 0}, %{foo: 0}, %{foo: 1}, %{foo: 0}])
response = send_message(user, "@bot: operable:seed '#{json}' | operable:abort-when $foo | operable:echo foo is $foo")
assert String.starts_with?(response, "Command: operable:abort-when $foo\nCalling Environment: {\n \"foo\": 1\n}\n")
end

end
2 changes: 2 additions & 0 deletions test/support/snoop.ex
Expand Up @@ -145,6 +145,8 @@ defmodule Cog.Snoop do
do: text
defp render_directive(%{"name" => "fixed_width_block", "text" => text}),
do: text
defp render_directive(%{"name" => "bold", "text" => text}),
do: text
# We can also get plain text (e.g., from `echo` output)
defp render_directive(%{"name" => "text", "text" => text}),
do: text
Expand Down

0 comments on commit 4e48734

Please sign in to comment.