Skip to content

Commit

Permalink
removed AmqpCloudEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
minghuaw committed Aug 18, 2022
1 parent 2593f81 commit 1214976
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 77 deletions.
41 changes: 23 additions & 18 deletions src/binding/fe2o3_amqp/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use crate::{

use super::{
constants::{prefixed, DATACONTENTTYPE},
AmqpCloudEvent, ATTRIBUTE_PREFIX,
ATTRIBUTE_PREFIX, AmqpMessage,
};

impl BinaryDeserializer for AmqpCloudEvent {
impl BinaryDeserializer for AmqpMessage {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
mut self,
mut serializer: V,
Expand All @@ -27,6 +27,8 @@ impl BinaryDeserializer for AmqpCloudEvent {
let spec_version = {
let value = self
.application_properties
.as_mut()
.ok_or(Error::WrongEncoding { })?
.remove(prefixed::SPECVERSION)
.ok_or(Error::WrongEncoding {})
.map(|val| match val {
Expand All @@ -38,23 +40,24 @@ impl BinaryDeserializer for AmqpCloudEvent {
serializer = serializer.set_spec_version(spec_version.clone())?;

// datacontenttype
serializer = match self.content_type {
Some(Symbol(content_type)) => serializer
serializer = match self.properties.map(|p| p.content_type) {
Some(Some(Symbol(content_type))) => serializer
.set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?,
None => serializer,
_ => serializer,
};

// remaining attributes
let attributes = spec_version.attribute_names();

for (key, value) in self.application_properties.0.into_iter() {
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
if attributes.contains(&key) {
let value = MessageAttributeValue::try_from((key, value))?;
serializer = serializer.set_attribute(key, value)?;
} else {
let value = MessageAttributeValue::try_from(value)?;
serializer = serializer.set_extension(key, value)?;
if let Some(application_properties) = self.application_properties {
for (key, value) in application_properties.0.into_iter() {
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
if attributes.contains(&key) {
let value = MessageAttributeValue::try_from((key, value))?;
serializer = serializer.set_attribute(key, value)?;
} else {
let value = MessageAttributeValue::try_from(value)?;
serializer = serializer.set_extension(key, value)?;
}
}
}
}
Expand All @@ -70,7 +73,7 @@ impl BinaryDeserializer for AmqpCloudEvent {
}
}

impl StructuredDeserializer for AmqpCloudEvent {
impl StructuredDeserializer for AmqpMessage {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
self,
serializer: V,
Expand All @@ -86,12 +89,14 @@ impl StructuredDeserializer for AmqpCloudEvent {
}
}

impl MessageDeserializer for AmqpCloudEvent {
impl MessageDeserializer for AmqpMessage {
fn encoding(&self) -> Encoding {
match self
.content_type
.properties
.as_ref()
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
.map(|p| p.content_type.as_ref()
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
).flatten()
{
Some(true) => Encoding::STRUCTURED,
Some(false) => Encoding::BINARY,
Expand Down
47 changes: 5 additions & 42 deletions src/binding/fe2o3_amqp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
//! Implements AMQP 1.0 binding for CloudEvents

use std::collections::HashMap;
use std::convert::TryFrom;

use chrono::{Utc, TimeZone};
use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties};
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value};
use fe2o3_amqp_lib::types::messaging::{Body, Message};
use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Timestamp, Value};

use crate::event::{AttributeValue, ExtensionValue};
use crate::event::{AttributeValue};
use crate::message::{Error, MessageAttributeValue};
use crate::Event;

use self::constants::{
prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE,
Expand All @@ -27,43 +25,8 @@ mod constants;
/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of
/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For
/// convenience, this type alias chose `Value` as the value of the generic parameter
pub type AmqpMessage = Message<Value>;

pub type AmqpBody = Body<Value>;

pub type Extensions = HashMap<String, ExtensionValue>;

/// The receiver of the event can distinguish between the two modes by inspecting the content-type
/// message property field. If the value is prefixed with the CloudEvents media type
/// application/cloudevents, indicating the use of a known event format, the receiver uses
/// structured mode, otherwise it defaults to binary mode.
pub struct AmqpCloudEvent {
content_type: Option<Symbol>,
application_properties: ApplicationProperties,
body: AmqpBody,
}

impl AmqpCloudEvent {
pub fn from_event(event: Event) -> Result<Self, Error> {
todo!()
}
}

impl From<AmqpCloudEvent> for AmqpMessage {
fn from(event: AmqpCloudEvent) -> Self {
let mut properties = Properties::default();
properties.content_type = event.content_type;
Message {
header: None,
delivery_annotations: None,
message_annotations: None,
properties: Some(properties),
application_properties: Some(event.application_properties),
body: event.body,
footer: None,
}
}
}
type AmqpMessage = Message<Value>;
type AmqpBody = Body<Value>;

impl<'a> From<AttributeValue<'a>> for SimpleValue {
fn from(value: AttributeValue) -> Self {
Expand Down
56 changes: 39 additions & 17 deletions src/binding/fe2o3_amqp/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,51 @@
use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary};
use fe2o3_amqp_types::messaging::{Data as AmqpData};
use fe2o3_amqp_types::messaging::{Data as AmqpData, Properties, ApplicationProperties};
use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol};

use crate::binding::header_prefix;
use crate::message::StructuredSerializer;
use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion};
use crate::{
event::SpecVersion,
message::{BinarySerializer, Error, MessageAttributeValue},
};

use super::constants::DATACONTENTTYPE;
use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody};
use super::{AmqpBody, AmqpMessage, ATTRIBUTE_PREFIX};

impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
impl BinarySerializer<AmqpMessage> for AmqpMessage {
fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result<Self> {
let key = String::from("cloudEvents:specversion");
let value = String::from(spec_version.as_str());
self.application_properties.insert(key, SimpleValue::from(value));
self.application_properties
.get_or_insert(ApplicationProperties::default())
.insert(key, SimpleValue::from(value));
Ok(self)
}

fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
fn set_attribute(
mut self,
name: &str,
value: MessageAttributeValue,
) -> crate::message::Result<Self> {
// For the binary mode, the AMQP content-type property field value maps directly to the
// CloudEvents datacontenttype attribute.
//
//
// All CloudEvents attributes with exception of datacontenttype MUST be individually mapped
// to and from the AMQP application-properties section.
if name == DATACONTENTTYPE {
self.content_type = match value {
self.properties
.get_or_insert(Properties::default())
.content_type = match value {
MessageAttributeValue::String(s) => Some(Symbol::from(s)),
_ => return Err(Error::WrongEncoding { })
_ => return Err(Error::WrongEncoding {}),
}
} else {
// CloudEvent attributes are prefixed with "cloudEvents:" for use in the
// application-properties section
let key = format!("{}:{}", ATTRIBUTE_PREFIX, name);
let key = header_prefix(ATTRIBUTE_PREFIX, name);
let value = SimpleValue::from(value);
self.application_properties.insert(key, value);
self.application_properties
.get_or_insert(ApplicationProperties::default())
.insert(key, value);
}

Ok(self)
Expand All @@ -43,10 +57,16 @@ impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
// systems that also process the message. Extension specifications that do this SHOULD specify
// how receivers are to interpret messages if the copied values differ from the cloud-event
// serialized values.
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result<Self> {
let key = format!("{}:{}", ATTRIBUTE_PREFIX, name);
fn set_extension(
mut self,
name: &str,
value: MessageAttributeValue,
) -> crate::message::Result<Self> {
let key = header_prefix(ATTRIBUTE_PREFIX, name);
let value = SimpleValue::from(value);
self.application_properties.insert(key, value);
self.application_properties
.get_or_insert(ApplicationProperties::default())
.insert(key, value);
Ok(self)
}

Expand All @@ -61,9 +81,11 @@ impl BinarySerializer<AmqpCloudEvent> for AmqpCloudEvent {
}
}

impl StructuredSerializer<AmqpCloudEvent> for AmqpCloudEvent {
impl StructuredSerializer<AmqpMessage> for AmqpMessage {
fn set_structured_event(mut self, bytes: Vec<u8>) -> crate::message::Result<Self> {
self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8"));
self.properties
.get_or_insert(Properties::default())
.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8"));
self.body = AmqpBody::Data(AmqpData(Binary::from(bytes)));
Ok(self)
}
Expand Down

0 comments on commit 1214976

Please sign in to comment.