Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support proxy requests on generic app URL and await session execution #2618

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions lib/livebook/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ defmodule Livebook.Session do
GenServer.call(pid, {:register_client, client_pid, user}, @timeout)
end

@doc """
Resets the auto shutdown timer, if ticking.

When the session has connected clients, nothing changes.
"""
@spec reset_auto_shutdown(pid()) :: :ok
def reset_auto_shutdown(pid) do
GenServer.cast(pid, :reset_auto_shutdown)
end

@doc """
Returns data of the given session.
"""
Expand Down Expand Up @@ -959,25 +969,31 @@ defmodule Livebook.Session do
defp schedule_auto_shutdown(state) do
client_count = map_size(state.data.clients_map)

case {client_count, state.auto_shutdown_timer_ref} do
{0, nil} when state.auto_shutdown_ms != nil ->
cond do
client_count == 0 and state.auto_shutdown_timer_ref == nil and state.auto_shutdown_ms != nil ->
timer_ref = Process.send_after(self(), :close, state.auto_shutdown_ms)
%{state | auto_shutdown_timer_ref: timer_ref}

{client_count, timer_ref} when client_count > 0 and timer_ref != nil ->
if Process.cancel_timer(timer_ref) == false do
receive do
:close -> :ok
end
end

%{state | auto_shutdown_timer_ref: nil}
client_count > 0 ->
cancel_auto_shutdown_timer(state)

_ ->
true ->
state
end
end

defp cancel_auto_shutdown_timer(%{auto_shutdown_timer_ref: nil} = state), do: state

defp cancel_auto_shutdown_timer(state) do
if Process.cancel_timer(state.auto_shutdown_timer_ref) == false do
receive do
:close -> :ok
end
end

%{state | auto_shutdown_timer_ref: nil}
end

@impl true
def handle_continue(:app_init, state) do
cell_ids = Data.cell_ids_for_full_evaluation(state.data, [])
Expand Down Expand Up @@ -1091,6 +1107,13 @@ defmodule Livebook.Session do
end

@impl true
def handle_cast(:reset_auto_shutdown, state) do
{:noreply,
state
|> cancel_auto_shutdown_timer()
|> schedule_auto_shutdown()}
end

def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
client_id = client_id(state, client_pid)
operation = {:set_notebook_attributes, client_id, attrs}
Expand Down Expand Up @@ -2466,7 +2489,7 @@ defmodule Livebook.Session do
status = state.data.app_data.status
send(state.app_pid, {:app_status_changed, state.session_id, status})

notify_update(state)
state
end

defp handle_action(state, :app_recover) do
Expand Down
7 changes: 7 additions & 0 deletions lib/livebook_web/errors.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule LivebookWeb.NotFoundError do
defexception [:message, plug_status: 404]
end

defmodule LivebookWeb.BadRequestError do
defexception [:message, plug_status: 400]
end
3 changes: 0 additions & 3 deletions lib/livebook_web/errors/not_found_error.ex

This file was deleted.

73 changes: 63 additions & 10 deletions lib/livebook_web/plugs/proxy_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,43 @@ defmodule LivebookWeb.ProxyPlug do
@impl true
def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do
session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end

def call(%{path_info: ["apps", slug, id, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)

unless Enum.any?(app.sessions, &(&1.id == id)) do
raise NotFoundError, "could not find an app session matching #{inspect(id)}"
raise NotFoundError, "could not find an app session with id #{inspect(id)}"
end

session = fetch_session!(id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, id, "proxy"])
call_proxy_handler(proxy_handler_spec, conn)
end

def call(%{path_info: ["apps", slug, "proxy" | path_info]} = conn, _opts) do
app = fetch_app!(slug)

if app.multi_session do
raise LivebookWeb.BadRequestError,
"the requested app is multi-session. In order to send requests to this app," <>
" you need to start a session and use its specific URL"
end

session_id = Livebook.App.get_session_id(app.pid)
{:ok, session} = Livebook.Sessions.fetch_session(session_id)
Livebook.Session.reset_auto_shutdown(session.pid)
await_app_session_ready(app, session.id)
proxy_handler_spec = fetch_proxy_handler_spec!(session)
conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"])
conn = call_proxy_handler(proxy_handler_spec, conn)
halt(conn)
call_proxy_handler(proxy_handler_spec, conn)
end

def call(conn, _opts) do
Expand All @@ -38,21 +57,27 @@ defmodule LivebookWeb.ProxyPlug do
defp fetch_app!(slug) do
case Livebook.Apps.fetch_app(slug) do
{:ok, app} -> app
:error -> raise NotFoundError, "could not find an app matching #{inspect(slug)}"
:error -> raise NotFoundError, "could not find an app with slug #{inspect(slug)}"
end
end

defp fetch_session!(id) do
case Livebook.Sessions.fetch_session(id) do
{:ok, session} -> session
{:error, _} -> raise NotFoundError, "could not find a session matching #{id}"
{:error, _} -> raise NotFoundError, "could not find a session with id #{id}"
end
end

defp fetch_proxy_handler_spec!(session) do
case Livebook.Session.fetch_proxy_handler_spec(session.pid) do
{:ok, pid} -> pid
{:error, _} -> raise NotFoundError, "could not find a kino proxy running"
{:ok, pid} ->
pid

{:error, _} ->
raise NotFoundError,
"the session does not listen to proxied requests." <>
" See the Kino.Proxy documentation to learn about defining" <>
" custom request handlers"
end
end

Expand All @@ -62,6 +87,34 @@ defmodule LivebookWeb.ProxyPlug do

defp call_proxy_handler(proxy_handler_spec, conn) do
{module, function, args} = proxy_handler_spec
apply(module, function, args ++ [conn])
conn = apply(module, function, args ++ [conn])
halt(conn)
end

defp await_app_session_ready(app, session_id) do
unless session_ready?(app, session_id) do
Livebook.App.subscribe(app.slug)
# We fetch the app again, in case it had changed before we subscribed
app = Livebook.App.get_by_pid(app.pid)
await_session_execution_loop(app, session_id)
Livebook.App.unsubscribe(app.slug)
end
end

defp await_session_execution_loop(%{slug: slug} = app, session_id) do
unless session_ready?(app, session_id) do
receive do
{:app_updated, %{slug: ^slug} = app} ->
await_session_execution_loop(app, session_id)
end
end
end

defp session_ready?(app, session_id) do
if session = Enum.find(app.sessions, &(&1.id == session_id)) do
session.app_status.execution != :executing
else
false
end
end
end
52 changes: 52 additions & 0 deletions test/livebook/proxy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,58 @@ defmodule Livebook.ProxyTest do
assert text_response(put(conn, url), 200) == "used PUT method"
assert text_response(patch(conn, url), 200) == "used PATCH method"
assert text_response(delete(conn, url), 200) == "used DELETE method"

# Generic path also works for single-session apps
url = "/apps/#{slug}/proxy/"

assert text_response(get(conn, url), 200) == "used GET method"
end

test "waits for the session to be executed before attempting the request", %{conn: conn} do
slug = Livebook.Utils.random_short_id()

app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
auto_shutdown_ms: 5_000
}

notebook = %{proxy_notebook() | app_settings: app_settings}

Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)

assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}

# The app is configured with auto shutdown, so the session will
# start only once requested. We should wait until it executes
# and then proxy the request as usual
url = "/apps/#{slug}/proxy/"

assert text_response(get(conn, url), 200) == "used GET method"
end

test "returns error when requesting generic path for multi-session app", %{conn: conn} do
slug = Livebook.Utils.random_short_id()

app_settings = %{
Notebook.AppSettings.new()
| slug: slug,
access_type: :public,
multi_session: true
}

notebook = %{proxy_notebook() | app_settings: app_settings}

Livebook.Apps.subscribe()
pid = deploy_notebook_sync(notebook)

assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}}

assert_error_sent 400, fn ->
get(conn, "/apps/#{slug}/proxy/foo/bar")
end
end
end

Expand Down
Loading