Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ async fn test_file_upload_real_multipart_bytes() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main"),
(9.98 * MB as f32) as usize, // < 10MB (in binary)
None,
|resp| async {
let res = resp.unwrap();

Expand All @@ -731,16 +732,21 @@ async fn test_file_upload_real_multipart_bytes() {
#[tokio::test]
#[serial]
async fn test_file_upload_size_exceed() {
test_oak_file_upload(Cow::Borrowed("./test_cases/main"), 10 * MB, |resp| async {
let res = resp.unwrap();
test_oak_file_upload(
Cow::Borrowed("./test_cases/main"),
10 * MB,
None,
|resp| async {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);
assert_eq!(res.status().as_u16(), 500);

let res = res.text().await;
let res = res.text().await;

assert!(res.is_ok());
assert_eq!(res.unwrap(), "Error!");
})
assert!(res.is_ok());
assert_eq!(res.unwrap(), "Error!");
},
)
.await;
}

Expand Down Expand Up @@ -1166,6 +1172,7 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main_small_cpu_time"),
48 * MB,
None,
|resp| async {
let res = resp.unwrap();

Expand All @@ -1183,8 +1190,40 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
.await;
}

async fn test_oak_file_upload<F, R>(main_service: Cow<'static, str>, bytes: usize, resp_callback: F)
where
#[tokio::test]
#[serial]
async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit_2() {
test_oak_file_upload(
Cow::Borrowed("./test_cases/main_small_cpu_time"),
1024 * 64,
Some("image/png"),
|resp| async {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);

let res = res.json::<ErrorResponsePayload>().await;

assert!(res.is_ok());

let msg = res.unwrap().msg;

assert!(!msg.starts_with("TypeError: request body receiver not connected"));
assert_eq!(
msg,
"WorkerRequestCancelled: request has been cancelled by supervisor"
);
},
)
.await;
}

async fn test_oak_file_upload<F, R>(
main_service: Cow<'static, str>,
bytes: usize,
mime: Option<&str>,
resp_callback: F,
) where
F: FnOnce(Result<Response, reqwest::Error>) -> R,
R: Future<Output = ()>,
{
Expand All @@ -1199,7 +1238,7 @@ where
"meow",
Part::bytes(vec![0u8; bytes])
.file_name("meow.bin")
.mime_str("application/octet-stream")
.mime_str(mime.unwrap_or("application/octet-stream"))
.unwrap(),
),
)
Expand Down
90 changes: 40 additions & 50 deletions crates/sb_workers/user_workers.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { primordials, core } from "ext:core/mod.js";
import { readableStreamForRid, writableStreamForRid } from 'ext:deno_web/06_streams.js';
import { getSupabaseTag } from 'ext:sb_core_main_js/js/http.js';
import { readableStreamForRid, writableStreamForRid } from "ext:deno_web/06_streams.js";
import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js";

const ops = core.ops;
const {
InterruptedPrototype,
} = core;
const {
TypeError,
ObjectPrototypeIsPrototypeOf,
StringPrototypeIncludes,
} = primordials;

const { TypeError } = primordials;

const {
op_user_worker_fetch_send,
op_user_worker_create,
Expand All @@ -33,11 +28,11 @@ class UserWorker {
this.key = key;
}

async fetch(req, opts = {}) {
const tag = getSupabaseTag(req);
async fetch(request, options = {}) {
const tag = getSupabaseTag(request);

const { method, url, headers, body, bodyUsed } = req;
const { signal } = opts;
const { method, url, headers, body, bodyUsed } = request;
const { signal } = options;

signal?.throwIfAborted();

Expand All @@ -60,62 +55,56 @@ class UserWorker {
);

// stream the request body
let reqBodyPromise = null;
let requestBodyPromise = null;

if (hasBody) {
let writableStream = writableStreamForRid(requestBodyRid);
reqBodyPromise = body.pipeTo(writableStream, { signal });
requestBodyPromise = body.pipeTo(writableStream, { signal });
}

const resPromise = op_user_worker_fetch_send(
const responsePromise = op_user_worker_fetch_send(
this.key,
requestRid,
requestBodyRid,
tag.streamRid,
tag.watcherRid
);

let [sent, res] = await Promise.allSettled([reqBodyPromise, resPromise]);

if (sent.status === "rejected") {
if (res.status === "fulfilled") {
res = res.value;
} else {
if (
ObjectPrototypeIsPrototypeOf(InterruptedPrototype, sent.reason) ||
StringPrototypeIncludes(sent.reason.message, "operation canceled")
) {
throw res.reason;
} else {
throw sent.reason;
}
}
} else if (res.status === "rejected") {
throw res.reason;
} else {
res = res.value;
const [requestBodyPromiseResult, responsePromiseResult] = await Promise.allSettled([
requestBodyPromise,
responsePromise
]);

if (requestBodyPromiseResult.status === "rejected") {
// console.warn(requestBodyPromiseResult.reason);
}

if (responsePromiseResult.status === "rejected") {
throw responsePromiseResult.reason;
}

const result = responsePromiseResult.value;
const response = {
headers: res.headers,
status: res.status,
statusText: res.statusText,
headers: result.headers,
status: result.status,
statusText: result.statusText,
body: null,
};

// TODO: add a test
if (nullBodyStatus(res.status) || redirectStatus(res.status)) {
core.close(res.bodyRid);
if (nullBodyStatus(result.status) || redirectStatus(result.status)) {
core.tryClose(result.bodyRid);
} else {
if (req.method === 'HEAD' || req.method === 'CONNECT') {
response.body = null;
core.close(res.bodyRid);
if (request.method === "HEAD" || request.method === "CONNECT") {
core.tryClose(result.bodyRid);
} else {
const bodyStream = readableStreamForRid(res.bodyRid);
const stream = readableStreamForRid(result.bodyRid);

signal?.addEventListener('abort', () => {
core.tryClose(res.bodyRid);
signal?.addEventListener("abort", () => {
core.tryClose(result.bodyRid);
});
response.body = bodyStream;

response.body = stream;
}
}

Expand Down Expand Up @@ -148,8 +137,8 @@ class UserWorker {

const { servicePath, maybeEszip } = readyOptions;

if (!maybeEszip && (!servicePath || servicePath === '')) {
throw new TypeError('service path must be defined');
if (!maybeEszip && (!servicePath || servicePath === "")) {
throw new TypeError("service path must be defined");
}

const key = await op_user_worker_create(readyOptions);
Expand All @@ -159,4 +148,5 @@ class UserWorker {
}

const SUPABASE_USER_WORKERS = UserWorker;

export { SUPABASE_USER_WORKERS };