Skip to content

Commit

Permalink
feat: add auth and tls for mirroring connections
Browse files Browse the repository at this point in the history
feat: add mirroring tls

feat: check cn cert name when export mirror

feat: mirror authorization on sc

feat: mirror authorization on spu

wip debug logs

fix: active peer ssl verify mode for spu tls

chore: fmt and clippy
  • Loading branch information
fraidev committed May 19, 2024
1 parent 247cccb commit 4674750
Show file tree
Hide file tree
Showing 39 changed files with 822 additions and 154 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/fluvio-auth/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub trait AuthContext: Debug {
action: InstanceAction,
key: &str,
) -> Result<bool, AuthError>;

/// check if remote id is allowed
fn allow_remote_id(&self, id: &str) -> bool;
}

#[async_trait]
Expand Down
28 changes: 15 additions & 13 deletions crates/fluvio-auth/src/x509/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ impl ScopeBindings {

#[derive(Debug)]
pub struct X509Authenticator {
scope_bindings: ScopeBindings,
scope_bindings: Option<ScopeBindings>,
}

impl X509Authenticator {
pub fn new(scope_binding_file_path: &Path) -> Self {
Self {
scope_bindings: ScopeBindings::load(scope_binding_file_path)
.expect("unable to create ScopeBindings"),
}
pub fn new(scope_binding_file_path: Option<&Path>) -> Self {
let scope_bindings = scope_binding_file_path.map(|scope_binding_file_path| {
ScopeBindings::load(scope_binding_file_path).expect("unable to create ScopeBindings")
});
Self { scope_bindings }
}

async fn send_authorization_request(
Expand Down Expand Up @@ -79,13 +79,9 @@ impl X509Authenticator {
fn principal_from_tls_stream(tls_stream: &DefaultServerTlsStream) -> Result<String, IoError> {
trace!("tls_stream {:?}", tls_stream);

let peer_certificate = tls_stream.peer_certificate();

trace!("peer_certificate {:?}", peer_certificate);

let client_certificate = tls_stream.peer_certificate().ok_or(IoErrorKind::NotFound)?;

trace!("client_certificate {:?}", tls_stream);
trace!("client_certificate {:?}", client_certificate);

let principal = Self::principal_from_raw_certificate(
&client_certificate
Expand All @@ -96,7 +92,7 @@ impl X509Authenticator {
Ok(principal)
}

fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result<String, IoError> {
pub fn principal_from_raw_certificate(certificate_bytes: &[u8]) -> Result<String, IoError> {
parse_x509_certificate(certificate_bytes)
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.and_then(|(_, parsed_cert)| Self::common_name_from_parsed_certificate(parsed_cert))
Expand Down Expand Up @@ -131,7 +127,13 @@ impl Authenticator for X509Authenticator {
target_tcp_stream: &TcpStream,
) -> Result<bool, IoError> {
let principal = Self::principal_from_tls_stream(incoming_tls_stream)?;
let scopes = self.scope_bindings.get_scopes(&principal);

let scopes = if let Some(scope_bindings) = &self.scope_bindings {
scope_bindings.get_scopes(&principal)
} else {
Vec::new()
};

let authorization_request = AuthRequest::new(principal, scopes);
let success =
Self::send_authorization_request(target_tcp_stream, authorization_request).await?;
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-auth/src/x509/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ impl X509Identity {
principal: req_msg.request.principal,
},
},
Err(_e) => {
Err(err) => {
tracing::error!(%err, "error decoding auth request");
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"connection closed",
))
));
}
}
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ k8-types = { workspace = true , features = ["core"] }
fluvio-cluster = { path = "../fluvio-cluster", default-features = false, features = ["cli"], optional = true }

fluvio = { workspace = true }
fluvio-auth = { workspace = true }
fluvio-socket = { workspace = true }
fluvio-command = { workspace = true }
fluvio-package-index = { workspace = true }
Expand Down
106 changes: 97 additions & 9 deletions crates/fluvio-cli/src/client/remote/export.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use clap::Parser;
use fluvio::config::{TlsConfig, TlsPolicy};
use fluvio_extension_common::{target::ClusterTarget, Terminal};
use fluvio_future::native_tls::{CertBuilder, X509PemBuilder};
use fluvio_sc_schema::{
mirror::{Home, MirrorSpec, MirrorType},
mirror::{ClientTls, Home, MirrorSpec, MirrorType},
remote_file::RemoteMetadataExport,
};
use anyhow::anyhow;

use super::get_admin;

#[derive(Debug, Parser)]
pub struct ExportOpt {
/// id of the remote cluster to export
Expand All @@ -20,9 +20,15 @@ pub struct ExportOpt {
/// override endpoint of the home cluster
#[arg(long, short = 'e')]
public_endpoint: Option<String>,
// id of the home cluster to share
#[arg(name = "c")]
/// id of the home cluster to share
#[arg(long)]
home_id: Option<String>,
/// remote tls certificate
#[arg(long)]
cert: Option<String>,
/// remote tls key
#[arg(long)]
key: Option<String>,
}

impl ExportOpt {
Expand All @@ -31,14 +37,15 @@ impl ExportOpt {
out: Arc<T>,
cluster_target: ClusterTarget,
) -> Result<()> {
let fluvio_config = cluster_target.load()?;
let public_endpoint = if let Some(public_endpoint) = self.public_endpoint {
public_endpoint
public_endpoint.clone()
} else {
let fluvio_config = cluster_target.clone().load()?;
fluvio_config.endpoint
fluvio_config.endpoint.clone()
};
let flv = fluvio::Fluvio::connect_with_config(&fluvio_config).await?;
let admin = flv.admin().await;

let admin = get_admin(cluster_target).await?;
let all_remotes = admin.all::<MirrorSpec>().await?;
let _remote = all_remotes
.iter()
Expand All @@ -50,10 +57,17 @@ impl ExportOpt {

let home_id = self.home_id.clone().unwrap_or_else(|| "home".to_owned());

let tls = get_tls_config(
fluvio_config.clone(),
self.cert.clone(),
self.key.clone(),
self.remote_id.clone(),
)?;
let home_metadata = Home {
id: home_id,
remote_id: self.remote_id,
public_endpoint,
tls,
};

let metadata = RemoteMetadataExport::new(home_metadata);
Expand All @@ -68,3 +82,77 @@ impl ExportOpt {
Ok(())
}
}

#[cfg(unix)]
fn get_tls_config(
fluvio_config: fluvio::config::FluvioConfig,
cert_path: Option<String>,
key_path: Option<String>,
remote_id: String,
) -> Result<Option<ClientTls>> {
match &fluvio_config.tls {
TlsPolicy::Verified(config) => {
let (remote_cert, remote_key, cert_path) = match (cert_path.clone(), key_path) {
(Some(cert), Some(key)) => (
std::fs::read_to_string(cert.clone())?,
std::fs::read_to_string(key)?,
cert,
),
_ => {
return Err(anyhow!(
"remote cert and key are required for a cluster using TLS"
));
}
};

let cert_build = X509PemBuilder::from_path(cert_path)
.map_err(|err| anyhow!("error building cert: {}", err))?;

let cert = cert_build
.build()
.map_err(|err| anyhow!("error building cert: {}", err))?;

let cert_der = cert
.to_der()
.map_err(|err| anyhow!("error converting cert to der: {}", err))?;

let principal = fluvio_auth::x509::X509Authenticator::principal_from_raw_certificate(&cert_der).expect(
"error getting principal from certificate. This should never happen as the certificate is valid",
);

if principal != remote_id {
return Err(anyhow!(
"remote_id: \"{}\" does not match the CN in the certificate: \"{}\"",
remote_id,
principal
));
}

match config {
TlsConfig::Inline(config) => Ok(Some(ClientTls {
domain: config.domain.clone(),
ca_cert: config.ca_cert.clone(),
client_cert: remote_cert,
client_key: remote_key,
})),
TlsConfig::Files(file_config) => Ok(Some(ClientTls {
domain: file_config.domain.clone(),
ca_cert: std::fs::read_to_string(&file_config.ca_cert)?,
client_cert: remote_cert,
client_key: remote_key,
})),
}
}
_ => Ok(None),
}
}

#[cfg(not(unix))]
fn get_tls_config(
_fluvio_config: fluvio::config::FluvioConfig,
_cert_path: Option<String>,
_key_path: Option<String>,
_remote_id: String,
) -> Result<Option<ClientTls>> {
Ok(None)
}
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.28.1"
version = "0.28.2"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
22 changes: 21 additions & 1 deletion crates/fluvio-controlplane-metadata/src/mirror/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use fluvio_protocol::{Encoder, Decoder};
)]
pub struct MirrorSpec {
pub mirror_type: MirrorType,
// TODO: we should add auth
}

impl MirrorSpec {
Expand Down Expand Up @@ -75,4 +74,25 @@ pub struct Home {
pub id: String,
pub remote_id: String,
pub public_endpoint: String,
#[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
pub tls: Option<ClientTls>,
}

#[derive(Clone, PartialEq, Eq, Default, Encoder, Decoder)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct ClientTls {
pub domain: String,
pub ca_cert: String,
pub client_cert: String,
pub client_key: String,
}

impl std::fmt::Debug for ClientTls {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ClientTls: {{ domain: {} }}", self.domain)
}
}
10 changes: 5 additions & 5 deletions crates/fluvio-controlplane-metadata/src/mirror/status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::time::Duration;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -63,7 +62,7 @@ pub struct ConnectionStat {

impl MirrorStatus {
#[cfg(feature = "use_serde")]
pub fn last_seen(&self, since: Duration) -> String {
pub fn last_seen(&self, since: std::time::Duration) -> String {
use humantime_serde::re::humantime;

let since_sec = since.as_secs();
Expand All @@ -87,9 +86,9 @@ impl std::fmt::Display for MirrorStatus {
(MirrorPairStatus::Disabled, ConnectionStatus::Online) => "Disabled",
(MirrorPairStatus::Waiting, ConnectionStatus::Online) => "Waiting",
(MirrorPairStatus::Succesful, ConnectionStatus::Offline) => "Offline",
(MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Failed",
(MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Disabled",
(MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Waiting",
(MirrorPairStatus::Failed, ConnectionStatus::Offline) => "Offline",
(MirrorPairStatus::Disabled, ConnectionStatus::Offline) => "Offline",
(MirrorPairStatus::Waiting, ConnectionStatus::Offline) => "Offline",
};
write!(f, "{}", status)
}
Expand Down Expand Up @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus {
#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;

#[test]
fn test_last_seen() {
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-sc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tracing = { workspace = true }


# Fluvio dependencies
fluvio = { workspace = true }
fluvio-auth = { workspace = true }
fluvio-future = { workspace = true, features = [
"subscriber",
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ impl ScOpt {
config.white_list = self.white_list.into_iter().collect();
config.read_only_metadata = self.run_mode.read_only.is_some();

config.tls = self.tls.tls;

// Set Configuration Authorization Policy

let policy = match self.auth_policy {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-sc/src/config/sc_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct ScConfig {
pub namespace: String,
pub x509_auth_scopes: Option<PathBuf>,
pub white_list: HashSet<String>,
pub tls: bool,
}

impl ::std::default::Default for ScConfig {
Expand All @@ -40,6 +41,7 @@ impl ::std::default::Default for ScConfig {
namespace: DEFAULT_NAMESPACE.to_owned(),
x509_auth_scopes: None,
white_list: HashSet::new(),
tls: false,
}
}
}
Expand Down

0 comments on commit 4674750

Please sign in to comment.