Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v5 (I think) middleware system #235

Closed
wants to merge 14 commits into from
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ name = "tests"
path = "tests/tauri/tauri_test.rs"
harness = false

[[bench]]
name = "benchmarks"
harness = false

[features]
default = ["typescript"]
typescript = ["rspc-core/typescript", "specta/typescript"]
Expand Down
9 changes: 9 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
A middleware should be unable to return `impl Stream` if it's being put on a `query` or `subscription`???

Supporting file streaming over rspc


It would be nice if `rspc-core` could not depend on the whole `futures`

Export `Error` and `ExecError` from root of `rspc-core`
Can `Error` and `ExecError` now be merged???
312 changes: 0 additions & 312 deletions benches/benchmarks.rs

This file was deleted.

48 changes: 24 additions & 24 deletions crates/core/src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
// use std::{
// pin::Pin,
// task::{Context, Poll},
// };

use serde_json::Value;
// use serde_json::Value;

use crate::error::ExecError;
// use crate::error::ExecError;

/// The resulting body from an rspc operation.
///
/// This can mean different things in different contexts.
/// For a query or mutation each frame is a part of the resulting single "message". Eg. part of the json, or part of a file.
/// For a subscription each frame is a discrete websocket message. Eg. the json for a single procedure's result
///
#[must_use = "`Body` do nothing unless polled"]
pub trait Body {
// TODO: Return `bytes::Bytes` instead
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Value, ExecError>>>;
// /// The resulting body from an rspc operation.
// ///
// /// This can mean different things in different contexts.
// /// For a query or mutation each frame is a part of the resulting single "message". Eg. part of the json, or part of a file.
// /// For a subscription each frame is a discrete websocket message. Eg. the json for a single procedure's result
// ///
// #[must_use = "`Body` do nothing unless polled"]
// pub trait Body {
// // TODO: Return `bytes::Bytes` instead
// fn poll_next(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Value, ExecError>>>;

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
// #[inline]
// fn size_hint(&self) -> (usize, Option<usize>) {
// (0, None)
// }
// }
1 change: 0 additions & 1 deletion crates/core/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ mod body;
mod once;

pub use body::*;
pub(crate) use once::*;
91 changes: 46 additions & 45 deletions crates/core/src/body/once.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
use std::{
future::Future,
ops::DerefMut,
pin::Pin,
task::{Context, Poll},
};

use futures::{stream::Once, Stream};
use serde_json::Value;

use crate::error::ExecError;

use super::Body;

impl Body for Box<dyn Body + Send + '_> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Value, ExecError>>> {
let inner = Pin::into_inner(self).deref_mut();

// SAFETY: This is the same implementation as std::pin::pin!().
// We're not using it as it uses a block rather than &mut-ing the inner value directly.
let inner = unsafe { Pin::new_unchecked(inner) };

inner.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(&**self).size_hint()
}
}

impl<Fut: Future<Output = Result<Value, ExecError>>> Body for Once<Fut> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Value, ExecError>>> {
Stream::poll_next(self, cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Stream::size_hint(self)
}
}
// use std::{
// future::Future,
// ops::DerefMut,
// pin::Pin,
// task::{Context, Poll},
// };

// use futures::{stream::Once, Stream};
// use serde_json::Value;

// use crate::error::ExecError;

// use super::Body;

// impl Body for Box<dyn Body + Send + '_> {
// fn poll_next(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Value, ExecError>>> {
// let inner = Pin::into_inner(self).deref_mut();

// // SAFETY: This is the same implementation as std::pin::pin!().
// // We're not using it as it uses a block rather than &mut-ing the inner value directly.
// #[allow(unsafe_code)]
// let inner = unsafe { Pin::new_unchecked(inner) };

// inner.poll_next(cx)
// }

// fn size_hint(&self) -> (usize, Option<usize>) {
// (&**self).size_hint()
// }
// }

// impl<Fut: Future<Output = Result<Value, ExecError>>> Body for Once<Fut> {
// fn poll_next(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Value, ExecError>>> {
// Stream::poll_next(self, cx)
// }

// fn size_hint(&self) -> (usize, Option<usize>) {
// Stream::size_hint(self)
// }
// }
55 changes: 55 additions & 0 deletions crates/core/src/body2/cursed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{cell::Cell, future::poll_fn, task::Poll};

use crate::Body;

// TODO: Make this private
pub enum YieldMsg {
YieldBody,
YieldBodyResult(serde_json::Value),
YieldBodyStream,
YieldNextBody,
}

thread_local! {
// TODO: Make this private
pub static CURSED_OP: Cell<Option<YieldMsg>> = const { Cell::new(None) };
}

// TODO: Make private
pub async fn inner() -> Body {
let mut state = false;
poll_fn(|_| match state {
false => {
CURSED_OP.set(Some(YieldMsg::YieldBody));
state = true;
return Poll::Pending;
}
true => {
let y = CURSED_OP
.take()
.expect("Expected response from outer future!");
return Poll::Ready(match y {
YieldMsg::YieldBody => unreachable!(),
YieldMsg::YieldBodyResult(body) => Body::Value(body),
YieldMsg::YieldBodyStream => unreachable!(),
YieldMsg::YieldNextBody => unreachable!(),
});
}
})
.await
}

// TODO: Use this instead
// // Called on `Poll::Pending` from inner
// pub fn outer(waker: &Waker) {
// if let Some(op) = CURSED_OP.take() {
// match op {
// YieldMsg::YieldBody => {
// // TODO: Get proper value
// CURSED_OP.set(Some(YieldMsg::YieldBodyResult(serde_json::Value::Null)));
// waker.wake_by_ref();
// }
// YieldMsg::YieldBodyResult(_) => unreachable!(),
// }
// }
// }
55 changes: 55 additions & 0 deletions crates/core/src/body2/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::pin::Pin;

use futures::Stream;
use serde_json::Value;

use crate::error::ExecError;

pub mod cursed;

// It is expected that the type remains the same for all items of a single stream! It's ok for panic's if this is violated.
//
// TODO: Can this be `pub(crate)`??? -> Right now `Layer` is the problem
#[derive(Debug)]
pub enum ValueOrBytes {
Value(serde_json::Value),
Bytes(Vec<u8>),
}

pub(crate) type StreamItem = Result<ValueOrBytes, ExecError>;
pub(crate) type ErasedBody = Pin<Box<dyn Stream<Item = StreamItem> + Send>>;

#[derive(Debug)]
#[non_exhaustive]
pub enum Body {
// Derived when `Stream` has one and only one item
Value(serde_json::Value),
// Derived from `ValueOrBytes`
Stream(StreamBody),
// Derived from `ValueOrBytes`
Bytes(BytesBody), // TODO: Implement this
}

#[derive(Debug)] // TODO: Better debug impl
pub struct StreamBody {}

