Skip to content

Commit

Permalink
Pin all the way to hospital
Browse files Browse the repository at this point in the history
  • Loading branch information
oscartbeaumont committed Dec 27, 2023
1 parent 9fef14d commit 80ca60c
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 28 deletions.
2 changes: 2 additions & 0 deletions crates/core2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ erased-serde = "0.4.1"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
serde_json = "1" # TODO: Remove this
futures = "0.3"
12 changes: 8 additions & 4 deletions crates/core2/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::HashMap, future::Future, pin::Pin};
use std::{borrow::Cow, collections::HashMap, future::Future, pin::Pin, sync::Arc};

use crate::{serializer::Serializer, Format, Task};

Expand All @@ -8,7 +8,7 @@ pub struct RequestContext<'a> {
pub result: Serializer<'a>,
}

pub type Procedure = Box<dyn Fn(RequestContext) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>>;
pub type Procedure = Arc<dyn Fn(RequestContext) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>>;

#[derive(Default)]
pub struct Executor {
Expand Down Expand Up @@ -36,7 +36,11 @@ impl Executor {
self.procedures.len()
}

pub async fn execute<F: Format>(name: &str) -> Task<F> {
todo!();
pub fn execute<F: Format>(&self, name: &str, format: Arc<F>) -> Task<F> {
let procedure = match self.procedures.get(name) {
Some(proc) => proc,
None => todo!(), // TODO: return Task::new(Procedure::not_found(name), format),
};
Task::new(procedure.clone(), format)
}
}
6 changes: 4 additions & 2 deletions crates/core2/src/format.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::pin::Pin;

pub trait Format {
type Result; // TODO: Should we keep result???
type Serializer: TODOSerializer;

fn serializer(&self) -> Self::Serializer;

// fn into_result(ser: Self::Serializer) -> Self::Result;
fn into_result(ser: &mut Self::Serializer) -> Option<Self::Result>;
}

// TODO: Rename
pub trait TODOSerializer: Send {
fn serialize_str(&mut self, s: &str);
fn serialize_str(self: Pin<&mut Self>, s: &str);

// TODO: Finish this
}
12 changes: 11 additions & 1 deletion crates/core2/src/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::{marker::PhantomData, pin::Pin};

use crate::TODOSerializer;

pub struct Serializer<'a> {
serializer: &'a mut dyn TODOSerializer,
serializer: Pin<&'a mut dyn TODOSerializer>,
phantom: PhantomData<&'a ()>,
}

impl<'a> Serializer<'a> {
pub(crate) fn new(serializer: Pin<&'a mut dyn TODOSerializer>) -> Self {
Self {
serializer,
phantom: PhantomData,
}
}

// TODO: Should this be `async` so we can yield and reset the serializer state for multiple values???
pub fn serialize<T: serde::Serialize>(self, value: &T) {
// TODO: Properly hook this up with Serde
Expand Down
125 changes: 114 additions & 11 deletions crates/core2/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,56 @@
use std::{
fmt,
future::Future,
marker::PhantomPinned,
mem::{self, transmute},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures_core::{FusedStream, Stream};

use crate::Format;
use crate::{executor::RequestContext, Format, Procedure, Serializer, TODOSerializer};

/// TODO
pub struct Task<F> {
pub struct Task<F: Format> {
/// The unique identifier of the task.
/// Be careful storing these as they may be reused after the task is completed.
pub id: u32,
/// Whether the task requires queuing.
/// If this is `false` you can safely block inline until it's done.
/// For example a streaming query would be `false` while a subscription would be `true`.
pub requires_queuing: bool,
// done: bool, // TODO
// Task must be `'static` so it can be queued onto an async runtime like Tokio.
// However this borrows from the `Arc` in `Executor`.
// TODO: Finish the explanation
stream: Pin<Box<dyn Future<Output = ()> + Send>>,
inner: TaskRepr<F::Serializer>,
format: Arc<F>,
}

impl<F> fmt::Debug for Task<F> {
enum TaskRepr<S> {
Procedure(Procedure),
Future {
serializer: S,
fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
// We have a reference to `serializer` inside `fut` so it would be unsafe to move pull this struct out of it's `Pin`.
phantom: PhantomPinned,
},
Done,
}

impl<F: Format> Task<F> {
pub(crate) fn new(procedure: Procedure, format: Arc<F>) -> Self {
Self {
id: 0, // TODO: Get id from somewhere
requires_queuing: false, // TODO: Make this work
inner: TaskRepr::Procedure(procedure),
format,
}
}
}

impl<F: Format> fmt::Debug for Task<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task").field("id", &self.id).finish()
}
Expand All @@ -37,19 +60,99 @@ impl<F: Format> Stream for Task<F> {
type Item = F::Result;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let serialize = self.format.serializer();
loop {
match &self.as_mut().inner {
TaskRepr::Procedure(_) => {
let y = TaskRepr::Future {
serializer: self.format.serializer(),
fut: None,
phantom: PhantomPinned,
};

let procedure = {
// TODO: Discuss why this is safe and required (PhantomPinned)
let inner = unsafe { &mut self.as_mut().get_unchecked_mut().inner };

match mem::replace(inner, y) {
TaskRepr::Procedure(procedure) => procedure,
_ => unreachable!(),
}
};

// TODO: Make this unsafe block wayyyy smaller
unsafe {
// TODO: Can we do this with safe code
let (serializer, fut) = match &mut self.as_mut().get_unchecked_mut().inner {
TaskRepr::Future {
serializer, fut, ..
} => (serializer, fut),
_ => unreachable!(),
};

// TODO: Is this safe???
// let y: Pin<&mut dyn TODOSerializer> = Pin::new_unchecked(serializer);

let result: Serializer<'_> =
Serializer::new(Pin::new_unchecked(serializer));

// let result = Serializer::new(Pin::new_unchecked(serializer));
*fut = Some((procedure)(RequestContext {
result: transmute(result), // TODO: Make sure we hardcode the output to ensure it's only faking lifetimes
}));
};
}
TaskRepr::Future { .. } => {
// TODO: Safety
let (fut, serializer) = unsafe {
match &mut self.as_mut().get_unchecked_mut().inner {
TaskRepr::Future {
fut, serializer, ..
} => (
match fut {
Some(fut) => fut,
None => unreachable!(),
},
serializer,
),
_ => unreachable!(),
}
};

let mut done = false; // TODO: Remove `done` flag and use `self`
let mut pending = false;

match fut.as_mut().poll(cx) {
Poll::Ready(()) => done = true,
Poll::Pending => pending = true,
};

let result = match F::into_result(serializer) {
Some(result) => Poll::Ready(Some(result)),
None => {
if done {
Poll::Ready(None)
} else if pending {
Poll::Pending
} else {
continue; // Will skip `if done` check and that's ok
}
}
};

// TODO: How to get `serialize` into the future?
if done {
unsafe { self.as_mut().get_unchecked_mut().inner = TaskRepr::Done };
}

match self.stream.as_mut().poll(cx) {
Poll::Ready(()) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
return result;
}
TaskRepr::Done => return Poll::Ready(None),
}
}
}
}

impl<F: Format> FusedStream for Task<F> {
fn is_terminated(&self) -> bool {
todo!()
matches!(self.inner, TaskRepr::Done)
}
}
69 changes: 59 additions & 10 deletions crates/core2/tests/basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use rspc_core2::Procedure;
use std::{pin::Pin, sync::Arc};

use futures::StreamExt;
use rspc_core2::{Executor, Format, Procedure, TODOSerializer};
use serde_json::Value;

#[tokio::test]
async fn test_procedure() {
let _p: Procedure = Box::new(|ctx| {
let _p: Procedure = Arc::new(|ctx| {
Box::pin(async move {
// let _input: String = erased_serde::deserialize(ctx.arg.unwrap()).unwrap();

Expand All @@ -11,19 +15,20 @@ async fn test_procedure() {
})
});

let _p: Procedure = Box::new(|ctx| {
let _p: Procedure = Arc::new(|ctx| {
Box::pin(async move {
// let _input: String = erased_serde::deserialize(ctx.arg.unwrap()).unwrap();
// ctx.result.erased_serialize_str("todo");

// Serialize multiple values
for i in 0..5 {
ctx.result.serialize(&i);
}
// TODO
// for i in 0..5 {
// ctx.result.serialize(&i);
// }
})
});

let _p: Procedure = Box::new(|ctx| {
let _p: Procedure = Arc::new(|ctx| {
Box::pin(async move {
// let _input: String = erased_serde::deserialize(ctx.arg.unwrap()).unwrap();

Expand All @@ -34,7 +39,51 @@ async fn test_procedure() {

#[tokio::test]
async fn test_executor() {
// TODO:
// let executor = Executor::default();
// executor.
let mut executor = Executor::default();

executor.insert(
"demo".into(),
Arc::new(|ctx| {
Box::pin(async move {
// let _input: String = erased_serde::deserialize(ctx.arg.unwrap()).unwrap();

// Serialize a single value
ctx.result.serialize(&"todo");
})
}),
);

let executor = Arc::new(executor);
let format = Arc::new(SerdeJsonFormat {});

let result = executor.execute("demo", format).collect::<Vec<_>>().await;
println!("{:?}", result);
panic!("done");
}

// TODO: Assert a task can be queued onto an async runtime like Tokio

pub(crate) struct SerdeJsonFormat {}

impl Format for SerdeJsonFormat {
type Result = serde_json::Value;
type Serializer = SerdeJsonSerializer;

fn serializer(&self) -> Self::Serializer {
SerdeJsonSerializer(None)
}

// TODO: Finish this method
fn into_result(ser: &mut Self::Serializer) -> Option<Self::Result> {
println!("{:?}", ser.0);
ser.0.take()
}
}

pub(crate) struct SerdeJsonSerializer(Option<Value>);

impl TODOSerializer for SerdeJsonSerializer {
fn serialize_str(mut self: Pin<&mut Self>, s: &str) {
self.0 = Some(Value::String(s.into()));
}
}

0 comments on commit 80ca60c

Please sign in to comment.