Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding process tracing #165

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5763c27
Added process tracing to Kino
akoutmos Jul 2, 2022
2330772
Update lib/kino/process.ex
akoutmos Jul 2, 2022
a82ebbc
Addressing PR feedback
akoutmos Jul 2, 2022
6304d23
Merge branch 'add_process_tracing_visualization' of github.com:akoutm…
akoutmos Jul 2, 2022
a114f04
Update lib/kino/process.ex
akoutmos Jul 2, 2022
7134659
Fixing error in tracer
akoutmos Jul 2, 2022
740bfe4
Merge branch 'add_process_tracing_visualization' of github.com:akoutm…
akoutmos Jul 2, 2022
d3cf354
Adding try do as not to leak tracer processes
akoutmos Jul 2, 2022
e7e815b
Update lib/kino/process.ex
akoutmos Jul 2, 2022
7c18680
Update lib/kino/process.ex
akoutmos Jul 2, 2022
19927c5
Addressing PR feedback
akoutmos Jul 2, 2022
357f216
Merge branch 'add_process_tracing_visualization' of github.com:akoutm…
akoutmos Jul 2, 2022
ad6b14f
Update lib/kino/process.ex
akoutmos Jul 2, 2022
37c56cb
Update lib/kino/process.ex
akoutmos Jul 2, 2022
53891d3
Fixing formatting
akoutmos Jul 2, 2022
cfef80b
Reworking exception handling
akoutmos Jul 2, 2022
4297985
Update lib/kino/process.ex
akoutmos Jul 2, 2022
b0804ca
Update lib/kino/process.ex
akoutmos Jul 2, 2022
52b9520
Addressing PR feedback
akoutmos Jul 2, 2022
6fdb020
Fixing issue when processes are stopped inside trace but started outs…
akoutmos Jul 2, 2022
9f341c7
Minor docs change
akoutmos Jul 2, 2022
0ac5146
Calling PID show up as self() in seq diagram
akoutmos Jul 3, 2022
5952a8d
Update lib/kino/process.ex
akoutmos Jul 4, 2022
7762e75
Update lib/kino/process.ex
akoutmos Jul 4, 2022
f8de392
Update lib/kino/process/tracer.ex
akoutmos Jul 4, 2022
2b55111
Update lib/kino/process/tracer.ex
akoutmos Jul 4, 2022
4ac71be
Switching to a map
akoutmos Jul 4, 2022
9a5b965
Added render_* functions
akoutmos Jul 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
212 changes: 210 additions & 2 deletions lib/kino/process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ defmodule Kino.Process do
"""

alias Kino.Markdown
alias Kino.Process.Tracer

@type supervisor :: pid() | atom()
@type trace_target :: :all | pid() | [pid()]

@doc """
Generates a visualization of an application tree.
Expand Down Expand Up @@ -62,7 +64,7 @@ defmodule Kino.Process do
{:dictionary, dictionary} = Process.info(root_supervisor, :dictionary)
[ancestor] = dictionary[:"$ancestors"]