impl Stream for StreamBody {
type Item = Result<Value, ExecError>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Set into thread, hey I want the next value

// Suspense
todo!();
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

#[derive(Debug)] // TODO: Better debug impl
pub struct BytesBody {}
12 changes: 5 additions & 7 deletions crates/core/src/exec/arc_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
// TODO: This is basically required for queueing an rspc subscription onto it's own task which with Tokio requires `'static`.
// TODO: This whole thing is really similar to the `owning_ref` crate but I want to erase the `T` from `Arc<T>` which is done through the `drop` function pointer.
use std::{
convert::Infallible,
mem::size_of,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
};

use serde_json::Value;

use crate::{
body::Body,
body2::ErasedBody,
middleware::{ProcedureKind, RequestContext},
router_builder::ProcedureMap,
Router,
Expand Down Expand Up @@ -104,7 +102,7 @@ fn get_procedure<TCtx: 'static>(
data: RequestData,
kind: ProcedureKind,
procedures: fn(&Router<TCtx>) -> &ProcedureMap<TCtx>,
) -> Option<ArcRef<Pin<Box<dyn Body + Send>>>> {
) -> Option<ArcRef<ErasedBody>> {
unsafe {
ArcRef::new(router, |router| {
let _: &'static _ = router;
Expand All @@ -130,7 +128,7 @@ pub(crate) fn get_query<TCtx: 'static>(
router: Arc<Router<TCtx>>,
ctx: TCtx,
data: RequestData,
) -> Option<ArcRef<Pin<Box<dyn Body + Send>>>> {
) -> Option<ArcRef<ErasedBody>> {
get_procedure(router, ctx, data, ProcedureKind::Query, |router| {
&router.queries
})
Expand All @@ -140,7 +138,7 @@ pub(crate) fn get_mutation<TCtx: 'static>(
router: Arc<Router<TCtx>>,
ctx: TCtx,
data: RequestData,
) -> Option<ArcRef<Pin<Box<dyn Body + Send>>>> {
) -> Option<ArcRef<ErasedBody>> {
get_procedure(router, ctx, data, ProcedureKind::Mutation, |router| {
&router.mutations
})
Expand All @@ -150,7 +148,7 @@ pub(crate) fn get_subscription<TCtx: 'static>(
router: Arc<Router<TCtx>>,
ctx: TCtx,
data: RequestData,
) -> Option<ArcRef<Pin<Box<dyn Body + Send>>>> {
) -> Option<ArcRef<ErasedBody>> {
get_procedure(router, ctx, data, ProcedureKind::Subscription, |router| {
&router.subscriptions
})
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/exec/execute.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{pin::Pin, sync::Arc};
use std::sync::Arc;

use futures::channel::oneshot;

use crate::{
body::Body,
body2::ErasedBody,
error::ExecError,
exec::{
arc_ref::{self, get_subscription, ArcRef},
Expand Down Expand Up @@ -102,7 +102,7 @@ impl<TCtx: Send + 'static> Router<TCtx> {
}
}

fn map_fut(id: u32, fut: Option<ArcRef<Pin<Box<dyn Body + Send>>>>) -> ExecutorResult {
fn map_fut(id: u32, fut: Option<ArcRef<ErasedBody>>) -> ExecutorResult {
match fut {
Some(stream) => ExecutorResult::Future(RequestFuture { id, stream }),
None => ExecutorResult::Response(Response {
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/exec/request_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use crate::{
body::Body,
body2::ErasedBody,
error::ExecError,
exec::{Response, ResponseInner},
};
Expand All @@ -21,7 +21,7 @@ pub struct RequestFuture {

// You will notice this is a `Stream` not a `Future` like would be implied by the struct.
// rspc's whole middleware system only uses `Stream`'s cause it makes life easier so we change to & from a `Future` at the start/end.
pub(crate) stream: ArcRef<Pin<Box<dyn Body + Send>>>,
pub(crate) stream: ArcRef<ErasedBody>,
}

impl fmt::Debug for RequestFuture {
Expand All @@ -39,7 +39,10 @@ impl Future for RequestFuture {
Poll::Ready(Response {
id: self.id,
inner: match self.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(result))) => ResponseInner::Value(result),
Poll::Ready(Some(Ok(result))) => ResponseInner::Value(match result {
crate::body2::ValueOrBytes::Value(v) => v,
crate::body2::ValueOrBytes::Bytes(_) => todo!("What are thoseeeeee!"),
}),
Poll::Ready(Some(Err(err))) => ResponseInner::Error(err.into()),
Poll::Ready(None) => ResponseInner::Error(ExecError::ErrStreamEmpty.into()),
Poll::Pending => return Poll::Pending,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/exec/subscription_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl SubscriptionMap {
#[cfg(debug_assertions)]
#[allow(clippy::panic)]
if !tx.is_canceled() {
panic!("Subscription was not shutdown before being removed!");
// panic!("Subscription was not shutdown before being removed!"); // TODO: Fix this
}
};
}
Expand Down
Loading
Loading