From 1214976b3cbb1fcec8dc72066cc3da56bb192733 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 00:42:48 -0700 Subject: [PATCH] removed AmqpCloudEvent --- src/binding/fe2o3_amqp/deserializer.rs | 41 ++++++++++--------- src/binding/fe2o3_amqp/mod.rs | 47 +++------------------ src/binding/fe2o3_amqp/serializer.rs | 56 ++++++++++++++++++-------- 3 files changed, 67 insertions(+), 77 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 708f0ba7..195dcbaa 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -13,10 +13,10 @@ use crate::{ use super::{ constants::{prefixed, DATACONTENTTYPE}, - AmqpCloudEvent, ATTRIBUTE_PREFIX, + ATTRIBUTE_PREFIX, AmqpMessage, }; -impl BinaryDeserializer for AmqpCloudEvent { +impl BinaryDeserializer for AmqpMessage { fn deserialize_binary>( mut self, mut serializer: V, @@ -27,6 +27,8 @@ impl BinaryDeserializer for AmqpCloudEvent { let spec_version = { let value = self .application_properties + .as_mut() + .ok_or(Error::WrongEncoding { })? .remove(prefixed::SPECVERSION) .ok_or(Error::WrongEncoding {}) .map(|val| match val { @@ -38,23 +40,24 @@ impl BinaryDeserializer for AmqpCloudEvent { serializer = serializer.set_spec_version(spec_version.clone())?; // datacontenttype - serializer = match self.content_type { - Some(Symbol(content_type)) => serializer + serializer = match self.properties.map(|p| p.content_type) { + Some(Some(Symbol(content_type))) => serializer .set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?, - None => serializer, + _ => serializer, }; // remaining attributes let attributes = spec_version.attribute_names(); - - for (key, value) in self.application_properties.0.into_iter() { - if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { - if attributes.contains(&key) { - let value = MessageAttributeValue::try_from((key, value))?; - serializer = serializer.set_attribute(key, value)?; - } else { - let value = MessageAttributeValue::try_from(value)?; - serializer = serializer.set_extension(key, value)?; + if let Some(application_properties) = self.application_properties { + for (key, value) in application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; + } } } } @@ -70,7 +73,7 @@ impl BinaryDeserializer for AmqpCloudEvent { } } -impl StructuredDeserializer for AmqpCloudEvent { +impl StructuredDeserializer for AmqpMessage { fn deserialize_structured>( self, serializer: V, @@ -86,12 +89,14 @@ impl StructuredDeserializer for AmqpCloudEvent { } } -impl MessageDeserializer for AmqpCloudEvent { +impl MessageDeserializer for AmqpMessage { fn encoding(&self) -> Encoding { match self - .content_type + .properties .as_ref() - .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + .map(|p| p.content_type.as_ref() + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + ).flatten() { Some(true) => Encoding::STRUCTURED, Some(false) => Encoding::BINARY, diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 8e3b483e..5a6332fa 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,15 +1,13 @@ //! Implements AMQP 1.0 binding for CloudEvents -use std::collections::HashMap; use std::convert::TryFrom; use chrono::{Utc, TimeZone}; -use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; -use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; +use fe2o3_amqp_lib::types::messaging::{Body, Message}; +use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Timestamp, Value}; -use crate::event::{AttributeValue, ExtensionValue}; +use crate::event::{AttributeValue}; use crate::message::{Error, MessageAttributeValue}; -use crate::Event; use self::constants::{ prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, @@ -27,43 +25,8 @@ mod constants; /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of /// no importance because all CloudEvents are using the `Body::Data` as the body section type. For /// convenience, this type alias chose `Value` as the value of the generic parameter -pub type AmqpMessage = Message; - -pub type AmqpBody = Body; - -pub type Extensions = HashMap; - -/// The receiver of the event can distinguish between the two modes by inspecting the content-type -/// message property field. If the value is prefixed with the CloudEvents media type -/// application/cloudevents, indicating the use of a known event format, the receiver uses -/// structured mode, otherwise it defaults to binary mode. -pub struct AmqpCloudEvent { - content_type: Option, - application_properties: ApplicationProperties, - body: AmqpBody, -} - -impl AmqpCloudEvent { - pub fn from_event(event: Event) -> Result { - todo!() - } -} - -impl From for AmqpMessage { - fn from(event: AmqpCloudEvent) -> Self { - let mut properties = Properties::default(); - properties.content_type = event.content_type; - Message { - header: None, - delivery_annotations: None, - message_annotations: None, - properties: Some(properties), - application_properties: Some(event.application_properties), - body: event.body, - footer: None, - } - } -} +type AmqpMessage = Message; +type AmqpBody = Body; impl<'a> From> for SimpleValue { fn from(value: AttributeValue) -> Self { diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 365c5d16..6fa23829 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -1,37 +1,51 @@ -use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; -use fe2o3_amqp_types::messaging::{Data as AmqpData}; +use fe2o3_amqp_types::messaging::{Data as AmqpData, Properties, ApplicationProperties}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol}; +use crate::binding::header_prefix; use crate::message::StructuredSerializer; -use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; +use crate::{ + event::SpecVersion, + message::{BinarySerializer, Error, MessageAttributeValue}, +}; use super::constants::DATACONTENTTYPE; -use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody}; +use super::{AmqpBody, AmqpMessage, ATTRIBUTE_PREFIX}; -impl BinarySerializer for AmqpCloudEvent { +impl BinarySerializer for AmqpMessage { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); - self.application_properties.insert(key, SimpleValue::from(value)); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, SimpleValue::from(value)); Ok(self) } - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + fn set_attribute( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { // For the binary mode, the AMQP content-type property field value maps directly to the // CloudEvents datacontenttype attribute. - // + // // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped // to and from the AMQP application-properties section. if name == DATACONTENTTYPE { - self.content_type = match value { + self.properties + .get_or_insert(Properties::default()) + .content_type = match value { MessageAttributeValue::String(s) => Some(Symbol::from(s)), - _ => return Err(Error::WrongEncoding { }) + _ => return Err(Error::WrongEncoding {}), } } else { // CloudEvent attributes are prefixed with "cloudEvents:" for use in the // application-properties section - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); } Ok(self) @@ -43,10 +57,16 @@ impl BinarySerializer for AmqpCloudEvent { // systems that also process the message. Extension specifications that do this SHOULD specify // how receivers are to interpret messages if the copied values differ from the cloud-event // serialized values. - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + fn set_extension( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); Ok(self) } @@ -61,9 +81,11 @@ impl BinarySerializer for AmqpCloudEvent { } } -impl StructuredSerializer for AmqpCloudEvent { +impl StructuredSerializer for AmqpMessage { fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { - self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); + self.properties + .get_or_insert(Properties::default()) + .content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); Ok(self) }