Skip to content

Commit

Permalink
Add process tracing (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
akoutmos committed Jul 4, 2022
1 parent 81addbd commit e5b3261
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 2 deletions.
289 changes: 287 additions & 2 deletions lib/kino/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ defmodule Kino.Process do
"""

alias Kino.Markdown
alias Kino.Process.Tracer

@type supervisor :: pid() | atom()
@type trace_target :: :all | pid() | [pid()]

@doc """
Generates a visualization of an application tree.
Expand Down Expand Up @@ -62,7 +64,7 @@ defmodule Kino.Process do
{:dictionary, dictionary} = Process.info(root_supervisor, :dictionary)
[ancestor] = dictionary[:"$ancestors"]

Kino.Markdown.new("""
Markdown.new("""
```mermaid
graph #{direction};
application_master(#{inspect(master)}):::supervisor ---> supervisor_ancestor;
Expand Down Expand Up @@ -126,7 +128,7 @@ defmodule Kino.Process do
direction = direction_from_opts(opts)
edges = traverse_supervisor(supervisor)

Kino.Markdown.new("""
Markdown.new("""
```mermaid
graph #{direction};
#{edges}
Expand All @@ -137,6 +139,289 @@ defmodule Kino.Process do
""")
end

@doc """
Renders a visual of the provided application tree.
This function renders an application tree much like `app_tree/2` with the difference
being that this function can be called anywhere within the Livebook code block
whereas `app_tree/2` must have its result be the last thing returned from the code
block in order to render the visual. It supports the same options as `app_tree/2` as
it delegates to that function to generate the visual.
"""
@spec render_app_tree(atom(), keyword()) :: Kino.nothing()
def render_app_tree(application, opts \\ []) do
application
|> app_tree(opts)
|> Kino.render()

Kino.nothing()
end

@doc """
Renders a sequence diagram of process messages and returns the function result.
This function renders a sequence diagram much like `seq_trace/2` with the difference
being that this function can be called anywhere within the Livebook code block
whereas `seq_trace/2` must have its result be the last thing returned from the code
block in order to render the visual. In addition, this function returns the result
from the provided trace function.
"""
@spec render_seq_trace(trace_target(), (() -> any())) :: any()
def render_seq_trace(trace_target \\ :all, trace_function) do
{func_result, sequence_diagram} = seq_trace(trace_target, trace_function)
Kino.render(sequence_diagram)
func_result
end

@doc """
Renders a visual of the provided supervision tree.
This function renders a supervision tree much like `sup_tree/2` with the difference
being that this function can be called anywhere within the Livebook code block
whereas `sup_tree/2` must have its result be the last thing returned from the code
block in order to render the visual. It supports the same options as `sup_tree/2` as
it delegates to that function to generate the visual.
"""
@spec render_sup_tree(supervisor(), keyword()) :: Kino.nothing()
def render_sup_tree(supervisor, opts \\ []) do
supervisor
|> sup_tree(opts)
|> Kino.render()

Kino.nothing()
end

@doc """
Generate a sequence diagram of process messages starting from `self()`.
The provided function is executed and traced, with all the events sent to and
received by the trace target processes rendered in a sequence diagram. The trace
target argument can either be a single PID, a list of PIDs, or the atom `:all`
depending on what messages you would like to retain in your trace.
## Examples
To generate a trace of all the messages occurring during the execution of the
provided function, you can do the following:
Kino.Process.seq_trace(fn ->
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)
1..2
|> Task.async_stream(
fn value ->
Agent.get(agent_pid, fn value -> value end)
100 * value
end,
max_concurrency: 3
)
|> Stream.run()
Agent.stop(agent_pid)
end)
If you are only interested in messages being sent to or received by certain PIDs,
you can filter the sequence diagram by specifying the PIDs that you are interested
in:
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)
Kino.Process.seq_trace(agent_pid, fn ->
1..2
|> Task.async_stream(
fn value ->
Agent.get(agent_pid, fn value -> value end)
100 * value
end,
max_concurrency: 3
)
|> Stream.run()
Agent.stop(agent_pid)
end)
"""
@spec seq_trace(trace_target(), (() -> any())) :: {any(), Markdown.t()}
def seq_trace(trace_target \\ :all, trace_function)

def seq_trace(pid, trace_function) when is_pid(pid) do
seq_trace([pid], trace_function)
end

def seq_trace(trace_pids, trace_function) when is_list(trace_pids) or trace_pids == :all do
# Set up the process message tracer and the Erlang seq_trace_module
calling_pid = self()
{:ok, tracer_pid} = Tracer.start_link()
:seq_trace.set_token(:send, true)
:seq_trace.set_token(:receive, true)
:seq_trace.set_token(:monotonic_timestamp, true)
previous_tracer = :seq_trace.set_system_tracer(tracer_pid)

# Run the user supplied function and capture the events if no errors were encountered
{raw_trace_events, func_result} =
try do
func_result =
try do
# Run the user provided function
trace_function.()
after
# Reset all of the seq_trace options
:seq_trace.set_system_tracer(previous_tracer)
:seq_trace.reset_trace()
end

{Tracer.get_trace_events(tracer_pid), func_result}
after
# The Tracer GenServer is no longer needed, shut it down
GenServer.stop(tracer_pid)
end

# Get all of the events from the Tracer GenServer
trace_events =
raw_trace_events
|> Enum.filter(fn
# Skip :spawn_reply messages
%{message: {:spawn_reply, _, _, _}} ->
false

# Skip loopback messages
%{from: pid, to: pid} ->
false

# Filter out messages based on the trace target
%{from: from_pid, to: to_pid} ->
trace_pids == :all or from_pid in trace_pids or to_pid in trace_pids

# Reject the rest
_ ->
false
end)
|> Enum.sort_by(fn %{timestamp: timestamp} ->
timestamp
end)

# Get all the participating actors in the trace along with their sequence diagram IDs
{participants_lookup, _idx} =
Enum.reduce(trace_events, {%{}, 0}, fn %{from: from, to: to}, acc ->
acc
|> maybe_add_participant(from)
|> maybe_add_participant(to)
end)

# Generate the Mermaid formatted list of participants
participants =
Enum.map_join(participants_lookup, "\n", fn {pid, idx} ->
if pid == calling_pid do
"participant #{idx} AS self();"
else
generate_participant_entry(pid, idx)
end
end)

# Generate the mermaid formatted list of message events
{formatted_messages, _} =
trace_events
|> Enum.reduce({[], MapSet.new()}, fn %{from: from, to: to, message: message},
{events, started_processes} ->
events = [normalize_message(from, to, message, participants_lookup) | events]

from_idx = Map.get(participants_lookup, from, :not_found)
to_idx = Map.get(participants_lookup, to, :not_found)

cond do
activate?(to_idx, message) ->
{["activate #{to_idx}" | events], MapSet.put(started_processes, to_idx)}

deactivate?(from_idx, message) and MapSet.member?(started_processes, from_idx) ->
{["deactivate #{from_idx}" | events], MapSet.delete(started_processes, from_idx)}

true ->
{events, started_processes}
end
end)

messages =
formatted_messages
|> Enum.reverse()
|> Enum.join("\n")

sequence_diagram =
Markdown.new("""
```mermaid
sequenceDiagram
#{participants}
#{messages}
```
""")

{func_result, sequence_diagram}
end

defp generate_participant_entry(pid, idx) do
case Process.info(pid, :registered_name) do
{:registered_name, name} when is_atom(name) ->
"participant #{idx} AS #{inspect(name)};"

_ ->
"participant #{idx} AS #35;PID#{:erlang.pid_to_list(pid)};"
end
end

defp maybe_add_participant({participants, idx}, pid) when is_pid(pid) do
if Map.has_key?(participants, pid) do
{participants, idx}
else
{Map.put(participants, pid, idx), idx + 1}
end
end

defp maybe_add_participant(acc, _) do
acc
end

defp activate?(idx, {:spawn_request, _, _, _, _, _, _, _}) when idx != :not_found, do: true
defp activate?(_idx, _), do: false

defp deactivate?(idx, {:EXIT, _, _}) when idx != :not_found, do: true
defp deactivate?(_idx, _), do: false

defp normalize_message(from, to, message, participants_lookup)
when is_map_key(participants_lookup, from) and is_map_key(participants_lookup, to) do
formatted_message = label_from_message(message)
from_idx = participants_lookup[from]
to_idx = participants_lookup[to]

"#{from_idx}->>#{to_idx}: #{formatted_message}"
end

defp normalize_message(_, _, _, _), do: ""

defp label_from_message(message) do
case message do
{:EXIT, _, reason} -> "EXIT: #{label_from_reason(reason)}"
{:spawn_request, _, _, _, _, _, _, _} -> "SPAWN"
{:DOWN, _, :process, _, reason} -> "DOWN: #{label_from_reason(reason)}"
{:"$gen_call", _ref, value} -> "CALL: #{label_from_value(value)}"
{:"$gen_cast", value} -> "CAST: #{label_from_value(value)}"
value -> "INFO: #{label_from_value(value)}"
end
end

defp label_from_reason(:normal), do: "normal"
defp label_from_reason(:shutdown), do: "shutdown"
defp label_from_reason({:shutdown, _}), do: "shutdown"
defp label_from_reason(_), do: "abnormal"

defp label_from_value(tuple)
when is_tuple(tuple) and is_atom(elem(tuple, 0)),
do: elem(tuple, 0)

defp label_from_value(atom) when is_atom(atom), do: atom
defp label_from_value(ref) when is_reference(ref), do: inspect(ref)
defp label_from_value(tuple) when is_tuple(tuple), do: "tuple"
defp label_from_value(_), do: "term"

defp direction_from_opts(opts) do
opts
|> Keyword.get(:direction, :top_down)
Expand Down
40 changes: 40 additions & 0 deletions lib/kino/process/tracer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Kino.Process.Tracer do
@moduledoc false

use GenServer

def start_link() do
GenServer.start_link(__MODULE__, nil)
end

def get_trace_events(tracer) do
GenServer.call(tracer, :get_trace_events)
end

@impl true
def init(_) do
{:ok, []}
end

@impl true
def handle_call(:get_trace_events, _from, trace_events) do
{:reply, trace_events, trace_events}
end

@impl true
def handle_info({:seq_trace, _, {:send, _, from, to, message}, timestamp}, trace_events) do
new_event = %{
type: :send,
timestamp: timestamp,
from: from,
to: to,
message: message
}

{:noreply, [new_event | trace_events]}
end

def handle_info(_ignored_event, trace_events) do
{:noreply, trace_events}
end
end

0 comments on commit e5b3261

Please sign in to comment.