diff --git a/.changeset/silly-wombats-cheer.md b/.changeset/silly-wombats-cheer.md new file mode 100644 index 000000000..5c5aee7ee --- /dev/null +++ b/.changeset/silly-wombats-cheer.md @@ -0,0 +1,8 @@ +--- +'@lagon/cli': patch +'@lagon/runtime': patch +'@lagon/serverless': patch +'@lagon/wpt-runner': patch +--- + +Improve performance by always using Hyper types diff --git a/.github/renovate.json b/.github/renovate.json index 86371abe6..30facd152 100644 --- a/.github/renovate.json +++ b/.github/renovate.json @@ -14,7 +14,7 @@ "pin", "digest" ], - "automerge": true + "automerge": false }, { "matchDepTypes": [ @@ -38,4 +38,4 @@ ], "automergeStrategy": "squash", "automergeType": "branch" -} \ No newline at end of file +} diff --git a/Cargo.lock b/Cargo.lock index a240a65c9..8a401bbd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,9 +146,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "async-compression" -version = "0.3.14" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345fd392ab01f746c717b1357165b76f0b67a60192007b234058c9045fdcf695" +checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11" dependencies = [ "flate2", "futures-core", @@ -165,7 +165,7 @@ checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -176,7 +176,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -304,7 +304,7 @@ version = "0.59.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cexpr", "clang-sys", "lazy_static", @@ -323,6 +323,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6776fc96284a0bb647b615056fc496d1fe1644a7ab01829818a6d91cae888b84" + [[package]] name = "bitvec" version = "1.0.1" @@ -556,7 +562,7 @@ checksum = "914c8c79fb560f238ef6429439a30023c862f7a28e688c58f7203f12b29970bd" dependencies = [ "anstream", "anstyle", - "bitflags", + "bitflags 1.3.2", "clap_lex", "strsim", ] @@ -570,7 +576,7 @@ dependencies = [ "heck 0.4.0", "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -832,6 +838,41 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" +[[package]] +name = "darling" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0558d22a7b463ed0241e993f76f09f30b126687447751a8638587b864e4b3944" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8bfa2e259f8ee1ce5e97824a3c55ec4404a0d772ca7fa96bf19f0752a046eb" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.16", +] + +[[package]] +name = "darling_macro" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.16", +] + [[package]] name = "dashmap" version = "5.4.0" @@ -1270,7 +1311,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -1518,7 +1559,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -1551,6 +1592,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -1590,7 +1637,7 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" dependencies = [ - "bitflags", + "bitflags 1.3.2", "inotify-sys", "libc", ] @@ -1715,7 +1762,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" dependencies = [ - "bitflags", + "bitflags 1.3.2", "libc", ] @@ -1754,6 +1801,7 @@ version = "0.1.0" dependencies = [ "flume", "httptest", + "hyper", "lagon-runtime-http", "lagon-runtime-isolate", "log", @@ -1832,6 +1880,7 @@ name = "lagon-runtime-v8-utils" version = "0.1.0" dependencies = [ "anyhow", + "hyper", "v8", ] @@ -1908,6 +1957,7 @@ version = "0.1.0" dependencies = [ "console", "flume", + "hyper", "lagon-runtime", "lagon-runtime-http", "lagon-runtime-isolate", @@ -2005,9 +2055,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.140" +version = "0.2.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" +checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" [[package]] name = "libloading" @@ -2071,11 +2121,11 @@ dependencies = [ [[package]] name = "lru" -version = "0.8.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909" +checksum = "03f1160296536f10c833a82dca22267d5486734230d47bf00bf435885814ba1e" dependencies = [ - "hashbrown 0.12.3", + "hashbrown 0.13.2", ] [[package]] @@ -2190,7 +2240,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -2252,9 +2302,9 @@ dependencies = [ [[package]] name = "mysql" -version = "23.0.1" +version = "24.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f11339ca5c251941805d51362a07823605a80586ced92914ab7de84fba813f" +checksum = "cfe2babc5f5b354eab9c0a0e40da3e69c4d77421c8b9b6ee03f97acc75bd7955" dependencies = [ "bufstream", "bytes", @@ -2271,21 +2321,39 @@ dependencies = [ "percent-encoding", "serde", "serde_json", - "socket2", + "socket2 0.5.3", "twox-hash", "url", ] +[[package]] +name = "mysql-common-derive" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f" +dependencies = [ + "darling", + "heck 0.4.0", + "num-bigint", + "proc-macro-crate 1.2.1", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.16", + "termcolor", + "thiserror", +] + [[package]] name = "mysql_common" -version = "0.29.2" +version = "0.30.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9006c95034ccf7b903d955f210469119f6c3477fc9c9e7a7845ce38a3e665c2a" +checksum = "73b8fb568c9537cf4f1ad39e2542aa74a66bf89883e550df2cb30a8f0c0f0355" dependencies = [ - "base64 0.13.0", + "base64 0.21.0", "bigdecimal", "bindgen", - "bitflags", + "bitflags 2.3.1", "bitvec", "byteorder", "bytes", @@ -2296,6 +2364,7 @@ dependencies = [ "frunk", "lazy_static", "lexical", + "mysql-common-derive", "num-bigint", "num-traits", "rand", @@ -2367,11 +2436,11 @@ dependencies = [ [[package]] name = "notify" -version = "5.1.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9" +checksum = "4d9ba6c734de18ca27c8cef5cd7058aa4ac9f63596131e4c7e41e579319032a2" dependencies = [ - "bitflags", + "bitflags 1.3.2", "crossbeam-channel", "filetime", "fsevent-sys", @@ -2380,7 +2449,7 @@ dependencies = [ "libc", "mio", "walkdir", - "windows-sys 0.42.0", + "windows-sys 0.45.0", ] [[package]] @@ -2526,7 +2595,7 @@ version = "0.10.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "618febf65336490dfcf20b73f885f5651a0c89c64c2d4a8c3662585a70bf5bd0" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "foreign-types", "libc", @@ -2652,11 +2721,12 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "pem" -version = "1.1.1" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" +checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a" dependencies = [ - "base64 0.13.0", + "base64 0.21.0", + "serde", ] [[package]] @@ -2792,6 +2862,30 @@ dependencies = [ "toml", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.98", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.20+deprecated" @@ -2800,9 +2894,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.53" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba466839c78239c09faf015484e5cc04860f88242cff4d03eb038f04b4699b73" +checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" dependencies = [ "unicode-ident", ] @@ -2904,7 +2998,7 @@ version = "10.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -2944,7 +3038,7 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -3001,9 +3095,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.17" +version = "0.11.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13293b639a097af28fc8a90f22add145a9c954e49d77da06263d58cf44d5fb91" +checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ "async-compression", "base64 0.21.0", @@ -3185,7 +3279,7 @@ version = "0.37.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e78cc525325c06b4a7ff02db283472f3c042b7ff0c391f96c6d5ac6f4f91b75" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", @@ -3287,7 +3381,7 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3400,7 +3494,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.10", + "syn 2.0.16", ] [[package]] @@ -3514,6 +3608,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "spin" version = "0.5.2" @@ -3589,9 +3693,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.10" +version = "2.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aad1363ed6d37b84299588d62d3a7d95b5a5c2d9aad5c85609fda12afaa1f40" +checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" dependencies = [ "proc-macro2", "quote", @@ -3618,6 +3722,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.34" @@ -3697,7 +3810,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.9", "tokio-macros", "windows-sys 0.42.0", ] @@ -3916,9 +4029,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.3.2" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dad5567ad0cf5b760e5665964bec1b47dfd077ba8a2544b513f3556d3d239a2" +checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" dependencies = [ "getrandom", "rand", @@ -3926,11 +4039,11 @@ dependencies = [ [[package]] name = "v8" -version = "0.71.1" +version = "0.71.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a2ece81e9f3d573376d5301b0d1c1c0ffcb63d57e6164ddf1bc844b4c8a23b" +checksum = "1a4bbfd886a9c2f87170438c0cdb6b1ddbfe80412ab591c83d24c7e48e487313" dependencies = [ - "bitflags", + "bitflags 1.3.2", "fslock", "once_cell", "which", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 487e844e2..daa78c921 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -22,7 +22,7 @@ pathdiff = "0.2.1" hyper-tls = { version = "0.5.0", features = ["vendored"] } flume = "0.10.14" chrono = "0.4.24" -notify = "5.1.0" +notify = "6.0.0" envfile = "0.2.1" anyhow = "1.0.71" urlencoding = "2.1.2" diff --git a/crates/cli/src/commands/dev.rs b/crates/cli/src/commands/dev.rs index bbc91a387..4fb79b067 100644 --- a/crates/cli/src/commands/dev.rs +++ b/crates/cli/src/commands/dev.rs @@ -5,9 +5,9 @@ use dialoguer::console::style; use envfile::EnvFile; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request as HyperRequest, Response as HyperResponse, Server}; +use hyper::{Body, Request, Response, Server}; use lagon_runtime::{options::RuntimeOptions, Runtime}; -use lagon_runtime_http::{Request, Response, RunResult, X_FORWARDED_FOR, X_LAGON_REGION}; +use lagon_runtime_http::{RunResult, X_FORWARDED_FOR, X_LAGON_REGION}; use lagon_runtime_isolate::{options::IsolateOptions, Isolate}; use lagon_runtime_isolate::{IsolateEvent, IsolateRequest}; use lagon_runtime_utils::assets::{find_asset, handle_asset}; @@ -56,12 +56,12 @@ fn parse_environment_variables( // except that we don't have multiple deployments and such multiple // threads to manage, and we don't manager logs and metrics. async fn handle_request( - req: HyperRequest, + req: Request, public_dir: Option, ip: String, assets: Arc>, isolate_tx: flume::Sender, -) -> Result> { +) -> Result> { let url = req.uri().path(); let (tx, rx) = flume::unbounded(); @@ -86,10 +86,7 @@ async fn handle_request( tx.send_async(run_result).await.unwrap_or(()); } else if is_favicon { tx.send_async(RunResult::Response( - Response { - status: 404, - ..Default::default() - }, + Response::builder().status(404).body(Body::empty())?, None, )) .await @@ -102,27 +99,21 @@ async fn handle_request( url ); - match Request::from_hyper(req).await { - Ok(mut request) => { - request.set_header(X_FORWARDED_FOR.to_string(), ip); - request.set_header(X_LAGON_REGION.to_string(), LOCAL_REGION.to_string()); - - isolate_tx - .send_async(IsolateEvent::Request(IsolateRequest { - request, - sender: tx, - })) - .await - .unwrap_or(()); - } - Err(error) => { - println!("Error while parsing request: {error}"); + let (mut parts, body) = req.into_parts(); + let body = hyper::body::to_bytes(body).await?; - tx.send_async(RunResult::Error("Error while parsing request".into())) - .await - .unwrap_or(()); - } - }; + parts.headers.insert(X_FORWARDED_FOR, ip.parse()?); + parts.headers.insert(X_LAGON_REGION, LOCAL_REGION.parse()?); + + let request = (parts, body); + + isolate_tx + .send_async(IsolateEvent::Request(IsolateRequest { + request, + sender: tx, + })) + .await + .unwrap_or(()); } handle_response(rx, |event| async move { @@ -141,7 +132,7 @@ async fn handle_request( ); } ResponseEvent::LimitsReached(result) => { - if result == RunResult::Timeout { + if result.is_timeout() { println!("{} Function execution timed out", style("✕").red()); } else { println!( diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 1aee9e900..2b4bbf50e 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -v8 = "0.71.1" +v8 = "0.71.2" [dev-dependencies] tokio = { version = "1", features = ["rt", "time", "macros"] } @@ -14,6 +14,7 @@ lagon-runtime-http = { path = "../runtime_http" } lagon-runtime-isolate = { path = "../runtime_isolate" } log = { version = "0.4.17", features = ["std", "kv_unstable", "kv_unstable_serde"] } serial_test = "2.0.0" +hyper = { version = "0.14.26", features = [] } [features] default = [] diff --git a/crates/runtime/tests/allow_codegen.rs b/crates/runtime/tests/allow_codegen.rs index f34e55765..aa8c01fcb 100644 --- a/crates/runtime/tests/allow_codegen.rs +++ b/crates/runtime/tests/allow_codegen.rs @@ -1,4 +1,4 @@ -use lagon_runtime_http::{Request, Response}; +use hyper::{header::CONTENT_TYPE, Request, Response}; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -15,10 +15,14 @@ return new Response(result) )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("2") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("2".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -33,8 +37,12 @@ async fn allow_function() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("2") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("2".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/async_context.rs b/crates/runtime/tests/async_context.rs index a710f56c1..ffd768fa9 100644 --- a/crates/runtime/tests/async_context.rs +++ b/crates/runtime/tests/async_context.rs @@ -1,4 +1,4 @@ -use lagon_runtime_http::{Request, Response}; +use hyper::{header::CONTENT_TYPE, Request, Response}; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -22,10 +22,14 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -47,10 +51,7 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::default() - ); + utils::assert_response(&receiver, Response::default()).await; } #[tokio::test] @@ -73,10 +74,7 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::default() - ); + utils::assert_response(&receiver, Response::default()).await; } #[tokio::test] @@ -113,10 +111,7 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::default() - ); + utils::assert_response(&receiver, Response::default()).await; } #[tokio::test] @@ -165,10 +160,7 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::default() - ); + utils::assert_response(&receiver, Response::default()).await; } #[tokio::test] @@ -199,17 +191,25 @@ export async function handler() { ); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("2") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("2".into()) + .unwrap(), + ) + .await; send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("4") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("4".into()) + .unwrap(), + ) + .await; assert_eq!( logs_receiver.recv_async().await.unwrap(), diff --git a/crates/runtime/tests/crypto.rs b/crates/runtime/tests/crypto.rs index 5b8fbaf93..41a0573ab 100644 --- a/crates/runtime/tests/crypto.rs +++ b/crates/runtime/tests/crypto.rs @@ -1,7 +1,7 @@ -use std::time::Duration; - -use lagon_runtime_http::{Request, Response, RunResult}; +use hyper::{header::CONTENT_TYPE, Request, Response}; +use lagon_runtime_http::RunResult; use lagon_runtime_isolate::options::IsolateOptions; +use std::time::Duration; mod utils; @@ -18,10 +18,14 @@ async fn crypto_random_uuid() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("string 36 false") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("string 36 false".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -37,10 +41,14 @@ async fn crypto_get_random_values() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true 3 3 false false") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true 3 3 false false".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -56,10 +64,14 @@ async fn crypto_get_random_values_update_in_place() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("3 false") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("3 false".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -74,13 +86,14 @@ async fn crypto_get_random_values_throw_not_typedarray() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Error( "Uncaught TypeError: Parameter 1 is not of type 'TypedArray'\n at handler (2:27)" - .to_string() - ) - ); + .to_string(), + ), + ) + .await; } #[tokio::test] @@ -102,10 +115,14 @@ async fn crypto_key_value() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("object 6") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("object 6".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -134,10 +151,14 @@ async fn crypto_unique_key_value() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("false") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("false".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -163,10 +184,14 @@ async fn crypto_sign() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true 32") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true 32".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -196,10 +221,14 @@ async fn crypto_verify() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -215,10 +244,16 @@ async fn crypto_digest_sha1() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("20 183,226,62,194,154,242,43,11,78,65,218,49,232,104,213,114,38,18,28,132") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body( + "20 183,226,62,194,154,242,43,11,78,65,218,49,232,104,213,114,38,18,28,132".into(), + ) + .unwrap(), + ) + .await; } #[tokio::test] @@ -234,10 +269,16 @@ async fn crypto_digest_string() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("32 9,202,126,78,170,110,138,233,199,210,97,22,113,41,24,72,131,100,77,7,223,186,124,191,188,76,138,46,8,54,13,91"), - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body( + "32 9,202,126,78,170,110,138,233,199,210,97,22,113,41,24,72,131,100,77,7,223,186,124,191,188,76,138,46,8,54,13,91".into(), + ) + .unwrap(), + ) + .await; } #[tokio::test] @@ -253,10 +294,16 @@ async fn crypto_digest_object() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("32 9,202,126,78,170,110,138,233,199,210,97,22,113,41,24,72,131,100,77,7,223,186,124,191,188,76,138,46,8,54,13,91"), - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body( + "32 9,202,126,78,170,110,138,233,199,210,97,22,113,41,24,72,131,100,77,7,223,186,124,191,188,76,138,46,8,54,13,91".into(), + ) + .unwrap(), + ) + .await; } #[tokio::test] @@ -286,10 +333,14 @@ async fn crypto_encrypt_aes_gcm() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true 28") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true 28".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -325,10 +376,14 @@ async fn crypto_decrypt_aes_gcm() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("hello, world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("hello, world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -358,10 +413,14 @@ async fn crypto_encrypt_aes_cbc() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true 16") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true 16".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -397,10 +456,14 @@ async fn crypto_decrypt_aes_cbc() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("hello, world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("hello, world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -430,10 +493,14 @@ async fn crypto_encrypt_aes_ctr() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true 12") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true 12".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -469,10 +536,14 @@ async fn crypto_decrypt_aes_ctr() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("hello, world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("hello, world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -505,10 +576,14 @@ async fn crypto_hkdf_derive_bits() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("16") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("16".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -541,10 +616,14 @@ async fn crypto_pbkdf2_derive_bits() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("16") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("16".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -597,10 +676,14 @@ async fn crypto_ecdh_derive_bits() { ); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("256 384") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("256 384".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -639,10 +722,14 @@ async fn crypto_derive_key() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true true true true true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true true true true true".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -679,10 +766,14 @@ async fn crypto_ecdsa_sign_verify() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -724,10 +815,14 @@ async fn crypto_rsa_pss_sign_verify() { ); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -769,8 +864,12 @@ async fn crypto_rsa_ssa_sign_verify() { ); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("true") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("true".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/disallow_codegen.rs b/crates/runtime/tests/disallow_codegen.rs index d52f15c33..fe350676e 100644 --- a/crates/runtime/tests/disallow_codegen.rs +++ b/crates/runtime/tests/disallow_codegen.rs @@ -1,4 +1,5 @@ -use lagon_runtime_http::{Request, RunResult}; +use hyper::Request; +use lagon_runtime_http::RunResult; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -15,9 +16,11 @@ async fn disallow_eval() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Error( + utils::assert_run_result( + &receiver, + RunResult::Error( "Uncaught EvalError: Code generation from strings disallowed for this context\n at handler (2:20)".into() - )); + )).await; } #[tokio::test] @@ -32,7 +35,9 @@ async fn disallow_function() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Error( + utils::assert_run_result( + &receiver, + RunResult::Error( "Uncaught EvalError: Code generation from strings disallowed for this context\n at handler (2:20)".into() - )); + )).await; } diff --git a/crates/runtime/tests/errors.rs b/crates/runtime/tests/errors.rs index 79b01bf2e..75f06e408 100644 --- a/crates/runtime/tests/errors.rs +++ b/crates/runtime/tests/errors.rs @@ -1,4 +1,5 @@ -use lagon_runtime_http::{Request, RunResult}; +use hyper::Request; +use lagon_runtime_http::RunResult; use lagon_runtime_isolate::options::IsolateOptions; use std::time::Duration; @@ -11,12 +12,13 @@ async fn no_handler() { utils::create_isolate(IsolateOptions::new("console.log('Hello')".into())); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Error( - "Uncaught Error: Handler function is not defined or is not a function".into() - ) - ); + "Uncaught Error: Handler function is not defined or is not a function".into(), + ), + ) + .await; } #[tokio::test] @@ -26,12 +28,13 @@ async fn handler_not_function() { utils::create_isolate(IsolateOptions::new("export const handler = 'Hello'".into())); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Error( - "Uncaught Error: Handler function is not defined or is not a function".into() - ) - ); + "Uncaught Error: Handler function is not defined or is not a function".into(), + ), + ) + .await; } #[tokio::test] @@ -45,10 +48,11 @@ async fn handler_reject() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: Rejected\n at handler (2:11)".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: Rejected\n at handler (2:11)".into()), + ) + .await; } #[tokio::test] @@ -62,10 +66,11 @@ async fn compilation_error() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught SyntaxError: Unexpected identifier 'syntax'".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught SyntaxError: Unexpected identifier 'syntax'".into()), + ) + .await; } #[tokio::test] @@ -81,13 +86,14 @@ export function handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Error( "Uncaught Error: Can't import modules, everything should be bundled in a single file" - .into() - ) - ); + .into(), + ), + ) + .await; } #[tokio::test] @@ -102,7 +108,7 @@ async fn execution_tick_timeout_reached() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Timeout); + utils::assert_run_result(&receiver, RunResult::Timeout).await; } #[tokio::test] @@ -117,7 +123,7 @@ export function handler() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Timeout); + utils::assert_run_result(&receiver, RunResult::Timeout).await; } #[tokio::test] @@ -132,7 +138,7 @@ async fn total_timeout_reached() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Timeout); + utils::assert_run_result(&receiver, RunResult::Timeout).await; } #[tokio::test] @@ -160,7 +166,7 @@ async fn memory_reached() { ); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::MemoryLimit); + utils::assert_run_result(&receiver, RunResult::MemoryLimit).await; } #[tokio::test] @@ -182,5 +188,8 @@ export function handler() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Error("Uncaught TypeError: a is not a function\n at test (2:12)\n at first (6:12)\n at handler (10:25)".into())); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught TypeError: a is not a function\n at test (2:12)\n at first (6:12)\n at handler (10:25)".into()) + ).await; } diff --git a/crates/runtime/tests/fetch.rs b/crates/runtime/tests/fetch.rs index d8871597f..2e0154dd5 100644 --- a/crates/runtime/tests/fetch.rs +++ b/crates/runtime/tests/fetch.rs @@ -1,5 +1,6 @@ use httptest::{matchers::*, responders::*, Expectation, Server}; -use lagon_runtime_http::{Request, Response, RunResult}; +use hyper::{header::CONTENT_TYPE, Request, Response}; +use lagon_runtime_http::RunResult; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -22,10 +23,14 @@ async fn basic_fetch() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -49,10 +54,14 @@ async fn request_method() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -60,7 +69,7 @@ async fn request_method_fallback() { utils::setup(); let server = Server::run(); server.expect( - Expectation::matching(request::method_path("GET", "/")) + Expectation::matching(request::method_path("UNKNOWN", "/")) .respond_with(status_code(200).body("Hello, World")), ); let url = server.url("/"); @@ -76,10 +85,14 @@ async fn request_method_fallback() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -101,17 +114,21 @@ async fn request_headers() { headers: {{ 'x-token': 'hello' }} - }}).then(res => res.text()); + }}).then(res => res.text()); return new Response(body); }}" ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -140,10 +157,14 @@ async fn request_headers_class() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -171,10 +192,14 @@ async fn request_body() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -204,10 +229,14 @@ async fn response_headers() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("content-length: 0 content-type: text/plain;charset=UTF-8 x-token: hello") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("content-length: 0 content-type: text/plain;charset=UTF-8 x-token: hello".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -234,10 +263,14 @@ async fn response_status() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Moved: 200") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Moved: 200".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -260,10 +293,14 @@ async fn response_json() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from(r#"object {"hello":"world"}"#) - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body(r#"object {"hello":"world"}"#.into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -286,13 +323,7 @@ async fn response_array_buffer() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response { - body: "Hello, World".into(), - ..Default::default() - } - ); + utils::assert_response(&receiver, Response::new("Hello, World".into())).await; } #[tokio::test] @@ -309,10 +340,11 @@ async fn throw_invalid_url() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: client requires absolute-form URIs".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: client requires absolute-form URIs".into()), + ) + .await; } #[tokio::test] @@ -333,10 +365,11 @@ async fn throw_invalid_header() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: failed to parse header value".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: failed to parse header value".into()), + ) + .await; } #[tokio::test] @@ -366,10 +399,14 @@ async fn abort_signal() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Aborted") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Aborted".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -390,10 +427,14 @@ async fn redirect() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("200") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("200".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -418,10 +459,14 @@ async fn redirect_relative_url() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("200") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("200".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -441,10 +486,11 @@ async fn redirect_without_location_header() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: Got a redirect without Location header".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: Got a redirect without Location header".into()), + ) + .await; } #[tokio::test] @@ -481,10 +527,11 @@ async fn redirect_loop() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: Too many redirects".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: Too many redirects".into()), + ) + .await; } #[tokio::test] @@ -514,23 +561,29 @@ export async function handler() {{ ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: fetch() can only be called 20 times per requests".into()) - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: fetch() can only be called 20 times per requests".into()), + ) + .await; // Test if we can still call fetch in subsequent requests send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Error("Uncaught Error: fetch() can only be called 20 times per requests".into()) - ); - - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("ok") - ); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught Error: fetch() can only be called 20 times per requests".into()), + ) + .await; + + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("ok".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -545,8 +598,12 @@ async fn fetch_https() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("200") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("200".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/promises.rs b/crates/runtime/tests/promises.rs index 5a997c60c..0d6898d6e 100644 --- a/crates/runtime/tests/promises.rs +++ b/crates/runtime/tests/promises.rs @@ -1,5 +1,5 @@ use httptest::{matchers::*, responders::*, Expectation, Server}; -use lagon_runtime_http::{Request, Response}; +use hyper::{header::CONTENT_TYPE, Request, Response}; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -15,10 +15,14 @@ async fn execute_async_handler() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Async handler") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Async handler".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -39,8 +43,12 @@ async fn execute_promise() { ))); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello, World") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello, World".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/runtime.rs b/crates/runtime/tests/runtime.rs index 6b2fb32cd..4670cb078 100644 --- a/crates/runtime/tests/runtime.rs +++ b/crates/runtime/tests/runtime.rs @@ -1,5 +1,8 @@ -use httptest::bytes::Bytes; -use lagon_runtime_http::{Method, Request, Response, RunResult, StreamResult}; +use hyper::{ + header::{CONTENT_TYPE, HOST}, + Body, Method, Request, Response, +}; +use lagon_runtime_http::{RunResult, StreamResult}; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -15,10 +18,14 @@ async fn execute_function() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -34,10 +41,14 @@ export { hello as handler }" )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -51,17 +62,25 @@ async fn execute_function_twice() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -82,10 +101,14 @@ async fn environment_variables() { ); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -97,31 +120,28 @@ async fn get_body_streaming() { }" .into(), )); - send(Request { - body: Bytes::from("Hello world"), - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - method: Method::GET, - url: "".into(), - }); + send( + Request::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Stream(StreamResult::Data(vec![ - 72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100 - ])) - ); + 72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, + ])), + ) + .await; + assert!(receiver.recv_async().await.unwrap().as_stream_done()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; } #[tokio::test] @@ -133,20 +153,21 @@ async fn get_body() { }" .into(), )); - send(Request { - body: Bytes::from("Hello world"), - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - method: Method::GET, - url: "".into(), - }); - - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") + send( + Request::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), ); + + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -158,20 +179,23 @@ async fn get_input() { }" .into(), )); - send(Request { - body: Bytes::new(), - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - method: Method::GET, - url: "https://hello.world".into(), - }); - - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("https://hello.world") + send( + Request::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .header(HOST, "hello.world") + .uri("/hello") + .body("Hello world".into()) + .unwrap(), ); + + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("https://hello.world/hello".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -183,20 +207,22 @@ async fn get_method() { }" .into(), )); - send(Request { - body: Bytes::new(), - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - method: Method::POST, - url: "".into(), - }); - - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("POST") + send( + Request::builder() + .method(Method::POST) + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body(Body::empty()) + .unwrap(), ); + + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("POST".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -208,17 +234,21 @@ async fn get_headers() { }" .into(), )); - send(Request { - body: Bytes::new(), - headers: Some(vec![("x-auth".into(), vec!["token".into()])]), - method: Method::POST, - url: "".into(), - }); - - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("token") + send( + Request::builder() + .header("x-auth", "token") + .body(Body::empty()) + .unwrap(), ); + + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("token".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -237,17 +267,15 @@ async fn return_headers() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response { - body: "Hello world".into(), - headers: Some(vec![ - ("content-type".into(), vec!["text/html".into()]), - ("x-test".into(), vec!["test".into()]) - ]), - status: 200, - } - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/html") + .header("x-test", "test") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -266,17 +294,15 @@ async fn return_headers_from_headers_api() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response { - body: "Hello world".into(), - headers: Some(vec![ - ("content-type".into(), vec!["text/html".into()]), - ("x-test".into(), vec!["test".into()]) - ]), - status: 200, - } - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/html") + .header("x-test", "test") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -292,17 +318,15 @@ async fn return_status() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response { - body: "Moved permanently".into(), - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - status: 302, - } - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .status(302) + .body("Moved permanently".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -318,13 +342,11 @@ async fn return_uint8array() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response { - body: "Hello world".into(), - ..Default::default() - } - ); + utils::assert_response( + &receiver, + Response::builder().body("Hello world".into()).unwrap(), + ) + .await; } #[tokio::test] @@ -344,10 +366,14 @@ async fn console_log() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -361,10 +387,14 @@ async fn atob() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -378,8 +408,12 @@ async fn btoa() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("SGVsbG8=") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("SGVsbG8=".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/streams.rs b/crates/runtime/tests/streams.rs index 5420dae63..0f817bc64 100644 --- a/crates/runtime/tests/streams.rs +++ b/crates/runtime/tests/streams.rs @@ -1,5 +1,5 @@ -use httptest::bytes::Bytes; -use lagon_runtime_http::{Request, Response, RunResult, StreamResult}; +use hyper::{Request, Response}; +use lagon_runtime_http::{RunResult, StreamResult}; use lagon_runtime_isolate::options::IsolateOptions; mod utils; @@ -22,21 +22,19 @@ async fn sync_streaming() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Data(vec![65, 66, 67])) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Data(vec![65, 66, 67])), + ) + .await; assert!(receiver.recv_async().await.unwrap().as_stream_done()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; } #[tokio::test] @@ -64,21 +62,16 @@ async fn queue_multiple() { send(Request::default()); for _ in 0..3 { - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Data(vec![65])) - ); + utils::assert_run_result(&receiver, RunResult::Stream(StreamResult::Data(vec![65]))).await; } assert!(receiver.recv_async().await.unwrap().as_stream_done()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; } #[tokio::test] @@ -105,21 +98,21 @@ async fn custom_response() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Data(vec![65, 66, 67])) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Data(vec![65, 66, 67])), + ) + .await; assert!(receiver.recv_async().await.unwrap().as_stream_done()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - body: Bytes::from("[object ReadableStream]"), - status: 201, - headers: Some(vec![("x-lagon".into(), vec!["test".into()])]), - })) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start( + Response::builder().status(201).header("x-lagon", "test"), + )), + ) + .await; } #[tokio::test] @@ -143,28 +136,27 @@ async fn start_and_pull() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Stream(StreamResult::Data(vec![ - 76, 111, 97, 100, 105, 110, 103, 46, 46, 46 - ])) - ); + 76, 111, 97, 100, 105, 110, 103, 46, 46, 46, + ])), + ) + .await; - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Data(vec![72, 101, 108, 108, 111])) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Data(vec![72, 101, 108, 108, 111])), + ) + .await; assert!(receiver.recv_async().await.unwrap().as_stream_done()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; } #[tokio::test] @@ -194,26 +186,25 @@ async fn response_before_write() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), + utils::assert_run_result( + &receiver, RunResult::Stream(StreamResult::Data(vec![ - 76, 111, 97, 100, 105, 110, 103, 46, 46, 46 - ])) - ); - - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); - - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Data(vec![72, 101, 108, 108, 111])) - ); + 76, 111, 97, 100, 105, 110, 103, 46, 46, 46, + ])), + ) + .await; + + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; + + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Data(vec![72, 101, 108, 108, 111])), + ) + .await; assert!(receiver.recv_async().await.unwrap().as_stream_done()); } @@ -231,16 +222,13 @@ async fn timeout_infinite_streaming() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Timeout); + utils::assert_run_result(&receiver, RunResult::Timeout).await; } #[tokio::test] @@ -262,7 +250,10 @@ async fn promise_reject_callback() { )); send(Request::default()); - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Error("Uncaught ReferenceError: doesNotExists is not defined\n at trigger (5:9)\n at handler (8:5)".to_owned())); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught ReferenceError: doesNotExists is not defined\n at trigger (5:9)\n at handler (8:5)".to_owned() + )).await; } #[tokio::test] @@ -294,14 +285,14 @@ async fn promise_reject_callback_after_response() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap(), - RunResult::Stream(StreamResult::Start(Response { - headers: None, - body: Bytes::from("[object ReadableStream]"), - status: 200, - })) - ); + utils::assert_run_result( + &receiver, + RunResult::Stream(StreamResult::Start(Response::builder())), + ) + .await; - assert_eq!(receiver.recv_async().await.unwrap(), RunResult::Error("Uncaught ReferenceError: doesNotExists is not defined\n at 12:17\n at stream (11:19)".to_owned())); + utils::assert_run_result( + &receiver, + RunResult::Error("Uncaught ReferenceError: doesNotExists is not defined\n at 12:17\n at stream (11:19)".to_owned() + )).await; } diff --git a/crates/runtime/tests/timers.rs b/crates/runtime/tests/timers.rs index f88ccd14b..91bcc02fa 100644 --- a/crates/runtime/tests/timers.rs +++ b/crates/runtime/tests/timers.rs @@ -1,4 +1,4 @@ -use lagon_runtime_http::{Request, Response}; +use hyper::{header::CONTENT_TYPE, Request, Response}; use lagon_runtime_isolate::options::IsolateOptions; use serial_test::serial; @@ -20,10 +20,14 @@ async fn set_timeout() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("test") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("test".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -52,10 +56,14 @@ async fn set_timeout_not_blocking_response() { logs_receiver.recv_async().await.unwrap(), ("log".into(), "before".into(), None) ); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello!") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello!".into()) + .unwrap(), + ) + .await; assert_eq!( logs_receiver.recv_async().await.unwrap(), ("log".into(), "after".into(), None) @@ -83,10 +91,14 @@ async fn set_timeout_clear() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("second") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("second".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -109,10 +121,14 @@ async fn set_timeout_clear_correct() { )); send(Request::default()); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("first") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("first".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -162,10 +178,14 @@ async fn set_interval() { ("log".into(), "res".into(), None) ); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -199,10 +219,14 @@ async fn queue_microtask() { ("log".into(), "microtask".into(), None) ); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } #[tokio::test] @@ -258,8 +282,12 @@ async fn timers_order() { logs_receiver.recv_async().await.unwrap(), ("log".into(), "main 2".into(), None) ); - assert_eq!( - receiver.recv_async().await.unwrap().as_response(), - Response::from("Hello world") - ); + utils::assert_response( + &receiver, + Response::builder() + .header(CONTENT_TYPE, "text/plain;charset=UTF-8") + .body("Hello world".into()) + .unwrap(), + ) + .await; } diff --git a/crates/runtime/tests/utils/mod.rs b/crates/runtime/tests/utils/mod.rs index eed0b4e9e..dcd19b4fe 100644 --- a/crates/runtime/tests/utils/mod.rs +++ b/crates/runtime/tests/utils/mod.rs @@ -1,5 +1,6 @@ +use hyper::{http::Request, Body, Response}; use lagon_runtime::{options::RuntimeOptions, Runtime}; -use lagon_runtime_http::{Request, RunResult}; +use lagon_runtime_http::{RunResult, StreamResult}; use lagon_runtime_isolate::{options::IsolateOptions, Isolate, IsolateEvent, IsolateRequest}; use std::sync::Once; use tokio::runtime::Handle; @@ -22,7 +23,7 @@ pub fn setup_allow_codegen() { }); } -type SendRequest = Box; +type SendRequest = Box)>; #[allow(dead_code)] pub fn create_isolate(options: IsolateOptions) -> (SendRequest, flume::Receiver) { @@ -41,13 +42,19 @@ pub fn create_isolate(options: IsolateOptions) -> (SendRequest, flume::Receiver< }) }); - let send_isolate_event = Box::new(move |request: Request| { - request_tx - .send(IsolateEvent::Request(IsolateRequest { - request, - sender: sender.clone(), - })) - .unwrap(); + let send_isolate_event = Box::new(move |req: Request| { + let request_tx = request_tx.clone(); + let sender = sender.clone(); + + tokio::spawn(async move { + let (parts, body) = req.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + let request = (parts, body); + + request_tx + .send(IsolateEvent::Request(IsolateRequest { request, sender })) + .unwrap(); + }); }); (send_isolate_event, receiver) @@ -69,14 +76,101 @@ pub fn create_isolate_without_snapshot( }) }); - let send_isolate_event = Box::new(move |request: Request| { - request_tx - .send(IsolateEvent::Request(IsolateRequest { - request, - sender: sender.clone(), - })) - .unwrap(); + let send_isolate_event = Box::new(move |req: Request| { + let request_tx = request_tx.clone(); + let sender = sender.clone(); + + tokio::spawn(async move { + let (parts, body) = req.into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + let request = (parts, body); + + request_tx + .send(IsolateEvent::Request(IsolateRequest { request, sender })) + .unwrap(); + }); }); (send_isolate_event, receiver) } + +#[allow(dead_code)] +pub async fn assert_run_result(receiver: &flume::Receiver, run_result: RunResult) { + let result = receiver.recv_async().await.unwrap(); + + match run_result { + RunResult::Response(response, _) => { + assert_response_inner(response, result.as_response()).await; + } + RunResult::Error(error) => { + assert_eq!(error, result.as_error()); + } + RunResult::MemoryLimit => { + assert!( + result.is_memory_limit(), + "Expected MemoryLimit, got {:?}", + result + ); + } + RunResult::Timeout => { + assert!(result.is_timeout(), "Expected Timeout, got {:?}", result); + } + RunResult::Stream(stream_result) => match stream_result { + StreamResult::Done(_) => { + assert!( + matches!(result, RunResult::Stream(StreamResult::Done(_))), + "Expected StreamResult::Done, got {:?}", + result + ); + } + StreamResult::Data(data) => { + assert!( + matches!(result, RunResult::Stream(StreamResult::Data(_))), + "Expected StreamResult::Data, got {:?}", + result + ); + + let result_data = match result { + RunResult::Stream(StreamResult::Data(data)) => data, + _ => unreachable!(), + }; + + assert_eq!(data, result_data); + } + StreamResult::Start(response) => { + assert!( + matches!(result, RunResult::Stream(StreamResult::Start(_))), + "Expected StreamResult::Start, got {:?}", + result, + ); + + let result_response = match result { + RunResult::Stream(StreamResult::Start(response)) => response, + _ => unreachable!(), + }; + + let response = response.body(Body::empty()).unwrap(); + let result_response = result_response.body(Body::empty()).unwrap(); + + assert_response_inner(response, result_response).await; + } + }, + } +} + +async fn assert_response_inner(first: Response, second: Response) { + assert_eq!(first.status(), second.status(), "Status mismatch"); + assert_eq!(first.headers(), second.headers(), "Headers mismatch"); + + let body1 = hyper::body::to_bytes(first.into_body()).await.unwrap(); + let body2 = hyper::body::to_bytes(second.into_body()).await.unwrap(); + + assert_eq!(body1, body2, "Body mismatch"); +} + +#[allow(dead_code)] +pub async fn assert_response(receiver: &flume::Receiver, response: Response) { + let result = receiver.recv_async().await.unwrap().as_response(); + + assert_response_inner(result, response).await; +} diff --git a/crates/runtime_crypto/Cargo.toml b/crates/runtime_crypto/Cargo.toml index 7962b4c29..781f959f1 100644 --- a/crates/runtime_crypto/Cargo.toml +++ b/crates/runtime_crypto/Cargo.toml @@ -4,10 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] -v8 = "0.71.1" +v8 = "0.71.2" anyhow = "1.0.71" rand = "0.8.5" -uuid = { version = "1.3.2", features = ["v4", "fast-rng"] } +uuid = { version = "1.3.3", features = ["v4", "fast-rng"] } lagon-runtime-v8-utils = { path = "../runtime_v8_utils" } # Crypto hmac = "0.12.1" @@ -22,4 +22,4 @@ num-traits = "0.2.15" rsa = { version = "=0.9.2", default-features = false, features = ["std", "sha2"] } p256 = { version = "0.13.2", features = ["ecdh"] } p384 = "0.13.0" -ctr = "0.9.1" +ctr = "0.9.2" diff --git a/crates/runtime_http/Cargo.toml b/crates/runtime_http/Cargo.toml index 829795813..64d8adf41 100644 --- a/crates/runtime_http/Cargo.toml +++ b/crates/runtime_http/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -v8 = "0.71.1" +v8 = "0.71.2" hyper = { version = "0.14.26", features = ["client", "http1", "http2", "tcp"] } anyhow = "1.0.71" lagon-runtime-v8-utils = { path = "../runtime_v8_utils" } diff --git a/crates/runtime_http/src/headers.rs b/crates/runtime_http/src/headers.rs index 722ac4b07..ede58b961 100644 --- a/crates/runtime_http/src/headers.rs +++ b/crates/runtime_http/src/headers.rs @@ -1,5 +1,3 @@ -pub type Headers = Vec<(String, Vec)>; - pub const X_FORWARDED_FOR: &str = "x-forwarded-for"; pub const X_REAL_IP: &str = "x-real-ip"; diff --git a/crates/runtime_http/src/lib.rs b/crates/runtime_http/src/lib.rs index 72ebcf5f5..11f5d9ff2 100644 --- a/crates/runtime_http/src/lib.rs +++ b/crates/runtime_http/src/lib.rs @@ -1,41 +1,28 @@ -use anyhow::Result; +use hyper::{http::response::Builder, Body, Response}; use std::time::Duration; mod headers; -mod method; mod request; mod response; pub use headers::*; -pub use method::*; pub use request::*; pub use response::*; -pub trait IntoV8 { - fn into_v8<'a>(self, scope: &mut v8::HandleScope<'a>) -> v8::Local<'a, v8::Object>; -} - -pub trait FromV8: Sized { - fn from_v8<'a>( - scope: &mut v8::HandleScope<'a>, - object: v8::Local<'a, v8::Value>, - ) -> Result; -} - -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum StreamResult { - Start(Response), + Start(Builder), Data(Vec), // Stream responses always have a duration // since they are always from the isolate Done(Duration), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub enum RunResult { // Isolate responses have a duration (cpu time) // Assets responses don't - Response(Response, Option), + Response(Response, Option), Stream(StreamResult), Timeout, MemoryLimit, @@ -43,6 +30,14 @@ pub enum RunResult { } impl RunResult { + pub fn is_timeout(&self) -> bool { + matches!(self, RunResult::Timeout) + } + + pub fn is_memory_limit(&self) -> bool { + matches!(self, RunResult::MemoryLimit) + } + pub fn as_error(self) -> String { if let RunResult::Error(error) = self { return error; @@ -51,7 +46,7 @@ impl RunResult { panic!("RunResult is not an Error: {:?}", self); } - pub fn as_response(self) -> Response { + pub fn as_response(self) -> Response { if let RunResult::Response(response, _) = self { return response; } diff --git a/crates/runtime_http/src/method.rs b/crates/runtime_http/src/method.rs deleted file mode 100644 index f68c82075..000000000 --- a/crates/runtime_http/src/method.rs +++ /dev/null @@ -1,54 +0,0 @@ -use hyper::Method as HyperMethod; - -#[derive(Debug, Copy, Clone)] -pub enum Method { - GET, - POST, - PUT, - PATCH, - DELETE, - HEAD, - OPTIONS, -} - -impl From for &str { - fn from(method: Method) -> Self { - match method { - Method::GET => "GET", - Method::POST => "POST", - Method::PUT => "PUT", - Method::PATCH => "PATCH", - Method::DELETE => "DELETE", - Method::HEAD => "HEAD", - Method::OPTIONS => "OPTIONS", - } - } -} - -impl From<&str> for Method { - fn from(method: &str) -> Self { - match method { - "POST" => Method::POST, - "PUT" => Method::PUT, - "PATCH" => Method::PATCH, - "DELETE" => Method::DELETE, - "HEAD" => Method::HEAD, - "OPTIONS" => Method::OPTIONS, - _ => Method::GET, - } - } -} - -impl From<&HyperMethod> for Method { - fn from(method: &HyperMethod) -> Self { - match *method { - HyperMethod::POST => Method::POST, - HyperMethod::PUT => Method::PUT, - HyperMethod::PATCH => Method::PATCH, - HyperMethod::DELETE => Method::DELETE, - HyperMethod::HEAD => Method::HEAD, - HyperMethod::OPTIONS => Method::OPTIONS, - _ => Method::GET, - } - } -} diff --git a/crates/runtime_http/src/request.rs b/crates/runtime_http/src/request.rs index 4f80a4d12..91cb32d85 100644 --- a/crates/runtime_http/src/request.rs +++ b/crates/runtime_http/src/request.rs @@ -1,204 +1,96 @@ -use super::{FromV8, IntoV8, Method}; -use crate::{Headers, X_LAGON_ID}; use anyhow::{anyhow, Result}; -use hyper::{ - body::{self, Bytes}, - http, Body, Request as HyperRequest, -}; +use hyper::{body::Bytes, http::request::Parts, Body, Method, Request}; use lagon_runtime_v8_utils::{ extract_v8_headers_object, extract_v8_string, v8_headers_object, v8_string, }; -#[derive(Debug)] -pub struct Request { - pub headers: Option, - pub method: Method, - pub body: Bytes, - pub url: String, -} - -impl Default for Request { - fn default() -> Self { - Request { - headers: None, - method: Method::GET, - body: Bytes::new(), - url: "".into(), - } - } -} - -// NOTE: -// We can safely use unwrap here because set only return Just(true) or Empty(), so if it should never fail -impl IntoV8 for Request { - fn into_v8<'a>(self, scope: &mut v8::HandleScope<'a>) -> v8::Local<'a, v8::Object> { - let mut len = if self.headers.is_some() { 3 } else { 2 }; - let body_exists = !self.body.is_empty(); +pub fn request_to_v8<'a>( + request: (Parts, Bytes), + scope: &mut v8::HandleScope<'a>, +) -> v8::Local<'a, v8::Object> { + let body_empty = request.1.is_empty(); + let len = if body_empty { 3 } else { 4 }; - if body_exists { - len += 1; - } + let mut names = Vec::with_capacity(len); + let mut values = Vec::with_capacity(len); - let mut names = Vec::with_capacity(len); - let mut values = Vec::with_capacity(len); + let host = request + .0 + .headers + .get("host") + .map_or("", |host| host.to_str().unwrap_or("")); + let uri = request.0.uri.to_string(); + let uri = uri.as_str(); - names.push(v8_string(scope, "i").into()); - values.push(v8_string(scope, &self.url).into()); + let mut url = String::with_capacity(8 + host.len() + uri.len()); + url.push_str("https://"); + url.push_str(host); + url.push_str(uri); - names.push(v8_string(scope, "m").into()); - values.push(v8_string(scope, self.method.into()).into()); + let method = request.0.method.as_str(); - if body_exists { - names.push(v8_string(scope, "b").into()); - values.push( - v8_string(scope, unsafe { std::str::from_utf8_unchecked(&self.body) }).into(), - ); - } + names.push(v8_string(scope, "i").into()); + values.push(v8_string(scope, &url).into()); - if let Some(headers) = self.headers { - names.push(v8_string(scope, "h").into()); - values.push(v8_headers_object(scope, headers).into()); - } + names.push(v8_string(scope, "m").into()); + values.push(v8_string(scope, method).into()); - let null = v8::null(scope); - v8::Object::with_prototype_and_properties(scope, null.into(), &names, &values) + if !body_empty { + names.push(v8_string(scope, "b").into()); + values.push(v8_string(scope, unsafe { std::str::from_utf8_unchecked(&request.1) }).into()); } -} -impl FromV8 for Request { - fn from_v8<'a>( - scope: &mut v8::HandleScope<'a>, - request: v8::Local<'a, v8::Value>, - ) -> Result { - let request = match request.to_object(scope) { - Some(request) => request, - None => return Err(anyhow!("Request is not an object")), - }; - - let mut body = Bytes::new(); - let body_key = v8_string(scope, "b"); - - if let Some(body_value) = request.get(scope, body_key.into()) { - if !body_value.is_null_or_undefined() { - body = Bytes::from(extract_v8_string(body_value, scope)?); - } - } - - let mut headers = None; - let headers_key = v8_string(scope, "h"); + names.push(v8_string(scope, "h").into()); + values.push(v8_headers_object(scope, request.0.headers).into()); - if let Some(headers_value) = request.get(scope, headers_key.into()) { - if !headers_value.is_null_or_undefined() { - headers = extract_v8_headers_object(headers_value, scope)?; - } - } + let null = v8::null(scope); + v8::Object::with_prototype_and_properties(scope, null.into(), &names, &values) +} - let mut method = Method::GET; - let method_key = v8_string(scope, "m"); +pub fn request_from_v8<'a>( + scope: &mut v8::HandleScope<'a>, + request: v8::Local<'a, v8::Value>, +) -> Result> { + let request = match request.to_object(scope) { + Some(request) => request, + None => return Err(anyhow!("Request is not an object")), + }; - if let Some(method_value) = request.get(scope, method_key.into()) { - method = Method::from(extract_v8_string(method_value, scope)?.as_str()); - } + let mut request_builder = Request::builder(); - let url; - let url_key = v8_string(scope, "u"); + let mut body = Body::empty(); + let body_key = v8_string(scope, "b"); - if let Some(url_value) = request.get(scope, url_key.into()) { - url = extract_v8_string(url_value, scope)?; - } else { - return Err(anyhow!("Could not find url")); + if let Some(body_value) = request.get(scope, body_key.into()) { + if !body_value.is_null_or_undefined() { + body = Body::from(extract_v8_string(body_value, scope)?); } - - Ok(Self { - headers, - method, - body, - url, - }) } -} -impl TryFrom<&Request> for http::request::Builder { - type Error = anyhow::Error; + let headers_key = v8_string(scope, "h"); - fn try_from(request: &Request) -> Result { - let mut builder = HyperRequest::builder() - .uri(&request.url) - .method::<&str>(request.method.into()); - - if let Some(headers) = &request.headers { - for (key, value) in headers { - for value in value { - builder = builder.header(key, value); - } - } + if let Some(headers_value) = request.get(scope, headers_key.into()) { + if !headers_value.is_null_or_undefined() { + let headers_map = request_builder.headers_mut().unwrap(); + extract_v8_headers_object(headers_map, headers_value, scope)?; } - - Ok(builder) } -} -impl Request { - // TODO: Return the full request length - pub fn len(&self) -> usize { - self.body.len() - } - - pub fn is_empty(&self) -> bool { - self.body.is_empty() - } + let mut method = Method::GET; + let method_key = v8_string(scope, "m"); - pub async fn from_hyper(request: HyperRequest) -> Result { - Self::from_hyper_with_capacity(request, 0).await + if let Some(method_value) = request.get(scope, method_key.into()) { + method = Method::from_bytes(extract_v8_string(method_value, scope)?.as_bytes())?; } - pub async fn from_hyper_with_capacity( - request: HyperRequest, - capacity: usize, - ) -> Result { - let host = request - .headers() - .get("host") - .map_or_else(String::new, |host| { - host.to_str() - .map_or_else(|_| String::new(), |value| value.to_string()) - }); - - let mut headers = Vec::with_capacity(request.headers().keys_len() + capacity); - - for key in request.headers().keys() { - if key != X_LAGON_ID { - // We guess that most of the time there will be only one header value - let mut values = Vec::with_capacity(1); - - for value in request.headers().get_all(key) { - values.push(value.to_str()?.to_string()); - } - - headers.push((key.to_string(), values)); - } - } + let url; + let url_key = v8_string(scope, "u"); - let method = Method::from(request.method()); - let url = format!("http://{}{}", host, request.uri().to_string().as_str()); - - let body = body::to_bytes(request.into_body()).await?; - - Ok(Request { - headers: if !headers.is_empty() { - Some(headers) - } else { - None - }, - method, - body, - url, - }) + if let Some(url_value) = request.get(scope, url_key.into()) { + url = extract_v8_string(url_value, scope)?; + } else { + return Err(anyhow!("Could not find url")); } - pub fn set_header(&mut self, key: String, value: String) { - if let Some(ref mut headers) = self.headers { - headers.push((key, vec![value])); - } - } + Ok(request_builder.method(method).uri(url).body(body)?) } diff --git a/crates/runtime_http/src/response.rs b/crates/runtime_http/src/response.rs index 5fc6892ea..536a878fb 100644 --- a/crates/runtime_http/src/response.rs +++ b/crates/runtime_http/src/response.rs @@ -1,181 +1,82 @@ -use crate::{FromV8, Headers, IntoV8}; use anyhow::{anyhow, Result}; -use hyper::{ - body::{self, Bytes}, - http, Body, Response as HyperResponse, -}; +use hyper::{body::Bytes, http::response::Parts, Body, Response}; use lagon_runtime_v8_utils::{ extract_v8_headers_object, extract_v8_integer, extract_v8_string, v8_headers_object, v8_integer, v8_string, }; -static READABLE_STREAM_STR: &[u8] = b"[object ReadableStream]"; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Response { - pub headers: Option, - pub body: Bytes, - pub status: u16, -} - -impl Default for Response { - fn default() -> Self { - Response { - headers: None, - body: Bytes::new(), - status: 200, - } - } -} +pub fn response_to_v8<'a>( + response: (Parts, Bytes), + scope: &mut v8::HandleScope<'a>, +) -> v8::Local<'a, v8::Object> { + let len = 3; + let mut names = Vec::with_capacity(len); + let mut values = Vec::with_capacity(len); -impl From<&str> for Response { - fn from(body: &str) -> Self { - Response { - headers: Some(vec![( - "content-type".into(), - vec!["text/plain;charset=UTF-8".into()], - )]), - body: Bytes::from(body.to_string()), - status: 200, - } - } -} + names.push(v8_string(scope, "b").into()); + values.push(v8_string(scope, unsafe { std::str::from_utf8_unchecked(&response.1) }).into()); -// NOTE: -// We can safely use unwrap here because set only return Just(true) or Empty(), so if it should never fail -impl IntoV8 for Response { - fn into_v8<'a>(self, scope: &mut v8::HandleScope<'a>) -> v8::Local<'a, v8::Object> { - let len = if self.headers.is_some() { 3 } else { 2 }; + names.push(v8_string(scope, "s").into()); + values.push(v8_integer(scope, response.0.status.as_u16().into()).into()); - let mut names = Vec::with_capacity(len); - let mut values = Vec::with_capacity(len); + names.push(v8_string(scope, "h").into()); + values.push(v8_headers_object(scope, response.0.headers).into()); - names.push(v8_string(scope, "b").into()); - values.push(v8_string(scope, unsafe { std::str::from_utf8_unchecked(&self.body) }).into()); + let null = v8::null(scope).into(); + v8::Object::with_prototype_and_properties(scope, null, &names, &values) +} - names.push(v8_string(scope, "s").into()); - values.push(v8_integer(scope, self.status.into()).into()); +pub fn response_from_v8<'a>( + scope: &mut v8::HandleScope<'a>, + response: v8::Local<'a, v8::Value>, +) -> Result<(Response, bool)> { + let response = match response.to_object(scope) { + Some(response) => response, + None => return Err(anyhow!("Response is not an object")), + }; - if let Some(headers) = self.headers { - names.push(v8_string(scope, "h").into()); - values.push(v8_headers_object(scope, headers).into()); - } + let mut response_builder = Response::builder(); - let null = v8::null(scope).into(); - v8::Object::with_prototype_and_properties(scope, null, &names, &values) - } -} + let headers_key = v8_string(scope, "h"); -impl FromV8 for Response { - fn from_v8<'a>( - scope: &mut v8::HandleScope<'a>, - response: v8::Local<'a, v8::Value>, - ) -> Result { - let response = match response.to_object(scope) { - Some(response) => response, - None => return Err(anyhow!("Response is not an object")), - }; - - let body; - let body_key = v8_string(scope, "b"); - - if let Some(body_value) = response.get(scope, body_key.into()) { - body = extract_v8_string(body_value, scope)?; - } else { - return Err(anyhow!("Could not find body")); - } + if let Some(headers_object) = response.get(scope, headers_key.into()) { + if let Some(headers_object) = headers_object.to_object(scope) { + if let Some(headers_value) = headers_object.get(scope, headers_key.into()) { + if !headers_value.is_null_or_undefined() { + let headers_map = response_builder.headers_mut().unwrap(); - let mut headers = None; - let headers_key = v8_string(scope, "h"); - - if let Some(headers_object) = response.get(scope, headers_key.into()) { - if let Some(headers_object) = headers_object.to_object(scope) { - if let Some(headers_value) = headers_object.get(scope, headers_key.into()) { - if !headers_value.is_null_or_undefined() { - headers = extract_v8_headers_object(headers_value, scope)?; - } - } else { - return Err(anyhow!("Could not find headers object")); + extract_v8_headers_object(headers_map, headers_value, scope)?; } } else { return Err(anyhow!("Could not find headers object")); } - } - - let status; - let status_key = v8_string(scope, "s"); - - if let Some(status_value) = response.get(scope, status_key.into()) { - status = extract_v8_integer(status_value, scope)? as u16; } else { - return Err(anyhow!("Could not find status")); + return Err(anyhow!("Could not find headers object")); } - - Ok(Self { - headers, - body: Bytes::from(body), - status, - }) } -} -impl TryFrom<&Response> for http::response::Builder { - type Error = anyhow::Error; + let status_key = v8_string(scope, "s"); - fn try_from(response: &Response) -> Result { - let mut builder = HyperResponse::builder().status(response.status); + match response.get(scope, status_key.into()) { + Some(status_value) => { + let status = extract_v8_integer(status_value, scope)? as u16; - if let Some(headers) = &response.headers { - for (key, value) in headers { - for value in value { - builder = builder.header(key, value); - } - } + response_builder = response_builder.status(status); } + None => return Err(anyhow!("Could not find status")), + }; - Ok(builder) - } -} - -impl Response { - // TODO: Return the full response length - pub fn len(&self) -> usize { - self.body.len() - } + let body_key = v8_string(scope, "b"); - pub fn is_empty(&self) -> bool { - self.body.is_empty() - } + return match response.get(scope, body_key.into()) { + Some(body_value) => match body_value.is_null_or_undefined() { + true => Ok((response_builder.body(Body::empty())?, true)), + false => { + let body = extract_v8_string(body_value, scope)?; - pub fn is_streamed(&self) -> bool { - self.body == READABLE_STREAM_STR - } - - pub async fn from_hyper(response: HyperResponse) -> Result { - let mut headers = Vec::with_capacity(response.headers().keys_len()); - - for key in response.headers().keys() { - // We guess that most of the time there will be only one header value - let mut values = Vec::with_capacity(1); - - for value in response.headers().get_all(key) { - values.push(value.to_str()?.to_string()); + Ok((response_builder.body(body.into())?, false)) } - - headers.push((key.to_string(), values)); - } - - let status = response.status().as_u16(); - let body = body::to_bytes(response.into_body()).await?; - - Ok(Response { - status, - headers: if !headers.is_empty() { - Some(headers) - } else { - None - }, - body, - }) - } + }, + None => Err(anyhow!("Could not find body")), + }; } diff --git a/crates/runtime_isolate/Cargo.toml b/crates/runtime_isolate/Cargo.toml index 58a77fd31..3891d7f7b 100644 --- a/crates/runtime_isolate/Cargo.toml +++ b/crates/runtime_isolate/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -v8 = "0.71.1" +v8 = "0.71.2" tokio = { version = "1", features = ["rt-multi-thread"] } futures = "0.3.28" hyper = { version = "0.14.26", features = ["client"] } diff --git a/crates/runtime_isolate/src/bindings/fetch.rs b/crates/runtime_isolate/src/bindings/fetch.rs index 182c2b42f..ef56acf95 100644 --- a/crates/runtime_isolate/src/bindings/fetch.rs +++ b/crates/runtime_isolate/src/bindings/fetch.rs @@ -1,12 +1,8 @@ use anyhow::{anyhow, Result}; use async_recursion::async_recursion; -use hyper::{ - client::HttpConnector, - http::{request::Builder, Uri}, - Body, Client, Response as HyperResponse, -}; +use hyper::{client::HttpConnector, header::LOCATION, http::Uri, Body, Client, Request, Response}; use hyper_tls::HttpsConnector; -use lagon_runtime_http::{FromV8, Request, Response}; +use lagon_runtime_http::request_from_v8; use once_cell::sync::Lazy; use crate::{bindings::PromiseResult, Isolate}; @@ -16,7 +12,7 @@ use super::BindingResult; static CLIENT: Lazy>> = Lazy::new(|| Client::builder().build::<_, Body>(HttpsConnector::new())); -type Arg = Request; +type Arg = Request; pub fn fetch_init(scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments) -> Result { let id = scope @@ -45,31 +41,53 @@ pub fn fetch_init(scope: &mut v8::HandleScope, args: v8::FunctionCallbackArgumen None => return Err(anyhow!("Invalid request")), }; - Request::from_v8(scope, request.into()) + request_from_v8(scope, request.into()) +} + +async fn clone_response(request: Request) -> Result<(Request, Request)> { + let uri = request.uri().clone(); + let method = request.method().clone(); + let headers = request.headers().clone(); + let body_bytes = hyper::body::to_bytes(request.into_body()).await?; + + let mut request_a = Request::builder().uri(uri.clone()).method(method.clone()); + let request_a_headers = request_a.headers_mut().unwrap(); + + let mut request_b = Request::builder().uri(uri).method(method); + let request_b_headers = request_b.headers_mut().unwrap(); + + for (key, value) in headers.iter() { + request_a_headers.append(key, value.clone()); + request_b_headers.append(key, value.clone()); + } + + let request_a = request_a.body(Body::from(body_bytes.clone()))?; + let request_b = request_b.body(Body::from(body_bytes))?; + + Ok((request_a, request_b)) } #[async_recursion] async fn make_request( - request: &Request, + mut request: Request, url: Option, mut count: u8, -) -> Result> { +) -> Result> { if count >= 5 { return Err(anyhow!("Too many redirects")); } - let mut hyper_request = Builder::try_from(request)?; - if let Some(url) = url { - hyper_request = hyper_request.uri(url); + *request.uri_mut() = url.parse()?; } - let hyper_request = hyper_request.body(Body::from(request.body.clone()))?; - let uri = hyper_request.uri().clone(); - let response = CLIENT.request(hyper_request).await?; + let uri = request.uri().clone(); + + let (request_a, request_b) = clone_response(request).await?; + let response = CLIENT.request(request_a).await?; if response.status().is_redirection() { - let mut redirect_url = match response.headers().get("location") { + let mut redirect_url = match response.headers().get(LOCATION) { Some(location) => location.to_str()?.to_string(), None => return Err(anyhow!("Got a redirect without Location header")), }; @@ -83,14 +101,14 @@ async fn make_request( } count += 1; - return make_request(request, Some(redirect_url), count).await; + return make_request(request_b, Some(redirect_url), count).await; } Ok(response) } pub async fn fetch_binding(id: usize, arg: Arg) -> BindingResult { - let hyper_response = match make_request(&arg, None, 0).await { + let hyper_response = match make_request(arg, None, 0).await { Ok(hyper_response) => hyper_response, Err(error) => { return BindingResult { @@ -100,10 +118,20 @@ pub async fn fetch_binding(id: usize, arg: Arg) -> BindingResult { } }; - let result = match Response::from_hyper(hyper_response).await { - Ok(response) => PromiseResult::Response(response), - Err(error) => PromiseResult::Error(error.to_string()), - }; + let (parts, body) = hyper_response.into_parts(); + + match hyper::body::to_bytes(body).await { + Ok(body) => { + let response = (parts, body); - BindingResult { id, result } + BindingResult { + id, + result: PromiseResult::Response(response), + } + } + Err(error) => BindingResult { + id, + result: PromiseResult::Error(error.to_string()), + }, + } } diff --git a/crates/runtime_isolate/src/bindings/mod.rs b/crates/runtime_isolate/src/bindings/mod.rs index c0566dcdd..f6f951f9c 100644 --- a/crates/runtime_isolate/src/bindings/mod.rs +++ b/crates/runtime_isolate/src/bindings/mod.rs @@ -5,7 +5,8 @@ use crypto::{ verify_binding, verify_init, }; use fetch::{fetch_binding, fetch_init}; -use lagon_runtime_http::{IntoV8, Response}; +use hyper::{body::Bytes, http::response::Parts}; +use lagon_runtime_http::response_to_v8; use lagon_runtime_v8_utils::{v8_boolean, v8_string, v8_uint8array}; use pull_stream::pull_stream_binding; use queue_microtask::queue_microtask_binding; @@ -31,7 +32,7 @@ pub struct BindingResult { } pub enum PromiseResult { - Response(Response), + Response((Parts, Bytes)), ArrayBuffer(Vec), Boolean(bool), Error(String), @@ -41,7 +42,7 @@ pub enum PromiseResult { impl PromiseResult { pub fn into_value<'a>(self, scope: &mut v8::HandleScope<'a>) -> v8::Local<'a, v8::Value> { match self { - PromiseResult::Response(response) => response.into_v8(scope).into(), + PromiseResult::Response(response) => response_to_v8(response, scope).into(), PromiseResult::ArrayBuffer(bytes) => v8_uint8array(scope, bytes).into(), PromiseResult::Boolean(boolean) => v8_boolean(scope, boolean).into(), PromiseResult::Error(error) => v8_string(scope, &error).into(), diff --git a/crates/runtime_isolate/src/lib.rs b/crates/runtime_isolate/src/lib.rs index 14c07cbc1..73d9d516c 100644 --- a/crates/runtime_isolate/src/lib.rs +++ b/crates/runtime_isolate/src/lib.rs @@ -1,5 +1,9 @@ use futures::{future::poll_fn, stream::FuturesUnordered, Future, StreamExt}; -use lagon_runtime_http::{FromV8, IntoV8, Request, Response, RunResult, StreamResult}; +use hyper::{ + body::Bytes, + http::{request::Parts, response::Builder}, +}; +use lagon_runtime_http::{request_to_v8, response_from_v8, RunResult, StreamResult}; use lagon_runtime_v8_utils::v8_string; use linked_hash_map::LinkedHashMap; use std::{ @@ -33,7 +37,7 @@ pub struct RequestContext { } pub struct IsolateRequest { - pub request: Request, + pub request: (Parts, Bytes), pub sender: flume::Sender, } @@ -418,7 +422,7 @@ impl Isolate { let global = global.open(try_catch); let global = global.global(try_catch); - let request = request.into_v8(try_catch); + let request = request_to_v8(request, try_catch); let id = v8::Integer::new(try_catch, requests_count as i32); try_catch.set_continuation_preserved_embedder_data(id.into()); @@ -549,8 +553,8 @@ impl Isolate { fn poll_event_loop(&mut self, cx: &mut Context) -> Poll<()> { if let Some(compilation_error) = &self.compilation_error { if let Ok(IsolateEvent::Request(IsolateRequest { sender, .. })) = self.rx.try_recv() { - let termination_result = match self.termination_result.read().unwrap().as_ref() { - Some(termination_result) => termination_result.clone(), + let termination_result = match self.termination_result.write().unwrap().take() { + Some(termination_result) => termination_result, None => RunResult::Error(compilation_error.to_string()), }; @@ -592,12 +596,9 @@ impl Isolate { let mut state = state.borrow_mut(); self.poll_stream(&state); - if let Some(termination_result) = self.termination_result.read().unwrap().as_ref() { - for handler_result in state.handler_results.values() { - handler_result - .sender - .send(termination_result.clone()) - .unwrap_or(()); + if let Some(termination_result) = self.termination_result.write().unwrap().take() { + if let Some(handler_result) = state.handler_results.values().next() { + handler_result.sender.send(termination_result).unwrap_or(()); } return Poll::Ready(()); @@ -654,24 +655,34 @@ impl Isolate { match promise.state() { v8::PromiseState::Fulfilled => { let response = promise.result(try_catch); - let run_result = match Response::from_v8(try_catch, response) { - Ok(response) => { - RunResult::Response(response, Some(handler_result.start_time.elapsed())) - } - Err(error) => RunResult::Error(error.to_string()), + let (run_result, is_streaming) = match response_from_v8(try_catch, response) { + Ok((response, is_streaming)) => ( + RunResult::Response( + response, + Some(handler_result.start_time.elapsed()), + ), + is_streaming, + ), + Err(error) => (RunResult::Error(error.to_string()), false), }; - if let RunResult::Response(ref response, _) = run_result { - if response.is_streamed() { - handler_result - .sender - .send(RunResult::Stream(StreamResult::Start(response.clone()))) - .unwrap_or(()); - - *handler_result.stream_response_sent.borrow_mut() = true; + if is_streaming { + let response = run_result.as_response(); + let mut response_builder = Builder::new().status(response.status()); + let headers = response_builder.headers_mut().unwrap(); - return true; + for (key, value) in response.headers().iter() { + headers.append(key, value.into()); } + + handler_result + .sender + .send(RunResult::Stream(StreamResult::Start(response_builder))) + .unwrap_or(()); + + *handler_result.stream_response_sent.borrow_mut() = true; + + return true; } handler_result.sender.send(run_result).unwrap_or(()); diff --git a/crates/runtime_utils/src/assets.rs b/crates/runtime_utils/src/assets.rs index 6d5abef60..2e8e6b734 100644 --- a/crates/runtime_utils/src/assets.rs +++ b/crates/runtime_utils/src/assets.rs @@ -1,6 +1,5 @@ use anyhow::Result; -use hyper::body::Bytes; -use lagon_runtime_http::Response; +use hyper::{body::Bytes, header::CONTENT_TYPE, Body, Response}; use std::{ collections::HashSet, fs, @@ -19,7 +18,7 @@ pub fn find_asset<'a>(url: &'a str, assets: &'a HashSet) -> Option<&'a S }) } -pub fn handle_asset(root: PathBuf, asset: &String) -> Result { +pub fn handle_asset(root: PathBuf, asset: &String) -> Result> { let path = root.join(asset); let body = fs::read(path)?; @@ -39,13 +38,9 @@ pub fn handle_asset(root: PathBuf, asset: &String) -> Result { }, ); - let headers = vec![("content-type".into(), vec![content_type.into()])]; - - Ok(Response { - status: 200, - headers: Some(headers), - body: Bytes::from(body), - }) + Ok(Response::builder() + .header(CONTENT_TYPE, content_type) + .body(Body::from(Bytes::from(body)))?) } #[cfg(test)] diff --git a/crates/runtime_utils/src/response.rs b/crates/runtime_utils/src/response.rs index e634b29cd..676f5c5a5 100644 --- a/crates/runtime_utils/src/response.rs +++ b/crates/runtime_utils/src/response.rs @@ -1,6 +1,6 @@ use anyhow::Result; use flume::Receiver; -use hyper::{body::Bytes, http::response::Builder, Body, Response as HyperResponse}; +use hyper::{body::Bytes, Body, Response}; use lagon_runtime_http::{RunResult, StreamResult}; use std::future::Future; @@ -22,7 +22,7 @@ pub enum ResponseEvent { pub async fn handle_response( rx: Receiver, on_event: impl Fn(ResponseEvent) -> F + Send + Sync + 'static, -) -> Result> +) -> Result> where F: Future> + Send, { @@ -33,12 +33,12 @@ where let (stream_tx, stream_rx) = flume::unbounded::>(); let body = Body::wrap_stream(stream_rx.into_stream()); - let (response_tx, response_rx) = flume::bounded(1); + let (response_builder_tx, response_builder_rx) = flume::bounded(1); let mut total_bytes = 0; match stream_result { StreamResult::Start(response) => { - response_tx.send_async(response).await.unwrap_or(()); + response_builder_tx.send_async(response).await.unwrap_or(()); } StreamResult::Data(bytes) => { total_bytes += bytes.len(); @@ -58,7 +58,7 @@ where while let Ok(result) = rx.recv_async().await { match result { RunResult::Stream(StreamResult::Start(response)) => { - response_tx.send_async(response).await.unwrap_or(()); + response_builder_tx.send_async(response).await.unwrap_or(()); } RunResult::Stream(StreamResult::Data(bytes)) => { total_bytes += bytes.len(); @@ -87,41 +87,37 @@ where } }); - let response = response_rx.recv_async().await?; - let hyper_response = Builder::try_from(&response)?.body(body)?; + let response_builder = response_builder_rx.recv_async().await?; + let response = response_builder.body(body)?; - Ok(hyper_response) + Ok(response) } RunResult::Response(response, elapsed) => { - let event = - ResponseEvent::Bytes(response.len(), elapsed.map(|duration| duration.as_micros())); + let event = ResponseEvent::Bytes(0, elapsed.map(|duration| duration.as_micros())); on_event(event).await?; - Ok(Builder::try_from(&response)?.body(response.body.into())?) + Ok(response) } RunResult::Timeout | RunResult::MemoryLimit => { let event = ResponseEvent::LimitsReached(result); on_event(event).await?; - Ok(HyperResponse::builder().status(502).body(PAGE_502.into())?) + Ok(Response::builder().status(502).body(PAGE_502.into())?) } RunResult::Error(_) => { let event = ResponseEvent::Error(result); on_event(event).await?; - Ok(HyperResponse::builder().status(500).body(PAGE_500.into())?) + Ok(Response::builder().status(500).body(PAGE_500.into())?) } } } #[cfg(test)] mod tests { - use std::time::Duration; - - use hyper::body::to_bytes; - use lagon_runtime_http::Response; - use super::*; + use hyper::{body::to_bytes, Response}; + use std::time::Duration; #[tokio::test] async fn sequential() { @@ -137,9 +133,12 @@ mod tests { ); }); - tx.send_async(RunResult::Response(Response::from("Hello World"), None)) - .await - .unwrap(); + tx.send_async(RunResult::Response( + Response::new("Hello World".into()), + None, + )) + .await + .unwrap(); handle.await.unwrap(); } @@ -158,7 +157,7 @@ mod tests { ); }); - tx.send_async(RunResult::Stream(StreamResult::Start(Response::from("")))) + tx.send_async(RunResult::Stream(StreamResult::Start(Response::builder()))) .await .unwrap(); @@ -199,7 +198,7 @@ mod tests { .await .unwrap(); - tx.send_async(RunResult::Stream(StreamResult::Start(Response::from("")))) + tx.send_async(RunResult::Stream(StreamResult::Start(Response::builder()))) .await .unwrap(); diff --git a/crates/runtime_v8_utils/Cargo.toml b/crates/runtime_v8_utils/Cargo.toml index 0b331099b..8d08a4efd 100644 --- a/crates/runtime_v8_utils/Cargo.toml +++ b/crates/runtime_v8_utils/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -v8 = "0.71.1" +v8 = "0.71.2" anyhow = "1.0.71" +hyper = { version = "0.14.26" } diff --git a/crates/runtime_v8_utils/src/lib.rs b/crates/runtime_v8_utils/src/lib.rs index 834977d8f..03e2f4529 100644 --- a/crates/runtime_v8_utils/src/lib.rs +++ b/crates/runtime_v8_utils/src/lib.rs @@ -1,4 +1,7 @@ use anyhow::{anyhow, Result}; +use hyper::{header::HeaderName, http::HeaderValue, HeaderMap}; + +const X_LAGON_ID: &str = "x-lagon-id"; pub fn extract_v8_string( value: v8::Local, @@ -20,9 +23,10 @@ pub fn extract_v8_integer(value: v8::Local, scope: &mut v8::HandleSco } pub fn extract_v8_headers_object( + header_map: &mut HeaderMap, value: v8::Local, scope: &mut v8::HandleScope, -) -> Result)>>> { +) -> Result<()> { if !value.is_map() { return Err(anyhow!("Value is not of type 'Map'")); } @@ -32,7 +36,6 @@ pub fn extract_v8_headers_object( if map.size() > 0 { let headers_keys = map.as_array(scope); let length = headers_keys.length(); - let mut headers = Vec::with_capacity((length / 2) as usize); for mut index in 0..length { if index % 2 != 0 { @@ -45,39 +48,27 @@ pub fn extract_v8_headers_object( index += 1; - let values = headers_keys - .get_index(scope, index) - .map_or_else(Vec::new, |value| { - let mut result = Vec::new(); - - if value.is_array() { - let values = unsafe { v8::Local::::cast(value) }; + for value in headers_keys.get_index(scope, index).into_iter() { + if value.is_array() { + let values = unsafe { v8::Local::::cast(value) }; - for i in 0..values.length() { - let value = values - .get_index(scope, i) - .map_or_else(String::new, |value| { - value.to_rust_string_lossy(scope) - }); + for i in 0..values.length() { + let value = values + .get_index(scope, i) + .map_or_else(String::new, |value| value.to_rust_string_lossy(scope)); - result.push(value); - } - } else { - let value = value.to_rust_string_lossy(scope); - - result.push(value); + header_map.append(HeaderName::from_bytes(key.as_bytes())?, value.parse()?); } + } else { + let value = value.to_rust_string_lossy(scope); - result - }); - - headers.push((key, values)); + header_map.append(HeaderName::from_bytes(key.as_bytes())?, value.parse()?); + } + } } - - return Ok(Some(headers)); } - Ok(None) + Ok(()) } pub fn extract_v8_uint8array(value: v8::Local) -> Result> { @@ -118,26 +109,28 @@ pub fn v8_uint8array<'a>( pub fn v8_headers_object<'a>( scope: &mut v8::HandleScope<'a>, - value: Vec<(String, Vec)>, + value: HeaderMap, ) -> v8::Local<'a, v8::Object> { let len = value.len(); let mut names = Vec::with_capacity(len); let mut values = Vec::with_capacity(len); - for (key, headers) in value.iter() { - let key = v8_string(scope, key); - - let mut elements = Vec::with_capacity(headers.len()); + for key in value.keys() { + if key != X_LAGON_ID { + // We guess that most of the time there will be only one header value + let mut elements = Vec::with_capacity(1); - for header in headers.iter() { - elements.push(v8_string(scope, header).into()) - } + for value in value.get_all(key) { + elements.push(v8_string(scope, value.to_str().unwrap()).into()) + } - let array = v8::Array::new_with_elements(scope, &elements); + let key = v8_string(scope, key.as_str()); + names.push(key.into()); - names.push(key.into()); - values.push(array.into()); + let array = v8::Array::new_with_elements(scope, &elements); + values.push(array.into()); + } } let null = v8::null(scope).into(); diff --git a/crates/serverless/Cargo.toml b/crates/serverless/Cargo.toml index d311b16b1..fa563b76e 100644 --- a/crates/serverless/Cargo.toml +++ b/crates/serverless/Cargo.toml @@ -15,7 +15,7 @@ lagon-serverless-logger = { path = "../serverless_logger" } lagon-serverless-downloader = { path = "../serverless_downloader" } lagon-serverless-pubsub = { path = "../serverless_pubsub" } flume = "0.10.14" -mysql = "23.0.1" +mysql = "24.0.0" dotenv = "0.15.0" serde_json = "1.0" metrics = "0.21.0" @@ -36,7 +36,7 @@ lagon-runtime-isolate = { path = "../runtime_isolate" } flume = "0.10.14" [dev-dependencies] -reqwest = "0.11.17" +reqwest = "0.11.18" serial_test = "2.0.0" clickhouse = { version = "0.11.4", features = ["test-util"] } diff --git a/crates/serverless/src/deployments/cache.rs b/crates/serverless/src/deployments/cache.rs index 1d66ea305..95e8b22e2 100644 --- a/crates/serverless/src/deployments/cache.rs +++ b/crates/serverless/src/deployments/cache.rs @@ -1,55 +1,53 @@ -use std::{ - env, - sync::Arc, - time::{Duration, Instant}, -}; +use super::{pubsub::clear_deployment_cache, Deployments}; +use crate::{serverless::Workers, REGION}; +use clickhouse::{Client, Row}; +use serde::Deserialize; +use std::{collections::HashSet, env, sync::Arc, time::Duration}; -use super::pubsub::clear_deployment_cache; -use crate::serverless::Workers; -use dashmap::DashMap; +const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(5); -const CACHE_TASK_INTERVAL: Duration = Duration::from_secs(1); +#[derive(Debug, Row, Deserialize)] +struct MyRow { + count: usize, +} -pub fn run_cache_clear_task(last_requests: Arc>, workers: Workers) { +pub fn run_cache_clear_task(client: &Client, deployments: Deployments, workers: Workers) { let isolates_cache_seconds = Duration::from_secs( env::var("LAGON_ISOLATES_CACHE_SECONDS") .expect("LAGON_ISOLATES_CACHE_SECONDS is not set") .parse() .expect("LAGON_ISOLATES_CACHE_SECONDS is not a valid number"), ); + let client = Arc::new(client.clone()); tokio::spawn(async move { - let mut deployments_to_clear = Vec::new(); - loop { tokio::time::sleep(CACHE_TASK_INTERVAL).await; - let now = Instant::now(); - - for last_request in last_requests.iter() { - let (deployment_id, last_request) = last_request.pair(); - - if now.duration_since(*last_request) > isolates_cache_seconds { - deployments_to_clear.push(deployment_id.clone()); + let deployments_id = deployments + .iter() + .map(|deployment| deployment.id.clone()) + .collect::>(); + + for deployment_id in deployments_id { + let query = client + .query("SELECT count(*) as count FROM serverless.requests WHERE timestamp >= subtractSeconds(now(), ?) AND region = ? AND deployment_id = ?") + .bind(isolates_cache_seconds.as_secs()) + .bind(REGION.clone()) + .bind(deployment_id.clone()) + .fetch_one::().await; + + if let Ok(row) = query { + if row.count == 0 { + clear_deployment_cache( + deployment_id, + Arc::clone(&workers), + String::from("expiration"), + ) + .await; + } } } - - if deployments_to_clear.is_empty() { - continue; - } - - for deployment_id in &deployments_to_clear { - last_requests.remove(deployment_id); - - clear_deployment_cache( - deployment_id.clone(), - Arc::clone(&workers), - String::from("expiration"), - ) - .await; - } - - deployments_to_clear.clear(); } }); } diff --git a/crates/serverless/src/serverless.rs b/crates/serverless/src/serverless.rs index f65cc7f1e..224ac08d8 100644 --- a/crates/serverless/src/serverless.rs +++ b/crates/serverless/src/serverless.rs @@ -6,16 +6,15 @@ use crate::{ use anyhow::Result; use clickhouse::{inserter::Inserter, Client}; use dashmap::DashMap; +use futures::lock::Mutex; use hyper::{ header::HOST, http::response::Builder, server::conn::AddrStream, service::{make_service_fn, service_fn}, - Body, Request as HyperRequest, Response as HyperResponse, Server, -}; -use lagon_runtime_http::{ - Request, Response, RunResult, X_FORWARDED_FOR, X_LAGON_ID, X_LAGON_REGION, X_REAL_IP, + Body, Request, Response, Server, }; +use lagon_runtime_http::{RunResult, X_FORWARDED_FOR, X_LAGON_ID, X_LAGON_REGION, X_REAL_IP}; use lagon_runtime_isolate::{ options::{IsolateOptions, Metadata}, Isolate, IsolateEvent, IsolateRequest, @@ -36,9 +35,9 @@ use std::{ net::SocketAddr, path::Path, sync::Arc, - time::{Duration, Instant, UNIX_EPOCH}, + time::{Duration, UNIX_EPOCH}, }; -use tokio::{runtime::Handle, sync::Mutex}; +use tokio::{runtime::Handle, sync::Mutex as TokioMutex}; pub type Workers = Arc>>; @@ -97,14 +96,13 @@ async fn handle_error( } async fn handle_request( - req: HyperRequest, + req: Request, ip: String, deployments: Deployments, - last_requests: Arc>, workers: Workers, inserters: Arc, Inserter)>>, log_sender: flume::Sender<(String, String, Metadata)>, -) -> Result> { +) -> Result> { let request_id = match req.headers().get(X_LAGON_ID) { Some(x_lagon_id) => x_lagon_id.to_str().unwrap_or("").to_string(), None => String::new(), @@ -135,7 +133,7 @@ async fn handle_request( ); warn!(req = as_debug!(req), ip = ip, hostname = hostname, request = request_id; "No deployment found for hostname"); - return Ok(HyperResponse::builder().status(404).body(PAGE_404.into())?); + return Ok(Response::builder().status(404).body(PAGE_404.into())?); } }; @@ -148,7 +146,7 @@ async fn handle_request( ); warn!(req = as_debug!(req), ip = ip, hostname = hostname, request = request_id; "Cron deployment cannot be called directly"); - return Ok(HyperResponse::builder().status(403).body(PAGE_403.into())?); + return Ok(Response::builder().status(403).body(PAGE_403.into())?); } let function_id = deployment.function_id.clone(); @@ -184,114 +182,102 @@ async fn handle_request( } else if is_favicon { sender .send_async(RunResult::Response( - Response { - status: 404, - ..Default::default() - }, + Response::builder().status(404).body(Body::empty())?, None, )) .await .unwrap_or(()); } else { - last_requests.insert(deployment_id.clone(), Instant::now()); - // Try to Extract the X-Real-Ip header or fallback to remote addr IP let ip = req.headers().get(X_REAL_IP).map_or(ip.clone(), |header| { header.clone().to_str().map_or(ip, |ip| ip.to_string()) }); - match Request::from_hyper_with_capacity(req, 2).await { - Ok(mut request) => { - bytes_in = request.len() as u32; - - request.set_header(X_FORWARDED_FOR.to_string(), ip.to_string()); - request.set_header(X_LAGON_REGION.to_string(), REGION.to_string()); - - let isolate_workers = Arc::clone(&workers); - let isolate_sender = workers.entry(deployment_id.clone()).or_insert_with(|| { - let handle = Handle::current(); - let (sender, receiver) = flume::unbounded(); - let labels = labels.clone(); - - std::thread::Builder::new().name(String::from("isolate-") + deployment.id.as_str()).spawn(move || { - handle.block_on(async move { - increment_gauge!("lagon_isolates", 1.0, &labels); - info!(deployment = deployment.id, function = deployment.function_id, request = request_id_handle; "Creating new isolate"); - - let code = deployment.get_code().unwrap_or_else(|error| { - error!(deployment = deployment.id, request = request_id_handle; "Error while getting deployment code: {}", error); - - "".into() - }); - let options = IsolateOptions::new(code) - .environment_variables(deployment.environment_variables.clone()) - .memory(deployment.memory) - .tick_timeout(Duration::from_millis(deployment.tick_timeout as u64)) - .total_timeout(Duration::from_millis( - deployment.total_timeout as u64, - )) - .metadata(Some(( - deployment.id.clone(), - deployment.function_id.clone(), - ))) - .on_drop_callback(Box::new(|metadata| { - if let Some(metadata) = metadata.as_ref().as_ref() { - let labels = [ - ("deployment", metadata.0.clone()), - ("function", metadata.1.clone()), - ("region", REGION.clone()), - ]; - - decrement_gauge!("lagon_isolates", 1.0, &labels); - info!(deployment = metadata.0, function = metadata.1; "Dropping isolate"); - } - })) - .on_statistics_callback(Box::new(|metadata, statistics| { - if let Some(metadata) = metadata.as_ref().as_ref() { - let labels = [ - ("deployment", metadata.0.clone()), - ("function", metadata.1.clone()), - ("region", REGION.clone()), - ]; - - histogram!( - "lagon_isolate_memory_usage", - statistics as f64, - &labels - ); - } - })) - .log_sender(log_sender) - .snapshot_blob(SNAPSHOT_BLOB); - - let mut isolate = Isolate::new(options, receiver); - isolate.evaluate(); - isolate.run_event_loop().await; - - // When the event loop is completed, that means a) the isolate was terminate due to limits - // or b) the isolate was dropped because of cache expiration. In the first case, the isolate - // isn't removed from the workers map - isolate_workers.remove(&deployment.id); - }); - }).unwrap(); - - sender + let (mut parts, body) = req.into_parts(); + let body = hyper::body::to_bytes(body).await?; + + bytes_in = body.len() as u32; + + parts.headers.insert(X_FORWARDED_FOR, ip.parse()?); + parts.headers.insert(X_LAGON_REGION, REGION.parse()?); + + let request = (parts, body); + + let isolate_workers = Arc::clone(&workers); + let isolate_sender = workers.entry(deployment_id.clone()).or_insert_with(|| { + let handle = Handle::current(); + let (sender, receiver) = flume::unbounded(); + let labels = labels.clone(); + + std::thread::Builder::new().name(String::from("isolate-") + deployment.id.as_str()).spawn(move || { + handle.block_on(async move { + increment_gauge!("lagon_isolates", 1.0, &labels); + info!(deployment = deployment.id, function = deployment.function_id, request = request_id_handle; "Creating new isolate"); + + let code = deployment.get_code().unwrap_or_else(|error| { + error!(deployment = deployment.id, request = request_id_handle; "Error while getting deployment code: {}", error); + + "".into() + }); + let options = IsolateOptions::new(code) + .environment_variables(deployment.environment_variables.clone()) + .memory(deployment.memory) + .tick_timeout(Duration::from_millis(deployment.tick_timeout as u64)) + .total_timeout(Duration::from_millis( + deployment.total_timeout as u64, + )) + .metadata(Some(( + deployment.id.clone(), + deployment.function_id.clone(), + ))) + .on_drop_callback(Box::new(|metadata| { + if let Some(metadata) = metadata.as_ref().as_ref() { + let labels = [ + ("deployment", metadata.0.clone()), + ("function", metadata.1.clone()), + ("region", REGION.clone()), + ]; + + decrement_gauge!("lagon_isolates", 1.0, &labels); + info!(deployment = metadata.0, function = metadata.1; "Dropping isolate"); + } + })) + .on_statistics_callback(Box::new(|metadata, statistics| { + if let Some(metadata) = metadata.as_ref().as_ref() { + let labels = [ + ("deployment", metadata.0.clone()), + ("function", metadata.1.clone()), + ("region", REGION.clone()), + ]; + + histogram!( + "lagon_isolate_memory_usage", + statistics as f64, + &labels + ); + } + })) + .log_sender(log_sender) + .snapshot_blob(SNAPSHOT_BLOB); + + let mut isolate = Isolate::new(options, receiver); + isolate.evaluate(); + isolate.run_event_loop().await; + + // When the event loop is completed, that means a) the isolate was terminate due to limits + // or b) the isolate was dropped because of cache expiration. In the first case, the isolate + // isn't removed from the workers map + isolate_workers.remove(&deployment.id); }); + }).unwrap(); - isolate_sender - .send_async(IsolateEvent::Request(IsolateRequest { request, sender })) - .await - .unwrap_or(()); - } - Err(error) => { - error!(deployment = &deployment.id, request = request_id_handle; "Error while parsing request: {}", error); + sender + }); - sender - .send_async(RunResult::Error("Error while parsing request".into())) - .await - .unwrap_or(()); - } - } + isolate_sender + .send_async(IsolateEvent::Request(IsolateRequest { request, sender })) + .await + .unwrap_or(()); } handle_response(receiver, move |event| { @@ -304,6 +290,8 @@ async fn handle_request( async move { match event { ResponseEvent::Bytes(bytes, cpu_time_micros) => { + let timestamp = UNIX_EPOCH.elapsed().unwrap().as_secs() as u32; + inserters .lock() .await @@ -315,7 +303,7 @@ async fn handle_request( bytes_in, bytes_out: bytes as u32, cpu_time_micros, - timestamp: UNIX_EPOCH.elapsed().unwrap().as_secs() as u32, + timestamp, }) .await .unwrap_or(()); @@ -375,10 +363,8 @@ where D: Downloader + Send + Sync + 'static, P: PubSubListener + Unpin + 'static, { - let last_requests = Arc::new(DashMap::new()); - let workers = Arc::new(DashMap::new()); - let pubsub = Arc::new(Mutex::new(pubsub)); + let pubsub = Arc::new(TokioMutex::new(pubsub)); listen_pub_sub( Arc::clone(&downloader), @@ -387,7 +373,7 @@ where // Arc::clone(&cronjob), pubsub, ); - run_cache_clear_task(Arc::clone(&last_requests), Arc::clone(&workers)); + run_cache_clear_task(&client, Arc::clone(&deployments), Arc::clone(&workers)); let insertion_interval = Duration::from_secs(1); let inserters = Arc::new(Mutex::new(( @@ -447,7 +433,6 @@ where let server = Server::bind(&addr).serve(make_service_fn(move |conn: &AddrStream| { let deployments = Arc::clone(&deployments); - let last_requests = Arc::clone(&last_requests); let workers = Arc::clone(&workers); let inserters = Arc::clone(&inserters); let log_sender = log_sender.clone(); @@ -461,7 +446,6 @@ where req, ip.clone(), Arc::clone(&deployments), - Arc::clone(&last_requests), Arc::clone(&workers), Arc::clone(&inserters), log_sender.clone(), diff --git a/crates/serverless/tests/requests.rs b/crates/serverless/tests/requests.rs index 03ebaf3c9..64d78e8d4 100644 --- a/crates/serverless/tests/requests.rs +++ b/crates/serverless/tests/requests.rs @@ -90,17 +90,17 @@ async fn returns_correct_path() -> Result<()> { let response = reqwest::get("http://127.0.0.1:4000").await?; assert_eq!(response.status(), 200); - assert_eq!(response.text().await?, "http://127.0.0.1:4000/"); + assert_eq!(response.text().await?, "https://127.0.0.1:4000/"); let response = reqwest::get("http://127.0.0.1:4000/test").await?; assert_eq!(response.status(), 200); - assert_eq!(response.text().await?, "http://127.0.0.1:4000/test"); + assert_eq!(response.text().await?, "https://127.0.0.1:4000/test"); let response = reqwest::get("http://127.0.0.1:4000/test?hello=world").await?; assert_eq!(response.status(), 200); assert_eq!( response.text().await?, - "http://127.0.0.1:4000/test?hello=world" + "https://127.0.0.1:4000/test?hello=world" ); Ok(()) diff --git a/crates/wpt-runner/Cargo.toml b/crates/wpt-runner/Cargo.toml index db7fc7205..ab1dfd17e 100644 --- a/crates/wpt-runner/Cargo.toml +++ b/crates/wpt-runner/Cargo.toml @@ -11,3 +11,4 @@ lagon-runtime-isolate = { path = "../runtime_isolate" } flume = "0.10.14" console = "0.15.5" once_cell = "1.17.1" +hyper = { version = "0.14.26", features = ["server"] } diff --git a/crates/wpt-runner/current-results.md b/crates/wpt-runner/current-results.md index 17ce97a3c..c19db91b4 100644 --- a/crates/wpt-runner/current-results.md +++ b/crates/wpt-runner/current-results.md @@ -53,12 +53,45 @@ TEST DONE 1 XMLHttpRequest with value %1E TEST DONE 1 XMLHttpRequest with value %1F TEST DONE 1 XMLHttpRequest with value %20 TEST DONE 1 fetch() with value %00 +TEST DONE 1 fetch() with value %01 +TEST DONE 1 fetch() with value %02 +TEST DONE 1 fetch() with value %03 +TEST DONE 1 fetch() with value %04 +TEST DONE 1 fetch() with value %05 +TEST DONE 1 fetch() with value %06 +TEST DONE 1 fetch() with value %07 +TEST DONE 1 fetch() with value %08 +TEST DONE 1 fetch() with value %09 +TEST DONE 1 fetch() with value %0A +TEST DONE 1 fetch() with value %0D +TEST DONE 1 fetch() with value %0E +TEST DONE 1 fetch() with value %0F +TEST DONE 1 fetch() with value %10 +TEST DONE 1 fetch() with value %11 +TEST DONE 1 fetch() with value %12 +TEST DONE 1 fetch() with value %13 +TEST DONE 1 fetch() with value %14 +TEST DONE 1 fetch() with value %15 +TEST DONE 1 fetch() with value %16 +TEST DONE 1 fetch() with value %17 +TEST DONE 1 fetch() with value %18 +TEST DONE 1 fetch() with value %19 +TEST DONE 1 fetch() with value %1A +TEST DONE 1 fetch() with value %1B +TEST DONE 1 fetch() with value %1C +TEST DONE 1 fetch() with value %1D +TEST DONE 1 fetch() with value %1E +TEST DONE 1 fetch() with value %1F +TEST DONE 1 fetch() with value %20 Running ../../tools/wpt/fetch/api/headers/header-values.any.js TEST DONE 1 XMLHttpRequest with value x%00x needs to throw TEST DONE 1 XMLHttpRequest with value x%0Ax needs to throw TEST DONE 1 XMLHttpRequest with value x%0Dx needs to throw TEST DONE 1 XMLHttpRequest with all valid values TEST DONE 1 fetch() with value x%00x needs to throw +TEST DONE 1 fetch() with value x%0Ax needs to throw +TEST DONE 1 fetch() with value x%0Dx needs to throw +TEST DONE 1 fetch() with all valid values Running ../../tools/wpt/fetch/api/headers/headers-basic.any.js TEST DONE 0 Create headers from no parameter TEST DONE 0 Create headers from undefined parameter @@ -412,6 +445,7 @@ TEST DONE 0 Request does not expose internalpriority attribute TEST DONE 0 Request does not expose blocking attribute Running ../../tools/wpt/fetch/api/response/json.any.js TEST DONE 1 Ensure the correct JSON parser is used +TEST DONE 1 Ensure UTF-16 results in an error Skipping ../../tools/wpt/fetch/api/response/response-cancel-stream.any.js Running ../../tools/wpt/fetch/api/response/response-clone.any.js TEST DONE 0 Check Response's clone with default values, without body @@ -420,6 +454,20 @@ TEST DONE 1 Check orginal response's body after cloning TEST DONE 1 Check cloned response's body TEST DONE 1 Cannot clone a disturbed response TEST DONE 1 Cloned responses should provide the same data +TEST DONE 1 Cancelling stream should not affect cloned one +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Int8Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Int16Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Int32Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (ArrayBufferchunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Uint8Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Uint8ClampedArraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Uint16Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Uint32Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (BigInt64Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (BigUint64Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Float32Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (Float64Arraychunk) +TEST DONE 1 Check response clone use structureClone for teed ReadableStreams (DataViewchunk) Running ../../tools/wpt/fetch/api/response/response-consume-empty.any.js TEST DONE 1 Consume response's body as text TEST DONE 1 Consume response's body as blob @@ -1700,5 +1748,5 @@ Running ../../tools/wpt/urlpattern/urlpattern-compare.https.any.js Running ../../tools/wpt/urlpattern/urlpattern.any.js Running ../../tools/wpt/urlpattern/urlpattern.https.any.js -1576 tests, 464 passed, 1103 failed - -> 29% conformance +1620 tests, 464 passed, 1151 failed + -> 28% conformance diff --git a/crates/wpt-runner/src/main.rs b/crates/wpt-runner/src/main.rs index e9ab620f0..4e026726d 100644 --- a/crates/wpt-runner/src/main.rs +++ b/crates/wpt-runner/src/main.rs @@ -1,6 +1,7 @@ use console::style; +use hyper::{http::Request, Body}; use lagon_runtime::{options::RuntimeOptions, Runtime}; -use lagon_runtime_http::{Request, RunResult}; +use lagon_runtime_http::RunResult; use lagon_runtime_isolate::{options::IsolateOptions, Isolate, IsolateEvent, IsolateRequest}; use once_cell::sync::Lazy; use std::{ @@ -139,9 +140,12 @@ export function handler() {{ }); let (request_tx, request_rx) = flume::unbounded(); + let (parts, body) = Request::new(Body::empty()).into_parts(); + let body = hyper::body::to_bytes(body).await.unwrap(); + let request = (parts, body); tx.send_async(IsolateEvent::Request(IsolateRequest { - request: Request::default(), + request, sender: request_tx, })) .await diff --git a/docker/Dockerfile b/docker/Dockerfile index 542fca355..11131de8c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -5,7 +5,7 @@ FROM chef AS planner COPY . . RUN cargo chef prepare --recipe-path recipe.json -FROM node:20.1.0-bullseye-slim as js-runtime +FROM node:20.2.0-bullseye-slim as js-runtime WORKDIR /app COPY ./packages/js-runtime/src/ ./src COPY ./packages/js-runtime/package.json ./ diff --git a/packages/js-runtime/src/index.ts b/packages/js-runtime/src/index.ts index 71007b5f7..c61259f54 100644 --- a/packages/js-runtime/src/index.ts +++ b/packages/js-runtime/src/index.ts @@ -123,7 +123,7 @@ declare global { b: RequestInit['body']; }, ) => Promise<{ - b: string; + b?: string; h: ResponseInit['headers']; s: ResponseInit['status']; }>; @@ -153,7 +153,6 @@ globalThis.masterHandler = async (id, handler, request) => { }); const response = await handler(handlerRequest); - let body: string; if (response.isStream) { const responseBody = response.body; @@ -162,7 +161,6 @@ globalThis.masterHandler = async (id, handler, request) => { throw new Error('Got a stream without a body'); } - body = responseBody.toString(); const reader = responseBody.getReader(); const read = () => { @@ -181,13 +179,16 @@ globalThis.masterHandler = async (id, handler, request) => { }; read(); + + return { + h: response.headers, + s: response.status, + }; } else { - body = await response.text(); + return { + b: await response.text(), + h: response.headers, + s: response.status, + }; } - - return { - b: body, - h: response.headers, - s: response.status, - }; };