Skip to content

Commit

Permalink
PipelineCall is static
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jun 23, 2023
1 parent 1fd6311 commit 9d86ecb
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 48 deletions.
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.1] - 2023-06-23

* `PipelineCall` is static

## [0.3.0] - 2023-06-22

* Release v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.0"
version = "0.3.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6.2"
ntex-bytes = "0.1.19"
ntex-util = "0.3.0"
ntex-service = "1.2.0"
ntex-service = "1.2.1"

bitflags = "1.3"
log = "0.4"
Expand Down
4 changes: 2 additions & 2 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item).into_static();
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
shared.handle_result(result, &shared.io);
Expand All @@ -276,7 +276,7 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item).into_static();
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
shared.handle_result(result, &shared.io);
Expand Down
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.2.1] - 2023-06-23

* Make `PipelineCall` static

## [1.2.0] - 2023-06-22

* Rename Container to Pipeline
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.0"
version = "1.2.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/src/fn_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {

let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());

let res = pipe.call(()).await;
let res = pipe.service_call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "pipe");
Expand Down
37 changes: 8 additions & 29 deletions ntex-service/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ impl<S> Pipeline<S> {
/// Call service and create future object that resolves to service result.
///
/// Note, this call does not check service readiness.
pub fn call<R>(&self, req: R) -> PipelineCall<'_, S, R>
pub fn call<R>(&self, req: R) -> PipelineCall<S, R>
where
S: Service<R>,
S: Service<R> + 'static,
R: 'static,
{
let pipeline = self.clone();
let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters));
Expand Down Expand Up @@ -111,41 +112,19 @@ impl<S> Clone for Pipeline<S> {

pin_project_lite::pin_project! {
#[must_use = "futures do nothing unless polled"]
pub struct PipelineCall<'f, S, R>
pub struct PipelineCall<S, R>
where
S: Service<R>,
S: 'f,
R: 'f,
S: 'static,
R: 'static,
{
#[pin]
fut: S::Future<'f>,
fut: S::Future<'static>,
pipeline: Pipeline<S>,
}
}

impl<'f, S, R> PipelineCall<'f, S, R>
where
S: Service<R> + 'f,
R: 'f,
{
#[inline]
/// Convert future object to static version.
///
/// Returned future is suitable for spawning into a async runtime.
/// Note, this call does not check service readiness.
pub fn into_static(self) -> PipelineCall<'static, S, R> {
let svc_call = self.fut;
let pipeline = self.pipeline;

// SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };
PipelineCall { fut, pipeline }
}
}

impl<'f, S, R> future::Future for PipelineCall<'f, S, R>
impl<S, R> future::Future for PipelineCall<S, R>
where
S: Service<R>,
{
Expand Down
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.1] - 2023-06-23

* `PipelineCall` is static

## [0.7.0] - 2023-06-22

* Release v0.7.0
Expand Down
8 changes: 4 additions & 4 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.0"
version = "0.7.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -52,13 +52,13 @@ ntex-codec = "0.6.2"
ntex-connect = "0.3.0"
ntex-http = "0.1.9"
ntex-router = "0.5.1"
ntex-service = "1.2.0"
ntex-service = "1.2.1"
ntex-macros = "0.1.3"
ntex-util = "0.3.0"
ntex-bytes = "0.1.19"
ntex-h2 = "0.3.0"
ntex-h2 = "0.3.1"
ntex-rt = "0.4.9"
ntex-io = "0.3.0"
ntex-io = "0.3.1"
ntex-tls = "0.3.0"
ntex-tokio = { version = "0.3.0", optional = true }
ntex-glommio = { version = "0.3.0", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion ntex/src/http/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ where
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> {
Box::pin(async move {
// connect to the host
let fut = self.0.call(ClientConnect {
let pl = self.0.clone();
let fut = pl.service_call(ClientConnect {
uri: head.as_ref().uri.clone(),
addr,
});
Expand Down
16 changes: 8 additions & 8 deletions ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ pin_project_lite::pin_project! {
where S: 'static, X: 'static
{
None,
Service { #[pin] fut: PipelineCall<'static, S, Request> },
ServiceUpgrade { #[pin] fut: PipelineCall<'static, S, Request> },
Expect { #[pin] fut: PipelineCall<'static, X, Request> },
Filter { fut: PipelineCall<'static, OnRequest, (Request, IoRef)> }
Service { #[pin] fut: PipelineCall<S, Request> },
ServiceUpgrade { #[pin] fut: PipelineCall<S, Request> },
Expect { #[pin] fut: PipelineCall<X, Request> },
Filter { fut: PipelineCall<OnRequest, (Request, IoRef)> }
}
}

Expand Down Expand Up @@ -479,21 +479,21 @@ where
fn service_call(&self, req: Request) -> CallState<S, X> {
// Handle normal requests
CallState::Service {
fut: self.config.service.call(req).into_static(),
fut: self.config.service.call(req),
}
}

fn service_filter(&self, req: Request, f: &Pipeline<OnRequest>) -> CallState<S, X> {
// Handle filter fut
CallState::Filter {
fut: f.call((req, self.io.get_ref())).into_static(),
fut: f.call((req, self.io.get_ref())),
}
}

fn service_expect(&self, req: Request) -> CallState<S, X> {
// Handle normal requests with EXPECT: 100-Continue` header
CallState::Expect {
fut: self.config.expect.call(req).into_static(),
fut: self.config.expect.call(req),
}
}

Expand All @@ -506,7 +506,7 @@ where
)));
// Handle upgrade requests
CallState::ServiceUpgrade {
fut: self.config.service.call(req).into_static(),
fut: self.config.service.call(req),
}
}

Expand Down

0 comments on commit 9d86ecb

Please sign in to comment.