Skip to content

Commit

Permalink
cloudevents#9 Encoders for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Nov 9, 2020
1 parent 89a6702 commit 171c2cc
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cloudevents-sdk-mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "CloudEvents official Rust SDK - Mqtt integration"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cloudevents-sdk = { version = "0.2.0", path = ".." }
cloudevents-sdk = { version = "0.3.0", path = ".." }
lazy_static = "1.4.0"
paho-mqtt = { path = "../../paho.mqtt.rust" }
chrono = { version = "^0.4", features = ["serde"] }
Expand Down
4 changes: 2 additions & 2 deletions cloudevents-sdk-mqtt/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'st
(s, attribute_name_to_header!(s))
}
})
.collect()
.collect()
}

lazy_static! {
Expand All @@ -32,4 +32,4 @@ pub enum MqttVersion {
V3_1,
V3_1_1,
V5,
}
}
4 changes: 2 additions & 2 deletions cloudevents-sdk-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\
#[macro_use]
mod headers;
mod mqtt_producer_record;
mod mqtt_consumer_record;
mod mqtt_producer_record;

pub use mqtt_consumer_record::record_to_event;
pub use mqtt_consumer_record::ConsumerMessageDeserializer;
pub use mqtt_consumer_record::MessageExt;

pub use headers::MqttVersion;
pub use mqtt_producer_record::MessageBuilderExt;
pub use mqtt_producer_record::MessageRecord;
pub use headers::MqttVersion;
40 changes: 27 additions & 13 deletions cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue,
MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer};
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use paho_mqtt::{Message, PropertyCode};
use std::collections::HashMap;
Expand Down Expand Up @@ -37,7 +39,7 @@ impl ConsumerMessageDeserializer {
impl BinaryDeserializer for ConsumerMessageDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {})
return Err(message::Error::WrongEncoding {});
}

let spec_version = SpecVersion::try_from(
Expand Down Expand Up @@ -124,9 +126,15 @@ impl MessageDeserializer for ConsumerMessageDeserializer {

pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<Event> {
match version {
headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?),
headers::MqttVersion::V5 => {
BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
headers::MqttVersion::V3_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
}
}

Expand All @@ -145,11 +153,12 @@ mod tests {
use super::*;
use crate::mqtt_producer_record::MessageRecord;

use crate::MessageBuilderExt;
use chrono::Utc;
use cloudevents::event::Data;
use cloudevents::{EventBuilder, EventBuilderV10};
use crate::MessageBuilderExt;
use paho_mqtt::MessageBuilder;
use serde_json::json;
use cloudevents::event::Data;

#[test]
fn test_binary_record() {
Expand All @@ -160,8 +169,10 @@ mod tests {
.ty("example.test")
.time(time)
.source("http://localhost")
.data("application/json",
Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()))
.data(
"application/json",
Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()),
)
.extension("someint", "10")
.build()
.unwrap();
Expand All @@ -178,7 +189,7 @@ mod tests {
.unwrap(),
headers::MqttVersion::V5,
)
.unwrap();
.unwrap();

let msg = MessageBuilder::new()
.topic("test")
Expand Down Expand Up @@ -220,6 +231,9 @@ mod tests {
.qos(1)
.finalize();

assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected)
assert_eq!(
msg.to_event(headers::MqttVersion::V3_1_1).unwrap(),
expected
)
}
}
}
128 changes: 67 additions & 61 deletions cloudevents-sdk-mqtt/src/mqtt_producer_record.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::headers;
use paho_mqtt::{Properties, Property, PropertyCode, MessageBuilder};
use cloudevents::message::{BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result,
StructuredDeserializer, StructuredSerializer, Error};
use cloudevents::Event;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result,
StructuredDeserializer, StructuredSerializer,
};
use cloudevents::Event;
use paho_mqtt::{MessageBuilder, Properties, Property, PropertyCode};
use std::option::Option::Some;

