diff --git a/crates/fluvio-auth/src/x509/authenticator.rs b/crates/fluvio-auth/src/x509/authenticator.rs index cf8cc6d9ed..5d7b72f491 100644 --- a/crates/fluvio-auth/src/x509/authenticator.rs +++ b/crates/fluvio-auth/src/x509/authenticator.rs @@ -87,15 +87,12 @@ impl X509Authenticator { info!("client_certificate {:?}", client_certificate); - let principal = Self::principal_from_raw_certificate( - &client_certificate - .to_der() - .map_err(|err| { - info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err); + let principal = + Self::principal_from_raw_certificate(&client_certificate.to_der().map_err(|err| { + info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err); - err.into_io_error() - })?, - )?; + err.into_io_error() + })?)?; Ok(principal) } @@ -134,7 +131,6 @@ impl Authenticator for X509Authenticator { incoming_tls_stream: &DefaultServerTlsStream, target_tcp_stream: &TcpStream, ) -> Result { - info!("DEBUG_1"); let principal = Self::principal_from_tls_stream(incoming_tls_stream)?; diff --git a/crates/fluvio-auth/src/x509/identity.rs b/crates/fluvio-auth/src/x509/identity.rs index d046ac173c..b92079a172 100644 --- a/crates/fluvio-auth/src/x509/identity.rs +++ b/crates/fluvio-auth/src/x509/identity.rs @@ -42,7 +42,7 @@ impl X509Identity { return Err(std::io::Error::new( std::io::ErrorKind::Interrupted, "connection closed", - )) + )); } } } else { diff --git a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs index 78a4115da5..f9cf4d91fa 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs @@ -74,6 +74,7 @@ pub struct Home { pub id: String, pub remote_id: String, pub public_endpoint: String, + //#[serde(skip_serializing_if = "Option::is_none")] #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))] pub tls: Option, } diff --git a/crates/fluvio-controlplane-metadata/src/mirror/status.rs b/crates/fluvio-controlplane-metadata/src/mirror/status.rs index 655db0e268..7f3871ff9d 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/status.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/status.rs @@ -1,4 +1,3 @@ -use std::time::Duration; use fluvio_protocol::{Encoder, Decoder}; #[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)] @@ -63,7 +62,7 @@ pub struct ConnectionStat { impl MirrorStatus { #[cfg(feature = "use_serde")] - pub fn last_seen(&self, since: Duration) -> String { + pub fn last_seen(&self, since: std::time::Duration) -> String { use humantime_serde::re::humantime; let since_sec = since.as_secs(); @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus { #[cfg(test)] mod test { use super::*; + use std::time::Duration; #[test] fn test_last_seen() { diff --git a/crates/fluvio-sc/src/services/public_api/public_server.rs b/crates/fluvio-sc/src/services/public_api/public_server.rs index e256a0513d..f149933c86 100644 --- a/crates/fluvio-sc/src/services/public_api/public_server.rs +++ b/crates/fluvio-sc/src/services/public_api/public_server.rs @@ -59,7 +59,6 @@ where mut socket: FluvioSocket, _connection: ConnectInfo, ) -> Result<()> { - info!("public_server DEBUG_1"); let auth_context = ctx diff --git a/crates/fluvio-socket/src/versioned.rs b/crates/fluvio-socket/src/versioned.rs index bf3ab78931..6993477cde 100644 --- a/crates/fluvio-socket/src/versioned.rs +++ b/crates/fluvio-socket/src/versioned.rs @@ -163,7 +163,9 @@ impl ClientConfig { pub fn with_prefix_sni_domain(&self, prefix: &str) -> Self { //let new_domain = format!("{}.{}", prefix, self.connector.domain()); //debug!(sni_domain = %new_domain); - let connector = self.connector.new_domain(self.connector.domain().to_string()); + let connector = self + .connector + .new_domain(self.connector.domain().to_string()); Self { addr: self.addr.clone(), client_id: self.client_id.clone(), diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index 8195d50c79..dc699b58cc 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -18,7 +18,7 @@ use crate::mirroring::remote::api_key::MirrorRemoteApiEnum; use crate::mirroring::remote::remote_api::RemoteMirrorRequest; use crate::mirroring::remote::sync::DefaultPartitionSyncRequest; use crate::replication::leader::SharedFileLeaderState; -use crate::services::auth::{SpuAuthGlobalContext, SpuAuthServiceContext}; +use crate::services::auth::SpuAuthServiceContext; use super::update_offsets::UpdateHomeOffsetRequest; diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index 4131562a7e..cab049ce26 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -216,7 +216,6 @@ where let (mut home_sink, mut home_stream) = home_socket.split(); - if tls { debug!("tls enabled, disabling zero copy sink"); home_sink.disable_zerocopy(); diff --git a/crates/fluvio-spu/src/mirroring/test/integration.rs b/crates/fluvio-spu/src/mirroring/test/integration.rs index 2fcf7a928a..6f92c268f0 100644 --- a/crates/fluvio-spu/src/mirroring/test/integration.rs +++ b/crates/fluvio-spu/src/mirroring/test/integration.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::debug; @@ -6,7 +6,10 @@ use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartiti use fluvio_future::timer::sleep; use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey}; -use crate::services::public::create_public_server; +use crate::services::{ + auth::{root::SpuRootAuthorization, SpuAuthGlobalContext}, + public::create_public_server, +}; use super::fixture::{ReplicaConfig, local_port}; @@ -64,7 +67,10 @@ async fn test_mirroring_new_records() { // start home server debug!("starting home server"); - let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run(); + + let auth_global_ctx = + SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(SpuRootAuthorization::new())); + let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run(); // sleep 1 seconds debug!("waiting for home public server to up"); diff --git a/crates/fluvio-spu/src/services/auth/remote.rs b/crates/fluvio-spu/src/services/auth/remote.rs index ac4fd6030a..ddac563623 100644 --- a/crates/fluvio-spu/src/services/auth/remote.rs +++ b/crates/fluvio-spu/src/services/auth/remote.rs @@ -19,7 +19,6 @@ impl Authorization for SpuRemoteAuthorization { &self, socket: &mut FluvioSocket, ) -> Result { - info!("remote_cert DEBUG_1"); let identity = X509Identity::create_from_connection(socket) diff --git a/crates/fluvio-spu/src/services/public/mod.rs b/crates/fluvio-spu/src/services/public/mod.rs index 56aeae5df5..ae3b1c1609 100644 --- a/crates/fluvio-spu/src/services/public/mod.rs +++ b/crates/fluvio-spu/src/services/public/mod.rs @@ -47,19 +47,22 @@ use std::fmt::Debug; pub(crate) type SpuPublicServer = FluvioApiServer, PublicService>; -pub fn create_public_server(addr: String, ctx: SpuAuthGlobalContext) -> SpuPublicServer +pub fn create_public_server( + addr: String, + auth_ctx: SpuAuthGlobalContext, +) -> SpuPublicServer where A: Authorization + Sync + Send + Debug + 'static, SpuAuthGlobalContext: Clone + Debug, ::Context: Send + Sync, { info!( - spu_id = ctx.global_ctx.local_spu_id(), + spu_id = auth_ctx.global_ctx.local_spu_id(), %addr, "Starting SPU public service:", ); - FluvioApiServer::new(addr, ctx, PublicService::::new()) + FluvioApiServer::new(addr, auth_ctx, PublicService::::new()) } #[derive(Debug)] diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 4189461a45..b50b2a9fba 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -1,4 +1,4 @@ -use std::{env::temp_dir, time::Duration}; +use std::{env::temp_dir, sync::Arc, time::Duration}; use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind}; use fluvio_controlplane::replica::Replica; @@ -27,13 +27,17 @@ use flv_util::fixture::ensure_clean_dir; use crate::{ config::SpuConfig, core::GlobalContext, - services::public::{ - create_public_server, - tests::{ - create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records, + replication::leader::LeaderReplicaState, + services::{ + auth::{root::SpuRootAuthorization, SpuAuthGlobalContext}, + public::{ + create_public_server, + tests::{ + create_filter_raw_records, create_filter_records, load_wasm_module, + vec_to_raw_batch, + }, }, }, - replication::leader::LeaderReplicaState, }; #[fluvio_future::test(ignore)] @@ -46,8 +50,9 @@ async fn test_produce_basic() { let mut spu_config = SpuConfig::default(); spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -160,7 +165,9 @@ async fn test_produce_invalid_compression() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -234,7 +241,10 @@ async fn test_produce_request_timed_out() { let (leader_ctx, _) = config.leader_replica().await; - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -296,7 +306,10 @@ async fn test_produce_not_waiting_replication() { let (leader_ctx, _) = config.leader_replica().await; let public_addr = config.leader_public_addr(); - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -355,8 +368,11 @@ async fn test_produce_waiting_replication() { let (leader_ctx, leader_replica) = config.leader_replica().await; let public_addr = config.leader_public_addr(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); let public_server_end_event = - create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); + let private_server_end_event = create_internal_server(config.leader_addr(), leader_ctx.clone()).run(); @@ -425,7 +441,9 @@ async fn test_produce_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -537,7 +555,9 @@ async fn test_produce_basic_with_smartmodule_with_lookback() { smartmodule.params.set_lookback(Some(Lookback::last(1))); let mut smartmodules = vec![smartmodule]; - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -848,7 +868,9 @@ async fn test_produce_with_deduplication() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1038,7 +1060,9 @@ async fn test_produce_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1124,7 +1148,9 @@ async fn test_dedup_init_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 311d985fba..560c5968e0 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -34,6 +34,8 @@ use fluvio_spu_schema::{ fetch::DefaultFetchRequest, }; use fluvio_spu_schema::server::stream_fetch::DefaultStreamFetchRequest; +use crate::services::auth::root::SpuRootAuthorization; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::public::tests::{vec_to_batch, create_filter_raw_records}; use crate::{ core::GlobalContext, @@ -58,7 +60,9 @@ async fn test_stream_fetch_basic() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -394,7 +398,9 @@ async fn test_stream_fetch_filter( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -585,7 +591,9 @@ async fn test_stream_fetch_filter_individual( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -711,7 +719,9 @@ async fn test_stream_filter_error_fetch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -846,7 +856,9 @@ async fn test_stream_filter_max( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -992,7 +1004,9 @@ async fn test_stream_fetch_map( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1147,7 +1161,9 @@ async fn test_stream_fetch_map_chain( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1307,7 +1323,9 @@ async fn test_stream_fetch_map_error( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1445,7 +1463,9 @@ async fn test_stream_aggregate_fetch_single_batch( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1591,7 +1611,9 @@ async fn test_stream_aggregate_fetch_multiple_batch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1729,7 +1751,9 @@ async fn test_stream_fetch_and_new_request( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1818,7 +1842,9 @@ async fn test_stream_fetch_array_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1940,7 +1966,9 @@ async fn test_stream_fetch_filter_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2065,7 +2093,9 @@ async fn test_stream_fetch_filter_with_params( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2257,7 +2287,9 @@ async fn test_stream_fetch_invalid_smartmodule( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2316,7 +2348,9 @@ async fn test_stream_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2494,7 +2528,9 @@ async fn stream_fetch_filter_lookback( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2655,7 +2691,9 @@ async fn stream_fetch_filter_lookback_age( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -3037,7 +3075,9 @@ async fn test_stream_fetch_sends_topic_delete_error_on_topic_delete() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await;