diff --git a/Cargo.lock b/Cargo.lock index 6021984a0c..9d87ec8bd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2335,6 +2335,7 @@ dependencies = [ "eyre", "flate2", "fluvio", + "fluvio-auth", "fluvio-channel", "fluvio-cli-common", "fluvio-cluster", @@ -2559,7 +2560,7 @@ dependencies = [ [[package]] name = "fluvio-controlplane-metadata" -version = "0.28.1" +version = "0.28.2" dependencies = [ "anyhow", "async-trait", @@ -2807,6 +2808,7 @@ dependencies = [ "cfg-if", "clap", "event-listener 3.1.0", + "fluvio", "fluvio-auth", "fluvio-controlplane", "fluvio-controlplane-metadata", @@ -2961,6 +2963,7 @@ dependencies = [ "event-listener 3.1.0", "flate2", "fluvio", + "fluvio-auth", "fluvio-compression", "fluvio-controlplane", "fluvio-controlplane-metadata", diff --git a/crates/fluvio-sc/src/services/auth/basic.rs b/crates/fluvio-auth/src/basic.rs similarity index 95% rename from crates/fluvio-sc/src/services/auth/basic.rs rename to crates/fluvio-auth/src/basic.rs index d1c7293d12..a43ee51316 100644 --- a/crates/fluvio-sc/src/services/auth/basic.rs +++ b/crates/fluvio-auth/src/basic.rs @@ -4,9 +4,9 @@ use tracing::instrument; use async_trait::async_trait; pub use policy::BasicRbacPolicy; -use fluvio_auth::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; +use crate::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; use fluvio_controlplane_metadata::extended::ObjectType; -use fluvio_auth::x509::X509Identity; +use crate::x509::X509Identity; #[derive(Debug, Clone)] pub struct BasicAuthorization { @@ -84,8 +84,8 @@ mod policy { use tracing::debug; use serde::{Serialize, Deserialize}; - use fluvio_auth::{AuthError, TypeAction, InstanceAction}; - use fluvio_auth::x509::X509Identity; + use crate::{AuthError, TypeAction, InstanceAction}; + use crate::x509::X509Identity; use super::ObjectType; @@ -105,6 +105,7 @@ mod policy { match action { TypeAction::Create => Action::Create, TypeAction::Read => Action::Read, + TypeAction::Update => Action::Update, } } } @@ -147,6 +148,7 @@ mod policy { // let (action,object,_instance) = request; // For each scope provided in the identity, // check if there is a match; + // let is_allowed = identity.scopes().iter().any(|scope| { self.0 .get(scope) @@ -179,6 +181,7 @@ mod policy { root_policy.insert(ObjectType::Partition, vec![Action::All]); root_policy.insert(ObjectType::TableFormat, vec![Action::All]); root_policy.insert(ObjectType::Mirror, vec![Action::All]); + root_policy.insert(ObjectType::RemoteToHomeConnection, vec![Action::All]); let mut policy = HashMap::new(); @@ -197,7 +200,7 @@ mod test { use std::convert::TryFrom; use std::collections::HashMap; - use fluvio_auth::x509::X509Identity; + use crate::x509::X509Identity; use super::policy::*; use super::ObjectType; diff --git a/crates/fluvio-auth/src/lib.rs b/crates/fluvio-auth/src/lib.rs index 816fec5996..f9040838aa 100644 --- a/crates/fluvio-auth/src/lib.rs +++ b/crates/fluvio-auth/src/lib.rs @@ -1,6 +1,9 @@ mod policy; mod error; +pub mod basic; +pub mod root; + pub mod x509; pub use policy::*; diff --git a/crates/fluvio-auth/src/policy.rs b/crates/fluvio-auth/src/policy.rs index 5270ddb521..eb3b758c51 100644 --- a/crates/fluvio-auth/src/policy.rs +++ b/crates/fluvio-auth/src/policy.rs @@ -12,6 +12,7 @@ use super::AuthError; pub enum TypeAction { Create, Read, + Update, } pub enum InstanceAction { @@ -19,7 +20,7 @@ pub enum InstanceAction { } #[async_trait] -pub trait AuthContext: Debug { +pub trait AuthContext: Debug + Send + Sync + 'static { /// check if any allow type specific action can be allowed async fn allow_type_action( &self, diff --git a/crates/fluvio-auth/src/root.rs b/crates/fluvio-auth/src/root.rs new file mode 100644 index 0000000000..9f72754555 --- /dev/null +++ b/crates/fluvio-auth/src/root.rs @@ -0,0 +1,154 @@ +use std::fmt::Debug; + +use async_trait::async_trait; + +use crate::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; +use fluvio_socket::FluvioSocket; +use fluvio_controlplane_metadata::extended::ObjectType; + +/// Authorization that allows anything +/// Used for personal development +#[derive(Debug, Clone, Default)] +pub struct RootAuthorization {} + +#[async_trait] +impl Authorization for RootAuthorization { + type Context = RootAuthContext; + + async fn create_auth_context( + &self, + _socket: &mut FluvioSocket, + ) -> Result { + Ok(RootAuthContext {}) + } +} + +impl RootAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub struct RootAuthContext {} + +#[async_trait] +impl AuthContext for RootAuthContext { + async fn allow_type_action( + &self, + _ty: ObjectType, + _action: TypeAction, + ) -> Result { + Ok(true) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } +} + +/// Authorization that allows only read only ops +#[derive(Debug, Clone, Default)] +pub struct ReadOnlyAuthorization {} + +#[async_trait] +impl Authorization for ReadOnlyAuthorization { + type Context = ReadOnlyAuthContext; + + async fn create_auth_context( + &self, + _socket: &mut FluvioSocket, + ) -> Result { + Ok(ReadOnlyAuthContext {}) + } +} + +impl ReadOnlyAuthorization { + pub fn new() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub struct ReadOnlyAuthContext {} + +#[async_trait] +impl AuthContext for ReadOnlyAuthContext { + async fn allow_type_action( + &self, + ty: ObjectType, + action: TypeAction, + ) -> Result { + Ok(matches!(action, TypeAction::Read) || matches!(ty, ObjectType::CustomSpu)) + } + + /// check if specific instance of spec can be deleted + async fn allow_instance_action( + &self, + _ty: ObjectType, + _action: InstanceAction, + _key: &str, + ) -> Result { + Ok(true) + } +} + +#[cfg(test)] +mod test { + use crate::AuthContext; + + use super::{ObjectType, ReadOnlyAuthContext, RootAuthContext, TypeAction}; + + /// test read only context + /// read only context allows read on everything + /// and create on spu + #[fluvio_future::test] + async fn test_read_only_context() { + let auth_context = ReadOnlyAuthContext {}; + assert!(auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Read) + .await + .unwrap()); + assert!(!auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Create) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Read) + .await + .unwrap()); + assert!(!auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Create) + .await + .unwrap()); + } + + /// test root context + /// root context allows everything + #[fluvio_future::test] + async fn test_root_context() { + let auth_context = RootAuthContext {}; + assert!(auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Read) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Spu, TypeAction::Create) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Read) + .await + .unwrap()); + assert!(auth_context + .allow_type_action(ObjectType::Topic, TypeAction::Create) + .await + .unwrap()); + } +} diff --git a/crates/fluvio-auth/src/x509/authenticator.rs b/crates/fluvio-auth/src/x509/authenticator.rs index 77501c523c..7b5f6284a0 100644 --- a/crates/fluvio-auth/src/x509/authenticator.rs +++ b/crates/fluvio-auth/src/x509/authenticator.rs @@ -36,15 +36,15 @@ impl ScopeBindings { #[derive(Debug)] pub struct X509Authenticator { - scope_bindings: ScopeBindings, + scope_bindings: Option, } impl X509Authenticator { - pub fn new(scope_binding_file_path: &Path) -> Self { - Self { - scope_bindings: ScopeBindings::load(scope_binding_file_path) - .expect("unable to create ScopeBindings"), - } + pub fn new(scope_binding_file_path: Option<&Path>) -> Self { + let scope_bindings = scope_binding_file_path.map(|scope_binding_file_path| { + ScopeBindings::load(scope_binding_file_path).expect("unable to create ScopeBindings") + }); + Self { scope_bindings } } async fn send_authorization_request( @@ -79,13 +79,9 @@ impl X509Authenticator { fn principal_from_tls_stream(tls_stream: &DefaultServerTlsStream) -> Result { trace!("tls_stream {:?}", tls_stream); - let peer_certificate = tls_stream.peer_certificate(); - - trace!("peer_certificate {:?}", peer_certificate); - let client_certificate = tls_stream.peer_certificate().ok_or(IoErrorKind::NotFound)?; - trace!("client_certificate {:?}", tls_stream); + trace!("client_certificate {:?}", client_certificate); let principal = Self::principal_from_raw_certificate( &client_certificate @@ -96,7 +92,7 @@ impl X509Authenticator { Ok(principal) } - fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result { + pub fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result { parse_x509_certificate(certificate_bytes) .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) .and_then(|(_, parsed_cert)| Self::common_name_from_parsed_certificate(parsed_cert)) @@ -131,7 +127,13 @@ impl Authenticator for X509Authenticator { target_tcp_stream: &TcpStream, ) -> Result { let principal = Self::principal_from_tls_stream(incoming_tls_stream)?; - let scopes = self.scope_bindings.get_scopes(&principal); + + let scopes = if let Some(scope_bindings) = &self.scope_bindings { + scope_bindings.get_scopes(&principal) + } else { + Vec::new() + }; + let authorization_request = AuthRequest::new(principal, scopes); let success = Self::send_authorization_request(target_tcp_stream, authorization_request).await?; diff --git a/crates/fluvio-auth/src/x509/identity.rs b/crates/fluvio-auth/src/x509/identity.rs index 46be341b91..b92079a172 100644 --- a/crates/fluvio-auth/src/x509/identity.rs +++ b/crates/fluvio-auth/src/x509/identity.rs @@ -37,11 +37,12 @@ impl X509Identity { principal: req_msg.request.principal, }, }, - Err(_e) => { + Err(err) => { + tracing::error!(%err, "error decoding auth request"); return Err(std::io::Error::new( std::io::ErrorKind::Interrupted, "connection closed", - )) + )); } } } else { diff --git a/crates/fluvio-cli/Cargo.toml b/crates/fluvio-cli/Cargo.toml index f469356f11..47fb72954d 100644 --- a/crates/fluvio-cli/Cargo.toml +++ b/crates/fluvio-cli/Cargo.toml @@ -84,6 +84,7 @@ k8-types = { workspace = true , features = ["core"] } fluvio-cluster = { path = "../fluvio-cluster", default-features = false, features = ["cli"], optional = true } fluvio = { workspace = true } +fluvio-auth = { workspace = true } fluvio-socket = { workspace = true } fluvio-command = { workspace = true } fluvio-package-index = { workspace = true } diff --git a/crates/fluvio-cli/src/client/remote/export.rs b/crates/fluvio-cli/src/client/remote/export.rs index d9a3a2acf7..06eada1290 100644 --- a/crates/fluvio-cli/src/client/remote/export.rs +++ b/crates/fluvio-cli/src/client/remote/export.rs @@ -3,13 +3,11 @@ use anyhow::{Context, Result}; use clap::Parser; use fluvio_extension_common::{target::ClusterTarget, Terminal}; use fluvio_sc_schema::{ - mirror::{Home, MirrorSpec, MirrorType}, + mirror::{ClientTls, Home, MirrorSpec, MirrorType}, remote_file::RemoteMetadataExport, }; use anyhow::anyhow; -use super::get_admin; - #[derive(Debug, Parser)] pub struct ExportOpt { /// id of the remote cluster to export @@ -20,9 +18,15 @@ pub struct ExportOpt { /// override endpoint of the home cluster #[arg(long, short = 'e')] public_endpoint: Option, - // id of the home cluster to share - #[arg(name = "c")] + /// id of the home cluster to share + #[arg(long)] home_id: Option, + /// remote tls certificate + #[arg(long)] + cert: Option, + /// remote tls key + #[arg(long)] + key: Option, } impl ExportOpt { @@ -31,14 +35,15 @@ impl ExportOpt { out: Arc, cluster_target: ClusterTarget, ) -> Result<()> { + let fluvio_config = cluster_target.load()?; let public_endpoint = if let Some(public_endpoint) = self.public_endpoint { - public_endpoint + public_endpoint.clone() } else { - let fluvio_config = cluster_target.clone().load()?; - fluvio_config.endpoint + fluvio_config.endpoint.clone() }; + let flv = fluvio::Fluvio::connect_with_config(&fluvio_config).await?; + let admin = flv.admin().await; - let admin = get_admin(cluster_target).await?; let all_remotes = admin.all::().await?; let _remote = all_remotes .iter() @@ -50,10 +55,17 @@ impl ExportOpt { let home_id = self.home_id.clone().unwrap_or_else(|| "home".to_owned()); + let tls = get_tls_config( + fluvio_config.clone(), + self.cert.clone(), + self.key.clone(), + self.remote_id.clone(), + )?; let home_metadata = Home { id: home_id, remote_id: self.remote_id, public_endpoint, + tls, }; let metadata = RemoteMetadataExport::new(home_metadata); @@ -68,3 +80,79 @@ impl ExportOpt { Ok(()) } } + +#[cfg(unix)] +fn get_tls_config( + fluvio_config: fluvio::config::FluvioConfig, + cert_path: Option, + key_path: Option, + remote_id: String, +) -> Result> { + use fluvio::config::{TlsConfig, TlsPolicy}; + use fluvio_future::native_tls::{CertBuilder, X509PemBuilder}; + match &fluvio_config.tls { + TlsPolicy::Verified(config) => { + let (remote_cert, remote_key, cert_path) = match (cert_path.clone(), key_path) { + (Some(cert), Some(key)) => ( + std::fs::read_to_string(cert.clone())?, + std::fs::read_to_string(key)?, + cert, + ), + _ => { + return Err(anyhow!( + "remote cert and key are required for a cluster using TLS" + )); + } + }; + + let cert_build = X509PemBuilder::from_path(cert_path) + .map_err(|err| anyhow!("error building cert: {}", err))?; + + let cert = cert_build + .build() + .map_err(|err| anyhow!("error building cert: {}", err))?; + + let cert_der = cert + .to_der() + .map_err(|err| anyhow!("error converting cert to der: {}", err))?; + + let principal = fluvio_auth::x509::X509Authenticator::principal_from_raw_certificate(&cert_der).expect( + "error getting principal from certificate. This should never happen as the certificate is valid", + ); + + if principal != remote_id { + return Err(anyhow!( + "remote_id: \"{}\" does not match the CN in the certificate: \"{}\"", + remote_id, + principal + )); + } + + match config { + TlsConfig::Inline(config) => Ok(Some(ClientTls { + domain: config.domain.clone(), + ca_cert: config.ca_cert.clone(), + client_cert: remote_cert, + client_key: remote_key, + })), + TlsConfig::Files(file_config) => Ok(Some(ClientTls { + domain: file_config.domain.clone(), + ca_cert: std::fs::read_to_string(&file_config.ca_cert)?, + client_cert: remote_cert, + client_key: remote_key, + })), + } + } + _ => Ok(None), + } +} + +#[cfg(not(unix))] +fn get_tls_config( + _fluvio_config: fluvio::config::FluvioConfig, + _cert_path: Option, + _key_path: Option, + _remote_id: String, +) -> Result> { + Ok(None) +} diff --git a/crates/fluvio-controlplane-metadata/Cargo.toml b/crates/fluvio-controlplane-metadata/Cargo.toml index 760fdfdcbc..63a0ce3212 100644 --- a/crates/fluvio-controlplane-metadata/Cargo.toml +++ b/crates/fluvio-controlplane-metadata/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-controlplane-metadata" edition = "2021" -version = "0.28.1" +version = "0.28.2" authors = ["Fluvio Contributors "] description = "Metadata definition for Fluvio control plane" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-controlplane-metadata/src/lib.rs b/crates/fluvio-controlplane-metadata/src/lib.rs index b6fd323cfa..d5bbc46733 100644 --- a/crates/fluvio-controlplane-metadata/src/lib.rs +++ b/crates/fluvio-controlplane-metadata/src/lib.rs @@ -34,6 +34,7 @@ pub mod extended { TableFormat, DerivedStream, Mirror, + RemoteToHomeConnection, } pub trait SpecExt: Spec { diff --git a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs index f00eff27cb..6c5e095dfb 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/spec.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/spec.rs @@ -10,7 +10,6 @@ use fluvio_protocol::{Encoder, Decoder}; )] pub struct MirrorSpec { pub mirror_type: MirrorType, - // TODO: we should add auth } impl MirrorSpec { @@ -75,4 +74,26 @@ pub struct Home { pub id: String, pub remote_id: String, pub public_endpoint: String, + //TODO: maybe we should move it to a secret or just store the paths of the files + #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))] + pub tls: Option, +} + +#[derive(Clone, PartialEq, Eq, Default, Encoder, Decoder)] +#[cfg_attr( + feature = "use_serde", + derive(serde::Serialize, serde::Deserialize), + serde(rename_all = "camelCase") +)] +pub struct ClientTls { + pub domain: String, + pub ca_cert: String, + pub client_cert: String, + pub client_key: String, +} + +impl std::fmt::Debug for ClientTls { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ClientTls: {{ domain: {} }}", self.domain) + } } diff --git a/crates/fluvio-controlplane-metadata/src/mirror/status.rs b/crates/fluvio-controlplane-metadata/src/mirror/status.rs index 2a9e55e1dc..7f3871ff9d 100644 --- a/crates/fluvio-controlplane-metadata/src/mirror/status.rs +++ b/crates/fluvio-controlplane-metadata/src/mirror/status.rs @@ -1,4 +1,3 @@ -use std::time::Duration; use fluvio_protocol::{Encoder, Decoder}; #[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)] @@ -63,7 +62,7 @@ pub struct ConnectionStat { impl MirrorStatus { #[cfg(feature = "use_serde")] - pub fn last_seen(&self, since: Duration) -> String { + pub fn last_seen(&self, since: std::time::Duration) -> String { use humantime_serde::re::humantime; let since_sec = since.as_secs(); @@ -87,9 +86,9 @@ impl std::fmt::Display for MirrorStatus { (MirrorPairStatus::Disabled, ConnectionStatus::Online) => "Disabled", (MirrorPairStatus::Waiting, ConnectionStatus::Online) => "Waiting", (MirrorPairStatus::Succesful, ConnectionStatus::Offline) => "Offline", - (MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Failed", - (MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Disabled", - (MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Waiting", + (MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Offline", + (MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Offline", + (MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Offline", }; write!(f, "{}", status) } @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus { #[cfg(test)] mod test { use super::*; + use std::time::Duration; #[test] fn test_last_seen() { diff --git a/crates/fluvio-sc/Cargo.toml b/crates/fluvio-sc/Cargo.toml index c2a8fa003d..aa68b7e969 100644 --- a/crates/fluvio-sc/Cargo.toml +++ b/crates/fluvio-sc/Cargo.toml @@ -47,6 +47,7 @@ tracing = { workspace = true } # Fluvio dependencies +fluvio = { workspace = true } fluvio-auth = { workspace = true } fluvio-future = { workspace = true, features = [ "subscriber", diff --git a/crates/fluvio-sc/src/cli.rs b/crates/fluvio-sc/src/cli.rs index 480a2717fa..5857b215f1 100644 --- a/crates/fluvio-sc/src/cli.rs +++ b/crates/fluvio-sc/src/cli.rs @@ -13,9 +13,9 @@ use std::process; use std::io::Error as IoError; use std::io::ErrorKind; use std::path::PathBuf; -use std::convert::TryFrom; use clap::Args; +use fluvio_auth::basic::BasicRbacPolicy; use tracing::info; use tracing::debug; use clap::Parser; @@ -25,7 +25,6 @@ use fluvio_types::defaults::TLS_SERVER_SECRET_NAME; use fluvio_future::openssl::TlsAcceptor; use fluvio_future::openssl::SslVerifyMode; -use crate::services::auth::basic::BasicRbacPolicy; use crate::config::ScConfig; type Config = (ScConfig, Option); @@ -131,6 +130,8 @@ impl ScOpt { config.white_list = self.white_list.into_iter().collect(); config.read_only_metadata = self.run_mode.read_only.is_some(); + config.tls = self.tls.tls; + // Set Configuration Authorization Policy let policy = match self.auth_policy { diff --git a/crates/fluvio-sc/src/config/sc_config.rs b/crates/fluvio-sc/src/config/sc_config.rs index 554207541d..209edc7bb1 100644 --- a/crates/fluvio-sc/src/config/sc_config.rs +++ b/crates/fluvio-sc/src/config/sc_config.rs @@ -29,6 +29,7 @@ pub struct ScConfig { pub namespace: String, pub x509_auth_scopes: Option, pub white_list: HashSet, + pub tls: bool, } impl ::std::default::Default for ScConfig { @@ -40,6 +41,7 @@ impl ::std::default::Default for ScConfig { namespace: DEFAULT_NAMESPACE.to_owned(), x509_auth_scopes: None, white_list: HashSet::new(), + tls: false, } } } diff --git a/crates/fluvio-sc/src/controllers/mirroring/controller.rs b/crates/fluvio-sc/src/controllers/mirroring/controller.rs index 1ef176d555..eb16a7172e 100644 --- a/crates/fluvio-sc/src/controllers/mirroring/controller.rs +++ b/crates/fluvio-sc/src/controllers/mirroring/controller.rs @@ -1,13 +1,14 @@ use std::time::{Duration, SystemTime}; +use fluvio::config::TlsPolicy; use tracing::{debug, error, info, instrument}; use anyhow::{anyhow, Result}; use fluvio_socket::{ClientConfig, MultiplexerSocket, StreamSocket}; use futures_util::StreamExt; -use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep}; use fluvio_sc_schema::{ core::MetadataItem, - mirror::{ConnectionStatus, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType}, + mirror::{ConnectionStatus, Home, MirrorPairStatus, MirrorSpec, MirrorStatus, MirrorType}, mirroring::ObjectMirroringRequest, topic::{MirrorConfig, RemoteMirrorConfig, ReplicaSpec, SpuMirrorConfig, TopicSpec}, TryEncodableFrom, @@ -72,9 +73,36 @@ impl RemoteMirrorController { if let Some(home) = home_mirrors.first() { //send to home cluster the connect request - //TODO: handle TLS + let tlspolicy = option_tlspolicy(home); + + // handling tls + let home_config = if let Some(tlspolicy) = &tlspolicy { + match DomainConnector::try_from(tlspolicy.clone()) { + Ok(connector) => { + ClientConfig::new(home.public_endpoint.clone(), connector, false) + } + Err(err) => { + error!( + "error establishing tls with leader at: <{}> err: {}", + home.public_endpoint.clone(), + err + ); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_millis(); + let status = MirrorStatus::new( + MirrorPairStatus::Failed, + ConnectionStatus::Online, + now as u64, + ); + self.mirrors.update_status(home.id.clone(), status).await?; + return Err(err.into()); + } + } + } else { + ClientConfig::with_addr(home.public_endpoint.clone()) + }; - let home_config = ClientConfig::with_addr(home.public_endpoint.clone()); let versioned_socket = home_config.connect().await?; let (socket, config, versions) = versioned_socket.split(); info!("connecting to home: {}", home.public_endpoint); @@ -195,3 +223,23 @@ impl RemoteMirrorController { } } } + +fn option_tlspolicy(home: &Home) -> Option { + use fluvio::config::{TlsCerts, TlsConfig}; + + let ct = match &home.tls { + Some(ct) => ct, + _ => { + return None; + } + }; + + let certs = TlsCerts { + domain: ct.domain.clone(), + key: ct.client_key.clone(), + cert: ct.client_cert.clone(), + ca_cert: ct.ca_cert.clone(), + }; + let tlscfg = TlsConfig::Inline(certs); + Some(TlsPolicy::from(tlscfg)) +} diff --git a/crates/fluvio-sc/src/init.rs b/crates/fluvio-sc/src/init.rs index 956cba2785..01220fb21d 100644 --- a/crates/fluvio-sc/src/init.rs +++ b/crates/fluvio-sc/src/init.rs @@ -6,6 +6,7 @@ //! use std::sync::Arc; +use fluvio_auth::basic::BasicRbacPolicy; use fluvio_sc_schema::mirror::MirrorSpec; use fluvio_stream_dispatcher::metadata::{SharedClient, MetadataClient}; use fluvio_stream_model::core::MetadataItem; @@ -19,7 +20,6 @@ use crate::controllers::topics::controller::{TopicController, SystemTopicControl use crate::config::ScConfig; use crate::services::start_internal_server; use crate::dispatcher::dispatcher::MetadataDispatcher; -use crate::services::auth::basic::BasicRbacPolicy; pub async fn start_main_loop( sc_config_policy: (ScConfig, Option), @@ -122,14 +122,15 @@ where mod pub_server { use std::sync::Arc; + use fluvio_auth::basic::{BasicAuthorization, BasicRbacPolicy}; + use fluvio_auth::root::{ReadOnlyAuthorization, RootAuthorization}; use tracing::info; + use crate::services::auth::ScAuthGlobalContext; use crate::services::start_public_server; use crate::core::SharedContext; use fluvio_controlplane_metadata::core::MetadataItem; - use crate::services::auth::{AuthGlobalContext, RootAuthorization, ReadOnlyAuthorization}; - use crate::services::auth::basic::{BasicAuthorization, BasicRbacPolicy}; pub fn start(ctx: SharedContext, auth_policy_option: Option) where @@ -138,20 +139,20 @@ where { if let Some(policy) = auth_policy_option { info!("using basic authorization"); - start_public_server(AuthGlobalContext::new( + start_public_server(ScAuthGlobalContext::new( ctx, Arc::new(BasicAuthorization::new(policy)), )); } else if ctx.config().read_only_metadata { info!("using read-only authorization"); - start_public_server(AuthGlobalContext::new( + start_public_server(ScAuthGlobalContext::new( ctx, Arc::new(ReadOnlyAuthorization::new()), )); } else { info!("using root authorization"); - start_public_server(AuthGlobalContext::new( + start_public_server(ScAuthGlobalContext::new( ctx, Arc::new(RootAuthorization::new()), )); diff --git a/crates/fluvio-sc/src/services/auth/mod.rs b/crates/fluvio-sc/src/services/auth/mod.rs index 1936136dd7..14cb7b6898 100644 --- a/crates/fluvio-sc/src/services/auth/mod.rs +++ b/crates/fluvio-sc/src/services/auth/mod.rs @@ -1,5 +1,3 @@ -pub mod basic; - pub use common::*; mod common { @@ -7,11 +5,6 @@ mod common { use std::sync::Arc; use std::fmt::Debug; - use async_trait::async_trait; - - use fluvio_auth::{AuthContext, Authorization, TypeAction, InstanceAction, AuthError}; - use fluvio_socket::FluvioSocket; - use fluvio_controlplane_metadata::extended::ObjectType; use fluvio_stream_model::core::MetadataItem; use crate::core::SharedContext; @@ -19,12 +12,12 @@ mod common { /// SC global context with authorization /// auth is trait object which contains global auth auth policy #[derive(Clone, Debug)] - pub struct AuthGlobalContext { + pub struct ScAuthGlobalContext { pub global_ctx: SharedContext, pub auth: Arc, } - impl AuthGlobalContext + impl ScAuthGlobalContext where C: MetadataItem, { @@ -33,62 +26,15 @@ mod common { } } - /// Authorization that allows anything - /// Used for personal development - #[derive(Debug, Clone)] - pub struct RootAuthorization {} - - #[async_trait] - impl Authorization for RootAuthorization { - type Context = RootAuthContext; - - async fn create_auth_context( - &self, - _socket: &mut FluvioSocket, - ) -> Result { - Ok(RootAuthContext {}) - } - } - - impl RootAuthorization { - pub fn new() -> Self { - Self {} - } - } - - #[derive(Debug)] - pub struct RootAuthContext {} - - #[async_trait] - impl AuthContext for RootAuthContext { - async fn allow_type_action( - &self, - _ty: ObjectType, - _action: TypeAction, - ) -> Result { - Ok(true) - } - - /// check if specific instance of spec can be deleted - async fn allow_instance_action( - &self, - _ty: ObjectType, - _action: InstanceAction, - _key: &str, - ) -> Result { - Ok(true) - } - } - /// Auth Service Context, this hold individual context that is enough enforce auth /// for this service context #[derive(Debug, Clone)] - pub struct AuthServiceContext { + pub struct ScAuthServiceContext { pub global_ctx: SharedContext, pub auth: AC, } - impl AuthServiceContext + impl ScAuthServiceContext where C: MetadataItem, { @@ -96,104 +42,4 @@ mod common { Self { global_ctx, auth } } } - - /// Authorization that allows only read only ops - #[derive(Debug, Clone)] - pub struct ReadOnlyAuthorization {} - - #[async_trait] - impl Authorization for ReadOnlyAuthorization { - type Context = ReadOnlyAuthContext; - - async fn create_auth_context( - &self, - _socket: &mut FluvioSocket, - ) -> Result { - Ok(ReadOnlyAuthContext {}) - } - } - - impl ReadOnlyAuthorization { - pub fn new() -> Self { - Self {} - } - } - - #[derive(Debug)] - pub struct ReadOnlyAuthContext {} - - #[async_trait] - impl AuthContext for ReadOnlyAuthContext { - async fn allow_type_action( - &self, - ty: ObjectType, - action: TypeAction, - ) -> Result { - Ok(matches!(action, TypeAction::Read) || matches!(ty, ObjectType::CustomSpu)) - } - - /// check if specific instance of spec can be deleted - async fn allow_instance_action( - &self, - _ty: ObjectType, - _action: InstanceAction, - _key: &str, - ) -> Result { - Ok(true) - } - } - - #[cfg(test)] - mod test { - use fluvio_auth::AuthContext; - - use super::{ObjectType, ReadOnlyAuthContext, RootAuthContext, TypeAction}; - - /// test read only context - /// read only context allows read on everything - /// and create on spu - #[fluvio_future::test] - async fn test_read_only_context() { - let auth_context = ReadOnlyAuthContext {}; - assert!(auth_context - .allow_type_action(ObjectType::Spu, TypeAction::Read) - .await - .unwrap()); - assert!(!auth_context - .allow_type_action(ObjectType::Spu, TypeAction::Create) - .await - .unwrap()); - assert!(auth_context - .allow_type_action(ObjectType::Topic, TypeAction::Read) - .await - .unwrap()); - assert!(!auth_context - .allow_type_action(ObjectType::Topic, TypeAction::Create) - .await - .unwrap()); - } - - /// test root context - /// root context allows everything - #[fluvio_future::test] - async fn test_root_context() { - let auth_context = RootAuthContext {}; - assert!(auth_context - .allow_type_action(ObjectType::Spu, TypeAction::Read) - .await - .unwrap()); - assert!(auth_context - .allow_type_action(ObjectType::Spu, TypeAction::Create) - .await - .unwrap()); - assert!(auth_context - .allow_type_action(ObjectType::Topic, TypeAction::Read) - .await - .unwrap()); - assert!(auth_context - .allow_type_action(ObjectType::Topic, TypeAction::Create) - .await - .unwrap()); - } - } } diff --git a/crates/fluvio-sc/src/services/private_api/private_server.rs b/crates/fluvio-sc/src/services/private_api/private_server.rs index 0f70612776..00ca9b22e5 100644 --- a/crates/fluvio-sc/src/services/private_api/private_server.rs +++ b/crates/fluvio-sc/src/services/private_api/private_server.rs @@ -505,7 +505,7 @@ async fn send_mirror_changes( UpdateMirrorRequest::with_changes(epoch, changes) }; - debug!(?request, "sending mirror to spu"); + debug!("sending mirror to spu"); let mut message = RequestMessage::new_request(request); message.get_mut_header().set_client_id("sc"); diff --git a/crates/fluvio-sc/src/services/public_api/create.rs b/crates/fluvio-sc/src/services/public_api/create.rs index ceb736f721..8845c9ab90 100644 --- a/crates/fluvio-sc/src/services/public_api/create.rs +++ b/crates/fluvio-sc/src/services/public_api/create.rs @@ -14,13 +14,13 @@ use fluvio_sc_schema::{Status, TryEncodableFrom}; use fluvio_sc_schema::objects::{ObjectApiCreateRequest, CreateRequest}; use fluvio_auth::AuthContext; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for create topic request #[instrument(skip(request, auth_context))] pub async fn handle_create_request( request: Box>, - auth_context: &AuthServiceContext, + auth_context: &ScAuthServiceContext, ) -> Result> { let (header, req) = request.get_header_request(); @@ -66,13 +66,13 @@ mod create_handler { use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; - use crate::services::auth::AuthServiceContext; + use crate::services::auth::ScAuthServiceContext; #[instrument(skip(create, spec, auth_ctx, object_ctx, error_code))] pub async fn process( create: CommonCreateRequest, spec: S, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, object_ctx: &StoreContext, error_code: F, ) -> Result diff --git a/crates/fluvio-sc/src/services/public_api/delete.rs b/crates/fluvio-sc/src/services/public_api/delete.rs index 10b26d5792..fb51670e74 100644 --- a/crates/fluvio-sc/src/services/public_api/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/delete.rs @@ -21,13 +21,13 @@ use fluvio_sc_schema::{Status, TryEncodableFrom}; use fluvio_sc_schema::objects::{ObjectApiDeleteRequest, DeleteRequest}; use fluvio_auth::AuthContext; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete topic request #[instrument(skip(request, auth_ctx))] pub async fn handle_delete_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { let (header, del_req) = request.get_header_request(); @@ -75,13 +75,13 @@ mod delete_handler { use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::{core::Spec, extended::SpecExt}; - use crate::services::auth::AuthServiceContext; + use crate::services::auth::ScAuthServiceContext; /// Handler for object delete #[instrument(skip(auth_ctx, object_ctx, error_code, not_found_code))] pub async fn process( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, object_ctx: &StoreContext, error_code: F, not_found_code: G, diff --git a/crates/fluvio-sc/src/services/public_api/list.rs b/crates/fluvio-sc/src/services/public_api/list.rs index 6fa318a78d..d6069f9db6 100644 --- a/crates/fluvio-sc/src/services/public_api/list.rs +++ b/crates/fluvio-sc/src/services/public_api/list.rs @@ -18,13 +18,13 @@ use fluvio_sc_schema::{ }; use fluvio_auth::AuthContext; -use crate::services::{auth::AuthServiceContext, public_api::mirror::handle_list_mirror}; +use crate::services::{auth::ScAuthServiceContext, public_api::mirror::handle_list_mirror}; use super::smartmodule::fetch_smart_modules; #[instrument(skip(request, auth_ctx))] pub async fn handle_list_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { let (header, req) = request.get_header_request(); debug!("list header: {:#?}, request: {:#?}", header, req); @@ -107,12 +107,12 @@ mod fetch { use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_controlplane_metadata::store::KeyFilter; - use crate::services::auth::AuthServiceContext; + use crate::services::auth::ScAuthServiceContext; #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, object_ctx: &StoreContext, ) -> Result, Error> where diff --git a/crates/fluvio-sc/src/services/public_api/mirror/list.rs b/crates/fluvio-sc/src/services/public_api/mirror/list.rs index de18078916..4d5c9dc287 100644 --- a/crates/fluvio-sc/src/services/public_api/mirror/list.rs +++ b/crates/fluvio-sc/src/services/public_api/mirror/list.rs @@ -7,11 +7,11 @@ use fluvio_sc_schema::{ use anyhow::Result; use tracing::{debug, info, trace}; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; pub async fn handle_list_mirror( _filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { info!("mirror-cluster list"); let mirror_list: Vec> = auth_ctx diff --git a/crates/fluvio-sc/src/services/public_api/mirror/register.rs b/crates/fluvio-sc/src/services/public_api/mirror/register.rs index 493ddd83f8..e2cb4ba968 100644 --- a/crates/fluvio-sc/src/services/public_api/mirror/register.rs +++ b/crates/fluvio-sc/src/services/public_api/mirror/register.rs @@ -9,11 +9,11 @@ use fluvio_sc_schema::{ use anyhow::Result; use tracing::info; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; pub async fn handle_register_mirror( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let (create, spec) = req.clone().parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/mirror/unregister.rs b/crates/fluvio-sc/src/services/public_api/mirror/unregister.rs index 405a7f396b..401448727c 100644 --- a/crates/fluvio-sc/src/services/public_api/mirror/unregister.rs +++ b/crates/fluvio-sc/src/services/public_api/mirror/unregister.rs @@ -4,11 +4,11 @@ use fluvio_sc_schema::{core::MetadataItem, Status}; use anyhow::Result; use tracing::info; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; pub async fn handle_unregister_mirror( key: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let name = key; info!(name = name, "unregister mirror cluster"); diff --git a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs index 8696dbf1d1..39c0a73891 100644 --- a/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs +++ b/crates/fluvio-sc/src/services/public_api/mirroring/connect.rs @@ -2,51 +2,54 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; -use fluvio_controlplane_metadata::mirroring::{ - MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse, +use tracing::{debug, error, info, instrument, trace, warn}; +use anyhow::{Result, anyhow}; + +use fluvio_auth::{AuthContext, TypeAction}; +use fluvio_controlplane_metadata::{ + extended::ObjectType, + mirroring::{MirrorConnect, MirroringSpecWrapper, MirroringStatusResponse}, }; use fluvio_future::{task::spawn, timer::sleep}; use fluvio_protocol::api::{RequestHeader, ResponseMessage}; use fluvio_sc_schema::{ core::MetadataItem, - mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus}, + mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus, MirrorType}, spu::SpuSpec, store::ChangeListener, topic::{MirrorConfig, ReplicaSpec, TopicSpec}, }; use fluvio_socket::ExclusiveFlvSink; use fluvio_types::event::StickyEvent; -use tracing::{debug, error, info, instrument, trace}; -use anyhow::{Result, anyhow}; -use crate::core::Context; +use crate::services::auth::ScAuthServiceContext; // This is the entry point for handling mirroring requests // Home clusters will receive requests from remote clusters -pub struct RemoteFetchingFromHomeController { +pub struct RemoteFetchingFromHomeController { req: MirrorConnect, response_sink: ExclusiveFlvSink, end_event: Arc, - ctx: Arc>, header: RequestHeader, + auth_ctx: Arc>, } const MIRRORING_CONTROLLER_INTERVAL: u64 = 5; -impl RemoteFetchingFromHomeController { +impl RemoteFetchingFromHomeController { pub fn start( req: MirrorConnect, response_sink: ExclusiveFlvSink, end_event: Arc, - ctx: Arc>, header: RequestHeader, + auth_ctx: Arc>, ) { let controller = Self { req: req.clone(), response_sink, end_event, - ctx, header, + auth_ctx, }; spawn(controller.dispatch_loop()); @@ -55,13 +58,43 @@ impl RemoteFetchingFromHomeController { #[instrument(skip(self), name = "RemoteFetchingFromHomeControllerLoop")] async fn dispatch_loop(mut self) { use tokio::select; + + // authorization check + if let Ok(authorized) = self + .auth_ctx + .auth + .allow_type_action(ObjectType::RemoteToHomeConnection, TypeAction::Update) + .await + { + if !authorized { + warn!("identity mismatch for remote_id: {}", self.req.remote_id); + return; + } + } + + let ctx = self.auth_ctx.global_ctx.clone(); + + // check if remote cluster exists + let mirrors = ctx.mirrors().store().value(&self.req.remote_id).await; + let remote = mirrors + .iter() + .find(|mirror| match &mirror.spec.mirror_type { + MirrorType::Remote(r) => r.id == self.req.remote_id, + _ => false, + }); + + if remote.is_none() { + warn!("remote cluster not found: {}", self.req.remote_id); + return; + } + info!( name = self.req.remote_id, "received mirroring connect request" ); - let mut topics_listener = self.ctx.topics().change_listener(); - let mut spus_listerner = self.ctx.spus().change_listener(); + let mut topics_listener = ctx.topics().change_listener(); + let mut spus_listerner = ctx.spus().change_listener(); loop { if self @@ -106,9 +139,11 @@ impl RemoteFetchingFromHomeController { return Ok(()); } - let spus = self.ctx.spus().store().clone_values().await; + let ctx = self.auth_ctx.global_ctx.clone(); + + let spus = ctx.spus().store().clone_values().await; - let topics = self.ctx.topics().store().clone_values().await; + let topics = ctx.topics().store().clone_values().await; let mirror_topics = topics .into_iter() .filter_map(|topic| match topic.spec.replicas() { @@ -150,7 +185,7 @@ impl RemoteFetchingFromHomeController { }) .collect::>(); - match self.ctx.mirrors().store().value(&self.req.remote_id).await { + match ctx.mirrors().store().value(&self.req.remote_id).await { Some(remote) => { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -173,8 +208,7 @@ impl RemoteFetchingFromHomeController { now as u64, ); - self.ctx - .mirrors() + ctx.mirrors() .update_status(remote.key.clone(), status) .await?; error!( @@ -192,8 +226,7 @@ impl RemoteFetchingFromHomeController { ConnectionStatus::Online, now as u64, ); - self.ctx - .mirrors() + ctx.mirrors() .update_status(remote.key.clone(), status) .await?; diff --git a/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs b/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs index 4a9ca95698..8a4235f7d3 100644 --- a/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/mirroring/mod.rs @@ -5,14 +5,14 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use fluvio_controlplane_metadata::mirroring::{MirroringRemoteClusterRequest, MirrorConnect}; use fluvio_socket::ExclusiveFlvSink; -use tracing::{info, instrument}; +use tracing::{info, instrument, warn}; use fluvio_auth::AuthContext; use fluvio_protocol::api::RequestMessage; use fluvio_stream_model::core::MetadataItem; use fluvio_sc_schema::mirroring::ObjectMirroringRequest; use fluvio_sc_schema::TryEncodableFrom; use fluvio_types::event::StickyEvent; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; use crate::services::public_api::mirroring::connect::RemoteFetchingFromHomeController; pub enum MirrorRequests { @@ -22,14 +22,13 @@ pub enum MirrorRequests { #[instrument(skip(request, auth_ctx, sink, end_event))] pub fn handle_mirroring_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: Arc>, sink: ExclusiveFlvSink, end_event: Arc, ) -> Result<()> { info!("remote cluster register request {:?}", request); let (header, req) = request.get_header_request(); - let ctx = auth_ctx.global_ctx.clone(); let Ok(req) = try_convert_to_reqs(req) else { return Err(anyhow!("unable to decode request")); @@ -37,7 +36,7 @@ pub fn handle_mirroring_request( match req { MirrorRequests::Connect(req) => { - RemoteFetchingFromHomeController::start(req, sink, end_event, ctx, header); + RemoteFetchingFromHomeController::start(req, sink, end_event, header, auth_ctx); } }; diff --git a/crates/fluvio-sc/src/services/public_api/mod.rs b/crates/fluvio-sc/src/services/public_api/mod.rs index 49278dbc02..0fe60500fa 100644 --- a/crates/fluvio-sc/src/services/public_api/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/mod.rs @@ -26,17 +26,17 @@ mod server { use fluvio_service::FluvioApiServer; use fluvio_auth::Authorization; - use crate::services::auth::AuthGlobalContext; + use crate::services::auth::ScAuthGlobalContext; use super::public_server::PublicService; /// create public server - pub fn start_public_server(ctx: AuthGlobalContext) + pub fn start_public_server(ctx: ScAuthGlobalContext) where A: Authorization + Sync + Send + Debug + 'static, C: MetadataItem + 'static, C::UId: Send + Sync, - AuthGlobalContext: Clone + Debug, + ScAuthGlobalContext: Clone + Debug, ::Context: Send + Sync, { let addr = ctx.global_ctx.config().public_endpoint.clone(); diff --git a/crates/fluvio-sc/src/services/public_api/partition/mod.rs b/crates/fluvio-sc/src/services/public_api/partition/mod.rs index 0da0e8b442..ea4af4a893 100644 --- a/crates/fluvio-sc/src/services/public_api/partition/mod.rs +++ b/crates/fluvio-sc/src/services/public_api/partition/mod.rs @@ -9,13 +9,13 @@ use fluvio_sc_schema::partition::PartitionSpec; use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; #[instrument(skip(_filters, auth_ctx))] pub async fn handle_fetch_request( _filters: ListFilters, system: bool, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { debug!("fetching custom spu list"); diff --git a/crates/fluvio-sc/src/services/public_api/public_server.rs b/crates/fluvio-sc/src/services/public_api/public_server.rs index 4a99efa885..c7fee7b3cd 100644 --- a/crates/fluvio-sc/src/services/public_api/public_server.rs +++ b/crates/fluvio-sc/src/services/public_api/public_server.rs @@ -27,7 +27,7 @@ use fluvio_service::FluvioService; use fluvio_sc_schema::AdminPublicApiKey; use fluvio_sc_schema::AdminPublicDecodedRequest; -use crate::services::auth::{AuthGlobalContext, AuthServiceContext}; +use crate::services::auth::{ScAuthGlobalContext, ScAuthServiceContext}; #[derive(Debug)] pub struct PublicService { @@ -48,7 +48,7 @@ where C: MetadataItem + 'static, ::Context: Send + Sync, { - type Context = AuthGlobalContext; + type Context = ScAuthGlobalContext; type Request = AdminPublicDecodedRequest; #[instrument(skip(self, ctx))] @@ -68,7 +68,7 @@ where })?; debug!(?auth_context); - let service_context = Arc::new(AuthServiceContext::new( + let service_context = Arc::new(ScAuthServiceContext::new( ctx.global_ctx.clone(), auth_context, )); @@ -110,7 +110,7 @@ where "list handler" ), AdminPublicDecodedRequest::MirroringRequest(request) => - super::mirroring::handle_mirroring_request(request, &service_context, shared_sink.clone(), end_event.clone())?, + super::mirroring::handle_mirroring_request(request, service_context.clone(), shared_sink.clone(), end_event.clone())?, AdminPublicDecodedRequest::WatchRequest(request) => super::watch::handle_watch_request( request, diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs index cab1c6e892..0c87d8de3a 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/create.rs @@ -16,13 +16,13 @@ use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; use crate::core::Context; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for smartmodule request #[instrument(skip(req, auth_ctx))] pub async fn handle_create_smartmodule_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let (create, spec) = req.parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs index f0158a8477..4916b754c4 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/delete.rs @@ -9,13 +9,13 @@ use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::smartmodule::{SmartModuleSpec, SmartModulePackageKey}; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete smartmodule request #[instrument(skip(name, auth_ctx))] pub async fn handle_delete_smartmodule( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs index 398f028a38..200dd28ffa 100644 --- a/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs +++ b/crates/fluvio-sc/src/services/public_api/smartmodule/list.rs @@ -90,6 +90,7 @@ mod test { use std::sync::Arc; + use fluvio_auth::root::RootAuthContext; use fluvio_stream_dispatcher::store::StoreContext; use fluvio_stream_model::fixture::TestMeta; use fluvio_stream_model::store::{MetadataStoreObject, LocalStore}; @@ -97,8 +98,6 @@ mod test { SmartModuleSpec, SmartModuleMetadata, SmartModulePackage, FluvioSemVersion, }; - use crate::services::auth::RootAuthContext; - use super::fetch_smart_modules; type TestSmartModuleStore = LocalStore; diff --git a/crates/fluvio-sc/src/services/public_api/spg/create.rs b/crates/fluvio-sc/src/services/public_api/spg/create.rs index 04344f59b7..b17bd4cca9 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/create.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/create.rs @@ -19,7 +19,7 @@ use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; use crate::core::Context; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; const DEFAULT_SPG_CREATE_TIMEOUT: u32 = 120 * 1000; // 2 minutes @@ -27,7 +27,7 @@ const DEFAULT_SPG_CREATE_TIMEOUT: u32 = 120 * 1000; // 2 minutes #[instrument(skip(req, auth_ctx))] pub async fn handle_create_spu_group_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let (create, spg) = req.parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/spg/delete.rs b/crates/fluvio-sc/src/services/public_api/spg/delete.rs index dbd5320c85..43b795a5e0 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/delete.rs @@ -8,13 +8,13 @@ use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::spg::SpuGroupSpec; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete spu group request #[instrument(skip(name, auth_ctx))] pub async fn handle_delete_spu_group( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/spg/fetch.rs b/crates/fluvio-sc/src/services/public_api/spg/fetch.rs index 2d01efbdd6..f23e8e3175 100644 --- a/crates/fluvio-sc/src/services/public_api/spg/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/spg/fetch.rs @@ -10,12 +10,12 @@ use fluvio_auth::{AuthContext, TypeAction}; use fluvio_controlplane_metadata::store::KeyFilter; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_spu_groups_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { debug!("fetching spu groups"); diff --git a/crates/fluvio-sc/src/services/public_api/spu/fetch.rs b/crates/fluvio-sc/src/services/public_api/spu/fetch.rs index ee009a9ee6..ce587a2d4b 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/fetch.rs @@ -9,12 +9,12 @@ use fluvio_auth::{AuthContext, TypeAction}; use fluvio_controlplane_metadata::store::KeyFilter; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_custom_spu_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { debug!("fetching custom spu list"); @@ -62,7 +62,7 @@ pub async fn handle_fetch_custom_spu_request( #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_spus_request( filters: ListFilters, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { debug!("fetching spu list"); diff --git a/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs b/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs index 807939dcf4..0751e1b6e9 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/register_custom_spus_req.rs @@ -16,7 +16,7 @@ use fluvio_auth::{AuthContext, TypeAction}; use fluvio_controlplane_metadata::extended::SpecExt; use crate::core::SharedContext; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; use crate::stores::spu::SpuLocalStorePolicy; pub struct RegisterCustomSpu { @@ -30,7 +30,7 @@ impl RegisterCustomSpu { #[instrument(skip(req, auth_ctx))] pub async fn handle_register_custom_spu_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Status { let (create, spec) = req.parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs b/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs index a336d4533e..3c60929f65 100644 --- a/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs +++ b/crates/fluvio-sc/src/services/public_api/spu/unregister_custom_spus_req.rs @@ -16,13 +16,13 @@ use fluvio_controlplane_metadata::extended::SpecExt; use crate::dispatcher::core::MetadataItem; use crate::stores::spu::{SpuAdminMd, SpuLocalStorePolicy}; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete custom spu request #[instrument(skip(key, auth_ctx))] pub async fn handle_un_register_custom_spu_request( key: CustomSpuKey, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let spu_name = key.to_string(); @@ -90,7 +90,7 @@ pub async fn handle_un_register_custom_spu_request( - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, spu: SpuAdminMd, ) -> Status { let spu_name = spu.key_owned(); diff --git a/crates/fluvio-sc/src/services/public_api/tableformat/create.rs b/crates/fluvio-sc/src/services/public_api/tableformat/create.rs index 0e1478020b..cf0a436041 100644 --- a/crates/fluvio-sc/src/services/public_api/tableformat/create.rs +++ b/crates/fluvio-sc/src/services/public_api/tableformat/create.rs @@ -16,13 +16,13 @@ use fluvio_controlplane_metadata::extended::SpecExt; use fluvio_auth::{AuthContext, TypeAction}; use crate::core::Context; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for tableformat request #[instrument(skip(req, auth_ctx))] pub async fn handle_create_tableformat_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let (create, spec) = req.parts(); let name = create.name; diff --git a/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs b/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs index b8648eccbe..1c66a0798e 100644 --- a/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/tableformat/delete.rs @@ -8,13 +8,13 @@ use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::tableformat::TableFormatSpec; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete tableformat request #[instrument(skip(name, auth_ctx))] pub async fn handle_delete_tableformat( name: String, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { use fluvio_protocol::link::ErrorCode; diff --git a/crates/fluvio-sc/src/services/public_api/topic/create.rs b/crates/fluvio-sc/src/services/public_api/topic/create.rs index 9f534d1384..6d3abd77fa 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/create.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/create.rs @@ -28,13 +28,13 @@ use crate::controllers::topics::policy::{ validate_computed_topic_parameters, validate_mirror_topic_parameter, }; use crate::core::Context; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for create topic request #[instrument(skip(req, auth_ctx))] pub(crate) async fn handle_create_topics_request( req: CreateRequest, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { let (create, topic) = req.parts(); let name = create.name; @@ -192,7 +192,7 @@ async fn validate_topic_request( /// create new topic and wait until all partitions are fully provisioned /// if any partitions are not provisioned in time, this will generate error async fn process_topic_request( - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, name: String, topic_spec: TopicSpec, ) -> Status { diff --git a/crates/fluvio-sc/src/services/public_api/topic/delete.rs b/crates/fluvio-sc/src/services/public_api/topic/delete.rs index 2e699c28f7..235fe1d7e9 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/delete.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/delete.rs @@ -14,14 +14,14 @@ use fluvio_controlplane_metadata::topic::TopicSpec; use fluvio_auth::{AuthContext, InstanceAction}; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; /// Handler for delete topic request #[instrument(skip(topic_name, auth_ctx))] pub async fn handle_delete_topic( topic_name: String, force: bool, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result { info!(%topic_name, "Deleting topic"); diff --git a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs index 09c0d20038..c96b4700ab 100644 --- a/crates/fluvio-sc/src/services/public_api/topic/fetch.rs +++ b/crates/fluvio-sc/src/services/public_api/topic/fetch.rs @@ -8,13 +8,13 @@ use fluvio_sc_schema::topic::TopicSpec; use fluvio_auth::{AuthContext, TypeAction}; use fluvio_controlplane_metadata::extended::SpecExt; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; #[instrument(skip(filters, auth_ctx))] pub async fn handle_fetch_topics_request( filters: ListFilters, system: bool, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, ) -> Result> { debug!("retrieving topic list: {:#?}", filters); diff --git a/crates/fluvio-sc/src/services/public_api/watch.rs b/crates/fluvio-sc/src/services/public_api/watch.rs index 1c105a1455..df5bc5289a 100644 --- a/crates/fluvio-sc/src/services/public_api/watch.rs +++ b/crates/fluvio-sc/src/services/public_api/watch.rs @@ -21,7 +21,7 @@ use fluvio_controlplane_metadata::topic::TopicSpec; use fluvio_controlplane_metadata::smartmodule::SmartModuleSpec; use fluvio_controlplane_metadata::tableformat::TableFormatSpec; -use crate::services::auth::AuthServiceContext; +use crate::services::auth::ScAuthServiceContext; use crate::stores::StoreContext; use fluvio_controlplane_metadata::spg::SpuGroupSpec; @@ -29,7 +29,7 @@ use fluvio_controlplane_metadata::spg::SpuGroupSpec; #[instrument(skip(request, auth_ctx, sink, end_event))] pub fn handle_watch_request( request: RequestMessage, - auth_ctx: &AuthServiceContext, + auth_ctx: &ScAuthServiceContext, sink: ExclusiveFlvSink, end_event: Arc, ) -> Result<()> { diff --git a/crates/fluvio-sc/src/start.rs b/crates/fluvio-sc/src/start.rs index 5d9191c417..fe86804e56 100644 --- a/crates/fluvio-sc/src/start.rs +++ b/crates/fluvio-sc/src/start.rs @@ -5,6 +5,7 @@ use std::{ }; use anyhow::Result; +use fluvio_auth::basic::BasicRbacPolicy; use tracing::info; use fluvio_future::{task::run_block_on, timer::sleep}; @@ -14,7 +15,6 @@ use k8_client::{K8Client, K8Config, memory::MemoryClient}; use crate::{ cli::{ScOpt, TlsConfig, RunMode}, - services::auth::basic::BasicRbacPolicy, config::ScConfig, config::DEFAULT_NAMESPACE, }; @@ -148,9 +148,8 @@ mod proxy { pub use fluvio_future::openssl::TlsAcceptor; use fluvio_auth::x509::X509Authenticator; - use flv_tls_proxy::{ - start as proxy_start, start_with_authenticator as proxy_start_with_authenticator, - }; + use flv_tls_proxy::start_with_authenticator as proxy_start_with_authenticator; + use flv_tls_proxy::start as proxy_start; use crate::{config::ScConfig, cli::TlsConfig}; @@ -168,14 +167,17 @@ mod proxy { let target = config.public_endpoint; info!("starting TLS proxy: {}", proxy_addr); - let result = if let Some(x509_auth_scopes) = config.x509_auth_scopes { - let authenticator = Box::new(X509Authenticator::new(&x509_auth_scopes)); + let proxy_result = if let Some(x509_auth_scopes) = config.x509_auth_scopes { + let authenticator = Box::new(X509Authenticator::new(Some(&x509_auth_scopes))); + proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await + } else if config.tls { + let authenticator = Box::new(X509Authenticator::new(None)); proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await } else { proxy_start(&proxy_addr, tls_acceptor, target).await }; - if let Err(err) = result { + if let Err(err) = proxy_result { print_cli_err!(err); process::exit(-1); } diff --git a/crates/fluvio-sc/test-data/auth_config/policy.json b/crates/fluvio-sc/test-data/auth_config/policy.json index d1c71e0282..cfc8b4733c 100644 --- a/crates/fluvio-sc/test-data/auth_config/policy.json +++ b/crates/fluvio-sc/test-data/auth_config/policy.json @@ -23,6 +23,9 @@ ], "Mirror": [ "All" + ], + "RemoteToHomeConnection": [ + "All" ] }, "Root": { @@ -52,6 +55,9 @@ ], "Mirror": [ "All" + ], + "RemoteToHomeConnection": [ + "All" ] } } diff --git a/crates/fluvio-sc/test-data/auth_config/scopes.json b/crates/fluvio-sc/test-data/auth_config/scopes.json index e2bf7126eb..ccca735331 100644 --- a/crates/fluvio-sc/test-data/auth_config/scopes.json +++ b/crates/fluvio-sc/test-data/auth_config/scopes.json @@ -1,4 +1,4 @@ { "root": ["Root"], - "user1": ["Default"] -} \ No newline at end of file + "user1": ["Default"], +} diff --git a/crates/fluvio-spu/Cargo.toml b/crates/fluvio-spu/Cargo.toml index 3fcfe57035..b7bbdbc824 100644 --- a/crates/fluvio-spu/Cargo.toml +++ b/crates/fluvio-spu/Cargo.toml @@ -50,6 +50,7 @@ mimalloc = { workspace = true } # Fluvio dependencies fluvio = { workspace = true } +fluvio-auth = { workspace = true } fluvio-types = { workspace = true, features = ["events"] } fluvio-storage = { workspace = true, features = ["iterators"] } fluvio-compression = { workspace = true } diff --git a/crates/fluvio-spu/src/config/cli.rs b/crates/fluvio-spu/src/config/cli.rs index 78bfc73b7d..ef880e5fb9 100644 --- a/crates/fluvio-spu/src/config/cli.rs +++ b/crates/fluvio-spu/src/config/cli.rs @@ -8,6 +8,7 @@ use std::io::Error as IoError; use std::process; use std::io::ErrorKind; +use fluvio_future::openssl::SslVerifyMode; use tracing::debug; use tracing::info; use clap::Parser; @@ -145,6 +146,8 @@ impl SpuOpt { config.peer_max_bytes = self.peer_max_bytes; + config.tls = self.tls.tls; + if let Some(smart_engine_max_memory) = self.smart_engine_max_memory { info!( "overriding smart engine max memory: {}", @@ -178,6 +181,7 @@ impl SpuOpt { .ok_or_else(|| IoError::new(ErrorKind::NotFound, "missing ca cert"))?; TlsAcceptor::builder() .map_err(|err| err.into_io_error())? + .with_ssl_verify_mode(SslVerifyMode::PEER) .with_ca_from_pem_file(ca_path) .map_err(|err| err.into_io_error())? } else { diff --git a/crates/fluvio-spu/src/config/spu_config.rs b/crates/fluvio-spu/src/config/spu_config.rs index e15f47161b..31208f8e97 100644 --- a/crates/fluvio-spu/src/config/spu_config.rs +++ b/crates/fluvio-spu/src/config/spu_config.rs @@ -111,6 +111,8 @@ pub struct SpuConfig { pub peer_max_bytes: u32, pub smart_engine: SmartEngineConfig, + + pub tls: bool, } impl Default for SpuConfig { @@ -126,6 +128,7 @@ impl Default for SpuConfig { log: Log::default(), peer_max_bytes: fluvio_storage::FileReplica::PREFER_MAX_LEN, smart_engine: SmartEngineConfig::default(), + tls: false, } } } diff --git a/crates/fluvio-spu/src/control_plane/dispatcher.rs b/crates/fluvio-spu/src/control_plane/dispatcher.rs index 70d5de850e..3f8eec0bfa 100644 --- a/crates/fluvio-spu/src/control_plane/dispatcher.rs +++ b/crates/fluvio-spu/src/control_plane/dispatcher.rs @@ -376,7 +376,7 @@ impl ScDispatcher { ) -> anyhow::Result<()> { let (_, request) = req_msg.get_header_request(); - debug!( message = ?request,"starting remote cluster update"); + debug!("starting remote cluster update"); let actions = if !request.all.is_empty() { debug!( diff --git a/crates/fluvio-spu/src/mirroring/home/connection.rs b/crates/fluvio-spu/src/mirroring/home/connection.rs index a0f65aa2b4..9225b0f41c 100644 --- a/crates/fluvio-spu/src/mirroring/home/connection.rs +++ b/crates/fluvio-spu/src/mirroring/home/connection.rs @@ -6,17 +6,21 @@ use tokio::select; use tracing::{debug, error, instrument, warn}; use anyhow::Result; +use fluvio_auth::{AuthContext, TypeAction}; +use fluvio_controlplane_metadata::mirror::MirrorType; +use fluvio_controlplane_metadata::extended::ObjectType; use fluvio_future::timer::sleep; use fluvio_protocol::api::RequestMessage; use fluvio_spu_schema::server::mirror::StartMirrorRequest; use futures_util::StreamExt; -use fluvio_socket::{FluvioStream, ExclusiveFlvSink}; +use fluvio_socket::{ExclusiveFlvSink, FluvioStream}; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::remote::api_key::MirrorRemoteApiEnum; use crate::mirroring::remote::remote_api::RemoteMirrorRequest; use crate::mirroring::remote::sync::DefaultPartitionSyncRequest; use crate::replication::leader::SharedFileLeaderState; +use crate::services::auth::SpuAuthServiceContext; use super::update_offsets::UpdateHomeOffsetRequest; @@ -59,18 +63,51 @@ impl fmt::Debug for MirrorHomeHandler { impl MirrorHomeHandler { /// start handling mirror request sync from remote /// it is called from public service handler - pub(crate) async fn respond( - ctx: DefaultSharedGlobalContext, + pub(crate) async fn respond( req_msg: RequestMessage, + auth_ctx: &SpuAuthServiceContext, sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + stream: FluvioStream, ) { + // authorization check + if let Ok(authorized) = auth_ctx + .auth + .allow_type_action(ObjectType::RemoteToHomeConnection, TypeAction::Update) + .await + { + if !authorized { + warn!( + "identity mismatch for remote_id: {}", + req_msg.request.remote_cluster_id + ); + return; + } + } + + // check if remote cluster exists + let mirrors = auth_ctx.global_ctx.mirrors_localstore().all_values(); + let remote = mirrors + .iter() + .find(|mirror| match &mirror.spec.mirror_type { + MirrorType::Remote(r) => r.id == req_msg.request.remote_cluster_id, + _ => false, + }); + + if remote.is_none() { + warn!( + "remote cluster not found: {}", + req_msg.request.remote_cluster_id + ); + return; + } + debug!("handling mirror request: {:#?}", req_msg); let remote_replica = req_msg.request.remote_replica; let remote_cluster_id = req_msg.request.remote_cluster_id; let _access_key = req_msg.request.access_key; - if let Some(leader) = ctx + if let Some(leader) = auth_ctx + .global_ctx .leaders_state() .find_mirror_home_leader(&remote_cluster_id, &remote_replica) .await @@ -79,21 +116,19 @@ impl MirrorHomeHandler { // map to actual home let metrics = Arc::new(MirrorRequestMetrics::new()); - // TODO: perform authorization let handler: MirrorHomeHandler = Self { metrics: metrics.clone(), leader, - ctx, + ctx: auth_ctx.global_ctx.clone(), }; if let Err(err) = handler.inner_respond(sink, stream).await { error!("error handling mirror request: {:#?}", err); } } else { - // TODO: handle no home partition warn!( remote_replica, - remote_cluster_id, "no leader replica found for this" + remote_cluster_id, "no leader replica found for this mirror request" ); } } @@ -102,7 +137,7 @@ impl MirrorHomeHandler { async fn inner_respond( self, mut sink: ExclusiveFlvSink, - stream: &mut FluvioStream, + mut stream: FluvioStream, ) -> Result<()> { // first send let mut api_stream = stream.api_stream::(); diff --git a/crates/fluvio-spu/src/mirroring/remote/controller.rs b/crates/fluvio-spu/src/mirroring/remote/controller.rs index 9e5e734755..cab049ce26 100644 --- a/crates/fluvio-spu/src/mirroring/remote/controller.rs +++ b/crates/fluvio-spu/src/mirroring/remote/controller.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use fluvio::config::TlsPolicy; use futures_util::StreamExt; use tokio::select; use tracing::{debug, error, warn, instrument}; @@ -23,7 +24,7 @@ use fluvio_storage::{ReplicaStorage, FileReplica}; use fluvio_socket::{FluvioSocket, FluvioSink}; use fluvio_spu_schema::{Isolation, server::mirror::StartMirrorRequest}; -use fluvio_future::{task::spawn, timer::sleep}; +use fluvio_future::{net::DomainConnector, task::spawn, timer::sleep}; use fluvio_protocol::{record::Offset, api::RequestMessage}; use fluvio_types::event::offsets::OffsetChangeListener; @@ -211,7 +212,7 @@ where (home_socket, tls): (FluvioSocket, bool), backoff: &mut ExponentialBackoff, ) -> Result<()> { - // debug!(home = self.home, "start syncing mirror"); + debug!(home_id = home.id, "start syncing mirror"); let (mut home_sink, mut home_stream) = home_socket.split(); @@ -220,7 +221,6 @@ where home_sink.disable_zerocopy(); } - // let mut home_api_stream = home_stream.api_stream::(); self.send_initial_request(home, &mut home_sink).await?; @@ -352,7 +352,7 @@ where } /// look up home cluster from local store - /// this may retur None if remote cluster is send by SC by time controller is started + /// this may return None if remote cluster is send by SC by time controller is started fn find_home_cluster(&self) -> Option { let read = self.mirror_store.read(); let mirror = read.get(&self.remote_config.home_cluster).cloned(); @@ -424,13 +424,14 @@ where } /// create socket to home, this will always succeed - #[instrument] + #[instrument(skip(self, home))] async fn create_socket_to_home( &self, backoff: &mut ExponentialBackoff, - _home: &Home, + home: &Home, ) -> (FluvioSocket, bool) { - //TODO: implement tls + let tlspolicy = option_tlspolicy(home); + loop { self.state.metrics.increase_conn_count(); @@ -441,12 +442,28 @@ where "trying connect to home", ); - let res = FluvioSocket::connect(endpoint).await; + let res = if let Some(tlspolicy) = &tlspolicy { + match DomainConnector::try_from(tlspolicy.clone()) { + Ok(connector) => { + FluvioSocket::connect_with_connector(endpoint, &(*connector)).await + } + Err(err) => { + error!( + "error establishing tls with leader at: <{}> err: {}", + endpoint, err + ); + self.backoff_and_wait(backoff).await; + continue; + } + } + } else { + FluvioSocket::connect(endpoint).await + }; match res { Ok(socket) => { debug!("connected"); - return (socket, false); + return (socket, tlspolicy.is_some()); } Err(err) => { @@ -473,3 +490,23 @@ fn create_backoff() -> ExponentialBackoff { .build() .unwrap() } + +fn option_tlspolicy(home: &Home) -> Option { + use fluvio::config::{TlsCerts, TlsConfig}; + + let ct = match &home.tls { + Some(ct) => ct, + _ => { + return None; + } + }; + + let certs = TlsCerts { + domain: ct.domain.clone(), + key: ct.client_key.clone(), + cert: ct.client_cert.clone(), + ca_cert: ct.ca_cert.clone(), + }; + let tlscfg = TlsConfig::Inline(certs); + Some(TlsPolicy::from(tlscfg)) +} diff --git a/crates/fluvio-spu/src/mirroring/test/fixture.rs b/crates/fluvio-spu/src/mirroring/test/fixture.rs index a3dca30136..8c3c602e58 100644 --- a/crates/fluvio-spu/src/mirroring/test/fixture.rs +++ b/crates/fluvio-spu/src/mirroring/test/fixture.rs @@ -200,6 +200,7 @@ impl ReplicaConfig { id: self.home_cluster.clone(), remote_id: self.home_cluster.clone(), public_endpoint: self.home_port.clone(), + tls: None, }), }, }]); diff --git a/crates/fluvio-spu/src/mirroring/test/integration.rs b/crates/fluvio-spu/src/mirroring/test/integration.rs index 2fcf7a928a..22db973c7d 100644 --- a/crates/fluvio-spu/src/mirroring/test/integration.rs +++ b/crates/fluvio-spu/src/mirroring/test/integration.rs @@ -1,12 +1,13 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; +use fluvio_auth::root::RootAuthorization; use tracing::debug; use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartitionConfig}; use fluvio_future::timer::sleep; use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey}; -use crate::services::public::create_public_server; +use crate::services::{auth::SpuAuthGlobalContext, public::create_public_server}; use super::fixture::{ReplicaConfig, local_port}; @@ -64,7 +65,10 @@ async fn test_mirroring_new_records() { // start home server debug!("starting home server"); - let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run(); + + let auth_global_ctx = + SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(RootAuthorization::new())); + let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run(); // sleep 1 seconds debug!("waiting for home public server to up"); diff --git a/crates/fluvio-spu/src/services/auth/mod.rs b/crates/fluvio-spu/src/services/auth/mod.rs new file mode 100644 index 0000000000..1cf6476800 --- /dev/null +++ b/crates/fluvio-spu/src/services/auth/mod.rs @@ -0,0 +1,37 @@ +pub use common::*; + +mod common { + + use std::sync::Arc; + use std::fmt::Debug; + + use crate::core::DefaultSharedGlobalContext; + + /// SPU global context with authorization + /// auth is trait object which contains global auth auth policy + #[derive(Clone, Debug)] + pub struct SpuAuthGlobalContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: Arc, + } + + impl SpuAuthGlobalContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: Arc) -> Self { + Self { global_ctx, auth } + } + } + + /// Auth Service Context, this hold individual context that is enough enforce auth + /// for this service context + #[derive(Debug, Clone)] + pub struct SpuAuthServiceContext { + pub global_ctx: DefaultSharedGlobalContext, + pub auth: AC, + } + + impl SpuAuthServiceContext { + pub fn new(global_ctx: DefaultSharedGlobalContext, auth: AC) -> Self { + Self { global_ctx, auth } + } + } +} diff --git a/crates/fluvio-spu/src/services/mod.rs b/crates/fluvio-spu/src/services/mod.rs index 70642db4d4..fbda948ebf 100644 --- a/crates/fluvio-spu/src/services/mod.rs +++ b/crates/fluvio-spu/src/services/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod public; +pub mod auth; pub mod internal; pub use self::internal::create_internal_server; diff --git a/crates/fluvio-spu/src/services/public/mod.rs b/crates/fluvio-spu/src/services/public/mod.rs index b0f3f1ea81..ae3b1c1609 100644 --- a/crates/fluvio-spu/src/services/public/mod.rs +++ b/crates/fluvio-spu/src/services/public/mod.rs @@ -12,6 +12,7 @@ mod conn_context; use std::sync::Arc; use async_trait::async_trait; +use fluvio_auth::Authorization; use fluvio_protocol::api::Request; use fluvio_protocol::api::RequestMessage; use fluvio_protocol::link::ErrorCode; @@ -29,6 +30,8 @@ use fluvio_types::event::StickyEvent; use crate::core::DefaultSharedGlobalContext; use crate::mirroring::home::connection::MirrorHomeHandler; +use crate::services::auth::SpuAuthGlobalContext; +use crate::services::auth::SpuAuthServiceContext; use crate::services::public::consumer_handler::handle_delete_consumer_offset_request; use crate::services::public::consumer_handler::handle_fetch_consumer_offsets_request; use crate::services::public::consumer_handler::handle_update_consumer_offset_request; @@ -39,54 +42,79 @@ use self::offset_request::handle_offset_request; use self::offset_update::handle_offset_update; use self::stream_fetch::{StreamFetchHandler, publishers::StreamPublishers}; use self::conn_context::ConnectionContext; +use std::fmt::Debug; -pub(crate) type SpuPublicServer = - FluvioApiServer; +pub(crate) type SpuPublicServer = + FluvioApiServer, PublicService>; -pub fn create_public_server(addr: String, ctx: DefaultSharedGlobalContext) -> SpuPublicServer { +pub fn create_public_server( + addr: String, + auth_ctx: SpuAuthGlobalContext, +) -> SpuPublicServer +where + A: Authorization + Sync + Send + Debug + 'static, + SpuAuthGlobalContext: Clone + Debug, + ::Context: Send + Sync, +{ info!( - spu_id = ctx.local_spu_id(), + spu_id = auth_ctx.global_ctx.local_spu_id(), %addr, "Starting SPU public service:", ); - FluvioApiServer::new(addr, ctx, PublicService::new()) + FluvioApiServer::new(addr, auth_ctx, PublicService::::new()) } #[derive(Debug)] -pub struct PublicService { - _0: (), // Prevent construction +pub struct PublicService { + data: std::marker::PhantomData, } -impl PublicService { +impl PublicService { pub fn new() -> Self { - PublicService { _0: () } + PublicService { + data: std::marker::PhantomData, + } } } #[async_trait] -impl FluvioService for PublicService { +impl FluvioService for PublicService +where + A: Authorization + Send + Sync, + ::Context: Send + Sync, +{ type Request = SpuServerRequest; - type Context = DefaultSharedGlobalContext; + type Context = SpuAuthGlobalContext; #[instrument(skip(self, context))] async fn respond( self: Arc, - context: DefaultSharedGlobalContext, - socket: FluvioSocket, + context: Self::Context, + mut socket: FluvioSocket, _connection: ConnectInfo, ) -> Result<()> { - let (sink, mut stream) = socket.split(); - + let auth_context = context + .auth + .create_auth_context(&mut socket) + .await + .map_err(|err| { + let io_error: std::io::Error = err.into(); + io_error + })?; + let service_context = SpuAuthServiceContext::new(context.global_ctx.clone(), auth_context); let mut mirror_request: Option> = None; let shutdown = StickyEvent::shared(); - + let (sink, mut stream) = socket.split(); let mut shared_sink = sink.as_shared(); + { let api_stream = stream.api_stream::(); let mut event_stream = api_stream.take_until(shutdown.listen_pinned()); let mut conn_ctx = ConnectionContext::new(); + let context = &context.global_ctx; + loop { let event = event_stream.next().await; match event { @@ -188,7 +216,7 @@ impl FluvioService for PublicService { } if let Some(request) = mirror_request { - MirrorHomeHandler::respond(context, request, shared_sink, &mut stream).await; + MirrorHomeHandler::respond(request, &service_context, shared_sink, stream).await; } shutdown.notify(); diff --git a/crates/fluvio-spu/src/services/public/tests/produce.rs b/crates/fluvio-spu/src/services/public/tests/produce.rs index 4189461a45..224f6c0a3d 100644 --- a/crates/fluvio-spu/src/services/public/tests/produce.rs +++ b/crates/fluvio-spu/src/services/public/tests/produce.rs @@ -1,6 +1,7 @@ -use std::{env::temp_dir, time::Duration}; +use std::{env::temp_dir, sync::Arc, time::Duration}; use fluvio::{SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind}; +use fluvio_auth::root::RootAuthorization; use fluvio_controlplane::replica::Replica; use fluvio_smartmodule::{Record, dataplane::smartmodule::Lookback}; use fluvio_storage::{FileReplica, iterators::FileBatchIterator}; @@ -27,13 +28,17 @@ use flv_util::fixture::ensure_clean_dir; use crate::{ config::SpuConfig, core::GlobalContext, - services::public::{ - create_public_server, - tests::{ - create_filter_records, vec_to_raw_batch, load_wasm_module, create_filter_raw_records, + replication::leader::LeaderReplicaState, + services::{ + auth::SpuAuthGlobalContext, + public::{ + create_public_server, + tests::{ + create_filter_raw_records, create_filter_records, load_wasm_module, + vec_to_raw_batch, + }, }, }, - replication::leader::LeaderReplicaState, }; #[fluvio_future::test(ignore)] @@ -46,8 +51,9 @@ async fn test_produce_basic() { let mut spu_config = SpuConfig::default(); spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -160,7 +166,9 @@ async fn test_produce_invalid_compression() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -234,7 +242,10 @@ async fn test_produce_request_timed_out() { let (leader_ctx, _) = config.leader_replica().await; - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -296,7 +307,10 @@ async fn test_produce_not_waiting_replication() { let (leader_ctx, _) = config.leader_replica().await; let public_addr = config.leader_public_addr(); - let server_end_event = create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -355,8 +369,11 @@ async fn test_produce_waiting_replication() { let (leader_ctx, leader_replica) = config.leader_replica().await; let public_addr = config.leader_public_addr(); + let auth_global_ctx = + SpuAuthGlobalContext::new(leader_ctx.clone(), Arc::new(RootAuthorization::new())); let public_server_end_event = - create_public_server(public_addr.clone(), leader_ctx.clone()).run(); + create_public_server(public_addr.to_owned(), auth_global_ctx.clone()).run(); + let private_server_end_event = create_internal_server(config.leader_addr(), leader_ctx.clone()).run(); @@ -425,7 +442,9 @@ async fn test_produce_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -537,7 +556,9 @@ async fn test_produce_basic_with_smartmodule_with_lookback() { smartmodule.params.set_lookback(Some(Lookback::last(1))); let mut smartmodules = vec![smartmodule]; - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -848,7 +869,9 @@ async fn test_produce_with_deduplication() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1038,7 +1061,9 @@ async fn test_produce_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1124,7 +1149,9 @@ async fn test_dedup_init_smart_engine_memory_overfow() { let ctx = GlobalContext::new_shared_context(spu_config); load_wasm_module(&ctx, FLUVIO_WASM_DEDUPLICATION_FILTER); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs index 311d985fba..c0f7cdfab8 100644 --- a/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs +++ b/crates/fluvio-spu/src/services/public/tests/stream_fetch.rs @@ -2,6 +2,7 @@ use std::{env::temp_dir, path::PathBuf, time::Duration}; use std::sync::Arc; use chrono::{Utc, Days}; +use fluvio_auth::root::RootAuthorization; use fluvio_controlplane::replica::Replica; use fluvio_controlplane::spu_api::update_smartmodule::SmartModule; use fluvio_smartmodule::dataplane::smartmodule::Lookback; @@ -34,6 +35,7 @@ use fluvio_spu_schema::{ fetch::DefaultFetchRequest, }; use fluvio_spu_schema::server::stream_fetch::DefaultStreamFetchRequest; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::public::tests::{vec_to_batch, create_filter_raw_records}; use crate::{ core::GlobalContext, @@ -58,7 +60,9 @@ async fn test_stream_fetch_basic() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -394,7 +398,9 @@ async fn test_stream_fetch_filter( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -585,7 +591,9 @@ async fn test_stream_fetch_filter_individual( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -711,7 +719,9 @@ async fn test_stream_filter_error_fetch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -846,7 +856,9 @@ async fn test_stream_filter_max( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -992,7 +1004,9 @@ async fn test_stream_fetch_map( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1147,7 +1161,9 @@ async fn test_stream_fetch_map_chain( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1307,7 +1323,9 @@ async fn test_stream_fetch_map_error( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1445,7 +1463,9 @@ async fn test_stream_aggregate_fetch_single_batch( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1591,7 +1611,9 @@ async fn test_stream_aggregate_fetch_multiple_batch( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1729,7 +1751,9 @@ async fn test_stream_fetch_and_new_request( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1818,7 +1842,9 @@ async fn test_stream_fetch_array_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -1940,7 +1966,9 @@ async fn test_stream_fetch_filter_map( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2065,7 +2093,9 @@ async fn test_stream_fetch_filter_with_params( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2257,7 +2287,9 @@ async fn test_stream_fetch_invalid_smartmodule( let port = portpicker::pick_unused_port().expect("No free ports left"); let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2316,7 +2348,9 @@ async fn test_stream_metrics() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2494,7 +2528,9 @@ async fn stream_fetch_filter_lookback( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -2655,7 +2691,9 @@ async fn stream_fetch_filter_lookback_age( let addr = format!("127.0.0.1:{port}"); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; @@ -3037,7 +3075,9 @@ async fn test_stream_fetch_sends_topic_delete_error_on_topic_delete() { spu_config.log.base_dir = test_path; let ctx = GlobalContext::new_shared_context(spu_config); - let server_end_event = create_public_server(addr.to_owned(), ctx.clone()).run(); + let auth_global_ctx = + SpuAuthGlobalContext::new(ctx.clone(), Arc::new(RootAuthorization::new())); + let server_end_event = create_public_server(addr.to_owned(), auth_global_ctx.clone()).run(); // wait for stream controller async to start sleep(Duration::from_millis(100)).await; diff --git a/crates/fluvio-spu/src/start.rs b/crates/fluvio-spu/src/start.rs index 3b6e9dd470..6f4a4a9e93 100644 --- a/crates/fluvio-spu/src/start.rs +++ b/crates/fluvio-spu/src/start.rs @@ -1,9 +1,12 @@ +use std::sync::Arc; + +use fluvio_auth::root::RootAuthorization; use fluvio_storage::FileReplica; use crate::config::{SpuConfig, SpuOpt}; +use crate::services::auth::SpuAuthGlobalContext; use crate::services::create_internal_server; -use crate::services::internal::InternalApiServer; -use crate::services::public::{SpuPublicServer, create_public_server}; +use crate::services::public::create_public_server; use crate::core::DefaultSharedGlobalContext; use crate::core::GlobalContext; use crate::control_plane::ScDispatcher; @@ -41,10 +44,7 @@ pub fn main_loop(opt: SpuOpt) { info!(uptime = sys.uptime(), "Uptime in secs"); run_block_on(async move { - let (ctx, internal_server, public_server) = create_services(spu_config.clone(), true, true); - - let _public_shutdown = internal_server.unwrap().run(); - let _private_shutdown = public_server.unwrap().run(); + let ctx = create_services(spu_config.clone(), true, true); init_monitoring(ctx); @@ -66,51 +66,61 @@ pub fn create_services( local_spu: SpuConfig, internal: bool, public: bool, -) -> ( - DefaultSharedGlobalContext, - Option, - Option, -) { +) -> DefaultSharedGlobalContext { let ctx = FileReplicaContext::new_shared_context(local_spu); let public_ep_addr = ctx.config().public_socket_addr().to_owned(); let private_ep_addr = ctx.config().private_socket_addr().to_owned(); - let public_server = if public { - Some(create_public_server(public_ep_addr, ctx.clone())) - } else { - None + if public { + //TODO: spu authorization + //if ctx.config().tls { + // let authorization = Arc::new(RemoteAuthorization::new()); + // let auth_global_ctx = SpuAuthGlobalContext::new(ctx.clone(), authorization); + // let pub_server = create_public_server(public_ep_addr, auth_global_ctx); + // pub_server.run(); + //} else { + // + //} + let authorization = Arc::new(RootAuthorization::new()); + let auth_global_ctx = SpuAuthGlobalContext::new(ctx.clone(), authorization); + let pub_server = create_public_server(public_ep_addr, auth_global_ctx); + pub_server.run(); }; - let internal_server = if internal { - Some(create_internal_server(private_ep_addr, ctx.clone())) - } else { - None + if internal { + let priv_server = create_internal_server(private_ep_addr, ctx.clone()); + priv_server.run(); }; let sc_dispatcher = ScDispatcher::new(ctx.clone()); sc_dispatcher.run(); - (ctx, internal_server, public_server) + ctx } mod proxy { use std::process; + use fluvio_auth::x509::X509Authenticator; use tracing::info; use flv_util::print_cli_err; use fluvio_future::openssl::TlsAcceptor; use crate::config::SpuConfig; - use flv_tls_proxy::start as proxy_start; + use flv_tls_proxy::start_with_authenticator as proxy_start_with_authenticator; pub async fn start_proxy(config: SpuConfig, acceptor: (TlsAcceptor, String)) { let (tls_acceptor, proxy_addr) = acceptor; let target = config.public_endpoint; info!("starting TLS proxy: {}", proxy_addr); - if let Err(err) = proxy_start(&proxy_addr, tls_acceptor, target).await { + let authenticator = Box::new(X509Authenticator::new(None)); + let proxy_result = + proxy_start_with_authenticator(&proxy_addr, tls_acceptor, target, authenticator).await; + + if let Err(err) = proxy_result { print_cli_err!(err); process::exit(-1); } else { diff --git a/k8-util/helm/fluvio-sys/Chart.yaml b/k8-util/helm/fluvio-sys/Chart.yaml index f0e4dd55d3..459f1d546c 100644 --- a/k8-util/helm/fluvio-sys/Chart.yaml +++ b/k8-util/helm/fluvio-sys/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v2 name: fluvio-sys description: A Helm chart for Fluvio type: application -version: 0.9.16 +version: 0.9.17 diff --git a/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml b/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml index 5a258d1cd9..dde553394b 100644 --- a/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml +++ b/k8-util/helm/fluvio-sys/templates/crd_mirror.yaml @@ -46,14 +46,17 @@ spec: type: string publicEndpoint: type: string - keyPair: - type: object - required: ["privateKey", "publicKey"] - properties: - publicKey: - type: string - privateKey: - type: string + tls: + type: object + properties: + domain: + type: string + caCert: + type: string + clientCert: + type: string + clientKey: + type: string additionalPrinterColumns: - name: Mirror Type type: string