Skip to content

Commit

Permalink
Support proxy requests on generic app URL and await session execution
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatanklosko committed May 24, 2024
1 parent 428d9ff commit 5dd0972
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 25 deletions.
47 changes: 35 additions & 12 deletions lib/livebook/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ defmodule Livebook.Session do
GenServer.call(pid, {:register_client, client_pid, user}, @timeout)
end

@doc """
Resets the auto shutdown timer, if ticking.
When the session has connected clients, nothing changes.
"""
@spec reset_auto_shutdown(pid()) :: :ok
def reset_auto_shutdown(pid) do
GenServer.cast(pid, :reset_auto_shutdown)
end

@doc """
Returns data of the given session.
"""
Expand Down Expand Up @@ -959,25 +969,31 @@ defmodule Livebook.Session do
defp schedule_auto_shutdown(state) do
client_count = map_size(state.data.clients_map)

case {client_count, state.auto_shutdown_timer_ref} do
{0, nil} when state.auto_shutdown_ms != nil ->
cond do
client_count == 0 and state.auto_shutdown_timer_ref == nil and state.auto_shutdown_ms != nil ->
timer_ref = Process.send_after(self(), :close, state.auto_shutdown_ms)
%{state | auto_shutdown_timer_ref: timer_ref}

{client_count, timer_ref} when client_count > 0 and timer_ref != nil ->
if Process.cancel_timer(timer_ref) == false do
receive do
:close -> :ok
end
end

%{state | auto_shutdown_timer_ref: nil}
client_count > 0 ->
cancel_auto_shutdown_timer(state)

_ ->
true ->
state
end
end

defp cancel_auto_shutdown_timer(%{auto_shutdown_timer_ref: nil} = state), do: state

defp cancel_auto_shutdown_timer(state) do
if Process.cancel_timer(state.auto_shutdown_timer_ref) == false do
receive do
:close -> :ok
end
end

%{state | auto_shutdown_timer_ref: nil}
end

@impl true
def handle_continue(:app_init, state) do
cell_ids = Data.cell_ids_for_full_evaluation(state.data, [])
Expand Down Expand Up @@ -1091,6 +1107,13 @@ defmodule Livebook.Session do
end

@impl true
def handle_cast(:reset_auto_shutdown, state) do
{:noreply,
state
|> cancel_auto_shutdown_timer()
|> schedule_auto_shutdown()}
end

def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
client_id = client_id(state, client_pid)
operation = {:set_notebook_attributes, client_id, attrs}
Expand Down Expand Up @@ -2466,7 +2489,7 @@ defmodule Livebook.Session do
status = state.data.app_data.status
send(state.app_pid, {:app_status_changed, state.session_id, status})

notify_update(state)
state
end

defp handle_action(state, :app_recover) do
Expand Down
7 changes: 7 additions & 0 deletions lib/livebook_web/errors.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule LivebookWeb.NotFoundError do
defexception [:message, plug_status: 404]
end

defmodule LivebookWeb.BadRequestError do
defexception [:message, plug_status: 400]
end
3 changes: 0 additions & 3 deletions lib/livebook_web/errors/not_found_error.ex

This file was deleted.

71 changes: 61 additions & 10 deletions lib/livebook_web/plugs/proxy_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,43 @@ defmodule LivebookWeb.ProxyPlug do
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end

def call(%{path_info: ["apps", slug, id, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)

unless Enum.any?(app.sessions, &(&1.id == id)) do
raise NotFoundError, "could not find an app session matching #{inspect(id)}"
raise NotFoundError, "could not find an app session with id #{inspect(id)}"
end

session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, id, "proxy"])
call_proxy_handler(proxy_handler_spec, conn)
end

def call(%{path_info: ["apps", slug, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)

if app.multi_session do
raise LivebookWeb.BadRequestError,
"the requested app is multi-session. In order to send requests to this app," <>
" you need to start a session and use its specific URL"
end

session_id = Livebook.App.get_session_id(app.pid)
{:ok, session} = Livebook.Sessions.fetch_session(session_id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end

def call(conn, _opts) do
Expand All @@ -38,21 +57,27 @@ defmodule LivebookWeb.ProxyPlug do
defp fetch_app!(slug) do
case Livebook.Apps.fetch_app(slug) do
{:ok, app} -> app
:error -> raise NotFoundError, "could not find an app matching #{inspect(slug)}"
:error -> raise NotFoundError, "could not find an app with slug #{inspect(slug)}"
end
end

defp fetch_session!(id) do
case Livebook.Sessions.fetch_session(id) do
{:ok, session} -> session
{:error, _} -> raise NotFoundError, "could not find a session matching #{id}"
{:error, _} -> raise NotFoundError, "could not find a session with id #{id}"
end
end

defp fetch_proxy_handler_spec!(session) do
case Livebook.Session.fetch_proxy_handler_spec(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
{:ok, pid} ->
pid

{:error, _} ->
raise NotFoundError,
"the session does not listen to proxied requests." <>
" See the Kino.Proxy documentation to learn about defining" <>
" custom request handlers"
end
end

Expand All @@ -62,6 +87,32 @@ defmodule LivebookWeb.ProxyPlug do

defp call_proxy_handler(proxy_handler_spec, conn) do
{module, function, args} = proxy_handler_spec
apply(module, function, args ++ [conn])
conn = apply(module, function, args ++ [conn])
halt(conn)
end

defp await_app_session_ready(app, session_id) do
unless session_ready?(app, session_id) do
Livebook.App.subscribe(app.slug)
await_session_execution_loop(app, session_id)
Livebook.App.unsubscribe(app.slug)
end
end

defp await_session_execution_loop(%{slug: slug}, session_id) do
receive do
{:app_updated, %{slug: ^slug} = app} ->
unless session_ready?(app, session_id) do
await_session_execution_loop(app, session_id)
end
end
end

defp session_ready?(app, session_id) do
if session = Enum.find(app.sessions, &(&1.id == session_id)) do
session.app_status.execution != :executing
else
false
end
end
end
52 changes: 52 additions & 0 deletions test/livebook/proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,58 @@ defmodule Livebook.ProxyTest do
assert text_response(put(conn, url), 200) == "used PUT method"
assert text_response(patch(conn, url), 200) == "used PATCH method"
assert text_response(delete(conn, url), 200) == "used DELETE method"

# Generic path also works for single-session apps
url = "/apps/#{slug}/proxy/"

assert text_response(get(conn, url), 200) == "used GET method"
end

test "waits for the session to be executed before attempting the request", %{conn: conn} do
slug = Livebook.Utils.random_short_id()

app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
auto_shutdown_ms: 5_000
}

notebook = %{proxy_notebook() | app_settings: app_settings}

Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)

assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}

# The app is configured with auto shutdown, so the session will
# start only once requested. We should wait until it executes
# and then proxy the request as usual
url = "/apps/#{slug}/proxy/"

assert text_response(get(conn, url), 200) == "used GET method"
end

test "returns error when requesting generic path for multi-session app", %{conn: conn} do
slug = Livebook.Utils.random_short_id()

app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
multi_session: true
}

notebook = %{proxy_notebook() | app_settings: app_settings}

Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)

assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}

assert_error_sent 400, fn ->
get(conn, "/apps/#{slug}/proxy/foo/bar")
end
end
end

Expand Down

0 comments on commit 5dd0972

Please sign in to comment.