Kino.Markdown.new("""
Markdown.new("""
```mermaid
graph #{direction};
application_master(#{inspect(master)}):::supervisor ---> supervisor_ancestor;
Expand Down Expand Up @@ -126,7 +128,7 @@ defmodule Kino.Process do
direction = direction_from_opts(opts)
edges = traverse_supervisor(supervisor)

Kino.Markdown.new("""
Markdown.new("""
```mermaid
graph #{direction};
#{edges}
Expand All @@ -137,6 +139,212 @@ defmodule Kino.Process do
""")
end

@doc """
Generate a sequence diagram of process messages.
akoutmos marked this conversation as resolved.
Show resolved Hide resolved

The provided function is executed and traced, with all the events sent to and
received by the trace target processes rendered in a sequence diagram. The trace
target argument can either be a single PID, a list of PIDs, or the atom `:all`
depending on what messages you would like to retain in your trace.

## Examples

To generate a trace of all the messages occurring during the execution of the
provided function, you can do the following:

Kino.Process.seq_trace(:all, fn ->
{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)

1..2
|> Task.async_stream(fn thing ->
Agent.get(agent_pid, fn thing -> thing end)
100 * thing
end,
max_concurrency: 3)
|> Stream.run()

Agent.stop(agent_pid)
end)

If you are only interested in messages being sent to or received by certain PIDs,
you can filter the sequence diagram by specifying the PIDs that you are interested
in:

{:ok, agent_pid} = Agent.start_link(fn -> [] end)
Process.monitor(agent_pid)

Kino.Process.seq_trace(agent_pid, fn ->
1..2
|> Task.async_stream(fn thing ->
Agent.get(agent_pid, fn thing -> thing end)
100 * thing
end,
max_concurrency: 3)
|> Stream.run()

Agent.stop(agent_pid)
end)
"""
@spec seq_trace(trace_target(), (() -> any())) :: Markdown.t()
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
def seq_trace(pid, trace_function) when is_pid(pid) do
seq_trace([pid], trace_function)
end

def seq_trace(trace_pids, trace_function) do
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
# Set up the process message tracer and the Erlang seq_trace_module
{:ok, tracer_pid} = Tracer.start_link(nil)
:seq_trace.set_token(:send, true)
:seq_trace.set_token(:receive, true)
:seq_trace.set_token(:monotonic_timestamp, true)
previous_tracer = :seq_trace.set_system_tracer(tracer_pid)

# Run the user supplied function and capture the events if
# no errors were encountered
trace_events =
try do
# Run the user provided function
trace_function.()

# Extract all of the events from the system tracer
Tracer.get_trace_events(tracer_pid)
after
# The Tracer GenServer is no longer needed, shut it down
GenServer.stop(tracer_pid)

# Reset all of the tracing options
:seq_trace.set_system_tracer(previous_tracer)
:seq_trace.reset_trace()
end

# Get all of the events from the Tracer GenServer
trace_events =
trace_events
|> Enum.filter(fn
# Skip :spawn_reply messages
{_, _, _, _, {:spawn_reply, _, _, _}} ->
false

# Skip loopback messages
{_, _, pid, pid, _message} ->
false

# Filter out messages based on the trace target
{_, _, from_pid, to_pid, _message} ->
trace_pids == :all or from_pid in trace_pids or to_pid in trace_pids

# Reject the rest
_ ->
false
end)
|> Enum.sort_by(fn {_type, timestamp, _from, _to, _message} ->
timestamp
end)

# Get all the participating actors in the trace along with their sequence diagram IDs
{participants_lookup, _idx} =
trace_events
|> Enum.reduce({%{}, 0}, fn {_type, _timestamp, from, to, _message}, acc ->
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
acc
|> maybe_add_participant(from)
|> maybe_add_participant(to)
end)

# Generate the Mermaid formatted list of participants
participants =
participants_lookup
|> Enum.map_join("\n", fn {pid, idx} ->
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
case Process.info(pid, :registered_name) do
nil ->
"participant #{idx} AS #35;PID#{:erlang.pid_to_list(pid)};"

{:registered_name, []} ->
"participant #{idx} AS #35;PID#{:erlang.pid_to_list(pid)};"

{:registered_name, name} ->
"participant #{idx} AS #{inspect(name)};"
end
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
end)

# Generate the mermaid formatted list of message events
messages =
trace_events
|> Enum.map_join("\n", fn {type, _timestamp, from, to, message} ->
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
normalize_message(type, from, to, message, participants_lookup)
end)

Markdown.new("""
```mermaid
sequenceDiagram
#{participants}
#{messages}
```
""")
end

defp maybe_add_participant({participants, idx}, pid) when is_pid(pid) do
if Map.has_key?(participants, pid) do
{participants, idx}
else
{Map.put(participants, pid, idx), idx + 1}
end
end

defp maybe_add_participant(acc, _) do
acc
end

defp maybe_activate(idx, {:spawn_request, _, _, _, _, _, _, _}), do: "\nactivate #{idx}"
defp maybe_activate(_, _), do: ""

defp maybe_deactivate(idx, {:EXIT, _, _}), do: "\ndeactivate #{idx}"
defp maybe_deactivate(_, _), do: ""

defp normalize_message(type, from, to, message, participants_lookup)
when is_map_key(participants_lookup, from) and is_map_key(participants_lookup, to) do
from_idx = participants_lookup[from]
to_idx = participants_lookup[to]

formatted_message = label_from_message(message)

case type do
:send ->
"#{from_idx}->>#{to_idx}: #{formatted_message}" <>
maybe_activate(to_idx, message) <>
maybe_deactivate(from_idx, message)

:receive ->
"#{to_idx}-->>#{from_idx}: #{formatted_message}"
end
end

defp normalize_message(_, _, _, _, _), do: ""

defp label_from_message(message) do
case message do
{:EXIT, _, reason} -> "EXIT: #{label_from_reason(reason)}"
{:spawn_request, _, _, _, _, _, _, _} -> "SPAWN"
{:DOWN, _, :process, _, reason} -> "DOWN: #{label_from_reason(reason)}"
{:"$gen_call", _ref, value} -> "CALL: #{label_from_value(value)}"
{:"$gen_cast", value} -> "CAST: #{label_from_value(value)}"
value -> "INFO: #{label_from_value(value)}"
end
end

defp label_from_reason(:normal), do: "normal"
defp label_from_reason(:shutdown), do: "shutdown"
defp label_from_reason({:shutdown, _}), do: "shutdown"
defp label_from_reason(_), do: "abnormal"

defp label_from_value(tuple)
when is_tuple(tuple) and is_atom(elem(tuple, 0)),
do: elem(tuple, 0)

defp label_from_value(atom) when is_atom(atom), do: atom
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
defp label_from_value(ref) when is_reference(ref), do: inspect(ref)
defp label_from_value(tuple) when is_tuple(tuple), do: "tuple"
defp label_from_value(_), do: "term"

defp direction_from_opts(opts) do
opts
|> Keyword.get(:direction, :top_down)
Expand Down
38 changes: 38 additions & 0 deletions lib/kino/process/tracer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Kino.Process.Tracer do
@moduledoc false

use GenServer

def start_link(_) do
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
GenServer.start_link(__MODULE__, nil)
end

# ---- Client API ----

@doc false
def get_trace_events(tracer) do
GenServer.call(tracer, :get_trace_events)
end

# ---- Callbacks ----
akoutmos marked this conversation as resolved.
Show resolved Hide resolved

@impl true
def init(_) do
{:ok, []}
end

@impl true
def handle_call(:get_trace_events, _from, trace_events) do
{:reply, trace_events, trace_events}
end

@impl true
def handle_info({:seq_trace, _, {:send, _, from, to, message}, timestamp}, trace_events) do
new_event = {:send, timestamp, from, to, message}
akoutmos marked this conversation as resolved.
Show resolved Hide resolved
{:noreply, [new_event | trace_events]}
end

def handle_info(_ignored_event, trace_events) do
{:noreply, trace_events}
end
end