Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update c8y-mapper to consume te measurements #2169

Merged

Conversation

didier-wenzek
Copy link
Contributor

@didier-wenzek didier-wenzek commented Aug 18, 2023

The main point of this PR is to implement a concrete use-case of the EntityStore and be in a better position to evaluate and improve this new API.

Proposed changes

Update the c8y-mapper to translate the measurements received on the te/+/+/+/+/m/+ topics.

  • Move the EntityStore from the mapper to its converter
  • Subscribe to te/+/+/+/+/m/+ and no more on tedge/measurements/#
  • Combine local device registration with cloud registration
  • Fix the behavior of device registration when the message triggering the registration is ill-formed
  • List suggestions of improvement for EntityStore: Improve EntityStore API #2179

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#2082

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@@ -61,6 +61,16 @@ impl EntityTopic {
_ => None,
}
}

pub fn is_measurement(topic: &mqtt_channel::Topic) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not happy with this function.

  • It's signature is too specific. It would be better to get the channel kind of the topic.
  • It's implementation seems a bit complex compared to the task. The current channel type is not appropriate.

Comment on lines -167 to -169
if r#type.is_empty() {
return Err(ChannelError::TooShort);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be removed.

te/device/main///m/ is a valid topic name which channel is of measurements with an empty type.

@@ -97,13 +98,27 @@ impl EntityStore {
self.get(mqtt_id)
}

/// Returns the entity attached to a topic, if any
pub fn get_entity_from_topic(&self, topic: &Topic) -> Option<&EntityMetadata> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The starting point for a caller is always a topic.

/// Returns the MQTT identifier of the main device.
///
/// The main device is an entity with `@type: "device"`.
pub fn main_device(&self) -> EntityIdRef {
self.main_device.as_str()
}

/// Returns the name of main device.
pub fn main_device_name(&self) -> &str {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method that returns the main device identifier is a bit useless for a user of the EntityStore, because returning an identifier and not an EntityMetadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent behind that was for the method to return the key that you can use in get:

entity_store.get(entity_store.main_device())

But probably 99% this is what we want, so main_device could be changed to just return EntityMetadata. One problem is that EntityMetadata doesn't contain entity MQTT identifier, but that can perhaps be added later in a way that doesn't just duplicate the string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But probably 99% this is what we want, so main_device could be changed to just return EntityMetadata.

Yes, that would be better. Especially, because entity_store.get(entity_store.main_device()) returns an Option despite there is always some main device.

One problem is that EntityMetadata doesn't contain entity MQTT identifier, but that can perhaps be added later in a way that doesn't just duplicate the string

Yes, this will also be really convenient.

Self {
converter,
messages,
mqtt_publisher,
timer_sender,
entity_store: EntityStore::with_main_device(main_device).unwrap(),
}
}

async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has been moved into the Converter.

This will have to be updated after #2164

Comment on lines +34 to 39
pub fn from_thin_edge_json(
input: &str,
child_id: &str,
entity: &EntityMetadata,
) -> Result<String, CumulocityJsonError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file highlights how the new EntityStore API can simplify and make more robust the mappers.

"",
r#"{"foo":25}"#,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A proper measurements is required otherwise the registration message is rejected by the converter.

Comment on lines 278 to 282
let device_register_payload =
format!("{{ \"@type\":\"{device_type}\", \"@id\":\"{device_name}\"}}");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the original version https://github.com/thin-edge/thin-edge.io/pull/2169/files#diff-1d1eb076015e0658044f7b03b0b805fce4f0253f4048587797fc5dd42019f908L149

  • the device type is now correct for the main device.
  • the device name is compatible with the current version of thin-edge (to avoid breaking the tests)

