Skip to content

Commit

Permalink
refactor(runtime,serverless,cli): parallel requests to the same isola…
Browse files Browse the repository at this point in the history
…te (#676)

* refactor(runtime,serverless,cli): parallel requests

* refactor: clean unused code

* refactor(cli,wpt-runner): update to new runtime

* fix: memory limit termination

* fix: remove thread_ids & add observability

* feat: timeout

* feat: terminate isolates

* feat: disable cronjob

* chore: clean comments

* fix: pub sub cache

* fix: per-request context

* fix: clean

* feat: add statistics

* chore: add changeset

* fix: missing import in release mode

* feat: optimize
  • Loading branch information
QuiiBz committed Mar 20, 2023
1 parent 499bb53 commit 54e37e3
Show file tree
Hide file tree
Showing 37 changed files with 1,827 additions and 1,779 deletions.
7 changes: 7 additions & 0 deletions .changeset/new-cats-kiss.md
@@ -0,0 +1,7 @@
---
'@lagon/cli': patch
'@lagon/runtime': patch
'@lagon/serverless': patch
---

Allow parallel requests to the same isolate
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/cli/Cargo.toml
Expand Up @@ -15,7 +15,6 @@ colored = "2.0.0"
dirs = "4.0.0"
webbrowser = "0.8.7"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }
tokio-util = { version = "0.7.7", features = ["rt"] }
hyper = { version = "0.14", features = ["client", "server", "http1", "runtime", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
99 changes: 54 additions & 45 deletions crates/cli/src/commands/dev.rs
Expand Up @@ -8,6 +8,7 @@ use hyper::{Body, Request as HyperRequest, Response as HyperResponse, Server};
use lagon_runtime::{options::RuntimeOptions, Runtime};
use lagon_runtime_http::{Request, Response, RunResult, X_FORWARDED_FOR, X_LAGON_REGION};
use lagon_runtime_isolate::{options::IsolateOptions, Isolate};
use lagon_runtime_isolate::{IsolateEvent, IsolateRequest};
use lagon_runtime_utils::assets::{find_asset, handle_asset};
use lagon_runtime_utils::response::{handle_response, ResponseEvent, FAVICON_URL};
use log::{
Expand All @@ -21,8 +22,8 @@ use std::convert::Infallible;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
use tokio_util::task::LocalPoolHandle;

use crate::utils::{bundle_function, error, info, input, success, warn, Assets, FunctionConfig};

Expand Down Expand Up @@ -79,10 +80,8 @@ async fn handle_request(
req: HyperRequest<Body>,
public_dir: Option<PathBuf>,
ip: String,
content: Arc<Mutex<(Vec<u8>, Assets)>>,
environment_variables: HashMap<String, String>,
isolate_lock: Arc<Mutex<Option<Isolate>>>,
pool: LocalPoolHandle,
assets: Arc<Mutex<Assets>>,
isolate_tx: flume::Sender<IsolateEvent>,
) -> Result<HyperResponse<Body>> {
let url = req.uri().path();

Expand All @@ -94,7 +93,7 @@ async fn handle_request(
);

let (tx, rx) = flume::unbounded();
let (index, assets) = content.lock().await.to_owned();
let assets = assets.lock().await.to_owned();

let is_favicon = url == FAVICON_URL;

Expand All @@ -120,29 +119,13 @@ async fn handle_request(
request.set_header(X_FORWARDED_FOR.to_string(), ip);
request.set_header(X_LAGON_REGION.to_string(), LOCAL_REGION.to_string());

pool.spawn_pinned_by_idx(
move || async move {
if isolate_lock.lock().await.is_none() {
let isolate = Isolate::new(
IsolateOptions::new(
String::from_utf8(index).expect("Code is not UTF-8"),
)
.timeout(Duration::from_secs(1))
.startup_timeout(Duration::from_secs(2))
.metadata(Some((String::from(""), String::from(""))))
.environment_variables(environment_variables),
);

*isolate_lock.lock().await = Some(isolate);
}

let mut isolate = isolate_lock.lock().await;
let isolate = isolate.as_mut().unwrap();

isolate.run(request, tx).await;
},
0,
);
isolate_tx
.send_async(IsolateEvent::Request(IsolateRequest {
request,
sender: tx,
}))
.await
.unwrap_or(());
}
Err(error) => {
println!("Error while parsing request: {error}");
Expand Down Expand Up @@ -228,7 +211,8 @@ pub async fn dev(

let (index, assets) = bundle_function(&function_config, &root)?;

let content = Arc::new(Mutex::new((index, assets)));
let server_index = index.clone();
let assets = Arc::new(Mutex::new(assets));

let runtime =
Runtime::new(RuntimeOptions::default().allow_code_generation(allow_code_generation));
Expand All @@ -243,18 +227,45 @@ pub async fn dev(
.assets
.as_ref()
.map(|assets| root.join(assets));
let server_content = Arc::clone(&content);
let environment_variables = parse_environment_variables(&root, env)?;
let isolate_lock = Arc::new(Mutex::new(None));
let server_isolate_lock = Arc::clone(&isolate_lock);
let pool = LocalPoolHandle::new(1);

let (tx, rx) = flume::unbounded();
let (index_tx, index_rx) = flume::unbounded();
let handle = Handle::current();

std::thread::spawn(move || {
handle.block_on(async move {
let mut index = server_index;

loop {
let mut isolate = Isolate::new(
IsolateOptions::new(
String::from_utf8(index.clone()).expect("Code is not UTF-8"),
)
.timeout(Duration::from_secs(1))
.startup_timeout(Duration::from_secs(2))
.metadata(Some((String::from(""), String::from(""))))
.environment_variables(environment_variables.clone()),
rx.clone(),
);

isolate.evaluate();

tokio::select! {
_ = isolate.run_event_loop() => {},
new_index = index_rx.recv_async() => {
index = new_index.unwrap();
}
}
}
});
});

let server_assets = Arc::clone(&assets);
let server = Server::bind(&addr).serve(make_service_fn(move |conn: &AddrStream| {
let public_dir = server_public_dir.clone();
let content = Arc::clone(&server_content);
let environment_variables = environment_variables.clone();
let isolate_lock = Arc::clone(&server_isolate_lock);
let pool = pool.clone();
let assets = Arc::clone(&server_assets);
let tx = tx.clone();

let addr = conn.remote_addr();
let ip = addr.ip().to_string();
Expand All @@ -265,10 +276,8 @@ pub async fn dev(
req,
public_dir.clone(),
ip.clone(),
Arc::clone(&content),
environment_variables.clone(),
Arc::clone(&isolate_lock),
pool.clone(),
Arc::clone(&assets),
tx.clone(),
)
}))
}
Expand Down Expand Up @@ -298,10 +307,10 @@ pub async fn dev(
print!("\x1B[2J\x1B[1;1H");
println!("{}", info("Found change, updating..."));

let (index, assets) = bundle_function(&function_config, &root)?;
let (new_index, new_assets) = bundle_function(&function_config, &root)?;

*content.lock().await = (index, assets);
*isolate_lock.lock().await = None;
*assets.lock().await = new_assets;
index_tx.send_async(new_index).await.unwrap();
}
}

Expand Down
50 changes: 27 additions & 23 deletions crates/runtime/tests/allow_codegen.rs
@@ -1,42 +1,46 @@
use lagon_runtime_http::{Request, Response, RunResult};
use lagon_runtime_isolate::{options::IsolateOptions, Isolate};
use lagon_runtime_isolate::options::IsolateOptions;

mod utils;

#[tokio::test]
async fn allow_eval() {
utils::setup_allow_codegen();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const result = eval('1 + 1')
return new Response(result)
let (mut isolate, send, receiver) =
utils::create_isolate_without_snapshot(IsolateOptions::new(
"export function handler() {
const result = eval('1 + 1')
return new Response(result)
}"
.into(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;
.into(),
));
send(Request::default());

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Response(Response::from("2"))
);
tokio::select! {
_ = isolate.run_event_loop() => {}
result = receiver.recv_async() => {
assert_eq!(result.unwrap(), RunResult::Response(Response::from("2")));
}
}
}

#[tokio::test]
async fn allow_function() {
utils::setup_allow_codegen();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
let (mut isolate, send, receiver) =
utils::create_isolate_without_snapshot(IsolateOptions::new(
"export function handler() {
const result = new Function('return 1 + 1')
return new Response(result())
}"
.into(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;
.into(),
));
send(Request::default());

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Response(Response::from("2"))
);
tokio::select! {
_ = isolate.run_event_loop() => {}
result = receiver.recv_async() => {
assert_eq!(result.unwrap(), RunResult::Response(Response::from("2")));
}
}
}

4 comments on commit 54e37e3

@vercel
Copy link

@vercel vercel bot commented on 54e37e3 Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

storybook – ./packages/ui

storybook-lagon.vercel.app
storybook-git-main-lagon.vercel.app
storybook-swart-eight.vercel.app
ui.lagon.app

@vercel
Copy link

@vercel vercel bot commented on 54e37e3 Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

docs – ./packages/docs

docs-lagon.vercel.app
lagon-docs.vercel.app
docs-git-main-lagon.vercel.app
docs.lagon.app

@vercel
Copy link

@vercel vercel bot commented on 54e37e3 Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

dashboard – ./packages/dashboard

dashboard-lagon.vercel.app
dashboard-git-main-lagon.vercel.app
dash.lagon.app

@vercel
Copy link

@vercel vercel bot commented on 54e37e3 Mar 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

www – ./www

www-lagon.vercel.app
lagon.app
www-git-main-lagon.vercel.app
lagon.dev

Please sign in to comment.