Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 107 additions & 85 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,73 +824,84 @@ impl<M, R, S> Client<M, R, S> {
}

/// Performs a request for which the server returns a oneshot receiver.
pub async fn rpc<Req, Res>(&self, msg: Req) -> Result<Res, Error>
pub fn rpc<Req, Res>(
&self,
msg: Req,
) -> impl Future<Output = Result<Res, Error>> + Send + 'static
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R could have the bound RpcMessage, and M could have the bound LocalRpcMessage to DRY the bounds. Not sure if this is worth it. I guess it depends a bit on if we need to take away some of the bounds for WASM. @matheus23 WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have a really hard time reading this type signature.

In general, if you're wondering if it's okay to have a T: Send bound in Wasm, then you simply need to answer this question: "Will T contain/refer to a JsValue?"
Such a value could be a JsPromise if it just made a call to sth async (this happens a lot when you have async APIs and is what makes rust futures in async non-Send in practice a lot), or it could be because you're storing a js Error value in there, which is the other common thing.

I'd very hesitantly say you're good in this case to just have a Send bound. But again, I have no idea what's going on in this type signature, it's quite complex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will T contain/refer to a JsValue?

So if there is a possibility that T contains a JsValue, it must not be Send? I think it is very unlikely that there will be a JsValue in these things.

R: From<Req> + Serialize + 'static,
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = NoReceiver>,
R: From<Req> + Serialize + Send + Sync + 'static,
Req: Channels<S, Tx = channel::oneshot::Sender<Res>, Rx = NoReceiver> + Send + 'static,
Res: RpcMessage,
{
let recv: channel::oneshot::Receiver<Res> = match self.request().await? {
Request::Local(request) => {
let (tx, rx) = channel::oneshot::channel();
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
let res = recv.await?;
Ok(res)
let request = self.request();
async move {
let recv: channel::oneshot::Receiver<Res> = match request.await? {
Request::Local(request) => {
let (tx, rx) = channel::oneshot::channel();
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
let res = recv.await?;
Ok(res)
}
}

/// Performs a request for which the server returns a spsc receiver.
pub async fn server_streaming<Req, Res>(
pub fn server_streaming<Req, Res>(
&self,
msg: Req,
local_response_cap: usize,
) -> Result<channel::spsc::Receiver<Res>, Error>
) -> impl Future<Output = Result<channel::spsc::Receiver<Res>, Error>> + Send + 'static
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
R: From<Req> + Serialize + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver>,
R: From<Req> + Serialize + Send + Sync + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = NoReceiver> + Send + 'static,
Res: RpcMessage,
{
let recv: channel::spsc::Receiver<Res> = match self.request().await? {
Request::Local(request) => {
let (tx, rx) = channel::spsc::channel(local_response_cap);
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
Ok(recv)
let request = self.request();
async move {
let recv: channel::spsc::Receiver<Res> = match request.await? {
Request::Local(request) => {
let (tx, rx) = channel::spsc::channel(local_response_cap);
request.send((msg, tx)).await?;
rx
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (_tx, rx) = request.write(msg).await?;
rx.into()
}
};
Ok(recv)
}
}

/// Performs a request for which the client can send updates.
pub async fn client_streaming<Req, Update, Res>(
pub fn client_streaming<Req, Update, Res>(
&self,
msg: Req,
local_update_cap: usize,
) -> Result<
(
channel::spsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
),
Error,
) -> impl Future<
Output = Result<
(
channel::spsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
),
Error,
>,
>
where
S: Service,
Expand All @@ -900,59 +911,70 @@ impl<M, R, S> Client<M, R, S> {
Update: RpcMessage,
Res: RpcMessage,
{
let (update_tx, res_rx): (
channel::spsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
) = match self.request().await? {
Request::Local(request) => {
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::oneshot::channel();
request.send((msg, res_tx, req_rx)).await?;
(req_tx, res_rx)
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (tx, rx) = request.write(msg).await?;
(tx.into(), rx.into())
}
};
Ok((update_tx, res_rx))
let request = self.request();
async move {
let (update_tx, res_rx): (
channel::spsc::Sender<Update>,
channel::oneshot::Receiver<Res>,
) = match request.await? {
Request::Local(request) => {
let (req_tx, req_rx) = channel::spsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::oneshot::channel();
request.send((msg, res_tx, req_rx)).await?;
(req_tx, res_rx)
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (tx, rx) = request.write(msg).await?;
(tx.into(), rx.into())
}
};
Ok((update_tx, res_rx))
}
}

/// Performs a request for which the client can send updates, and the server returns a spsc receiver.
pub async fn bidi_streaming<Req, Update, Res>(
pub fn bidi_streaming<Req, Update, Res>(
&self,
msg: Req,
local_update_cap: usize,
local_response_cap: usize,
) -> Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>), Error>
) -> impl Future<
Output = Result<(channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>), Error>,
> + Send
+ 'static
where
S: Service,
M: From<WithChannels<Req, S>> + Send + Sync + Unpin + 'static,
R: From<Req> + Serialize + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>,
R: From<Req> + Serialize + Send + 'static,
Req: Channels<S, Tx = channel::spsc::Sender<Res>, Rx = channel::spsc::Receiver<Update>>
+ Send
+ 'static,
Update: RpcMessage,
Res: RpcMessage,
{
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
match self.request().await? {
Request::Local(request) => {
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
request.send((msg, res_tx, update_rx)).await?;
(update_tx, res_rx)
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (tx, rx) = request.write(msg).await?;
(tx.into(), rx.into())
}
};
Ok((update_tx, res_rx))
let request = self.request();
async move {
let (update_tx, res_rx): (channel::spsc::Sender<Update>, channel::spsc::Receiver<Res>) =
match request.await? {
Request::Local(request) => {
let (update_tx, update_rx) = channel::spsc::channel(local_update_cap);
let (res_tx, res_rx) = channel::spsc::channel(local_response_cap);
request.send((msg, res_tx, update_rx)).await?;
(update_tx, res_rx)
}
#[cfg(not(feature = "rpc"))]
Request::Remote(_request) => unreachable!(),
#[cfg(feature = "rpc")]
Request::Remote(request) => {
let (tx, rx) = request.write(msg).await?;
(tx.into(), rx.into())
}
};
Ok((update_tx, res_rx))
}
}
}

Expand Down
Loading