Comment on lines -250 to -254
add_external_device_registration_message(
child_id,
&mut self.children,
&mut mqtt_messages,
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been moved to the try_convert() method https://github.com/thin-edge/thin-edge.io/pull/2169/files#diff-1d1eb076015e0658044f7b03b0b805fce4f0253f4048587797fc5dd42019f908L149.

Comment on lines 307 to 251
fn try_convert_measurement(
&mut self,
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal is to reach a point where this method is no more processing a raw MQTT message, but a Measurement message that has been fully decoded by the tedge-api and gives:

  • the topic identifier of the source
  • the measurement type
  • the measurement value

@github-actions
Copy link
Contributor

github-actions bot commented Aug 18, 2023

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass %
257 0 5 257 100

@didier-wenzek didier-wenzek changed the title Update c8y-mapper to consume te measurements Update c8y-mapper to consume te measurements Aug 18, 2023
@didier-wenzek didier-wenzek force-pushed the feat/mapper-consume-te-measurements branch from d4cc6dd to 915d6d1 Compare August 18, 2023 13:29
}

let mut messages = match &message.topic {
topic if EntityTopic::is_measurement(topic) => self.try_convert_measurement(message),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be done like this without adding a new method. It's a mouthful, but to be honest we're inside a match arm and we have to do all of this in a single expression, usually we would have different statements for creating the EntityTopic and looking at the type of channel it has.

Suggested change
topic if EntityTopic::is_measurement(topic) => self.try_convert_measurement(message),
topic if EntityTopic::from_str(&topic.name).map_or(false, |t| {
t.channel().map(|c| c.category) == Some(ChannelCategory::Measurement)
}) =>
{
self.try_convert_measurement(message)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I would prefer to have a fully different match expression.

Instead of having methods working on raw topics and messages, my goal is to do all the parsing inside the tedge-api crate with some function taking an MqttMessage and returning an error or a plain struct with the message source (e.g. device/main//) the channel (e.g. Measurement { r#type }) and the decoded message (e.g the measurement). The converter main method would be then a breeze focused on the translation into the format expected by the cloud and no more a mix of parsing / validation / translation.

crates/core/tedge_api/src/entity_store.rs Show resolved Hide resolved
Comment on lines 249 to 331
pub fn main_device(external_id: String) -> Self {
Self {
external_id,
r#type: EntityType::MainDevice,
parent: None,
other: serde_json::json!({}),
}
}

/// Creates a entity metadata for a child device.
pub fn child_device(external_id: String) -> Self {
Self {
external_id,
r#type: EntityType::ChildDevice,
parent: Some("device/main//".to_string()),
other: serde_json::json!({}),
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this functions are only used in tests. Would you say that we'll need to construct EntityMetadata in normal code as well? My intent was for only EntityStore to be able to construct it, when parsing a registration message, but this can change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this functions are only used in tests

Indeed, currently only used for testing. Mostly because creating an EntityRegistrationMessage where a bit more painful.

Would you say that we'll need to construct EntityMetadata in normal code as well? My intent was for only EntityStore to be able to construct it, when parsing a registration message, but this can change.

Not sure to see the benefits of EntityRegistrationMessage.

/// Returns the MQTT identifier of the main device.
///
/// The main device is an entity with `@type: "device"`.
pub fn main_device(&self) -> EntityIdRef {
self.main_device.as_str()
}

/// Returns the name of main device.
pub fn main_device_name(&self) -> &str {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent behind that was for the method to return the key that you can use in get:

entity_store.get(entity_store.main_device())

But probably 99% this is what we want, so main_device could be changed to just return EntityMetadata. One problem is that EntityMetadata doesn't contain entity MQTT identifier, but that can perhaps be added later in a way that doesn't just duplicate the string

@@ -12,7 +12,7 @@ use tedge_config::TEdgeConfig;

const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd";
const COLLECTD_INPUT_TOPICS: &str = "collectd/#";
const COLLECTD_OUTPUT_TOPIC: &str = "tedge/measurements";
const COLLECTD_OUTPUT_TOPIC: &str = "te/device/main///m/ ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded MQTT root

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it would be good to get this topic from the entity store.

entity_store.main_device().measurement_topic()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be done in a follow-up task: #2179

@@ -796,6 +888,15 @@ impl Converter for CumulocityConverter {
}
}

/// Check if a message is an entity registration message.
fn is_entity_register_message(message: &Message) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be moved to entity_store.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to entity_store.rs

@@ -885,6 +986,13 @@ fn add_external_device_registration_message(
false
}

fn external_device_registration_message(entity: &EntityMetadata) -> Option<Message> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be moved to entity_store.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, looking closer, this method is c8y related. The point is to register the device on the cloud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function that should be moved to entity_store.rs is auto_register_entity().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function that should be moved to entity_store.rs is auto_register_entity().

Done

@codecov
Copy link

codecov bot commented Aug 21, 2023

Codecov Report

Merging #2169 (96f7f89) into main (75127c6) will decrease coverage by 0.1%.
The diff coverage is 88.1%.

❗ Current head 96f7f89 differs from pull request most recent head 98f1735. Consider uploading reports for the commit 98f1735 to get more accurate results

Additional details and impacted files
Files Changed Coverage Δ
crates/core/tedge_api/src/topic.rs 94.5% <ø> (-0.8%) ⬇️
crates/core/tedge_mapper/src/collectd/mapper.rs 0.0% <ø> (ø)
crates/extensions/c8y_mapper_ext/src/actor.rs 70.6% <ø> (+2.4%) ⬆️
crates/extensions/c8y_mapper_ext/src/converter.rs 76.7% <79.1%> (-2.4%) ⬇️
crates/core/tedge_api/src/entity_store.rs 91.6% <92.1%> (+2.7%) ⬆️
crates/extensions/c8y_mapper_ext/src/json.rs 98.0% <94.7%> (-0.4%) ⬇️
.../tedge_config/src/tedge_config_cli/tedge_config.rs 87.2% <100.0%> (ø)
crates/core/tedge_api/src/entity.rs 84.2% <100.0%> (+2.0%) ⬆️
crates/extensions/c8y_mapper_ext/src/config.rs 62.5% <100.0%> (-0.5%) ⬇️
crates/extensions/c8y_mapper_ext/src/serializer.rs 81.5% <100.0%> (+0.7%) ⬆️
... and 1 more

... and 7 files with indirect coverage changes

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek force-pushed the feat/mapper-consume-te-measurements branch from 96f7f89 to 98f1735 Compare August 22, 2023 15:52
@didier-wenzek didier-wenzek temporarily deployed to Test Pull Request August 22, 2023 16:02 — with GitHub Actions Inactive
@didier-wenzek didier-wenzek merged commit ea68aac into thin-edge:main Aug 22, 2023
15 of 16 checks passed
@didier-wenzek didier-wenzek deleted the feat/mapper-consume-te-measurements branch August 22, 2023 16:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants