Skip to content

Commit

Permalink
feat: add filter mechanism (#147)
Browse files Browse the repository at this point in the history
* feat: add filter mechanism

- add tower dependency

* use snake case
  • Loading branch information
onewe committed Apr 25, 2023
1 parent a6ea8a3 commit 8612490
Show file tree
Hide file tree
Showing 6 changed files with 528 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ reqwest = { version = "0.11", default-features = false, features = [], optional
async-trait = {version = "0.1", optional = true}
async-stream = "0.3.5"
tonic = "0.9.1"
tower = "0.4.13"
tower = {version = "0.4.13", features = ["filter", "log"]}
http = "0.2.9"
pin-project = "1.0.12"
futures-util = "0.3.28"
Expand Down
2 changes: 1 addition & 1 deletion src/common/cache/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl<V> Store<V> for DiskStore
where
V: de::DeserializeOwned,
{
fn name(&self) -> Cow<str> {
fn name(&self) -> Cow<'_, str> {
Cow::from("disk store")
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ where
debug!("{} sync to {} quit!", id, store.name());
}

pub(crate) fn get(&self, key: &String) -> Option<CacheRef<V>> {
pub(crate) fn get(&self, key: &String) -> Option<CacheRef<'_, V>> {
let value = self.inner.get(key);
value.map(|dash_map_ref| CacheRef { dash_map_ref })
}

pub(crate) fn get_mut(&self, key: &String) -> Option<CacheRefMut<V>> {
pub(crate) fn get_mut(&self, key: &String) -> Option<CacheRefMut<'_, V>> {
let value = self.inner.get_mut(key);
value.map(|dash_map_ref_mut| CacheRefMut {
dash_map_ref_mut,
Expand Down Expand Up @@ -369,7 +369,7 @@ enum ChangeEvent {

#[async_trait]
pub(crate) trait Store<V>: Send {
fn name(&self) -> Cow<str>;
fn name(&self) -> Cow<'_, str>;

fn load(&mut self) -> HashMap<String, V>;

Expand Down
51 changes: 50 additions & 1 deletion src/common/remote/grpc/nacos_grpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::task::JoinHandle;
use tower::layer::util::Stack;

use crate::api::error::Error;
use crate::common::executor;
use crate::common::remote::grpc::message::{request::NacosClientAbilities, GrpcMessageData};
use crate::common::remote::grpc::message::{
GrpcMessage, GrpcMessageBuilder, GrpcRequestMessage, GrpcResponseMessage,
};
use crate::common::remote::grpc::nacos_grpc_service::DynamicUnaryCallLayerWrapper;

use super::handlers::client_detection_request_handler::ClientDetectionRequestHandler;
use super::message::request::ClientDetectionRequest;
use super::nacos_grpc_connection::{NacosGrpcConnection, SendRequest};
use super::nacos_grpc_service::{
DynamicBiStreamingCallLayer, DynamicBiStreamingCallLayerWrapper, DynamicUnaryCallLayer,
};
use super::server_list_service::PollingServerListService;
use super::tonic::TonicBuilder;
use super::{config::GrpcConfiguration, nacos_grpc_service::ServerRequestHandler};
Expand Down Expand Up @@ -61,6 +66,8 @@ pub(crate) struct NacosGrpcClientBuilder {
server_list: Vec<String>,
connected_listener: Option<ConnectedListener>,
disconnected_listener: Option<DisconnectedListener>,
unary_call_layer: Option<DynamicUnaryCallLayer>,
bi_call_layer: Option<DynamicBiStreamingCallLayer>,
}

impl NacosGrpcClientBuilder {
Expand All @@ -76,6 +83,8 @@ impl NacosGrpcClientBuilder {
server_list,
connected_listener: None,
disconnected_listener: None,
unary_call_layer: None,
bi_call_layer: None,
}
}

Expand Down Expand Up @@ -273,6 +282,38 @@ impl NacosGrpcClientBuilder {
Self { ..self }
}

pub(crate) fn unary_call_layer(self, layer: DynamicUnaryCallLayer) -> Self {
let stack = if let Some(unary_call_layer) = self.unary_call_layer {
Arc::new(Stack::new(
DynamicUnaryCallLayerWrapper(layer),
DynamicUnaryCallLayerWrapper(unary_call_layer),
))
} else {
layer
};

Self {
unary_call_layer: Some(stack),
..self
}
}

pub(crate) fn bi_call_layer(self, layer: DynamicBiStreamingCallLayer) -> Self {
let stack = if let Some(bi_call_layer) = self.bi_call_layer {
Arc::new(Stack::new(
DynamicBiStreamingCallLayerWrapper(layer),
DynamicBiStreamingCallLayerWrapper(bi_call_layer),
))
} else {
layer
};

Self {
bi_call_layer: Some(stack),
..self
}
}

pub(crate) fn build(mut self) -> NacosGrpcClient {
self.server_request_handler_map.insert(
ClientDetectionRequest::identity().to_string(),
Expand All @@ -283,7 +324,15 @@ impl NacosGrpcClientBuilder {
let join_handler: JoinHandle<Arc<dyn SendRequest + Send + Sync + 'static>> =
executor::spawn(async move {
let server_list = PollingServerListService::new(self.server_list);
let tonic_builder = TonicBuilder::new(self.grpc_config, server_list);
let mut tonic_builder = TonicBuilder::new(self.grpc_config, server_list);
if let Some(layer) = self.unary_call_layer {
tonic_builder = tonic_builder.unary_call_layer(layer);
}

if let Some(layer) = self.bi_call_layer {
tonic_builder = tonic_builder.bi_call_layer(layer);
}

let mut connection = NacosGrpcConnection::new(
tonic_builder,
self.server_request_handler_map,
Expand Down

0 comments on commit 8612490

Please sign in to comment.