diff --git a/Cargo.lock b/Cargo.lock index 6021984a0c..9d87ec8bd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2335,6 +2335,7 @@ dependencies = [ "eyre", "flate2", "fluvio", + "fluvio-auth", "fluvio-channel", "fluvio-cli-common", "fluvio-cluster", @@ -2559,7 +2560,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.28.1" +version = "0.28.2" dependencies = [ "anyhow", "async-trait", @@ -2807,6 +2808,7 @@ dependencies = [ "cfg-if", "clap", "event-listener 3.1.0", + "fluvio", "fluvio-auth", "fluvio-controlplane", "fluvio-controlplane-metadata", @@ -2961,6 +2963,7 @@ dependencies = [ "event-listener 3.1.0", "flate2", "fluvio", + "fluvio-auth", "fluvio-compression", "fluvio-controlplane", "fluvio-controlplane-metadata", diff --git a/crates/fluvio-auth/src/policy.rs b/crates/fluvio-auth/src/policy.rs index 5270ddb521..45d5f67810 100644 --- a/crates/fluvio-auth/src/policy.rs +++ b/crates/fluvio-auth/src/policy.rs @@ -34,6 +34,9 @@ pub trait AuthContext: Debug { action: InstanceAction, key: &str, ) -> Result; + + /// check if remote id is allowed + fn allow_remote_id(&self, id: &str) -> bool; } #[async_trait] diff --git a/crates/fluvio-auth/src/x509/authenticator.rs b/crates/fluvio-auth/src/x509/authenticator.rs index 77501c523c..7b5f6284a0 100644 --- a/crates/fluvio-auth/src/x509/authenticator.rs +++ b/crates/fluvio-auth/src/x509/authenticator.rs @@ -36,15 +36,15 @@ impl ScopeBindings { #[derive(Debug)] pub struct X509Authenticator { - scope_bindings: ScopeBindings, + scope_bindings: Option, } impl X509Authenticator { - pub fn new(scope_binding_file_path: &Path) -> Self { - Self { - scope_bindings: ScopeBindings::load(scope_binding_file_path) - .expect("unable to create ScopeBindings"), - } + pub fn new(scope_binding_file_path: Option<&Path>) -> Self { + let scope_bindings = scope_binding_file_path.map(|scope_binding_file_path| { + ScopeBindings::load(scope_binding_file_path).expect("unable to create ScopeBindings") + }); + Self { scope_bindings } } async fn send_authorization_request( @@ -79,13 +79,9 @@ impl X509Authenticator { fn principal_from_tls_stream(tls_stream: &DefaultServerTlsStream) -> Result { trace!("tls_stream {:?}", tls_stream); - let peer_certificate = tls_stream.peer_certificate(); - - trace!("peer_certificate {:?}", peer_certificate); - let client_certificate = tls_stream.peer_certificate().ok_or(IoErrorKind::NotFound)?; - trace!("client_certificate {:?}", tls_stream); + trace!("client_certificate {:?}", client_certificate); let principal = Self::principal_from_raw_certificate( &client_certificate @@ -96,7 +92,7 @@ impl X509Authenticator { Ok(principal) } - fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result { + pub fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result { parse_x509_certificate(certificate_bytes) .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .and_then(|(_, parsed_cert)| Self::common_name_from_parsed_certificate(parsed_cert)) @@ -131,7 +127,13 @@ impl Authenticator for X509Authenticator { target_tcp_stream: &TcpStream, ) -> Result { let principal = Self::principal_from_tls_stream(incoming_tls_stream)?; - let scopes = self.scope_bindings.get_scopes(&principal); + + let scopes = if let Some(scope_bindings) = &self.scope_bindings { + scope_bindings.get_scopes(&principal) + } else { + Vec::new() + }; + let authorization_request = AuthRequest::new(principal, scopes); let success = Self::send_authorization_request(target_tcp_stream, authorization_request).await?; diff --git a/crates/fluvio-auth/src/x509/identity.rs b/crates/fluvio-auth/src/x509/identity.rs index 46be341b91..b92079a172 100644 --- a/crates/fluvio-auth/src/x509/identity.rs +++ b/crates/fluvio-auth/src/x509/identity.rs @@ -37,11 +37,12 @@ impl X509Identity { principal: req_msg.request.principal, }, }, - Err(_e) => { + Err(err) => { + tracing::error!(%err, "error decoding auth request"); return Err(std::io::Error::new( std::io::ErrorKind::Interrupted, "connection closed", - )) + )); } } } else { diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index f469356f11..47fb72954d 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -84,6 +84,7 @@ k8-types = { workspace = true , features = ["core"] } fluvio-cluster = { path = "../fluvio-cluster", default-features = false, features = ["cli"], optional = true } fluvio = { workspace = true } +fluvio-auth = { workspace = true } fluvio-socket = { workspace = true } fluvio-command = { workspace = true } fluvio-package-index = { workspace = true } diff --git a/crates/fluvio-cli/src/client/remote/export.rs b/crates/fluvio-cli/src/client/remote/export.rs index d9a3a2acf7..3357095b8c 100644 --- a/crates/fluvio-cli/src/client/remote/export.rs +++ b/crates/fluvio-cli/src/client/remote/export.rs @@ -1,15 +1,15 @@ use std::sync::Arc; use anyhow::{Context, Result}; use clap::Parser; +use fluvio::config::{TlsConfig, TlsPolicy}; use fluvio_extension_common::{target::ClusterTarget, Terminal}; +use fluvio_future::native_tls::{CertBuilder, X509PemBuilder}; use fluvio_sc_schema::{ - mirror::{Home, MirrorSpec, MirrorType}, + mirror::{ClientTls, Home, MirrorSpec, MirrorType}, remote_file::RemoteMetadataExport, }; use anyhow::anyhow; -use super::get_admin; - #[derive(Debug, Parser)] pub struct ExportOpt { /// id of the remote cluster to export @@ -20,9 +20,15 @@ pub struct ExportOpt { /// override endpoint of the home cluster #[arg(long, short = 'e')] public_endpoint: Option, - // id of the home cluster to share - #[arg(name = "c")] + /// id of the home cluster to share + #[arg(long)] home_id: Option, + /// remote tls certificate + #[arg(long)] + cert: Option, + /// remote tls key + #[arg(long)] + key: Option, } impl ExportOpt { @@ -31,14 +37,15 @@ impl ExportOpt { out: Arc, cluster_target: ClusterTarget, ) -> Result<()> { + let fluvio_config = cluster_target.load()?; let public_endpoint = if let Some(public_endpoint) = self.public_endpoint { - public_endpoint + public_endpoint.clone() } else { - let fluvio_config = cluster_target.clone().load()?; - fluvio_config.endpoint + fluvio_config.endpoint.clone() }; + let flv = fluvio::Fluvio::connect_with_config(&fluvio_config).await?; + let admin = flv.admin().await; - let admin = get_admin(cluster_target).await?; let all_remotes = admin.all::().await?; let _remote = all_remotes .iter() @@ -50,10 +57,17 @@ impl ExportOpt { let home_id = self.home_id.clone().unwrap_or_else(|| "home".to_owned()); + let tls = get_tls_config( + fluvio_config.clone(), + self.cert.clone(), + self.key.clone(), + self.remote_id.clone(), + )?; let home_metadata = Home { id: home_id, remote_id: self.remote_id, public_endpoint, + tls, }; let metadata = RemoteMetadataExport::new(home_metadata); @@ -68,3 +82,77 @@ impl ExportOpt { Ok(()) } } + +#[cfg(unix)] +fn get_tls_config( + fluvio_config: fluvio::config::FluvioConfig, + cert_path: Option, + key_path: Option, + remote_id: String, +) -> Result> { + match &fluvio_config.tls { + TlsPolicy::Verified(config) => { + let (remote_cert, remote_key, cert_path) = match (cert_path.clone(), key_path) { + (Some(cert), Some(key)) => ( + std::fs::read_to_string(cert.clone())?, + std::fs::read_to_string(key)?, + cert, + ), + _ => { + return Err(anyhow!( + "remote cert and key are required for a cluster using TLS" + )); + } + }; + + let cert_build = X509PemBuilder::from_path(cert_path) + .map_err(|err| anyhow!("error building cert: {}", err))?; + + let cert = cert_build + .build() + .map_err(|err| anyhow!("error building cert: {}", err))?; + + let cert_der = cert + .to_der() + .map_err(|err| anyhow!("error converting cert to der: {}", err))?; + + let principal = fluvio_auth::x509::X509Authenticator::principal_from_raw_certificate(&cert_der).expect( + "error getting principal from certificate. This should never happen as the certificate is valid", + ); + + if principal != remote_id { + return Err(anyhow!( + "remote_id: \"{}\" does not match the CN in the certificate: \"{}\"", + remote_id, + principal + )); + } + + match config { + TlsConfig::Inline(config) => Ok(Some(ClientTls { + domain: config.domain.clone(), + ca_cert: config.ca_cert.clone(), + client_cert: remote_cert, + client_key: remote_key, + })), + TlsConfig::Files(file_config) => Ok(Some(ClientTls { + domain: file_config.domain.clone(), + ca_cert: std::fs::read_to_string(&file_config.ca_cert)?, + client_cert: remote_cert, + client_key: remote_key, + })), + } + } + _ => Ok(None), + } +} + +#[cfg(not(unix))] +fn get_tls_config( + _fluvio_config: fluvio::config::FluvioConfig, + _cert_path: Option, + _key_path: Option, + _remote_id: String, +) -> Result> { + Ok(None) +} diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 760fdfdcbc..63a0ce3212 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.28.1" +version = "0.28.2" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs index f00eff27cb..9dc9f3b8e4 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs @@ -10,7 +10,6 @@ use fluvio_protocol::{Encoder, Decoder}; )] pub struct MirrorSpec { pub mirror_type: MirrorType, - // TODO: we should add auth } impl MirrorSpec { @@ -75,4 +74,25 @@ pub struct Home { pub id: String, pub remote_id: String, pub public_endpoint: String, + #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))] + pub tls: Option, +} + +#[derive(Clone, PartialEq, Eq, Default, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "camelCase") +)] +pub struct ClientTls { + pub domain: String, + pub ca_cert: String, + pub client_cert: String, + pub client_key: String, +} + +impl std::fmt::Debug for ClientTls { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ClientTls: {{ domain: {} }}", self.domain) + } } diff --git a/crates/fluvio-controlplane-metadata/src/mirror/status.rs b/crates/fluvio-controlplane-metadata/src/mirror/status.rs index 2a9e55e1dc..7f3871ff9d 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/status.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/status.rs @@ -1,4 +1,3 @@ -use std::time::Duration; use fluvio_protocol::{Encoder, Decoder}; #[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)] @@ -63,7 +62,7 @@ pub struct ConnectionStat { impl MirrorStatus { #[cfg(feature = "use_serde")] - pub fn last_seen(&self, since: Duration) -> String { + pub fn last_seen(&self, since: std::time::Duration) -> String { use humantime_serde::re::humantime; let since_sec = since.as_secs(); @@ -87,9 +86,9 @@ impl std::fmt::Display for MirrorStatus { (MirrorPairStatus::Disabled, ConnectionStatus::Online) => "Disabled", (MirrorPairStatus::Waiting, ConnectionStatus::Online) => "Waiting", (MirrorPairStatus::Succesful, ConnectionStatus::Offline) => "Offline", - (MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Failed", - (MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Disabled", - (MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Waiting", + (MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Offline", + (MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Offline", + (MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Offline", }; write!(f, "{}", status) } @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus { #[cfg(test)] mod test { use super::*; + use std::time::Duration; #[test] fn test_last_seen() { diff --git a/crates/fluvio-sc/Cargo.toml b/crates/fluvio-sc/Cargo.toml index c2a8fa003d..aa68b7e969 100644 --- a/crates/fluvio-sc/Cargo.toml +++ b/crates/fluvio-sc/Cargo.toml @@ -47,6 +47,7 @@ tracing = { workspace = true } # Fluvio dependencies +fluvio = { workspace = true } fluvio-auth = { workspace = true } fluvio-future = { workspace = true, features = [ "subscriber", diff --git a/crates/fluvio-sc/src/cli.rs b/crates/fluvio-sc/src/cli.rs index 480a2717fa..97a4babc9c 100644 --- a/crates/fluvio-sc/src/cli.rs +++ b/crates/fluvio-sc/src/cli.rs @@ -131,6 +131,8 @@ impl ScOpt { config.white_list = self.white_list.into_iter().collect(); config.read_only_metadata = self.run_mode.read_only.is_some(); + config.tls = self.tls.tls; + // Set Configuration Authorization Policy let policy = match self.auth_policy { diff --git a/crates/fluvio-sc/src/config/sc_config.rs b/crates/fluvio-sc/src/config/sc_config.rs index 554207541d..209edc7bb1 100644 --- a/crates/fluvio-sc/src/config/sc_config.rs +++ b/crates/fluvio-sc/src/config/sc_config.rs @@ -29,6 +29,7 @@ pub struct ScConfig { pub namespace: String, pub x509_auth_scopes: Option, pub white_list: HashSet, + pub tls: bool, } impl ::std::default::Default for ScConfig { @@ -40,6 +41,7 @@ impl ::std::default::Default for ScConfig { namespace: DEFAULT_NAMESPACE.to_owned(), x509_auth_scopes: None, white_list: HashSet::new(), + tls: false, } } } diff --git a/crates/fluvio-sc/src/controllers/mirroring/controller.rs b/crates/fluvio-sc/src/controllers/mirroring/controller.rs index 1ef176d555..eb16a7172e 100644 --- a/crates/fluvio-sc/src/controllers/mirroring/controller.rs +++ b/crates/fluvio-sc/src/controllers/mirroring/controller.rs @@ -1,13 +1,14 @@ use std::time::{Duration, SystemTime}; +use fluvio::config::TlsPolicy; use tracing::{debug, error, info, instrument}; use anyhow::{anyhow, Result}; use fluvio_socket::{ClientConfig, MultiplexerSocket, StreamSocket}; use futures_util::StreamExt; -use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep}; use fluvio_sc_schema::{ core::MetadataItem, - mirror::{ConnectionStatus, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType}, + mirror::{ConnectionStatus, Home, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType}, mirroring::ObjectMirroringRequest, topic::{MirrorConfig, RemoteMirrorConfig, ReplicaSpec, SpuMirrorConfig, TopicSpec}, TryEncodableFrom, @@ -72,9 +73,36 @@ impl RemoteMirrorController { if let Some(home) = home_mirrors.first() { //send to home cluster the connect request - //TODO: handle TLS + let tlspolicy = option_tlspolicy(home); + + // handling tls + let home_config = if let Some(tlspolicy) = &tlspolicy { + match DomainConnector::try_from(tlspolicy.clone()) { + Ok(connector) => { + ClientConfig::new(home.public_endpoint.clone(), connector, false) + } + Err(err) => { + error!( + "error establishing tls with leader at: <{}> err: {}", + home.public_endpoint.clone(), + err + ); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_millis(); + let status = MirrorStatus::new( + MirrorPairStatus::Failed, + ConnectionStatus::Online, + now as u64, + ); + self.mirrors.update_status(home.id.clone(), status).await?; + return Err(err.into()); + } + } + } else { + ClientConfig::with_addr(home.public_endpoint.clone()) + }; - let home_config = ClientConfig::with_addr(home.public_endpoint.clone()); let versioned_socket = home_config.connect().await?; let (socket, config, versions) = versioned_socket.split(); info!("connecting to home: {}", home.public_endpoint); @@ -195,3 +223,23 @@ impl RemoteMirrorController { } } } + +fn option_tlspolicy(home: &Home) -> Option { + use fluvio::config::{TlsCerts, TlsConfig}; + + let ct = match &home.tls { + Some(ct) => ct, + _ => { + return None; + } + }; + + let certs = TlsCerts { + domain: ct.domain.clone(), + key: ct.client_key.clone(), + cert: ct.client_cert.clone(), + ca_cert: ct.ca_cert.clone(), + }; + let tlscfg = TlsConfig::Inline(certs); + Some(TlsPolicy::from(tlscfg)) +} diff --git a/crates/fluvio-sc/src/init.rs b/crates/fluvio-sc/src/init.rs index 956cba2785..e30cd394da 100644 --- a/crates/fluvio-sc/src/init.rs +++ b/crates/fluvio-sc/src/init.rs @@ -124,6 +124,7 @@ where use std::sync::Arc; use tracing::info; + use crate::services::auth::remote::RemoteAuthorization; use crate::services::start_public_server; use crate::core::SharedContext; @@ -142,6 +143,12 @@ where ctx, Arc::new(BasicAuthorization::new(policy)), )); + } else if ctx.config().tls { + info!("using spu remote authorization"); + start_public_server(AuthGlobalContext::new( + ctx, + Arc::new(RemoteAuthorization::new()), + )); } else if ctx.config().read_only_metadata { info!("using read-only authorization"); diff --git a/crates/fluvio-sc/src/services/auth/basic.rs b/crates/fluvio-sc/src/services/auth/basic.rs index d1c7293d12..ea4609a7e8 100644 --- a/crates/fluvio-sc/src/services/auth/basic.rs +++ b/crates/fluvio-sc/src/services/auth/basic.rs @@ -70,6 +70,11 @@ impl AuthContext for BasicAuthContext { ) -> Result { Ok(true) } + + // check if remote id is allowed + fn allow_remote_id(&self, id: &str) -> bool { + self.identity.principal == id + } } /// basic policy module diff --git a/crates/fluvio-sc/src/services/auth/mod.rs b/crates/fluvio-sc/src/services/auth/mod.rs index 1936136dd7..b5c81052d0 100644 --- a/crates/fluvio-sc/src/services/auth/mod.rs +++ b/crates/fluvio-sc/src/services/auth/mod.rs @@ -1,4 +1,5 @@ pub mod basic; +pub mod remote; pub use common::*; @@ -78,6 +79,10 @@ mod common { ) -> Result { Ok(true) } + + fn allow_remote_id(&self, _id: &str) -> bool { + true + } } /// Auth Service Context, this hold individual context that is enough enforce auth @@ -141,6 +146,10 @@ mod common { ) -> Result { Ok(true) } + + fn allow_remote_id(&self, _id: &str) -> bool { + true + } } #[cfg(test)] diff --git a/crates/fluvio-sc/src/services/auth/remote.rs b/crates/fluvio-sc/src/services/auth/remote.rs new file mode 100644 index 0000000000..e615699e6e --- /dev/null +++ b/crates/fluvio-sc/src/services/auth/remote.rs @@ -0,0 +1,65 @@ +use tracing::instrument; +use async_trait::async_trait; + +use fluvio_auth::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; +use fluvio_controlplane_metadata::extended::ObjectType; +use fluvio_auth::x509::X509Identity; + +#[derive(Debug, Clone)] +pub struct RemoteAuthorization {} + +impl RemoteAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl Authorization for RemoteAuthorization { + type Context = RemoteAuthContext; + + #[instrument(level = "trace", skip(self, socket))] + async fn create_auth_context( + &self, + socket: &mut fluvio_socket::FluvioSocket, + ) -> Result { + let identity = X509Identity::create_from_connection(socket) + .await + .map_err(|err| { + tracing::error!(%err, "failed to create x509 identity"); + err + })?; + Ok(RemoteAuthContext { identity }) + } +} + +#[derive(Debug)] +pub struct RemoteAuthContext { + identity: X509Identity, +} + +#[async_trait] +impl AuthContext for RemoteAuthContext { + async fn allow_type_action( + &self, + _ty: ObjectType, + _action: TypeAction, + ) -> Result { + Ok(true) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } + + // check if remote id is allowed + fn allow_remote_id(&self, id: &str) -> bool { + self.identity.principal == id + } +} diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index 0f70612776..00ca9b22e5 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -505,7 +505,7 @@ async fn send_mirror_changes( UpdateMirrorRequest::with_changes(epoch, changes) }; - debug!(?request, "sending mirror to spu"); + debug!("sending mirror to spu"); let mut message = RequestMessage::new_request(request); message.get_mut_header().set_client_id("sc"); diff --git a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs index 8696dbf1d1..ea365544ac 100644 --- a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs +++ b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs @@ -9,14 +9,14 @@ use fluvio_future::{task::spawn, timer::sleep}; use fluvio_protocol::api::{RequestHeader, ResponseMessage}; use fluvio_sc_schema::{ core::MetadataItem, - mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus}, + mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus, MirrorType}, spu::SpuSpec, store::ChangeListener, topic::{MirrorConfig, ReplicaSpec, TopicSpec}, }; use fluvio_socket::ExclusiveFlvSink; use fluvio_types::event::StickyEvent; -use tracing::{debug, error, info, instrument, trace}; +use tracing::{debug, error, info, instrument, trace, warn}; use anyhow::{Result, anyhow}; use crate::core::Context; @@ -55,6 +55,21 @@ impl RemoteFetchingFromHomeController { #[instrument(skip(self), name = "RemoteFetchingFromHomeControllerLoop")] async fn dispatch_loop(mut self) { use tokio::select; + + // check if remote cluster exists + let mirrors = self.ctx.mirrors().store().value(&self.req.remote_id).await; + let remote = mirrors + .iter() + .find(|mirror| match &mirror.spec.mirror_type { + MirrorType::Remote(r) => r.id == self.req.remote_id, + _ => false, + }); + + if remote.is_none() { + warn!("remote cluster not found: {}", self.req.remote_id); + return; + } + info!( name = self.req.remote_id, "received mirroring connect request" diff --git a/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs b/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs index 4a9ca95698..e92ba40bcf 100644 --- a/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use fluvio_controlplane_metadata::mirroring::{MirroringRemoteClusterRequest, MirrorConnect}; use fluvio_socket::ExclusiveFlvSink; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use fluvio_auth::AuthContext; use fluvio_protocol::api::RequestMessage; use fluvio_stream_model::core::MetadataItem; @@ -29,7 +29,6 @@ pub fn handle_mirroring_request( info!("remote cluster register request {:?}", request); let (header, req) = request.get_header_request(); - let ctx = auth_ctx.global_ctx.clone(); let Ok(req) = try_convert_to_reqs(req) else { return Err(anyhow!("unable to decode request")); @@ -37,6 +36,14 @@ pub fn handle_mirroring_request( match req { MirrorRequests::Connect(req) => { + // authorization check + if !auth_ctx.auth.allow_remote_id(&req.remote_id) { + warn!("identity mismatch for remote_id: {}", req.remote_id); + return Err(anyhow!("identity mismatch")); + } + + let ctx = auth_ctx.global_ctx.clone(); + RemoteFetchingFromHomeController::start(req, sink, end_event, ctx, header); } }; diff --git a/crates/fluvio-sc/src/start.rs b/crates/fluvio-sc/src/start.rs index 5d9191c417..47527e9abc 100644 --- a/crates/fluvio-sc/src/start.rs +++ b/crates/fluvio-sc/src/start.rs @@ -148,9 +148,8 @@ mod proxy { pub use fluvio_future::openssl::TlsAcceptor; use fluvio_auth::x509::X509Authenticator; - use flv_tls_proxy::{ - start as proxy_start, start_with_authenticator as proxy_start_with_authenticator, - }; + use flv_tls_proxy::start_with_authenticator as proxy_start_with_authenticator; + use flv_tls_proxy::start as proxy_start; use crate::{config::ScConfig, cli::TlsConfig}; @@ -168,14 +167,17 @@ mod proxy { let target = config.public_endpoint; info!("starting TLS proxy: {}", proxy_addr); - let result = if let Some(x509_auth_scopes) = config.x509_auth_scopes { - let authenticator = Box::new(X509Authenticator::new(&x509_auth_scopes)); + let proxy_result = if let Some(x509_auth_scopes) = config.x509_auth_scopes { + let authenticator = Box::new(X509Authenticator::new(Some(&x509_auth_scopes))); + proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await + } else if config.tls { + let authenticator = Box::new(X509Authenticator::new(None)); proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await } else { proxy_start(&proxy_addr, tls_acceptor, target).await }; - if let Err(err) = result { + if let Err(err) = proxy_result { print_cli_err!(err); process::exit(-1); } diff --git a/crates/fluvio-spu/Cargo.toml b/crates/fluvio-spu/Cargo.toml index 3fcfe57035..b7bbdbc824 100644 --- a/crates/fluvio-spu/Cargo.toml +++ b/crates/fluvio-spu/Cargo.toml @@ -50,6 +50,7 @@ mimalloc = { workspace = true } # Fluvio dependencies fluvio = { workspace = true } +fluvio-auth = { workspace = true } fluvio-types = { workspace = true, features = ["events"] } fluvio-storage = { workspace = true, features = ["iterators"] } fluvio-compression = { workspace = true } diff --git a/crates/fluvio-spu/src/config/cli.rs b/crates/fluvio-spu/src/config/cli.rs index 78bfc73b7d..ef880e5fb9 100644 --- a/crates/fluvio-spu/src/config/cli.rs +++ b/crates/fluvio-spu/src/config/cli.rs @@ -8,6 +8,7 @@ use std::io::Error as IoError; use std::process; use std::io::ErrorKind; +use fluvio_future::openssl::SslVerifyMode; use tracing::debug; use tracing::info; use clap::Parser; @@ -145,6 +146,8 @@ impl SpuOpt { config.peer_max_bytes = self.peer_max_bytes; + config.tls = self.tls.tls; + if let Some(smart_engine_max_memory) = self.smart_engine_max_memory { info!( "overriding smart engine max memory: {}", @@ -178,6 +181,7 @@ impl SpuOpt { .ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing ca cert"))?; TlsAcceptor::builder() .map_err(|err| err.into_io_error())? + .with_ssl_verify_mode(SslVerifyMode::PEER) .with_ca_from_pem_file(ca_path) .map_err(|err| err.into_io_error())? } else { diff --git a/crates/fluvio-spu/src/config/spu_config.rs b/crates/fluvio-spu/src/config/spu_config.rs index e15f47161b..31208f8e97 100644 --- a/crates/fluvio-spu/src/config/spu_config.rs +++ b/crates/fluvio-spu/src/config/spu_config.rs @@ -111,6 +111,8 @@ pub struct SpuConfig { pub peer_max_bytes: u32, pub smart_engine: SmartEngineConfig, + + pub tls: bool, } impl Default for SpuConfig { @@ -126,6 +128,7 @@ impl Default for SpuConfig { log: Log::default(), peer_max_bytes: fluvio_storage::FileReplica::PREFER_MAX_LEN, smart_engine: SmartEngineConfig::default(), + tls: false, } } } diff --git a/crates/fluvio-spu/src/control_plane/dispatcher.rs b/crates/fluvio-spu/src/control_plane/dispatcher.rs index 70d5de850e..3f8eec0bfa 100644 --- a/crates/fluvio-spu/src/control_plane/dispatcher.rs +++ b/crates/fluvio-spu/src/control_plane/dispatcher.rs @@ -376,7 +376,7 @@ impl ScDispatcher { ) -> anyhow::Result<()> { let (_, request) = req_msg.get_header_request(); - debug!( message = ?request,"starting remote cluster update"); + debug!("starting remote cluster update"); let actions = if !request.all.is_empty() { debug!( diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index a0f65aa2b4..1141bb3386 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -2,6 +2,8 @@ use std::time::Duration; use std::{fmt, sync::Arc}; use std::sync::atomic::AtomicU64; +use fluvio_auth::AuthContext; +use fluvio_controlplane_metadata::mirror::MirrorType; use tokio::select; use tracing::{debug, error, instrument, warn}; use anyhow::Result; @@ -10,13 +12,14 @@ use fluvio_future::timer::sleep; use fluvio_protocol::api::RequestMessage; use fluvio_spu_schema::server::mirror::StartMirrorRequest; use futures_util::StreamExt; -use fluvio_socket::{FluvioStream, ExclusiveFlvSink}; +use fluvio_socket::{ExclusiveFlvSink, FluvioStream}; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::remote::api_key::MirrorRemoteApiEnum; use crate::mirroring::remote::remote_api::RemoteMirrorRequest; use crate::mirroring::remote::sync::DefaultPartitionSyncRequest; use crate::replication::leader::SharedFileLeaderState; +use crate::services::auth::SpuAuthServiceContext; use super::update_offsets::UpdateHomeOffsetRequest; @@ -59,18 +62,48 @@ impl fmt::Debug for MirrorHomeHandler { impl MirrorHomeHandler { /// start handling mirror request sync from remote /// it is called from public service handler - pub(crate) async fn respond( - ctx: DefaultSharedGlobalContext, + pub(crate) async fn respond( req_msg: RequestMessage, + auth_ctx: &SpuAuthServiceContext, sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + stream: FluvioStream, ) { + // authorization check + if !auth_ctx + .auth + .allow_remote_id(&req_msg.request.remote_cluster_id) + { + warn!( + "identity mismatch for remote_id: {}", + req_msg.request.remote_cluster_id + ); + return; + } + + // check if remote cluster exists + let mirrors = auth_ctx.global_ctx.mirrors_localstore().all_values(); + let remote = mirrors + .iter() + .find(|mirror| match &mirror.spec.mirror_type { + MirrorType::Remote(r) => r.id == req_msg.request.remote_cluster_id, + _ => false, + }); + + if remote.is_none() { + warn!( + "remote cluster not found: {}", + req_msg.request.remote_cluster_id + ); + return; + } + debug!("handling mirror request: {:#?}", req_msg); let remote_replica = req_msg.request.remote_replica; let remote_cluster_id = req_msg.request.remote_cluster_id; let _access_key = req_msg.request.access_key; - if let Some(leader) = ctx + if let Some(leader) = auth_ctx + .global_ctx .leaders_state() .find_mirror_home_leader(&remote_cluster_id, &remote_replica) .await @@ -79,21 +112,19 @@ impl MirrorHomeHandler { // map to actual home let metrics = Arc::new(MirrorRequestMetrics::new()); - // TODO: perform authorization let handler: MirrorHomeHandler = Self { metrics: metrics.clone(), leader, - ctx, + ctx: auth_ctx.global_ctx.clone(), }; if let Err(err) = handler.inner_respond(sink, stream).await { error!("error handling mirror request: {:#?}", err); } } else { - // TODO: handle no home partition warn!( remote_replica, - remote_cluster_id, "no leader replica found for this" + remote_cluster_id, "no leader replica found for this mirror request" ); } } @@ -102,7 +133,7 @@ impl MirrorHomeHandler { async fn inner_respond( self, mut sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + mut stream: FluvioStream, ) -> Result<()> { // first send let mut api_stream = stream.api_stream::(); diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index 9e5e734755..cab049ce26 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use fluvio::config::TlsPolicy; use futures_util::StreamExt; use tokio::select; use tracing::{debug, error, warn, instrument}; @@ -23,7 +24,7 @@ use fluvio_storage::{ReplicaStorage, FileReplica}; use fluvio_socket::{FluvioSocket, FluvioSink}; use fluvio_spu_schema::{Isolation, server::mirror::StartMirrorRequest}; -use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep}; use fluvio_protocol::{record::Offset, api::RequestMessage}; use fluvio_types::event::offsets::OffsetChangeListener; @@ -211,7 +212,7 @@ where (home_socket, tls): (FluvioSocket, bool), backoff: &mut ExponentialBackoff, ) -> Result<()> { - // debug!(home = self.home, "start syncing mirror"); + debug!(home_id = home.id, "start syncing mirror"); let (mut home_sink, mut home_stream) = home_socket.split(); @@ -220,7 +221,6 @@ where home_sink.disable_zerocopy(); } - // let mut home_api_stream = home_stream.api_stream::(); self.send_initial_request(home, &mut home_sink).await?; @@ -352,7 +352,7 @@ where } /// look up home cluster from local store - /// this may retur None if remote cluster is send by SC by time controller is started + /// this may return None if remote cluster is send by SC by time controller is started fn find_home_cluster(&self) -> Option { let read = self.mirror_store.read(); let mirror = read.get(&self.remote_config.home_cluster).cloned(); @@ -424,13 +424,14 @@ where } /// create socket to home, this will always succeed - #[instrument] + #[instrument(skip(self, home))] async fn create_socket_to_home( &self, backoff: &mut ExponentialBackoff, - _home: &Home, + home: &Home, ) -> (FluvioSocket, bool) { - //TODO: implement tls + let tlspolicy = option_tlspolicy(home); + loop { self.state.metrics.increase_conn_count(); @@ -441,12 +442,28 @@ where "trying connect to home", ); - let res = FluvioSocket::connect(endpoint).await; + let res = if let Some(tlspolicy) = &tlspolicy { + match DomainConnector::try_from(tlspolicy.clone()) { + Ok(connector) => { + FluvioSocket::connect_with_connector(endpoint, &(*connector)).await + } + Err(err) => { + error!( + "error establishing tls with leader at: <{}> err: {}", + endpoint, err + ); + self.backoff_and_wait(backoff).await; + continue; + } + } + } else { + FluvioSocket::connect(endpoint).await + }; match res { Ok(socket) => { debug!("connected"); - return (socket, false); + return (socket, tlspolicy.is_some()); } Err(err) => { @@ -473,3 +490,23 @@ fn create_backoff() -> ExponentialBackoff { .build() .unwrap() } + +fn option_tlspolicy(home: &Home) -> Option { + use fluvio::config::{TlsCerts, TlsConfig}; + + let ct = match &home.tls { + Some(ct) => ct, + _ => { + return None; + } + }; + + let certs = TlsCerts { + domain: ct.domain.clone(), + key: ct.client_key.clone(), + cert: ct.client_cert.clone(), + ca_cert: ct.ca_cert.clone(), + }; + let tlscfg = TlsConfig::Inline(certs); + Some(TlsPolicy::from(tlscfg)) +} diff --git a/crates/fluvio-spu/src/mirroring/test/fixture.rs b/crates/fluvio-spu/src/mirroring/test/fixture.rs index a3dca30136..8c3c602e58 100644 --- a/crates/fluvio-spu/src/mirroring/test/fixture.rs +++ b/crates/fluvio-spu/src/mirroring/test/fixture.rs @@ -200,6 +200,7 @@ impl ReplicaConfig { id: self.home_cluster.clone(), remote_id: self.home_cluster.clone(), public_endpoint: self.home_port.clone(), + tls: None, }), }, }]); diff --git a/crates/fluvio-spu/src/mirroring/test/integration.rs b/crates/fluvio-spu/src/mirroring/test/integration.rs index 2fcf7a928a..6f92c268f0 100644 --- a/crates/fluvio-spu/src/mirroring/test/integration.rs +++ b/crates/fluvio-spu/src/mirroring/test/integration.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::debug; @@ -6,7 +6,10 @@ use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartiti use fluvio_future::timer::sleep; use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey}; -use crate::services::public::create_public_server; +use crate::services::{ + auth::{root::SpuRootAuthorization, SpuAuthGlobalContext}, + public::create_public_server, +}; use super::fixture::{ReplicaConfig, local_port}; @@ -64,7 +67,10 @@ async fn test_mirroring_new_records() { // start home server debug!("starting home server"); - let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run(); + + let auth_global_ctx = + SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(SpuRootAuthorization::new())); + let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run(); // sleep 1 seconds debug!("waiting for home public server to up"); diff --git a/crates/fluvio-spu/src/services/auth/mod.rs b/crates/fluvio-spu/src/services/auth/mod.rs new file mode 100644 index 0000000000..c1d06273a6 --- /dev/null +++ b/crates/fluvio-spu/src/services/auth/mod.rs @@ -0,0 +1,40 @@ +pub mod remote; +pub mod root; + +pub use common::*; + +mod common { + + use std::sync::Arc; + use std::fmt::Debug; + + use crate::core::DefaultSharedGlobalContext; + + /// SPU global context with authorization + /// auth is trait object which contains global auth auth policy + #[derive(Clone, Debug)] + pub struct SpuAuthGlobalContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: Arc, + } + + impl SpuAuthGlobalContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: Arc) -> Self { + Self { global_ctx, auth } + } + } + + /// Auth Service Context, this hold individual context that is enough enforce auth + /// for this service context + #[derive(Debug, Clone)] + pub struct SpuAuthServiceContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: AC, + } + + impl SpuAuthServiceContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: AC) -> Self { + Self { global_ctx, auth } + } + } +} diff --git a/crates/fluvio-spu/src/services/auth/remote.rs b/crates/fluvio-spu/src/services/auth/remote.rs new file mode 100644 index 0000000000..ddac563623 --- /dev/null +++ b/crates/fluvio-spu/src/services/auth/remote.rs @@ -0,0 +1,101 @@ +use async_trait::async_trait; +use fluvio_auth::{ + x509::X509Identity, AuthContext, AuthError, Authorization, InstanceAction, TypeAction, +}; +use fluvio_controlplane_metadata::extended::ObjectType; +use fluvio_socket::FluvioSocket; +use tracing::{info, instrument}; + +/// Authorization that allows by remote Id +#[derive(Debug, Clone)] +pub struct SpuRemoteAuthorization {} + +#[async_trait] +impl Authorization for SpuRemoteAuthorization { + type Context = SpuRemoteAuthContext; + + #[instrument(level = "trace", skip(self, socket))] + async fn create_auth_context( + &self, + socket: &mut FluvioSocket, + ) -> Result { + info!("remote_cert DEBUG_1"); + + let identity = X509Identity::create_from_connection(socket) + .await + .map_err(|err| { + tracing::error!(%err, "failed to create x509 identity"); + err + })?; + + info!("remote_cert DEBUG_2"); + Ok(SpuRemoteAuthContext { identity }) + } +} + +impl SpuRemoteAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub struct SpuRemoteAuthContext { + identity: X509Identity, +} + +#[async_trait] +impl AuthContext for SpuRemoteAuthContext { + async fn allow_type_action( + &self, + _ty: ObjectType, + _action: TypeAction, + ) -> Result { + Ok(true) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } + + fn allow_remote_id(&self, id: &str) -> bool { + self.identity.principal == id + } +} + +#[cfg(test)] +mod test { + use fluvio_auth::AuthContext; + + use super::{ObjectType, SpuRemoteAuthContext, TypeAction}; + + /// test remote context + #[fluvio_future::test] + async fn test_remote_context() { + let auth_context = SpuRemoteAuthContext { + identity: Default::default(), + }; + assert!(auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Read) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Create) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Read) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Create) + .await + .unwrap()); + } +} diff --git a/crates/fluvio-spu/src/services/auth/root.rs b/crates/fluvio-spu/src/services/auth/root.rs new file mode 100644 index 0000000000..f1b1c6d544 --- /dev/null +++ b/crates/fluvio-spu/src/services/auth/root.rs @@ -0,0 +1,56 @@ +use async_trait::async_trait; +use fluvio_auth::{AuthContext, AuthError, Authorization, InstanceAction, TypeAction}; +use fluvio_controlplane_metadata::extended::ObjectType; +use fluvio_socket::FluvioSocket; +use tracing::instrument; + +/// Authorization that allows by remote Id +#[derive(Debug, Clone)] +pub struct SpuRootAuthorization {} + +#[async_trait] +impl Authorization for SpuRootAuthorization { + type Context = SpuRootAuthContext; + + #[instrument(level = "trace", skip(self, _socket))] + async fn create_auth_context( + &self, + _socket: &mut FluvioSocket, + ) -> Result { + Ok(SpuRootAuthContext {}) + } +} + +impl SpuRootAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub struct SpuRootAuthContext {} + +#[async_trait] +impl AuthContext for SpuRootAuthContext { + async fn allow_type_action( + &self, + _ty: ObjectType, + _action: TypeAction, + ) -> Result { + Ok(true) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } + + fn allow_remote_id(&self, _id: &str) -> bool { + true + } +} diff --git a/crates/fluvio-spu/src/services/mod.rs b/crates/fluvio-spu/src/services/mod.rs index 70642db4d4..fbda948ebf 100644 --- a/crates/fluvio-spu/src/services/mod.rs +++ b/crates/fluvio-spu/src/services/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod public; +pub mod auth; pub mod internal; pub use self::internal::create_internal_server; diff --git a/crates/fluvio-spu/src/services/public/mod.rs b/crates/fluvio-spu/src/services/public/mod.rs index b0f3f1ea81..ae3b1c1609 100644 --- a/crates/fluvio-spu/src/services/public/mod.rs +++ b/crates/fluvio-spu/src/services/public/mod.rs @@ -12,6 +12,7 @@ mod conn_context; use std::sync::Arc; use async_trait::async_trait; +use fluvio_auth::Authorization; use fluvio_protocol::api::Request; use fluvio_protocol::api::RequestMessage; use fluvio_protocol::link::ErrorCode; @@ -29,6 +30,8 @@ use fluvio_types::event::StickyEvent; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::home::connection::MirrorHomeHandler; +use crate::services::auth::SpuAuthGlobalContext; +use crate::services::auth::SpuAuthServiceContext; use crate::services::public::consumer_handler::handle_delete_consumer_offset_request; use crate::services::public::consumer_handler::handle_fetch_consumer_offsets_request; use crate::services::public::consumer_handler::handle_update_consumer_offset_request; @@ -39,54 +42,79 @@ use self::offset_request::handle_offset_request; use self::offset_update::handle_offset_update; use self::stream_fetch::{StreamFetchHandler, publishers::StreamPublishers}; use self::conn_context::ConnectionContext; +use std::fmt::Debug; -pub(crate) type SpuPublicServer = - FluvioApiServer; +pub(crate) type SpuPublicServer = + FluvioApiServer, PublicService>; -pub fn create_public_server(addr: String, ctx: DefaultSharedGlobalContext) -> SpuPublicServer { +pub fn create_public_server( + addr: String, + auth_ctx: SpuAuthGlobalContext, +) -> SpuPublicServer +where + A: Authorization + Sync + Send + Debug + 'static, + SpuAuthGlobalContext: Clone + Debug, + ::Context: Send + Sync, +{ info!( - spu_id = ctx.local_spu_id(), + spu_id = auth_ctx.global_ctx.local_spu_id(), %addr, "Starting SPU public service:", ); - FluvioApiServer::new(addr, ctx, PublicService::new()) + FluvioApiServer::new(addr, auth_ctx, PublicService::::new()) } #[derive(Debug)] -pub struct PublicService { - _0: (), // Prevent construction +pub struct PublicService { + data: std::marker::PhantomData, } -impl PublicService { +impl PublicService { pub fn new() -> Self { - PublicService { _0: () } + PublicService { + data: std::marker::PhantomData, + } } } #[async_trait] -impl FluvioService for PublicService { +impl FluvioService for PublicService +where + A: Authorization + Send + Sync, + ::Context: Send + Sync, +{ type Request = SpuServerRequest; - type Context = DefaultSharedGlobalContext; + type Context = SpuAuthGlobalContext; #[instrument(skip(self, context))] async fn respond( self: Arc, - context: DefaultSharedGlobalContext, - socket: FluvioSocket, + context: Self::Context, + mut socket: FluvioSocket, _connection: ConnectInfo, ) -> Result<()> { - let (sink, mut stream) = socket.split(); - + let auth_context = context + .auth + .create_auth_context(&mut socket) + .await + .map_err(|err| { + let io_error: std::io::Error = err.into(); + io_error + })?; + let service_context = SpuAuthServiceContext::new(context.global_ctx.clone(), auth_context); let mut mirror_request: Option> = None; let shutdown = StickyEvent::shared(); - + let (sink, mut stream) = socket.split(); let mut shared_sink = sink.as_shared(); + { let api_stream = stream.api_stream::(); let mut event_stream = api_stream.take_until(shutdown.listen_pinned()); let mut conn_ctx = ConnectionContext::new(); + let context = &context.global_ctx; + loop { let event = event_stream.next().await; match event { @@ -188,7 +216,7 @@ impl FluvioService for PublicService { } if let Some(request) = mirror_request { - MirrorHomeHandler::respond(context, request, shared_sink, &mut stream).await; + MirrorHomeHandler::respond(request, &service_context, shared_sink, stream).await; } shutdown.notify(); diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 4189461a45..b50b2a9fba 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -1,4 +1,4 @@ -use std::{env::temp_dir, time::Duration}; +use std::{env::temp_dir, sync::Arc, time::Duration}; use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind}; use fluvio_controlplane::replica::Replica; @@ -27,13 +27,17 @@ use flv_util::fixture::ensure_clean_dir; use crate::{ config::SpuConfig, core::GlobalContext, - services::public::{ - create_public_server, - tests::{ - create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records, + replication::leader::LeaderReplicaState, + services::{ + auth::{root::SpuRootAuthorization, SpuAuthGlobalContext}, + public::{ + create_public_server, + tests::{ + create_filter_raw_records, create_filter_records, load_wasm_module, + vec_to_raw_batch, + }, }, }, - replication::leader::LeaderReplicaState, }; #[fluvio_future::test(ignore)] @@ -46,8 +50,9 @@ async fn test_produce_basic() { let mut spu_config = SpuConfig::default(); spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -160,7 +165,9 @@ async fn test_produce_invalid_compression() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -234,7 +241,10 @@ async fn test_produce_request_timed_out() { let (leader_ctx, _) = config.leader_replica().await; - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -296,7 +306,10 @@ async fn test_produce_not_waiting_replication() { let (leader_ctx, _) = config.leader_replica().await; let public_addr = config.leader_public_addr(); - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -355,8 +368,11 @@ async fn test_produce_waiting_replication() { let (leader_ctx, leader_replica) = config.leader_replica().await; let public_addr = config.leader_public_addr(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(SpuRootAuthorization::new())); let public_server_end_event = - create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); + let private_server_end_event = create_internal_server(config.leader_addr(), leader_ctx.clone()).run(); @@ -425,7 +441,9 @@ async fn test_produce_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -537,7 +555,9 @@ async fn test_produce_basic_with_smartmodule_with_lookback() { smartmodule.params.set_lookback(Some(Lookback::last(1))); let mut smartmodules = vec![smartmodule]; - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -848,7 +868,9 @@ async fn test_produce_with_deduplication() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1038,7 +1060,9 @@ async fn test_produce_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1124,7 +1148,9 @@ async fn test_dedup_init_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 311d985fba..560c5968e0 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -34,6 +34,8 @@ use fluvio_spu_schema::{ fetch::DefaultFetchRequest, }; use fluvio_spu_schema::server::stream_fetch::DefaultStreamFetchRequest; +use crate::services::auth::root::SpuRootAuthorization; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::public::tests::{vec_to_batch, create_filter_raw_records}; use crate::{ core::GlobalContext, @@ -58,7 +60,9 @@ async fn test_stream_fetch_basic() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -394,7 +398,9 @@ async fn test_stream_fetch_filter( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -585,7 +591,9 @@ async fn test_stream_fetch_filter_individual( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -711,7 +719,9 @@ async fn test_stream_filter_error_fetch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -846,7 +856,9 @@ async fn test_stream_filter_max( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -992,7 +1004,9 @@ async fn test_stream_fetch_map( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1147,7 +1161,9 @@ async fn test_stream_fetch_map_chain( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1307,7 +1323,9 @@ async fn test_stream_fetch_map_error( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1445,7 +1463,9 @@ async fn test_stream_aggregate_fetch_single_batch( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1591,7 +1611,9 @@ async fn test_stream_aggregate_fetch_multiple_batch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1729,7 +1751,9 @@ async fn test_stream_fetch_and_new_request( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1818,7 +1842,9 @@ async fn test_stream_fetch_array_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1940,7 +1966,9 @@ async fn test_stream_fetch_filter_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2065,7 +2093,9 @@ async fn test_stream_fetch_filter_with_params( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2257,7 +2287,9 @@ async fn test_stream_fetch_invalid_smartmodule( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2316,7 +2348,9 @@ async fn test_stream_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2494,7 +2528,9 @@ async fn stream_fetch_filter_lookback( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2655,7 +2691,9 @@ async fn stream_fetch_filter_lookback_age( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -3037,7 +3075,9 @@ async fn test_stream_fetch_sends_topic_delete_error_on_topic_delete() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(SpuRootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/start.rs b/crates/fluvio-spu/src/start.rs index 3b6e9dd470..ab3bd0bce0 100644 --- a/crates/fluvio-spu/src/start.rs +++ b/crates/fluvio-spu/src/start.rs @@ -1,9 +1,13 @@ +use std::sync::Arc; + use fluvio_storage::FileReplica; use crate::config::{SpuConfig, SpuOpt}; +use crate::services::auth::remote::SpuRemoteAuthorization; +use crate::services::auth::root::SpuRootAuthorization; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::create_internal_server; -use crate::services::internal::InternalApiServer; -use crate::services::public::{SpuPublicServer, create_public_server}; +use crate::services::public::create_public_server; use crate::core::DefaultSharedGlobalContext; use crate::core::GlobalContext; use crate::control_plane::ScDispatcher; @@ -41,10 +45,7 @@ pub fn main_loop(opt: SpuOpt) { info!(uptime = sys.uptime(), "Uptime in secs"); run_block_on(async move { - let (ctx, internal_server, public_server) = create_services(spu_config.clone(), true, true); - - let _public_shutdown = internal_server.unwrap().run(); - let _private_shutdown = public_server.unwrap().run(); + let ctx = create_services(spu_config.clone(), true, true); init_monitoring(ctx); @@ -66,51 +67,59 @@ pub fn create_services( local_spu: SpuConfig, internal: bool, public: bool, -) -> ( - DefaultSharedGlobalContext, - Option, - Option, -) { +) -> DefaultSharedGlobalContext { let ctx = FileReplicaContext::new_shared_context(local_spu); let public_ep_addr = ctx.config().public_socket_addr().to_owned(); let private_ep_addr = ctx.config().private_socket_addr().to_owned(); - let public_server = if public { - Some(create_public_server(public_ep_addr, ctx.clone())) - } else { - None + if public { + if ctx.config().tls { + let authorization = Arc::new(SpuRemoteAuthorization::new()); + let auth_global_ctx = SpuAuthGlobalContext::new(ctx.clone(), authorization); + let pub_server = create_public_server(public_ep_addr, auth_global_ctx); + pub_server.run(); + } else { + let authorization = Arc::new(SpuRootAuthorization::new()); + let auth_global_ctx = SpuAuthGlobalContext::new(ctx.clone(), authorization); + let pub_server = create_public_server(public_ep_addr, auth_global_ctx); + pub_server.run(); + } }; - let internal_server = if internal { - Some(create_internal_server(private_ep_addr, ctx.clone())) - } else { - None + if internal { + let priv_server = create_internal_server(private_ep_addr, ctx.clone()); + priv_server.run(); }; let sc_dispatcher = ScDispatcher::new(ctx.clone()); sc_dispatcher.run(); - (ctx, internal_server, public_server) + ctx } mod proxy { use std::process; + use fluvio_auth::x509::X509Authenticator; use tracing::info; use flv_util::print_cli_err; use fluvio_future::openssl::TlsAcceptor; use crate::config::SpuConfig; - use flv_tls_proxy::start as proxy_start; + use flv_tls_proxy::start_with_authenticator as proxy_start_with_authenticator; pub async fn start_proxy(config: SpuConfig, acceptor: (TlsAcceptor, String)) { let (tls_acceptor, proxy_addr) = acceptor; let target = config.public_endpoint; info!("starting TLS proxy: {}", proxy_addr); - if let Err(err) = proxy_start(&proxy_addr, tls_acceptor, target).await { + let authenticator = Box::new(X509Authenticator::new(None)); + let proxy_result = + proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await; + + if let Err(err) = proxy_result { print_cli_err!(err); process::exit(-1); } else { diff --git a/k8-util/helm/fluvio-sys/Chart.yaml b/k8-util/helm/fluvio-sys/Chart.yaml index f0e4dd55d3..459f1d546c 100644 --- a/k8-util/helm/fluvio-sys/Chart.yaml +++ b/k8-util/helm/fluvio-sys/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v2 name: fluvio-sys description: A Helm chart for Fluvio type: application -version: 0.9.16 +version: 0.9.17 diff --git a/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml b/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml index 5a258d1cd9..dde553394b 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml @@ -46,14 +46,17 @@ spec: type: string publicEndpoint: type: string - keyPair: - type: object - required: ["privateKey", "publicKey"] - properties: - publicKey: - type: string - privateKey: - type: string + tls: + type: object + properties: + domain: + type: string + caCert: + type: string + clientCert: + type: string + clientKey: + type: string additionalPrinterColumns: - name: Mirror Type type: string