Skip to content

Commit

Permalink
chore: fmt and clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed May 17, 2024
1 parent 86224dd commit 1b66907
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 59 deletions.
14 changes: 5 additions & 9 deletions crates/fluvio-auth/src/x509/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,12 @@ impl X509Authenticator {

info!("client_certificate {:?}", client_certificate);

let principal = Self::principal_from_raw_certificate(
&client_certificate
.to_der()
.map_err(|err| {
info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err);
let principal =
Self::principal_from_raw_certificate(&client_certificate.to_der().map_err(|err| {
info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err);

err.into_io_error()
})?,
)?;
err.into_io_error()
})?)?;

Ok(principal)
}
Expand Down Expand Up @@ -134,7 +131,6 @@ impl Authenticator for X509Authenticator {
incoming_tls_stream: &DefaultServerTlsStream,
target_tcp_stream: &TcpStream,
) -> Result<bool, IoError> {

info!("DEBUG_1");

let principal = Self::principal_from_tls_stream(incoming_tls_stream)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-auth/src/x509/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl X509Identity {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"connection closed",
))
));
}
}
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-controlplane-metadata/src/mirror/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct Home {
pub id: String,
pub remote_id: String,
pub public_endpoint: String,
//#[serde(skip_serializing_if = "Option::is_none")]
#[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
pub tls: Option<ClientTls>,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane-metadata/src/mirror/status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::time::Duration;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -63,7 +62,7 @@ pub struct ConnectionStat {

impl MirrorStatus {
#[cfg(feature = "use_serde")]
pub fn last_seen(&self, since: Duration) -> String {
pub fn last_seen(&self, since: std::time::Duration) -> String {
use humantime_serde::re::humantime;

let since_sec = since.as_secs();
Expand Down Expand Up @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus {
#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;

#[test]
fn test_last_seen() {
Expand Down
1 change: 0 additions & 1 deletion crates/fluvio-sc/src/services/public_api/public_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ where
mut socket: FluvioSocket,
_connection: ConnectInfo,
) -> Result<()> {

info!("public_server DEBUG_1");

let auth_context = ctx
Expand Down
4 changes: 3 additions & 1 deletion crates/fluvio-socket/src/versioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl ClientConfig {
pub fn with_prefix_sni_domain(&self, prefix: &str) -> Self {
//let new_domain = format!("{}.{}", prefix, self.connector.domain());
//debug!(sni_domain = %new_domain);
let connector = self.connector.new_domain(self.connector.domain().to_string());
let connector = self
.connector
.new_domain(self.connector.domain().to_string());
Self {
addr: self.addr.clone(),
client_id: self.client_id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::mirroring::remote::api_key::MirrorRemoteApiEnum;
use crate::mirroring::remote::remote_api::RemoteMirrorRequest;
use crate::mirroring::remote::sync::DefaultPartitionSyncRequest;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::auth::{SpuAuthGlobalContext, SpuAuthServiceContext};
use crate::services::auth::SpuAuthServiceContext;

use super::update_offsets::UpdateHomeOffsetRequest;

Expand Down
1 change: 0 additions & 1 deletion crates/fluvio-spu/src/mirroring/remote/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ where

let (mut home_sink, mut home_stream) = home_socket.split();


if tls {
debug!("tls enabled, disabling zero copy sink");
home_sink.disable_zerocopy();
Expand Down
12 changes: 9 additions & 3 deletions crates/fluvio-spu/src/mirroring/test/integration.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use tracing::debug;

use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartitionConfig};
use fluvio_future::timer::sleep;
use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey};

use crate::services::public::create_public_server;
use crate::services::{
auth::{root::SpuRootAuthorization, SpuAuthGlobalContext},
public::create_public_server,
};

use super::fixture::{ReplicaConfig, local_port};

Expand Down Expand Up @@ -64,7 +67,10 @@ async fn test_mirroring_new_records() {

// start home server
debug!("starting home server");
let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run();

let auth_global_ctx =
SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(SpuRootAuthorization::new()));
let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run();

// sleep 1 seconds
debug!("waiting for home public server to up");
Expand Down
1 change: 0 additions & 1 deletion crates/fluvio-spu/src/services/auth/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ impl Authorization for SpuRemoteAuthorization {
&self,
socket: &mut FluvioSocket,
) -> Result<Self::Context, AuthError> {

info!("remote_cert DEBUG_1");

let identity = X509Identity::create_from_connection(socket)
Expand Down
9 changes: 6 additions & 3 deletions crates/fluvio-spu/src/services/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ use std::fmt::Debug;
pub(crate) type SpuPublicServer<A> =
FluvioApiServer<SpuServerRequest, SpuServerApiKey, SpuAuthGlobalContext<A>, PublicService<A>>;

pub fn create_public_server<A>(addr: String, ctx: SpuAuthGlobalContext<A>) -> SpuPublicServer<A>
pub fn create_public_server<A>(
addr: String,
auth_ctx: SpuAuthGlobalContext<A>,
) -> SpuPublicServer<A>
where
A: Authorization + Sync + Send + Debug + 'static,
SpuAuthGlobalContext<A>: Clone + Debug,
<A as Authorization>::Context: Send + Sync,
{
info!(
spu_id = ctx.global_ctx.local_spu_id(),
spu_id = auth_ctx.global_ctx.local_spu_id(),
%addr,
"Starting SPU public service:",
);

FluvioApiServer::new(addr, ctx, PublicService::<A>::new())
FluvioApiServer::new(addr, auth_ctx, PublicService::<A>::new())
}

#[derive(Debug)]
Expand Down
60 changes: 43 additions & 17 deletions crates/fluvio-spu/src/services/public/tests/produce.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{env::temp_dir, time::Duration};
use std::{env::temp_dir, sync::Arc, time::Duration};

use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind};
use fluvio_controlplane::replica::Replica;
Expand Down Expand Up @@ -27,13 +27,17 @@ use flv_util::fixture::ensure_clean_dir;
use crate::{
config::SpuConfig,
core::GlobalContext,
services::public::{
create_public_server,
tests::{
create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records,
replication::leader::LeaderReplicaState,
services::{
auth::{root::SpuRootAuthorization, SpuAuthGlobalContext},
public::{
create_public_server,
tests::{
create_filter_raw_records, create_filter_records, load_wasm_module,
vec_to_raw_batch,
},
},
},
replication::leader::LeaderReplicaState,
};

#[fluvio_future::test(ignore)]
Expand All @@ -46,8 +50,9 @@ async fn test_produce_basic() {
let mut spu_config = SpuConfig::default();
spu_config.log.base_dir = test_path;
let ctx = GlobalContext::new_shared_context(spu_config);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -160,7 +165,9 @@ async fn test_produce_invalid_compression() {
spu_config.log.base_dir = test_path;
let ctx = GlobalContext::new_shared_context(spu_config);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -234,7 +241,10 @@ async fn test_produce_request_timed_out() {

let (leader_ctx, _) = config.leader_replica().await;

let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event =
create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -296,7 +306,10 @@ async fn test_produce_not_waiting_replication() {
let (leader_ctx, _) = config.leader_replica().await;
let public_addr = config.leader_public_addr();

let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event =
create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -355,8 +368,11 @@ async fn test_produce_waiting_replication() {
let (leader_ctx, leader_replica) = config.leader_replica().await;
let public_addr = config.leader_public_addr();

let auth_global_ctx =
SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let public_server_end_event =
create_public_server(public_addr.clone(), leader_ctx.clone()).run();
create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run();

let private_server_end_event =
create_internal_server(config.leader_addr(), leader_ctx.clone()).run();

Expand Down Expand Up @@ -425,7 +441,9 @@ async fn test_produce_metrics() {
spu_config.log.base_dir = test_path;
let ctx = GlobalContext::new_shared_context(spu_config);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -537,7 +555,9 @@ async fn test_produce_basic_with_smartmodule_with_lookback() {
smartmodule.params.set_lookback(Some(Lookback::last(1)));
let mut smartmodules = vec![smartmodule];

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -848,7 +868,9 @@ async fn test_produce_with_deduplication() {
let ctx = GlobalContext::new_shared_context(spu_config);
load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -1038,7 +1060,9 @@ async fn test_produce_smart_engine_memory_overfow() {
let ctx = GlobalContext::new_shared_context(spu_config);
load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -1124,7 +1148,9 @@ async fn test_dedup_init_smart_engine_memory_overfow() {
let ctx = GlobalContext::new_shared_context(spu_config);
load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER);

let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run();
let auth_global_ctx =
SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new()));
let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run();

// wait for stream controller async to start
sleep(Duration::from_millis(100)).await;
Expand Down

0 comments on commit 1b66907

Please sign in to comment.