pub struct MessageRecord {
Expand All @@ -22,68 +24,76 @@ impl MessageRecord {

pub fn from_event(event: Event, version: headers::MqttVersion) -> Result<Self> {
match version {
headers::MqttVersion::V5 => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()),
headers::MqttVersion::V3_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()),
headers::MqttVersion::V3_1_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()),
headers::MqttVersion::V5 => {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
headers::MqttVersion::V3_1 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
}
}
}

impl BinarySerializer<MessageRecord> for MessageRecord {
fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
match Property::new_string_pair(PropertyCode::UserProperty, headers::SPEC_VERSION_HEADER,
spec_version.as_str()) {
Ok(property) => {
match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e)
}),
_ => Ok(self)
}
match Property::new_string_pair(
PropertyCode::UserProperty,
headers::SPEC_VERSION_HEADER,
spec_version.as_str(),
) {
Ok(property) => match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e),
}),
_ => Ok(self),
},
_ => Err(Error::UnrecognizedAttributeName {
name: headers::SPEC_VERSION_HEADER.to_string()
})
name: headers::SPEC_VERSION_HEADER.to_string(),
}),
}
}

fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
match Property::new_string_pair(PropertyCode::UserProperty, &headers::ATTRIBUTES_TO_MQTT_HEADERS
.get(name)
.ok_or(cloudevents::message::Error::UnrecognizedAttributeName {
name: String::from(name),
})?
.clone()[..],
&value.to_string()[..]) {
Ok(property) => {
match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e)
}),
_ => Ok(self)
}
match Property::new_string_pair(
PropertyCode::UserProperty,
&headers::ATTRIBUTES_TO_MQTT_HEADERS
.get(name)
.ok_or(cloudevents::message::Error::UnrecognizedAttributeName {
name: String::from(name),
})?
.clone()[..],
&value.to_string()[..],
) {
Ok(property) => match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e),
}),
_ => Ok(self),
},
_ => Err(Error::UnrecognizedAttributeName {
name: headers::SPEC_VERSION_HEADER.to_string()
})
name: headers::SPEC_VERSION_HEADER.to_string(),
}),
}
}

fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
match Property::new_string_pair(PropertyCode::UserProperty,
&attribute_name_to_header!(name)[..],
&value.to_string()[..]) {
Ok(property) => {
match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e)
}),
_ => Ok(self)
}
match Property::new_string_pair(
PropertyCode::UserProperty,
&attribute_name_to_header!(name)[..],
&value.to_string()[..],
) {
Ok(property) => match self.headers.push(property) {
Err(e) => Err(Error::Other {
source: Box::new(e),
}),
_ => Ok(self),
},
_ => Err(Error::UnrecognizedAttributeName {
name: headers::SPEC_VERSION_HEADER.to_string()
})
name: headers::SPEC_VERSION_HEADER.to_string(),
}),
}
}

Expand All @@ -100,14 +110,15 @@ impl BinarySerializer<MessageRecord> for MessageRecord {

impl StructuredSerializer<MessageRecord> for MessageRecord {
fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<MessageRecord> {
match Property::new_string_pair(PropertyCode::UserProperty,
headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER) {
Ok(property) => {
match self.headers.push(property) {
_ => ()
}
match Property::new_string_pair(
PropertyCode::UserProperty,
headers::CONTENT_TYPE,
headers::CLOUDEVENTS_JSON_HEADER,
) {
Ok(property) => match self.headers.push(property) {
_ => (),
},
_ => ()
_ => (),
}
self.payload = Some(bytes);

Expand All @@ -116,16 +127,11 @@ impl StructuredSerializer<MessageRecord> for MessageRecord {
}

pub trait MessageBuilderExt {
fn message_record(
self,
message_record: & MessageRecord,
) -> MessageBuilder;
fn message_record(self, message_record: &MessageRecord) -> MessageBuilder;
}

impl MessageBuilderExt for MessageBuilder {
fn message_record(mut self,
message_record: & MessageRecord
) -> MessageBuilder {
fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder {
self = self.properties(message_record.headers.clone());

if let Some(s) = message_record.payload.as_ref() {
Expand All @@ -134,4 +140,4 @@ impl MessageBuilderExt for MessageBuilder {

self
}
}
}

0 comments on commit 171c2cc

Please sign in to comment.