From 9e8e714e4327f71533fc7a88d5e86c198efba4f8 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 31 Aug 2023 12:48:52 -0700 Subject: [PATCH 1/5] changed DEFAULT_SCOPE to DEFAULT_RESOURCE --- sdk/messaging_eventhubs/Cargo.toml | 4 ++-- .../src/amqp/amqp_client.rs | 4 ++-- .../src/amqp/cbs_token_provider.rs | 6 +++--- .../event_hub_token_credential.rs | 20 +++++++++++++++---- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sdk/messaging_eventhubs/Cargo.toml b/sdk/messaging_eventhubs/Cargo.toml index 6c9271bf52..0f7d124bbf 100644 --- a/sdk/messaging_eventhubs/Cargo.toml +++ b/sdk/messaging_eventhubs/Cargo.toml @@ -16,7 +16,7 @@ keywords = ["sdk", "azure", "eventhubs", "amqp", "cloud"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -time = { version = "<=0.3.23", features = ["macros"] } +time = { version = "0.3", features = ["macros"] } url = "2" uuid = { version = "1", features = ["v4"] } const_format = "0.2" @@ -35,7 +35,7 @@ pin-project-lite = "0.2.9" serde = "1" # Azure dependencies -azure_core = { path = "../core", version = "0.13" } +azure_core = { path = "../core" } # AMQP dependencies fe2o3-amqp = { version = "0.8.22" } diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_client.rs b/sdk/messaging_eventhubs/src/amqp/amqp_client.rs index dc1f1a19e8..37de3399b6 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_client.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_client.rs @@ -155,7 +155,7 @@ impl TransportClient for AmqpClient { let access_token = self .connection_scope .credential - .get_token_using_default_scope() + .get_token_using_default_resource() .await?; let token_value = access_token.token.secret(); loop { @@ -195,7 +195,7 @@ impl TransportClient for AmqpClient { let access_token = self .connection_scope .credential - .get_token_using_default_scope() + .get_token_using_default_resource() .await?; let token_value = access_token.token.secret(); loop { diff --git a/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs b/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs index 073b193dcb..462d862ece 100644 --- a/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs +++ b/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs @@ -64,7 +64,7 @@ impl<'a> Future for CbsTokenFut<'a> { let entity_type = self.provider.token_type.entity_type().to_string(); // TODO: reduce clone/to_string let result = match &mut self.provider.token_type { TokenType::SharedAccessToken { credential } => { - let fut = credential.get_token_using_default_scope(); + let fut = credential.get_token_using_default_resource(); pin_mut!(fut); ready!(fut.poll(cx)) } @@ -80,7 +80,7 @@ impl<'a> Future for CbsTokenFut<'a> { azure_core::error::Error::new(azure_core::error::ErrorKind::Credential, e) })?; if is_nearing_expiration(cached, expiration_buffer) { - let fut = credential.get_token_using_default_scope(); + let fut = credential.get_token_using_default_resource(); pin_mut!(fut); let token = ready!(fut.poll(cx))?; *cached = token; @@ -95,7 +95,7 @@ impl<'a> Future for CbsTokenFut<'a> { })?; // GetTokenUsingDefaultScopeAsync - let fut = credential.get_token_using_default_scope(); + let fut = credential.get_token_using_default_resource(); pin_mut!(fut); let token = ready!(fut.poll(cx))?; *cached_token = Some(token.clone()); diff --git a/sdk/messaging_eventhubs/src/authorization/event_hub_token_credential.rs b/sdk/messaging_eventhubs/src/authorization/event_hub_token_credential.rs index 3f9e62d58c..beb9c52cb7 100644 --- a/sdk/messaging_eventhubs/src/authorization/event_hub_token_credential.rs +++ b/sdk/messaging_eventhubs/src/authorization/event_hub_token_credential.rs @@ -65,7 +65,10 @@ impl EventHubTokenCredential { } impl EventHubTokenCredential { - pub(crate) const DEFAULT_SCOPE: &str = "https://eventhubs.azure.net/.default"; + // pub(crate) const DEFAULT_SCOPE: &str = "https://eventhubs.azure.net/.default"; + + // `azure_identity` appends "/.default" to the resource internally. + pub(crate) const DEFAULT_RESOURCE: &str = "https://eventhubs.azure.net/"; /// Gets a `TokenResponse` for the specified resource pub(crate) async fn get_token(&self, resource: &str) -> azure_core::Result { @@ -77,8 +80,12 @@ impl EventHubTokenCredential { } } - pub(crate) async fn get_token_using_default_scope(&self) -> azure_core::Result { - self.get_token(Self::DEFAULT_SCOPE).await + // pub(crate) async fn get_token_using_default_scope(&self) -> azure_core::Result { + // self.get_token(Self::DEFAULT_SCOPE).await + // } + + pub(crate) async fn get_token_using_default_resource(&self) -> azure_core::Result { + self.get_token(Self::DEFAULT_RESOURCE).await } } @@ -133,7 +140,12 @@ cfg_not_wasm32! { use azure_identity::DefaultAzureCredential; let default_credential = DefaultAzureCredential::default(); - let _event_hub_token_credential = EventHubTokenCredential::from(default_credential); + let event_hub_token_credential = EventHubTokenCredential::from(default_credential); + let token = event_hub_token_credential + .get_token_using_default_resource() + .await + .unwrap(); + assert!(!token.token.secret().is_empty()) } } } From 5f018e3990be382d05f0c758899dbc3423a689de Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 31 Aug 2023 13:53:54 -0700 Subject: [PATCH 2/5] use box::pin for cbs provider future --- .../examples/consumer_read_from_position.rs | 4 +- .../src/amqp/amqp_cbs_link.rs | 7 + .../src/amqp/amqp_connection_scope.rs | 6 + .../src/amqp/cbs_token_provider.rs | 154 ++++-------------- .../src/amqp/token_type.rs | 5 - .../src/event_hubs_connection.rs | 1 + .../src/producer/event_hub_producer_client.rs | 1 + .../tests/event_hub_connection_live_tests.rs | 23 ++- .../event_hub_producer_client_live_tests.rs | 28 +++- 9 files changed, 101 insertions(+), 128 deletions(-) diff --git a/sdk/messaging_eventhubs/examples/consumer_read_from_position.rs b/sdk/messaging_eventhubs/examples/consumer_read_from_position.rs index ef8c80ece5..36e415b7dd 100644 --- a/sdk/messaging_eventhubs/examples/consumer_read_from_position.rs +++ b/sdk/messaging_eventhubs/examples/consumer_read_from_position.rs @@ -3,10 +3,10 @@ use std::time::Duration; -use futures_util::StreamExt; -use messaging_eventhubs::consumer::{ +use azeventhubs::consumer::{ EventHubConsumerClient, EventHubConsumerClientOptions, EventPosition, ReadEventOptions, }; +use futures_util::StreamExt; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs b/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs index 8a325ca232..ee505e013d 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs @@ -82,6 +82,9 @@ impl AmqpCbsLinkHandle { .send(command) .await .map_err(|_| AmqpCbsEventLoopStopped {})?; + + println!("request_refreshable_authorization: waiting for result"); + result.await.map_err(|_| AmqpCbsEventLoopStopped {}) } @@ -170,12 +173,16 @@ impl AmqpCbsLink { resource: impl AsRef, required_claims: impl IntoIterator>, ) -> Result, CbsAuthError> { + println!("request_authorization_using_cbs"); + let resource = resource.as_ref(); let token = self .cbs_token_provider .get_token_async(endpoint, resource, required_claims) .await?; + println!("request_authorization_using_cbs: got token"); + // find the smallest timeout let expires_at_utc = token.expires_at_utc().clone().map(OffsetDateTime::from); diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs b/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs index 1fe1592af0..f305b3cf14 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs @@ -267,6 +267,8 @@ impl AmqpConnectionScope { ) -> Result, OpenProducerError> { use std::borrow::Cow; + println!("open_producer_link"); + let path: Cow = match &partition_id { None => Cow::Borrowed(&self.event_hub_name), Some(partition_id) if partition_id.is_empty() => Cow::Borrowed(&self.event_hub_name), @@ -315,6 +317,8 @@ impl AmqpConnectionScope { return Err(OpenProducerError::ConnectionScopeDisposed); } + println!("create_sending_session_and_link"); + // Perform the initial authorization for the link. let auth_claims = vec![event_hub_claim::SEND.to_string()]; let resource = endpoint.to_string(); @@ -326,6 +330,8 @@ impl AmqpConnectionScope { ) .await?; + println!("requested refreshable authorization using cbs"); + // Create and open the AMQP session associated with the link. let mut session_handle = Session::begin(&mut self.connection.handle).await?; diff --git a/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs b/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs index 462d862ece..4386ce777a 100644 --- a/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs +++ b/sdk/messaging_eventhubs/src/amqp/cbs_token_provider.rs @@ -1,10 +1,8 @@ use azure_core::auth::TokenResponse; use fe2o3_amqp_cbs::{token::CbsToken, AsyncCbsTokenProvider}; use fe2o3_amqp_types::primitives::Timestamp; -use futures_util::{pin_mut, ready}; -use std::{future::Future, sync::Arc, task::Poll}; +use std::{future::Future, sync::Arc, pin::Pin}; use time::Duration as TimeSpan; -use tokio::sync::Semaphore; use crate::authorization::event_hub_token_credential::EventHubTokenCredential; @@ -31,9 +29,6 @@ impl CbsTokenProvider { } else { TokenType::JsonWebToken { credential, - // Tokens are only cached for JWT-based credentials; no need - // to instantiate the semaphore if no caching is taking place. - semaphore: Semaphore::new(1), cached_token: None, } }; @@ -49,74 +44,8 @@ fn is_nearing_expiration(token: &TokenResponse, token_expiration_buffer: TimeSpa token.expires_on - token_expiration_buffer <= crate::util::time::now_utc() } -pub struct CbsTokenFut<'a> { - provider: &'a mut CbsTokenProvider, -} - -impl<'a> Future for CbsTokenFut<'a> { - type Output = Result, azure_core::error::Error>; - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll { - let expiration_buffer = self.provider.token_expiration_buffer; - let entity_type = self.provider.token_type.entity_type().to_string(); // TODO: reduce clone/to_string - let result = match &mut self.provider.token_type { - TokenType::SharedAccessToken { credential } => { - let fut = credential.get_token_using_default_resource(); - pin_mut!(fut); - ready!(fut.poll(cx)) - } - TokenType::JsonWebToken { - credential, - semaphore, - cached_token, - } => match cached_token { - Some(cached) => { - let fut = semaphore.acquire(); - pin_mut!(fut); - let _permit = ready!(fut.poll(cx)).map_err(|e| { - azure_core::error::Error::new(azure_core::error::ErrorKind::Credential, e) - })?; - if is_nearing_expiration(cached, expiration_buffer) { - let fut = credential.get_token_using_default_resource(); - pin_mut!(fut); - let token = ready!(fut.poll(cx))?; - *cached = token; - } - Ok(cached.clone()) - } - None => { - let fut = semaphore.acquire(); - pin_mut!(fut); - let _permit = ready!(fut.poll(cx)).map_err(|e| { - azure_core::error::Error::new(azure_core::error::ErrorKind::Credential, e) - })?; - - // GetTokenUsingDefaultScopeAsync - let fut = credential.get_token_using_default_resource(); - pin_mut!(fut); - let token = ready!(fut.poll(cx))?; - *cached_token = Some(token.clone()); - Ok(token) - } - }, - }; - - match result { - Ok(token) => Poll::Ready(Ok(CbsToken::new( - token.token.secret().to_owned(), - entity_type, - Some(Timestamp::from(token.expires_on)), - ))), - Err(err) => Poll::Ready(Err(err)), - } - } -} - impl AsyncCbsTokenProvider for CbsTokenProvider { - type Fut<'a> = CbsTokenFut<'a>; + type Fut<'a> = Pin, azure_core::error::Error>> + Send + 'a>>; type Error = azure_core::error::Error; fn get_token_async( @@ -125,7 +54,39 @@ impl AsyncCbsTokenProvider for CbsTokenProvider { _resource_id: impl AsRef, _claims: impl IntoIterator>, ) -> Self::Fut<'_> { - CbsTokenFut { provider: self } + // CbsTokenFut { provider: self } + Box::pin(async { + let expiration_buffer = self.token_expiration_buffer; + let entity_type = self.token_type.entity_type().to_string(); + + let result = match &mut self.token_type { + TokenType::SharedAccessToken { credential } => { + let token = credential.get_token_using_default_resource().await?; + Ok(token) + }, + TokenType::JsonWebToken { credential, cached_token } => { + match cached_token { + Some(cached) => { + if is_nearing_expiration(cached, expiration_buffer) { + let token = credential.get_token_using_default_resource().await?; + *cached = token.clone(); + } + Ok(cached.clone()) + }, + None => { + let token = credential.get_token_using_default_resource().await?; + *cached_token = Some(token.clone()); + Ok(token) + } + } + } + }; + result.map(|token| CbsToken::new( + token.token.secret().to_owned(), + entity_type, + Some(Timestamp::from(token.expires_on)), + )) + }) } } @@ -266,50 +227,5 @@ mod tests { second_token.expires_at_utc().clone() ); } - - // // TODO: This cannot be mock tested right now - // #[tokio::test] - // async fn get_token_does_not_cache_shared_access_credential() { - // // var value = "TOkEn!"; - // // var signature = new SharedAccessSignature("hub-name", "keyName", "key", value, DateTimeOffset.UtcNow.AddHours(4)); - // let signature = SharedAccessSignature::try_from_parts( - // "sb-name", - // "keyName", - // "key", - // Some(std::time::Duration::from_secs(4 * 60 * 60)), - // ).unwrap(); - // } - - // // TODO: This requires dispatching token provider into tasks, so a mutex is required - // #[tokio::test] - // async fn get_token_synchronizes_multiple_refresh_attempts_for_jwt_tokens() { - // let token_value = "ValuE_oF_tHE_tokEn"; - // let buffer = TimeSpan::minutes(5); - // let expires_on: OffsetDateTime = - // crate::util::time::now_utc() - buffer + TimeSpan::seconds(-10); - // let mut mock_credential = MockTokenCredential::new(); - - // let mut seq = Sequence::new(); - // mock_credential - // .expect_get_token() - // .times(1) - // .in_sequence(&mut seq) - // .returning(move |_resource| { - // Ok(TokenResponse { - // token: AccessToken::new(token_value), - // expires_on: expires_on, - // }) - // }); - // mock_credential - // .expect_get_token() - // .times(1) - // .in_sequence(&mut seq) - // .returning(move |_resource| { - // Ok(TokenResponse { - // token: AccessToken::new(token_value), - // expires_on: crate::util::time::now_utc() + TimeSpan::days(1), - // }) - // }); - // } } } diff --git a/sdk/messaging_eventhubs/src/amqp/token_type.rs b/sdk/messaging_eventhubs/src/amqp/token_type.rs index 214faf8ab5..8e2aa52b4b 100644 --- a/sdk/messaging_eventhubs/src/amqp/token_type.rs +++ b/sdk/messaging_eventhubs/src/amqp/token_type.rs @@ -1,6 +1,5 @@ use azure_core::auth::TokenResponse; use std::sync::Arc; -use tokio::sync::Semaphore; use crate::{ authorization::event_hub_token_credential::EventHubTokenCredential, @@ -17,10 +16,6 @@ pub(crate) enum TokenType { JsonWebToken { credential: Arc, - /// Tokens are only cached for JWT-based credentials; no need - /// to instantiate the semaphore if no caching is taking place. - semaphore: Semaphore, - /// The JWT-based token that is currently cached for authorization. cached_token: Option, }, diff --git a/sdk/messaging_eventhubs/src/event_hubs_connection.rs b/sdk/messaging_eventhubs/src/event_hubs_connection.rs index 4bef2f8f0c..043a0a2f88 100644 --- a/sdk/messaging_eventhubs/src/event_hubs_connection.rs +++ b/sdk/messaging_eventhubs/src/event_hubs_connection.rs @@ -274,6 +274,7 @@ where RP: EventHubsRetryPolicy + Send, C::OpenProducerError: Into, { + println!("create_transport_producer"); match &mut self.inner { Sharable::Owned(c) => c .create_producer( diff --git a/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs b/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs index 8778ce4c42..94594127b3 100644 --- a/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs +++ b/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs @@ -329,6 +329,7 @@ where retry_policy, ) .await?; + println!("created producer"); self.producer_pool .insert(partition_id.to_string(), producer); } diff --git a/sdk/messaging_eventhubs/tests/event_hub_connection_live_tests.rs b/sdk/messaging_eventhubs/tests/event_hub_connection_live_tests.rs index 5262914446..2b2956ca89 100644 --- a/sdk/messaging_eventhubs/tests/event_hub_connection_live_tests.rs +++ b/sdk/messaging_eventhubs/tests/event_hub_connection_live_tests.rs @@ -56,7 +56,7 @@ cfg_not_wasm32! { #[tokio::test] async fn connection_can_connect_with_named_key_credential() { common::setup_dotenv(); - use messaging_eventhubs::authorization::{ + use azeventhubs::authorization::{ SharedAccessCredential, AzureNamedKeyCredential, build_connection_signature_authorization_resource, }; @@ -79,4 +79,25 @@ cfg_not_wasm32! { ).await.unwrap(); connection.close().await.unwrap(); } + + #[tokio::test] + async fn connection_can_connect_with_azure_identity_credential() { + common::setup_dotenv(); + + use azure_identity::DefaultAzureCredential; + + let namespace = std::env::var("EVENT_HUBS_NAMESPACE").unwrap(); + let fqn = format!("{}.servicebus.windows.net", namespace); + let event_hub_name = std::env::var("EVENT_HUB_NAME").unwrap(); + + let options = EventHubConnectionOptions::default(); + let credential = DefaultAzureCredential::default(); + + let connection = EventHubConnection::from_namespace_and_credential( + fqn, + event_hub_name, + credential, + options, + ).await.unwrap(); + } } diff --git a/sdk/messaging_eventhubs/tests/event_hub_producer_client_live_tests.rs b/sdk/messaging_eventhubs/tests/event_hub_producer_client_live_tests.rs index b339c9bd03..c24bf56bba 100644 --- a/sdk/messaging_eventhubs/tests/event_hub_producer_client_live_tests.rs +++ b/sdk/messaging_eventhubs/tests/event_hub_producer_client_live_tests.rs @@ -28,7 +28,7 @@ cfg_not_wasm32! { #[tokio::test] async fn producer_client_can_connect_using_named_key_credential() { - use messaging_eventhubs::authorization::{AzureNamedKeyCredential}; + use azeventhubs::authorization::{AzureNamedKeyCredential}; common::setup_dotenv(); @@ -54,6 +54,32 @@ cfg_not_wasm32! { producer_client.close().await.unwrap(); } + #[tokio::test] + async fn producer_client_can_connect_using_azure_identity_credential() { + use azure_identity::DefaultAzureCredential; + + common::setup_dotenv(); + + let namespace = std::env::var("EVENT_HUBS_NAMESPACE").unwrap(); + let fqn = format!("{}.servicebus.windows.net", namespace); + let event_hub_name = std::env::var("EVENT_HUB_NAME").unwrap(); + let options = EventHubProducerClientOptions::default(); + let default_credential = DefaultAzureCredential::default(); + + let mut producer_client = EventHubProducerClient::from_namespace_and_credential( + fqn, + event_hub_name, + default_credential, + options, + ).await.unwrap(); + + let event = "test connect using azure identity"; + let options = SendEventOptions::new().with_partition_id("0"); + producer_client.send_event(event, options).await.unwrap(); + + producer_client.close().await.unwrap(); + } + #[tokio::test] async fn close_producer_client_does_not_close_shared_connection() { common::setup_dotenv(); From e8dc5b7f084252b4c6322a5570901e5e34b80c5f Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 31 Aug 2023 14:16:46 -0700 Subject: [PATCH 3/5] added example for azure_identity --- .../examples/azure_identity_credential.rs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 sdk/messaging_eventhubs/examples/azure_identity_credential.rs diff --git a/sdk/messaging_eventhubs/examples/azure_identity_credential.rs b/sdk/messaging_eventhubs/examples/azure_identity_credential.rs new file mode 100644 index 0000000000..17347841b1 --- /dev/null +++ b/sdk/messaging_eventhubs/examples/azure_identity_credential.rs @@ -0,0 +1,29 @@ +use azeventhubs::producer::{ + EventHubProducerClient, EventHubProducerClientOptions, SendEventOptions, +}; +use azure_identity::DefaultAzureCredential; + +#[tokio::main] +async fn main() -> Result<(), Box> { + + let namespace = std::env::var("EVENT_HUBS_NAMESPACE")?; + let fqn = format!("{}.servicebus.windows.net", namespace); + let event_hub_name = std::env::var("EVENT_HUB_NAME")?; + let options = EventHubProducerClientOptions::default(); + let default_credential = DefaultAzureCredential::default(); + + let mut producer_client = EventHubProducerClient::from_namespace_and_credential( + fqn, + event_hub_name, + default_credential, + options, + ).await?; + + let event = "test connect using azure identity"; + let options = SendEventOptions::new().with_partition_id("0"); + producer_client.send_event(event, options).await?; + + producer_client.close().await?; + + Ok(()) +} From de596df36189b49a90ad1e036c06842c0e764c49 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 31 Aug 2023 14:21:42 -0700 Subject: [PATCH 4/5] renamed example --- ...y_credential.rs => producer_with_azure_identity_credential.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sdk/messaging_eventhubs/examples/{azure_identity_credential.rs => producer_with_azure_identity_credential.rs} (100%) diff --git a/sdk/messaging_eventhubs/examples/azure_identity_credential.rs b/sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs similarity index 100% rename from sdk/messaging_eventhubs/examples/azure_identity_credential.rs rename to sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs From 3960298ec5a42d5c47d0f5ac8a10b2ac7e689c99 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 31 Aug 2023 14:41:51 -0700 Subject: [PATCH 5/5] removed debug info --- .../examples/producer_with_azure_identity_credential.rs | 4 ++-- sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs | 6 ------ sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs | 6 ------ sdk/messaging_eventhubs/src/amqp/amqp_phantom_message.rs | 7 ++----- sdk/messaging_eventhubs/src/event_hubs_connection.rs | 1 - .../src/producer/event_hub_producer_client.rs | 1 - 6 files changed, 4 insertions(+), 21 deletions(-) diff --git a/sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs b/sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs index 17347841b1..de479f4e12 100644 --- a/sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs +++ b/sdk/messaging_eventhubs/examples/producer_with_azure_identity_credential.rs @@ -5,7 +5,6 @@ use azure_identity::DefaultAzureCredential; #[tokio::main] async fn main() -> Result<(), Box> { - let namespace = std::env::var("EVENT_HUBS_NAMESPACE")?; let fqn = format!("{}.servicebus.windows.net", namespace); let event_hub_name = std::env::var("EVENT_HUB_NAME")?; @@ -17,7 +16,8 @@ async fn main() -> Result<(), Box> { event_hub_name, default_credential, options, - ).await?; + ) + .await?; let event = "test connect using azure identity"; let options = SendEventOptions::new().with_partition_id("0"); diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs b/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs index ee505e013d..fc6b74eaa5 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_cbs_link.rs @@ -83,8 +83,6 @@ impl AmqpCbsLinkHandle { .await .map_err(|_| AmqpCbsEventLoopStopped {})?; - println!("request_refreshable_authorization: waiting for result"); - result.await.map_err(|_| AmqpCbsEventLoopStopped {}) } @@ -173,16 +171,12 @@ impl AmqpCbsLink { resource: impl AsRef, required_claims: impl IntoIterator>, ) -> Result, CbsAuthError> { - println!("request_authorization_using_cbs"); - let resource = resource.as_ref(); let token = self .cbs_token_provider .get_token_async(endpoint, resource, required_claims) .await?; - println!("request_authorization_using_cbs: got token"); - // find the smallest timeout let expires_at_utc = token.expires_at_utc().clone().map(OffsetDateTime::from); diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs b/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs index f305b3cf14..1fe1592af0 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_connection_scope.rs @@ -267,8 +267,6 @@ impl AmqpConnectionScope { ) -> Result, OpenProducerError> { use std::borrow::Cow; - println!("open_producer_link"); - let path: Cow = match &partition_id { None => Cow::Borrowed(&self.event_hub_name), Some(partition_id) if partition_id.is_empty() => Cow::Borrowed(&self.event_hub_name), @@ -317,8 +315,6 @@ impl AmqpConnectionScope { return Err(OpenProducerError::ConnectionScopeDisposed); } - println!("create_sending_session_and_link"); - // Perform the initial authorization for the link. let auth_claims = vec![event_hub_claim::SEND.to_string()]; let resource = endpoint.to_string(); @@ -330,8 +326,6 @@ impl AmqpConnectionScope { ) .await?; - println!("requested refreshable authorization using cbs"); - // Create and open the AMQP session associated with the link. let mut session_handle = Session::begin(&mut self.connection.handle).await?; diff --git a/sdk/messaging_eventhubs/src/amqp/amqp_phantom_message.rs b/sdk/messaging_eventhubs/src/amqp/amqp_phantom_message.rs index 1674507b30..fef8de03f7 100644 --- a/sdk/messaging_eventhubs/src/amqp/amqp_phantom_message.rs +++ b/sdk/messaging_eventhubs/src/amqp/amqp_phantom_message.rs @@ -321,17 +321,14 @@ mod tests { let message_iter = std::iter::once(event.amqp_message); let batch = build_amqp_batch_from_messages(message_iter.clone(), None).unwrap(); - let serialized_value = serialized_value_of_sendable(batch.sendable); - println!("serialized_value: {:?}", serialized_value); + let _serialized_value = serialized_value_of_sendable(batch.sendable); let batch = build_amqp_batch_from_messages(message_iter.clone(), None).unwrap(); - let serialized_bytes = serialized_bytes_of_sendable(batch.sendable); - println!("serialized_bytes: {:?}", serialized_bytes); + let _serialized_bytes = serialized_bytes_of_sendable(batch.sendable); let batch = build_amqp_batch_from_messages(message_iter, None).unwrap(); let (phantom_size, ssize) = phantom_size_and_serialized_size_of_sendable_envelope(batch.sendable); - println!("serialized_size: {}", ssize); assert_eq!(phantom_size, ssize) } diff --git a/sdk/messaging_eventhubs/src/event_hubs_connection.rs b/sdk/messaging_eventhubs/src/event_hubs_connection.rs index 043a0a2f88..4bef2f8f0c 100644 --- a/sdk/messaging_eventhubs/src/event_hubs_connection.rs +++ b/sdk/messaging_eventhubs/src/event_hubs_connection.rs @@ -274,7 +274,6 @@ where RP: EventHubsRetryPolicy + Send, C::OpenProducerError: Into, { - println!("create_transport_producer"); match &mut self.inner { Sharable::Owned(c) => c .create_producer( diff --git a/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs b/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs index 94594127b3..8778ce4c42 100644 --- a/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs +++ b/sdk/messaging_eventhubs/src/producer/event_hub_producer_client.rs @@ -329,7 +329,6 @@ where retry_policy, ) .await?; - println!("created producer"); self.producer_pool .insert(partition_id.to_string(), producer); }