-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
multiplex_service.rs
118 lines (110 loc) · 3.37 KB
/
multiplex_service.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use axum::{
body::Body,
extract::Request,
http::header::CONTENT_TYPE,
response::{IntoResponse, Response},
};
use futures::{future::BoxFuture, ready};
use std::{
convert::Infallible,
task::{Context, Poll},
};
use tower::Service;
pub struct MultiplexService<A, B> {
rest: A,
rest_ready: bool,
grpc: B,
grpc_ready: bool,
}
impl<A, B> MultiplexService<A, B> {
pub fn new(rest: A, grpc: B) -> Self {
Self {
rest,
rest_ready: false,
grpc,
grpc_ready: false,
}
}
}
impl<A, B> Clone for MultiplexService<A, B>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
rest: self.rest.clone(),
grpc: self.grpc.clone(),
// the cloned services probably won't be ready
rest_ready: false,
grpc_ready: false,
}
}
}
impl<A, B> Service<Request<hyper::Body>> for MultiplexService<A, B>
where
A: Service<Request<hyper::Body>, Error = Infallible>,
A::Response: IntoResponse,
A::Future: Send + 'static,
B: Service<Request<hyper::Body>>,
B::Response: IntoResponse,
B::Future: Send + 'static,
{
type Response = Response;
type Error = B::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// drive readiness for each inner service and record which is ready
loop {
match (self.rest_ready, self.grpc_ready) {
(true, true) => {
return Ok(()).into();
}
(false, _) => {
ready!(self.rest.poll_ready(cx)).map_err(|err| match err {})?;
self.rest_ready = true;
}
(_, false) => {
ready!(self.grpc.poll_ready(cx))?;
self.grpc_ready = true;
}
}
}
}
fn call(&mut self, req: Request<hyper::Body>) -> Self::Future {
// require users to call `poll_ready` first, if they don't we're allowed to panic
// as per the `tower::Service` contract
assert!(
self.grpc_ready,
"grpc service not ready. Did you forget to call `poll_ready`?"
);
assert!(
self.rest_ready,
"rest service not ready. Did you forget to call `poll_ready`?"
);
// if we get a grpc request call the grpc service, otherwise call the rest service
// when calling a service it becomes not-ready so we have drive readiness again
if is_grpc_request(&req) {
self.grpc_ready = false;
let future = self.grpc.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
} else {
self.rest_ready = false;
let future = self.rest.call(req);
Box::pin(async move {
let res = future.await.map_err(|err| match err {})?;
Ok(res.into_response())
})
}
}
}
fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}