From 78335a651ba0af83696fcd1e552d0a48aacdade2 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 4 Apr 2023 16:20:08 +0200 Subject: [PATCH 1/5] tls support improvements, added trust to hostname and ca certificate --- examples/tls/producer.rs | 96 ++++++++++++++++++++++++++++++++++++++++ src/client/mod.rs | 18 ++++++-- src/client/options.rs | 7 +-- src/environment.rs | 73 ++++++++++++++++++------------ 4 files changed, 158 insertions(+), 36 deletions(-) create mode 100644 examples/tls/producer.rs diff --git a/examples/tls/producer.rs b/examples/tls/producer.rs new file mode 100644 index 00000000..d5784137 --- /dev/null +++ b/examples/tls/producer.rs @@ -0,0 +1,96 @@ + +use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; +use tracing::info; +use tracing_subscriber::FmtSubscriber; +use tokio_native_tls::native_tls::Certificate; +use std::fs::File; +use std::io::BufReader; +use std::path::Path; +use openssl::pkcs12::Pkcs12; +use openssl::pkey::{PKey, Private}; +use openssl::x509::X509; + +const BATCH_SIZE: usize = 100; + +#[tokio::main] +async fn main() -> Result<(), Box> { + + let stream_name = String::from("mixing"); + let subscriber = FmtSubscriber::builder().finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + let cert = include_bytes!("/Users/dpalaia/projects/rabbitmq-stream-go-client/compose/tls/tls-gen/basic/result/ca_certificate.pem"); + + let tlsConfiguration = TlsConfiguration::builder() + .trust_hostname(true) + //.trust_certificate(false) + .add_root_certificate(Certificate::from_pem(cert).unwrap()) + .build(); + + + let environment = Environment::builder() + .host("localhost") + .port(5551) + .tls(tlsConfiguration) + .build() + .await?; + + start_publisher( + environment.clone(), + &stream_name, + ).await; + + Ok(()) + +} + +async fn start_publisher( + env: Environment, + // opts: &Opts, + stream: &String, +) -> Result<(), Box> { + + info!("im inside start_publisher"); + let _ = env.stream_creator().create(&stream).await; + + let producer = env + .producer() + .batch_size(BATCH_SIZE) + .build(&stream) + .await?; + + let is_batch_send = true; + tokio::task::spawn(async move { + info!( + "Starting producer with batch size {} and batch send {}", + BATCH_SIZE, is_batch_send + ); + info!("Sending {} simple messages", BATCH_SIZE); + batch_send_simple(&producer).await; + + + }).await?; + info!("end im inside start_publisher"); + Ok(()) +} + +async fn batch_send_simple(producer: &Producer) { + let mut msg = Vec::with_capacity(BATCH_SIZE); + for i in 0..BATCH_SIZE { + msg.push(Message::builder().body(format!("rust message{}", i)).build()); + } + + producer + .batch_send(msg, move |_| async move {}) + .await + .unwrap(); + +} + + +#[derive(Debug)] +enum CertLoadError { + TlsError(tokio_native_tls::native_tls::Error), + Io(String, std::io::Error), +} \ No newline at end of file diff --git a/src/client/mod.rs b/src/client/mod.rs index 9507d4ce..0adce327 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -406,10 +406,20 @@ impl Client { let stream = if broker.tls.enabled() { let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?; - let mut tls_builder = tokio_native_tls::native_tls::TlsConnector::builder(); - tls_builder - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true); + let mut tls_builder: tokio_native_tls::native_tls::TlsConnectorBuilder = tokio_native_tls::native_tls::TlsConnector::builder(); + + + if broker.tls.trust_hostname_enabled() { + tls_builder.danger_accept_invalid_hostnames(true); + } + if broker.tls.trust_certificate_enabled() { + tls_builder.danger_accept_invalid_certs(true); + } else { + if let Some(cert)=broker.tls.get_root_certificate() { + print!("Hello, World!"); + tls_builder.add_root_certificate(cert.clone()); + } + } let conn = tokio_native_tls::TlsConnector::from(tls_builder.build()?); diff --git a/src/client/options.rs b/src/client/options.rs index 70e44812..e4bc64cd 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -42,8 +42,9 @@ impl Default for ClientOptions { collector: Arc::new(NopMetricsCollector {}), tls: TlsConfiguration { enabled: false, - hostname_verification: false, - trust_everything: false, + trust_hostname: false, + trust_certificate: false, + certificate: None, }, } } @@ -51,7 +52,7 @@ impl Default for ClientOptions { impl ClientOptions { pub fn get_tls(&self) -> TlsConfiguration { - self.tls + self.tls.clone() } pub fn enable_tls(&mut self) { diff --git a/src/environment.rs b/src/environment.rs index 3e140a7b..1b60ce0e 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -13,8 +13,11 @@ use crate::{ stream_creator::StreamCreator, RabbitMQStreamResult, }; + +use tokio_native_tls::native_tls::Certificate; + /// Main access point to a node -#[derive(Clone)] +#[derive(Clone)] pub struct Environment { pub(crate) options: EnvironmentOptions, } @@ -108,18 +111,10 @@ impl EnvironmentBuilder { } pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder { + self.0 .client_options - .tls - .trust_everything(tls_configuration.trust_everything_enabled()); - self.0 - .client_options - .tls - .hostname_verification_enable(tls_configuration.hostname_verification_enabled()); - self.0 - .client_options - .tls - .enable(tls_configuration.enabled()); + .tls = tls_configuration; self } @@ -142,27 +137,26 @@ pub struct EnvironmentOptions { } /** Helper for tls configuration */ -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct TlsConfiguration { pub(crate) enabled: bool, - pub(crate) hostname_verification: bool, - pub(crate) trust_everything: bool, + pub(crate) trust_hostname: bool, + pub(crate) trust_certificate: bool, + pub(crate) certificate: Option, } impl Default for TlsConfiguration { fn default() -> TlsConfiguration { TlsConfiguration { enabled: true, - trust_everything: false, - hostname_verification: true, + trust_certificate: false, + trust_hostname: false, + certificate: None, } } } impl TlsConfiguration { - pub fn trust_everything(&mut self, trust_everything: bool) { - self.trust_everything = trust_everything - } pub fn enable(&mut self, enabled: bool) { self.enabled = enabled @@ -172,24 +166,37 @@ impl TlsConfiguration { self.enabled } - pub fn hostname_verification_enable(&mut self, hostname_verification: bool) { - self.hostname_verification = hostname_verification + pub fn get_root_certificate(&self) -> Option<&Certificate> { + self.certificate.as_ref() + } + + pub fn add_root_certificate(&mut self, certificate: Certificate) { + self.certificate = Some(certificate) + } + + pub fn trust_hostname(&mut self, trust_hostname: bool) { + self.trust_hostname = trust_hostname } - pub fn hostname_verification_enabled(&self) -> bool { - self.hostname_verification + pub fn trust_hostname_enabled(&self) -> bool { + self.trust_hostname } - pub fn trust_everything_enabled(&self) -> bool { - self.trust_everything + pub fn trust_certificate(&mut self, trust_certificate: bool) { + self.trust_certificate = trust_certificate } + + pub fn trust_certificate_enabled(&self) -> bool { + self.trust_certificate + } + } pub struct TlsConfigurationBuilder(TlsConfiguration); impl TlsConfigurationBuilder { - pub fn trust_everything(mut self, trust_everything: bool) -> TlsConfigurationBuilder { - self.0.trust_everything = trust_everything; + pub fn trust_certificate(mut self, trust_certificate: bool) -> TlsConfigurationBuilder { + self.0.trust_certificate = trust_certificate; self } @@ -198,11 +205,19 @@ impl TlsConfigurationBuilder { self } - pub fn hostname_verification_enable( + pub fn trust_hostname( mut self, hostname_verification: bool, ) -> TlsConfigurationBuilder { - self.0.hostname_verification = hostname_verification; + self.0.trust_hostname = hostname_verification; + self + } + + pub fn add_root_certificate( + mut self, + certificate: Certificate, + ) -> TlsConfigurationBuilder { + self.0.certificate = Some(certificate); self } From 124615bec016bfe6514ae7b6edab71f522915bdc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 26 Jul 2023 16:05:34 +0200 Subject: [PATCH 2/5] wip new TLS Signed-off-by: Gabriele Santomaggio --- Cargo.toml | 17 ++-- examples/{tls/producer.rs => tls_producer.rs} | 74 ++++++++------- src/client/mod.rs | 94 ++++++++++--------- src/client/options.rs | 2 +- src/environment.rs | 47 ++++------ src/error.rs | 2 +- 6 files changed, 119 insertions(+), 117 deletions(-) rename examples/{tls/producer.rs => tls_producer.rs} (59%) diff --git a/Cargo.toml b/Cargo.toml index 12316825..ff51d76f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" authors = ["wolf4ood ", "korsmakolnikov ", "gsantomaggio "] edition = "2018" license = "Apache-2.0 OR MPL-2.0" -description= "A Rust client for RabbitMQ Stream" +description = "A Rust client for RabbitMQ Stream" repository = "https://github.com/rabbitmq/rabbitmq-stream-rust-client" keywords = ["AMQP", "IoT", "messaging", "streams"] categories = ["network-programming"] @@ -14,17 +14,18 @@ readme = "README.md" [workspace] members = [ - ".", - "protocol", - "benchmark" + ".", + "protocol", + "benchmark" ] [dependencies] -tokio-native-tls = "0.3.0" -rabbitmq-stream-protocol = { version = "0.2" , path = "protocol" } +tokio-rustls = "0.24.1" +rustls-pemfile = "1.0.3" +rabbitmq-stream-protocol = { version = "0.2", path = "protocol" } tokio = { version = "1.12.0", features = ["full"] } -tokio-util = { version = "0.7.3", features = ["codec"] } +tokio-util = { version = "0.7.3", features = ["codec"] } bytes = "1.0.0" pin-project = { version = "1.0.0" } tokio-stream = "0.1.11" @@ -38,5 +39,5 @@ dashmap = "5.3.4" [dev-dependencies] tracing-subscriber = "0.3.1" -fake = { version = "2.4", features=['derive']} +fake = { version = "2.4", features = ['derive'] } chrono = "0.4.19" diff --git a/examples/tls/producer.rs b/examples/tls_producer.rs similarity index 59% rename from examples/tls/producer.rs rename to examples/tls_producer.rs index d5784137..8d31ed94 100644 --- a/examples/tls/producer.rs +++ b/examples/tls_producer.rs @@ -1,64 +1,67 @@ - -use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; +use tokio_native_tls::native_tls::Certificate; use tracing::info; +use tracing_subscriber::fmt::time; use tracing_subscriber::FmtSubscriber; -use tokio_native_tls::native_tls::Certificate; -use std::fs::File; -use std::io::BufReader; -use std::path::Path; -use openssl::pkcs12::Pkcs12; -use openssl::pkey::{PKey, Private}; -use openssl::x509::X509; + +use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; const BATCH_SIZE: usize = 100; #[tokio::main] async fn main() -> Result<(), Box> { - let stream_name = String::from("mixing"); let subscriber = FmtSubscriber::builder().finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let cert = include_bytes!("/Users/dpalaia/projects/rabbitmq-stream-go-client/compose/tls/tls-gen/basic/result/ca_certificate.pem"); + let cert = + include_bytes!("/Users/gas/sw/rabbitmq_server-3.11.11/sbin/certs/ca_certificate.pem"); - let tlsConfiguration = TlsConfiguration::builder() - .trust_hostname(true) + let tls_configuration = TlsConfiguration::builder() + .trust_hostname(false) //.trust_certificate(false) .add_root_certificate(Certificate::from_pem(cert).unwrap()) .build(); - let environment = Environment::builder() .host("localhost") + .username("guest") + .password("guest") .port(5551) - .tls(tlsConfiguration) + .tls(tls_configuration) .build() .await?; + println!("environment = {:?}", 1); + environment.stream_creator().create(&stream_name).await?; + println!("environment = {:?}", 2); + let producer = environment + .producer() + .batch_size(BATCH_SIZE) + .build(&stream_name) + .await?; + println!("environment = {:?}", 3); + // println!("producer = {:?}", producer); - start_publisher( - environment.clone(), - &stream_name, - ).await; + batch_send_simple(&producer).await; - Ok(()) + println!("environment = {:?}", 4); + // start_publisher(environment.clone(), &stream_name).await.expect("TODO: panic message"); + + Ok(()) } async fn start_publisher( env: Environment, - // opts: &Opts, + // opts: &Opts, stream: &String, ) -> Result<(), Box> { - info!("im inside start_publisher"); - let _ = env.stream_creator().create(&stream).await; + let r = env.stream_creator().create(&stream).await; - let producer = env - .producer() - .batch_size(BATCH_SIZE) - .build(&stream) - .await?; + println!("stream_creator = {:?}", r); + + let producer = env.producer().batch_size(BATCH_SIZE).build(&stream).await?; let is_batch_send = true; tokio::task::spawn(async move { @@ -68,9 +71,8 @@ async fn start_publisher( ); info!("Sending {} simple messages", BATCH_SIZE); batch_send_simple(&producer).await; - - - }).await?; + }) + .await?; info!("end im inside start_publisher"); Ok(()) } @@ -78,7 +80,11 @@ async fn start_publisher( async fn batch_send_simple(producer: &Producer) { let mut msg = Vec::with_capacity(BATCH_SIZE); for i in 0..BATCH_SIZE { - msg.push(Message::builder().body(format!("rust message{}", i)).build()); + msg.push( + Message::builder() + .body(format!("rust message{}", i)) + .build(), + ); } producer @@ -86,11 +92,11 @@ async fn batch_send_simple(producer: &Producer) { .await .unwrap(); + println!("batch_send_simple = {:?}", 1); } - #[derive(Debug)] enum CertLoadError { TlsError(tokio_native_tls::native_tls::Error), Io(String, std::io::Error), -} \ No newline at end of file +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 0adce327..61c1ac3f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,16 +1,30 @@ -mod channel; -mod codec; -mod dispatcher; -mod handler; -mod message; -mod metadata; -mod metrics; -mod options; -use crate::{error::ClientError, RabbitMQStreamResult}; +use std::convert::TryFrom; +use std::{ + collections::HashMap, + io, + pin::Pin, + sync::{atomic::AtomicU64, Arc}, + task::{Context, Poll}, + time::{Duration, Instant}, +}; +use std::{future::Future, sync::atomic::Ordering}; + use futures::{ stream::{SplitSink, SplitStream}, Stream, StreamExt, TryFutureExt, }; +use pin_project::pin_project; +use rustls::ServerName; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio::{net::TcpStream, sync::Notify}; +use tokio::{sync::RwLock, task::JoinHandle}; +use tokio_rustls::client::TlsStream; +use tokio_rustls::rustls::ClientConfig; +use tokio_rustls::{rustls, TlsConnector}; +use tokio_util::codec::Framed; +use tracing::trace; pub use message::ClientMessage; pub use metadata::{Broker, StreamMetadata}; @@ -42,8 +56,8 @@ use rabbitmq_stream_protocol::{ types::PublishedMessage, FromResponse, Request, Response, ResponseCode, ResponseKind, }; -use tokio_native_tls::TlsStream; -use tracing::trace; + +use crate::{error::ClientError, RabbitMQStreamResult}; pub use self::handler::{MessageHandler, MessageResult}; use self::{ @@ -53,22 +67,14 @@ use self::{ message::BaseMessage, }; -use pin_project::pin_project; -use std::{ - collections::HashMap, - io, - pin::Pin, - sync::{atomic::AtomicU64, Arc}, - task::{Context, Poll}, - time::{Duration, Instant}, -}; -use std::{future::Future, sync::atomic::Ordering}; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::io::ReadBuf; -use tokio::{net::TcpStream, sync::Notify}; -use tokio::{sync::RwLock, task::JoinHandle}; -use tokio_util::codec::Framed; +mod channel; +mod codec; +mod dispatcher; +mod handler; +mod message; +mod metadata; +mod metrics; +mod options; #[cfg_attr(docsrs, doc(cfg(feature = "tokio-stream")))] #[pin_project(project = StreamProj)] @@ -166,6 +172,7 @@ impl MessageHandler for Client { Ok(()) } } + /// Raw API for taking to RabbitMQ stream /// /// For high level APIs check [`crate::Environment`] @@ -405,27 +412,28 @@ impl Client { > { let stream = if broker.tls.enabled() { let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?; + let cert_bytes = include_bytes!( + "/Users/gas/sw/rabbitmq_server-3.11.11/sbin/certs/ca_certificate.pem" + ); - let mut tls_builder: tokio_native_tls::native_tls::TlsConnectorBuilder = tokio_native_tls::native_tls::TlsConnector::builder(); + let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.as_ref()).unwrap(); + let mut roots = rustls::RootCertStore::empty(); - - if broker.tls.trust_hostname_enabled() { - tls_builder.danger_accept_invalid_hostnames(true); - } - if broker.tls.trust_certificate_enabled() { - tls_builder.danger_accept_invalid_certs(true); - } else { - if let Some(cert)=broker.tls.get_root_certificate() { - print!("Hello, World!"); - tls_builder.add_root_certificate(cert.clone()); - } - } + root_cert_store + .iter() + .for_each(|cert| roots.add(&rustls::Certificate(cert.to_vec())).unwrap()); - let conn = tokio_native_tls::TlsConnector::from(tls_builder.build()?); + let config = ClientConfig::builder() + .with_safe_defaults() + // .with_client_auth_cert(client_certs, client_keys.into_iter().next().unwrap()) + .with_root_certificates(roots) + .with_no_client_auth(); - let stream = conn.connect(broker.host.as_str(), stream).await?; + let connector = TlsConnector::from(Arc::new(config)); + let domain = ServerName::try_from(broker.host.as_str()).unwrap(); + let conn = connector.connect(domain, stream).await?; - GenericTcpStream::SecureTcp(stream) + GenericTcpStream::SecureTcp(conn) } else { let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?; GenericTcpStream::Tcp(stream) diff --git a/src/client/options.rs b/src/client/options.rs index e4bc64cd..36e38746 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -44,7 +44,7 @@ impl Default for ClientOptions { enabled: false, trust_hostname: false, trust_certificate: false, - certificate: None, + // certificate: None, }, } } diff --git a/src/environment.rs b/src/environment.rs index 1b60ce0e..79b136ef 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -14,10 +14,8 @@ use crate::{ RabbitMQStreamResult, }; -use tokio_native_tls::native_tls::Certificate; - /// Main access point to a node -#[derive(Clone)] +#[derive(Clone)] pub struct Environment { pub(crate) options: EnvironmentOptions, } @@ -111,10 +109,7 @@ impl EnvironmentBuilder { } pub fn tls(mut self, tls_configuration: TlsConfiguration) -> EnvironmentBuilder { - - self.0 - .client_options - .tls = tls_configuration; + self.0.client_options.tls = tls_configuration; self } @@ -142,7 +137,7 @@ pub struct TlsConfiguration { pub(crate) enabled: bool, pub(crate) trust_hostname: bool, pub(crate) trust_certificate: bool, - pub(crate) certificate: Option, + // pub(crate) certificate: Option, } impl Default for TlsConfiguration { @@ -151,13 +146,12 @@ impl Default for TlsConfiguration { enabled: true, trust_certificate: false, trust_hostname: false, - certificate: None, + // certificate: None, } } } impl TlsConfiguration { - pub fn enable(&mut self, enabled: bool) { self.enabled = enabled } @@ -166,13 +160,13 @@ impl TlsConfiguration { self.enabled } - pub fn get_root_certificate(&self) -> Option<&Certificate> { - self.certificate.as_ref() - } - - pub fn add_root_certificate(&mut self, certificate: Certificate) { - self.certificate = Some(certificate) - } + // pub fn get_root_certificate(&self) -> Option<&Certificate> { + // self.certificate.as_ref() + // } + // + // pub fn add_root_certificate(&mut self, certificate: Certificate) { + // self.certificate = Some(certificate) + // } pub fn trust_hostname(&mut self, trust_hostname: bool) { self.trust_hostname = trust_hostname @@ -182,14 +176,13 @@ impl TlsConfiguration { self.trust_hostname } - pub fn trust_certificate(&mut self, trust_certificate: bool) { + pub fn trust_certificate(&mut self, trust_certificate: bool) { self.trust_certificate = trust_certificate } pub fn trust_certificate_enabled(&self) -> bool { self.trust_certificate } - } pub struct TlsConfigurationBuilder(TlsConfiguration); @@ -205,21 +198,15 @@ impl TlsConfigurationBuilder { self } - pub fn trust_hostname( - mut self, - hostname_verification: bool, - ) -> TlsConfigurationBuilder { + pub fn trust_hostname(mut self, hostname_verification: bool) -> TlsConfigurationBuilder { self.0.trust_hostname = hostname_verification; self } - pub fn add_root_certificate( - mut self, - certificate: Certificate, - ) -> TlsConfigurationBuilder { - self.0.certificate = Some(certificate); - self - } + // pub fn add_root_certificate(mut self, certificate: Certificate) -> TlsConfigurationBuilder { + // self.0.certificate = Some(certificate); + // self + // } pub fn build(self) -> TlsConfiguration { self.0 diff --git a/src/error.rs b/src/error.rs index 56a8d6b5..9c65b241 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,7 +18,7 @@ pub enum ClientError { #[error("Client already closed")] AlreadyClosed, #[error(transparent)] - Tls(#[from] tokio_native_tls::native_tls::Error), + Tls(#[from] tokio_rustls::rustls::Error), #[error("Request error: {0:?}")] RequestError(ResponseCode), } From 4c2266a5fa7851ec8f929c954bd999dcbb157753 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Thu, 27 Jul 2023 15:35:26 +0200 Subject: [PATCH 3/5] updating tsl library and interface to tokio::tokio-rustls --- Cargo.toml | 2 +- examples/tls_producer.rs | 44 +++-------------------- src/client/mod.rs | 13 +++---- src/client/options.rs | 4 +-- src/environment.rs | 50 ++++++--------------------- tests/integration/environment_test.rs | 4 ++- 6 files changed, 28 insertions(+), 89 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ff51d76f..c22e7565 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ members = [ tokio-rustls = "0.24.1" rustls-pemfile = "1.0.3" rabbitmq-stream-protocol = { version = "0.2", path = "protocol" } -tokio = { version = "1.12.0", features = ["full"] } +tokio = { version = "1.29.1", features = ["full"] } tokio-util = { version = "0.7.3", features = ["codec"] } bytes = "1.0.0" pin-project = { version = "1.0.0" } diff --git a/examples/tls_producer.rs b/examples/tls_producer.rs index 8d31ed94..1034add3 100644 --- a/examples/tls_producer.rs +++ b/examples/tls_producer.rs @@ -1,10 +1,7 @@ -use tokio_native_tls::native_tls::Certificate; +use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; use tracing::info; -use tracing_subscriber::fmt::time; use tracing_subscriber::FmtSubscriber; -use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; - const BATCH_SIZE: usize = 100; #[tokio::main] @@ -14,39 +11,18 @@ async fn main() -> Result<(), Box> { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - let cert = - include_bytes!("/Users/gas/sw/rabbitmq_server-3.11.11/sbin/certs/ca_certificate.pem"); - - let tls_configuration = TlsConfiguration::builder() - .trust_hostname(false) - //.trust_certificate(false) - .add_root_certificate(Certificate::from_pem(cert).unwrap()) + let tls_configuration: TlsConfiguration = TlsConfiguration::builder() + .add_root_certificate(String::from("/path/to/your/certificate-ca.pem")) .build(); let environment = Environment::builder() .host("localhost") - .username("guest") - .password("guest") .port(5551) .tls(tls_configuration) .build() .await?; - println!("environment = {:?}", 1); - environment.stream_creator().create(&stream_name).await?; - println!("environment = {:?}", 2); - let producer = environment - .producer() - .batch_size(BATCH_SIZE) - .build(&stream_name) - .await?; - println!("environment = {:?}", 3); - // println!("producer = {:?}", producer); - - batch_send_simple(&producer).await; - println!("environment = {:?}", 4); - - // start_publisher(environment.clone(), &stream_name).await.expect("TODO: panic message"); + start_publisher(environment.clone(), &stream_name).await; Ok(()) } @@ -57,9 +33,7 @@ async fn start_publisher( stream: &String, ) -> Result<(), Box> { info!("im inside start_publisher"); - let r = env.stream_creator().create(&stream).await; - - println!("stream_creator = {:?}", r); + let _ = env.stream_creator().create(&stream).await; let producer = env.producer().batch_size(BATCH_SIZE).build(&stream).await?; @@ -91,12 +65,4 @@ async fn batch_send_simple(producer: &Producer) { .batch_send(msg, move |_| async move {}) .await .unwrap(); - - println!("batch_send_simple = {:?}", 1); -} - -#[derive(Debug)] -enum CertLoadError { - TlsError(tokio_native_tls::native_tls::Error), - Io(String, std::io::Error), } diff --git a/src/client/mod.rs b/src/client/mod.rs index 61c1ac3f..31660472 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -412,12 +412,14 @@ impl Client { > { let stream = if broker.tls.enabled() { let stream = TcpStream::connect((broker.host.as_str(), broker.port)).await?; - let cert_bytes = include_bytes!( - "/Users/gas/sw/rabbitmq_server-3.11.11/sbin/certs/ca_certificate.pem" - ); - - let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.as_ref()).unwrap(); let mut roots = rustls::RootCertStore::empty(); + let cert = broker.tls.get_root_certificates(); + /*let cert_bytes = include_bytes!( + cert + );*/ + let cert_bytes = std::fs::read(cert); + + let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).unwrap(); root_cert_store .iter() @@ -425,7 +427,6 @@ impl Client { let config = ClientConfig::builder() .with_safe_defaults() - // .with_client_auth_cert(client_certs, client_keys.into_iter().next().unwrap()) .with_root_certificates(roots) .with_no_client_auth(); diff --git a/src/client/options.rs b/src/client/options.rs index 36e38746..6fa92faf 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -42,9 +42,7 @@ impl Default for ClientOptions { collector: Arc::new(NopMetricsCollector {}), tls: TlsConfiguration { enabled: false, - trust_hostname: false, - trust_certificate: false, - // certificate: None, + certificate_path: String::from(""), }, } } diff --git a/src/environment.rs b/src/environment.rs index 79b136ef..557d88a0 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -135,18 +135,14 @@ pub struct EnvironmentOptions { #[derive(Clone)] pub struct TlsConfiguration { pub(crate) enabled: bool, - pub(crate) trust_hostname: bool, - pub(crate) trust_certificate: bool, - // pub(crate) certificate: Option, + pub(crate) certificate_path: String, } impl Default for TlsConfiguration { fn default() -> TlsConfiguration { TlsConfiguration { enabled: true, - trust_certificate: false, - trust_hostname: false, - // certificate: None, + certificate_path: String::from(""), } } } @@ -160,61 +156,37 @@ impl TlsConfiguration { self.enabled } - // pub fn get_root_certificate(&self) -> Option<&Certificate> { - // self.certificate.as_ref() - // } - // - // pub fn add_root_certificate(&mut self, certificate: Certificate) { - // self.certificate = Some(certificate) - // } - - pub fn trust_hostname(&mut self, trust_hostname: bool) { - self.trust_hostname = trust_hostname - } - - pub fn trust_hostname_enabled(&self) -> bool { - self.trust_hostname + pub fn get_root_certificates(&self) -> String { + self.certificate_path.clone() } - - pub fn trust_certificate(&mut self, trust_certificate: bool) { - self.trust_certificate = trust_certificate - } - - pub fn trust_certificate_enabled(&self) -> bool { - self.trust_certificate + // + pub fn add_root_certificate(&mut self, certificate_path: String) { + self.certificate_path = certificate_path } } pub struct TlsConfigurationBuilder(TlsConfiguration); impl TlsConfigurationBuilder { - pub fn trust_certificate(mut self, trust_certificate: bool) -> TlsConfigurationBuilder { - self.0.trust_certificate = trust_certificate; - self - } - pub fn enable(mut self, enable: bool) -> TlsConfigurationBuilder { self.0.enabled = enable; self } - pub fn trust_hostname(mut self, hostname_verification: bool) -> TlsConfigurationBuilder { - self.0.trust_hostname = hostname_verification; + pub fn add_root_certificate(mut self, certificate_path: String) -> TlsConfigurationBuilder { + self.0.certificate_path = certificate_path; self } - // pub fn add_root_certificate(mut self, certificate: Certificate) -> TlsConfigurationBuilder { - // self.0.certificate = Some(certificate); - // self - // } - pub fn build(self) -> TlsConfiguration { self.0 } } + impl TlsConfiguration { pub fn builder() -> TlsConfigurationBuilder { TlsConfigurationBuilder(TlsConfiguration::default()) } } + diff --git a/tests/integration/environment_test.rs b/tests/integration/environment_test.rs index 78014bf1..4d0e0e20 100644 --- a/tests/integration/environment_test.rs +++ b/tests/integration/environment_test.rs @@ -134,6 +134,7 @@ async fn environment_create_streams_with_parameters() { assert_eq!(delete_response.is_ok(), true); } +/* #[tokio::test(flavor = "multi_thread")] async fn environment_fail_tls_connection() { // in this test we try to connect to a server that does not support tls @@ -144,8 +145,9 @@ async fn environment_fail_tls_connection() { .tls(TlsConfiguration::default()) .build() .await; + assert!(matches!( env, Err(rabbitmq_stream_client::error::ClientError::Tls { .. }) )); -} +}*/ From 8909703023d40db0430f9a2b61a7a48d5e3fc55d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Aug 2023 11:58:57 +0200 Subject: [PATCH 4/5] cosmetics Signed-off-by: Gabriele Santomaggio --- examples/tls_producer.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/tls_producer.rs b/examples/tls_producer.rs index 1034add3..d98a754e 100644 --- a/examples/tls_producer.rs +++ b/examples/tls_producer.rs @@ -1,12 +1,13 @@ -use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; use tracing::info; use tracing_subscriber::FmtSubscriber; +use rabbitmq_stream_client::{Environment, NoDedup, Producer, TlsConfiguration, types::Message}; + const BATCH_SIZE: usize = 100; #[tokio::main] async fn main() -> Result<(), Box> { - let stream_name = String::from("mixing"); + let stream_name = String::from("tls_test_stream"); let subscriber = FmtSubscriber::builder().finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); @@ -22,17 +23,15 @@ async fn main() -> Result<(), Box> { .build() .await?; - start_publisher(environment.clone(), &stream_name).await; + start_publisher(environment.clone(), &stream_name).await.expect("error in publisher"); Ok(()) } async fn start_publisher( env: Environment, - // opts: &Opts, stream: &String, ) -> Result<(), Box> { - info!("im inside start_publisher"); let _ = env.stream_creator().create(&stream).await; let producer = env.producer().batch_size(BATCH_SIZE).build(&stream).await?; @@ -46,8 +45,7 @@ async fn start_publisher( info!("Sending {} simple messages", BATCH_SIZE); batch_send_simple(&producer).await; }) - .await?; - info!("end im inside start_publisher"); + .await?; Ok(()) } From 31a621331b43cd5371bf281a963bfbfb8033df62 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 1 Aug 2023 12:04:16 +0200 Subject: [PATCH 5/5] fixing test parsing --- examples/tls_producer.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/tls_producer.rs b/examples/tls_producer.rs index d98a754e..82d644a3 100644 --- a/examples/tls_producer.rs +++ b/examples/tls_producer.rs @@ -1,7 +1,7 @@ use tracing::info; use tracing_subscriber::FmtSubscriber; -use rabbitmq_stream_client::{Environment, NoDedup, Producer, TlsConfiguration, types::Message}; +use rabbitmq_stream_client::{types::Message, Environment, NoDedup, Producer, TlsConfiguration}; const BATCH_SIZE: usize = 100; @@ -23,7 +23,9 @@ async fn main() -> Result<(), Box> { .build() .await?; - start_publisher(environment.clone(), &stream_name).await.expect("error in publisher"); + start_publisher(environment.clone(), &stream_name) + .await + .expect("error in publisher"); Ok(()) } @@ -45,7 +47,7 @@ async fn start_publisher( info!("Sending {} simple messages", BATCH_SIZE); batch_send_simple(&producer).await; }) - .await?; + .await?; Ok(()) }