Skip to content

Commit

Permalink
Document request proxy and shift boundaries (#2617)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko committed May 24, 2024
1 parent ef25a2e commit 428d9ff
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 50 deletions.
17 changes: 17 additions & 0 deletions lib/livebook/proxy/adapter.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
defmodule Livebook.Proxy.Adapter do
@moduledoc false

# Implements a Plug adapter for handling `conn` within the runtime.
#
# All actions are forwarded to the parent process (`Livebook.Proxy.Server`),
# which operates within Livebook itself.

@behaviour Plug.Conn.Adapter

@impl true
def send_resp({pid, ref}, status, headers, body) do
send(pid, {:send_resp, self(), ref, status, headers, body})

Expand All @@ -11,6 +18,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def get_peer_data({pid, ref}) do
send(pid, {:get_peer_data, self(), ref})

Expand All @@ -20,6 +28,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def get_http_protocol({pid, ref}) do
send(pid, {:get_http_protocol, self(), ref})

Expand All @@ -29,6 +38,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def read_req_body({pid, ref}, opts) do
send(pid, {:read_req_body, self(), ref, opts})

Expand All @@ -40,6 +50,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def send_chunked({pid, ref}, status, headers) do
send(pid, {:send_chunked, self(), ref, status, headers})

Expand All @@ -49,6 +60,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def chunk({pid, ref}, chunk) do
send(pid, {:chunk, self(), ref, chunk})

Expand All @@ -59,6 +71,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def inform({pid, ref}, status, headers) do
send(pid, {:inform, self(), ref, status, headers})

Expand All @@ -68,6 +81,7 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def send_file({pid, ref}, status, headers, path, offset, length) do
%File.Stat{type: :regular, size: size} = File.stat!(path)

Expand All @@ -90,7 +104,10 @@ defmodule Livebook.Proxy.Adapter do
end
end

@impl true
def upgrade(_payload, _protocol, _opts), do: {:error, :not_supported}

@impl true
def push(_payload, _path, _headers), do: {:error, :not_supported}

defp exit_fun(fun, arity, reason) do
Expand Down
57 changes: 47 additions & 10 deletions lib/livebook/proxy/handler.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,58 @@
defmodule Livebook.Proxy.Handler do
@moduledoc false

# Handles request `conn` with the configured function.
#
# The handler forwards all actual communication to the parent
# `Livebook.Proxy.Server` via `Livebook.Proxy.Adapter`.
#
# A handler process is started on demand under a task supervisor.
# To avoid bottlenecks, we use a partition supervisor, so that we
# have a group of task supervisors ready.

@name __MODULE__

@doc """
Returns a child spec to setup the handler supervision tree.
Expects the `:listen` option to be provided, and be a function with
the signature `Plug.Conn.t() -> Plug.Conn.t()`.
"""
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(opts) do
name = Keyword.fetch!(opts, :name)
listen = Keyword.fetch!(opts, :listen)
:persistent_term.put({__MODULE__, name}, listen)
PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: name)
:persistent_term.put({__MODULE__, :listen}, listen)
PartitionSupervisor.child_spec(child_spec: Task.Supervisor, name: @name)
end

def serve(parent, name, data) when is_pid(parent) and is_atom(name) do
Process.link(parent)
ref = Process.monitor(parent)
conn = struct!(Plug.Conn, %{data | adapter: {Livebook.Proxy.Adapter, {parent, ref}}})
:persistent_term.get({__MODULE__, name}).(conn)
@doc """
Handles request with the configured listener function.
Restores `%Plug.Conn{}` from the given attributes and delegates
all response handling back to the parent `Livebook.Proxy.Server`.
"""
@spec serve(pid(), map()) :: Plug.Conn.t()
def serve(parent_pid, %{} = conn_attrs) when is_pid(parent_pid) do
Process.link(parent_pid)
ref = Process.monitor(parent_pid)

conn =
struct!(Plug.Conn, %{conn_attrs | adapter: {Livebook.Proxy.Adapter, {parent_pid, ref}}})

listen = :persistent_term.get({__MODULE__, :listen})
listen.(conn)
end

def get_pid(name, key) do
GenServer.whereis({:via, PartitionSupervisor, {name, key}})
@doc """
Returns a pid of task supervisor to start the handler under.
In case no supervisor is running, returns `nil`.
"""
@spec get_supervisor_pid() :: pid() | nil
def get_supervisor_pid() do
if Process.whereis(@name) do
key = :rand.uniform()
GenServer.whereis({:via, PartitionSupervisor, {@name, key}})
end
end
end
38 changes: 31 additions & 7 deletions lib/livebook/proxy/server.ex
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
defmodule Livebook.Proxy.Server do
@moduledoc false

# The entrypoint for delegating `conn` handling to a runtime.
#
# The `Livebook.Proxy` modules are an implementation detail of the
# runtime. `Livebook.Proxy.Server` lives on the Livebook-side and
# it delegates request handling to `Livebook.Proxy.Handler`, which
# lives in the runtime node. The handler uses a custom plug adapter
# that dispatches `%Plug.Conn{}` operations as messages back to the
# server.
#
# Note that the server is not itself a new process, it is whoever
# calls `serve/2`.

import Plug.Conn

def serve(pid, name, %Plug.Conn{} = conn) when is_pid(pid) and is_atom(name) do
args = [self(), name, build_client_conn(conn)]
{:ok, spawn_pid} = Task.Supervisor.start_child(pid, Livebook.Proxy.Handler, :serve, args)
monitor_ref = Process.monitor(spawn_pid)
@doc """
Handles a request by delegating to a new handler process in the
runtime.
This function blocks until the request handler is done and it returns
the final `conn`.
"""
@spec serve(pid(), Plug.Conn.t()) :: Plug.Conn.t()
def serve(supervisor_pid, %Plug.Conn{} = conn) when is_pid(supervisor_pid) do
args = [self(), build_handler_conn(conn)]

{:ok, handler_pid} =
Task.Supervisor.start_child(supervisor_pid, Livebook.Proxy.Handler, :serve, args)

monitor_ref = Process.monitor(handler_pid)
loop(monitor_ref, conn)
end

defp build_client_conn(conn) do
defp build_handler_conn(conn) do
%{
adapter: nil,
host: conn.host,
Expand Down Expand Up @@ -71,8 +95,8 @@ defmodule Livebook.Proxy.Server do
send(pid, {ref, :ok})
loop(monitor_ref, conn)

{:DOWN, ^monitor_ref, :process, _pid, reason} ->
{conn, reason}
{:DOWN, ^monitor_ref, :process, _pid, _reason} ->
conn
end
end
end
15 changes: 11 additions & 4 deletions lib/livebook/runtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ defprotocol Livebook.Runtime do
source: atom()
}

@type proxy_handler_spec :: {module :: module(), function :: atom(), args :: list()}

@doc """
Returns relevant information about the runtime.
Expand Down Expand Up @@ -1103,10 +1105,15 @@ defprotocol Livebook.Runtime do
def unregister_clients(runtime, client_ids)

@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
When the handler is available, this function returns MFA. In order
to handle a connection, the caller should invoke the MFA, appending
`conn` to the argument list, where `conn` is a `%Plug.Conn{}` struct
for the specific request.
TODO: document the communication here.
Once done, the handler MFA should return the final `conn`.
"""
@spec fetch_proxy_handler(t(), pid()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(runtime, client_pid)
@spec fetch_proxy_handler_spec(t()) :: {:ok, proxy_handler_spec()} | {:error, :not_found}
def fetch_proxy_handler_spec(runtime)
end
4 changes: 2 additions & 2 deletions lib/livebook/runtime/attached.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end

def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end
end
4 changes: 2 additions & 2 deletions lib/livebook/runtime/elixir_standalone.ex
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end

def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end
end
4 changes: 2 additions & 2 deletions lib/livebook/runtime/embedded.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
RuntimeServer.unregister_clients(runtime.server_pid, client_ids)
end

def fetch_proxy_handler(runtime, client_pid) do
RuntimeServer.fetch_proxy_handler(runtime.server_pid, client_pid)
def fetch_proxy_handler_spec(runtime) do
RuntimeServer.fetch_proxy_handler_spec(runtime.server_pid)
end

defp config() do
Expand Down
17 changes: 10 additions & 7 deletions lib/livebook/runtime/erl_dist/runtime_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end

@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
"""
@spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found}
def fetch_proxy_handler(pid, client_pid) do
GenServer.call(pid, {:fetch_proxy_handler, client_pid})
@spec fetch_proxy_handler_spec(pid()) ::
{:ok, {module(), atom(), list()}} | {:error, :not_found}
def fetch_proxy_handler_spec(pid) do
with {:ok, supervisor_pid} <- GenServer.call(pid, :fetch_proxy_handler_supervisor) do
{:ok, {Livebook.Proxy.Server, :serve, [supervisor_pid]}}
end
end

@doc """
Expand Down Expand Up @@ -752,9 +755,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:reply, has_dependencies?, state}
end

def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do
if pid = Livebook.Proxy.Handler.get_pid(Kino.Proxy, client_pid) do
{:reply, {:ok, pid}, state}
def handle_call(:fetch_proxy_handler_supervisor, _from, state) do
if supervisor_pid = Livebook.Proxy.Handler.get_supervisor_pid() do
{:reply, {:ok, supervisor_pid}, state}
else
{:reply, {:error, :not_found}, state}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/livebook/runtime/evaluator/io_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
end

defp io_request({:livebook_get_proxy_handler_child_spec, fun}, state) do
result = {Livebook.Proxy.Handler, name: Kino.Proxy, listen: fun}
result = {Livebook.Proxy.Handler, listen: fun}
{result, state}
end

Expand Down
13 changes: 7 additions & 6 deletions lib/livebook/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,12 @@ defmodule Livebook.Session do
end

@doc """
Fetches the running Proxy Handler's pid from runtime.
Fetches information about a proxy request handler, if available.
"""
@spec fetch_proxy_handler(pid(), pid()) :: {:ok, pid()} | {:error, :not_found | :disconnected}
def fetch_proxy_handler(pid, client_pid) do
GenServer.call(pid, {:fetch_proxy_handler, client_pid})
@spec fetch_proxy_handler_spec(pid()) ::
{:ok, Runtime.proxy_handler_spec()} | {:error, :not_found | :disconnected}
def fetch_proxy_handler_spec(pid) do
GenServer.call(pid, :fetch_proxy_handler_spec)
end

@doc """
Expand Down Expand Up @@ -1081,9 +1082,9 @@ defmodule Livebook.Session do
{:noreply, state}
end

def handle_call({:fetch_proxy_handler, client_pid}, _from, state) do
def handle_call(:fetch_proxy_handler_spec, _from, state) do
if Runtime.connected?(state.data.runtime) do
{:reply, Runtime.fetch_proxy_handler(state.data.runtime, client_pid), state}
{:reply, Runtime.fetch_proxy_handler_spec(state.data.runtime), state}
else
{:reply, {:error, :disconnected}, state}
end
Expand Down
20 changes: 12 additions & 8 deletions lib/livebook_web/plugs/proxy_plug.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule LivebookWeb.ProxyPlug do
@behaviour Plug

import Plug.Conn

alias LivebookWeb.NotFoundError
Expand All @@ -10,10 +11,9 @@ defmodule LivebookWeb.ProxyPlug do
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
{conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn)

conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
end

Expand All @@ -25,10 +25,9 @@ defmodule LivebookWeb.ProxyPlug do
end

session = fetch_session!(id)
pid = fetch_proxy_handler!(session)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
{conn, _} = Livebook.Proxy.Server.serve(pid, Kino.Proxy, conn)

conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
end

Expand All @@ -50,8 +49,8 @@ defmodule LivebookWeb.ProxyPlug do
end
end

defp fetch_proxy_handler!(session) do
case Livebook.Session.fetch_proxy_handler(session.pid, self()) do
defp fetch_proxy_handler_spec!(session) do
case Livebook.Session.fetch_proxy_handler_spec(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
end
Expand All @@ -60,4 +59,9 @@ defmodule LivebookWeb.ProxyPlug do
defp prepare_conn(conn, path_info, script_name) do
%{conn | path_info: path_info, script_name: conn.script_name ++ script_name}
end

defp call_proxy_handler(proxy_handler_spec, conn) do
{module, function, args} = proxy_handler_spec
apply(module, function, args ++ [conn])
end
end
2 changes: 1 addition & 1 deletion test/support/noop_runtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ defmodule Livebook.Runtime.NoopRuntime do

def register_clients(_, _), do: :ok
def unregister_clients(_, _), do: :ok
def fetch_proxy_handler(_, _), do: {:error, :not_found}
def fetch_proxy_handler_spec(_), do: {:error, :not_found}

defp trace(runtime, fun, args) do
if runtime.trace_to do
Expand Down

0 comments on commit 428d9ff

Please sign in to comment.