From 812a6a15b3676c5b7d66fff5be4b1948eeaca46a Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 11 Apr 2025 11:59:16 +0200 Subject: [PATCH 1/2] Add headers to service to service communication --- src/context/request.rs | 25 +++++++++++++++++++++---- src/endpoint/context.rs | 21 ++++++++++++++++++--- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/context/request.rs b/src/context/request.rs index 4478135..c35a417 100644 --- a/src/context/request.rs +++ b/src/context/request.rs @@ -75,6 +75,7 @@ pub struct Request<'a, Req, Res = ()> { ctx: &'a ContextInternal, request_target: RequestTarget, idempotency_key: Option, + headers: Vec<(String, String)>, req: Req, res: PhantomData, } @@ -85,11 +86,17 @@ impl<'a, Req, Res> Request<'a, Req, Res> { ctx, request_target, idempotency_key: None, + headers: vec![], req, res: PhantomData, } } + pub fn header(mut self, key: String, value: String) -> Self { + self.headers.push((key, value)); + self + } + /// Add idempotency key to the request pub fn idempotency_key(mut self, idempotency_key: impl Into) -> Self { self.idempotency_key = Some(idempotency_key.into()); @@ -102,8 +109,12 @@ impl<'a, Req, Res> Request<'a, Req, Res> { Req: Serialize + 'static, Res: Deserialize + 'static, { - self.ctx - .call(self.request_target, self.idempotency_key, self.req) + self.ctx.call( + self.request_target, + self.idempotency_key, + self.headers, + self.req, + ) } /// Send the request to the service, without waiting for the response. @@ -111,8 +122,13 @@ impl<'a, Req, Res> Request<'a, Req, Res> { where Req: Serialize + 'static, { - self.ctx - .send(self.request_target, self.idempotency_key, self.req, None) + self.ctx.send( + self.request_target, + self.idempotency_key, + self.headers, + self.req, + None, + ) } /// Schedule the request to the service, without waiting for the response. @@ -123,6 +139,7 @@ impl<'a, Req, Res> Request<'a, Req, Res> { self.ctx.send( self.request_target, self.idempotency_key, + self.headers, self.req, Some(delay), ) diff --git a/src/endpoint/context.rs b/src/endpoint/context.rs index 0ddb577..e83ebab 100644 --- a/src/endpoint/context.rs +++ b/src/endpoint/context.rs @@ -15,8 +15,8 @@ use futures::future::{BoxFuture, Either, Shared}; use futures::{FutureExt, TryFutureExt}; use pin_project_lite::pin_project; use restate_sdk_shared_core::{ - CoreVM, DoProgressResponse, Error as CoreError, NonEmptyValue, NotificationHandle, RetryPolicy, - RunExitResult, TakeOutputResult, Target, TerminalFailure, Value, VM, + CoreVM, DoProgressResponse, Error as CoreError, Header, NonEmptyValue, NotificationHandle, + RetryPolicy, RunExitResult, TakeOutputResult, Target, TerminalFailure, Value, VM, }; use std::borrow::Cow; use std::collections::HashMap; @@ -371,12 +371,20 @@ impl ContextInternal { &self, request_target: RequestTarget, idempotency_key: Option, + headers: Vec<(String, String)>, req: Req, ) -> impl CallFuture + Send { let mut inner_lock = must_lock!(self.inner); let mut target: Target = request_target.into(); target.idempotency_key = idempotency_key; + target.headers = headers + .into_iter() + .map(|(k, v)| Header { + key: k.into(), + value: v.into(), + }) + .collect(); let call_result = Req::serialize(&req) .map_err(|e| Error::serialization("call", e)) .and_then(|input| inner_lock.vm.sys_call(target, input).map_err(Into::into)); @@ -443,6 +451,7 @@ impl ContextInternal { &self, request_target: RequestTarget, idempotency_key: Option, + headers: Vec<(String, String)>, req: Req, delay: Option, ) -> impl InvocationHandle { @@ -450,7 +459,13 @@ impl ContextInternal { let mut target: Target = request_target.into(); target.idempotency_key = idempotency_key; - + target.headers = headers + .into_iter() + .map(|(k, v)| Header { + key: k.into(), + value: v.into(), + }) + .collect(); let input = match Req::serialize(&req) { Ok(b) => b, Err(e) => { From cf563c9c8e9f558ec67a612cd956011757fc2cfb Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 11 Apr 2025 12:02:44 +0200 Subject: [PATCH 2/2] Fix little issue in docs --- src/context/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/context/mod.rs b/src/context/mod.rs index 2432f09..d9b66fd 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -219,8 +219,7 @@ impl<'ctx> WorkflowContext<'ctx> { /// /// ## Scheduling Async Tasks /// -/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls][crate::context::ContextClient#delayed-calls]. -/// +/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after). /// /// ## Durable sleep /// To sleep in a Restate application for ten seconds, do the following: