Skip to content

Commit

Permalink
Merge pull request #454 from mtconnect/453_448_mqtt_asset_handling_pr…
Browse files Browse the repository at this point in the history
…etty_printing

453 448 mqtt asset handling pretty printing
  • Loading branch information
wsobel authored May 13, 2024
2 parents 80209f7 + 15564b4 commit 7e59720
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 39 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 3)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 8)
set(AGENT_VERSION_BUILD 9)
set(AGENT_VERSION_RC "")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class MTConnectAgentConan(ConanFile):
name = "mtconnect_agent"
version = "2.2"
version = "2.3"
url = "https://github.com/mtconnect/cppagent.git"
license = "Apache License 2.0"
settings = "os", "compiler", "arch", "build_type"
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ namespace mtconnect {
{
if (item.expired())
continue;

auto di = item.lock();
if (di->hasInitialValue())
{
m_loopback->receive(di, *di->getInitialValue());
}
}
}

std::lock_guard<buffer::CircularBuffer> lock(m_circularBuffer);
if (m_circularBuffer.addToBuffer(observation) != 0)
{
Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/device_model/data_item/data_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace mtconnect {
}
}
}

if (const auto &init = maybeGet<string>("InitialValue"); init)
{
m_initialValue = *init;
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/device_model/data_item/data_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ namespace mtconnect {
/// @brief get the topic name leaf node for this data item
/// @return the topic name
const auto &getTopicName() const { return m_topicName; }

/// @brief get the initial value if one is set
/// @return optional initial value
const auto &getInitialValue() const { return m_initialValue; }

Category getCategory() const { return m_category; }
Representation getRepresentation() const { return m_representation; }
SpecialClass getSpecialClass() const { return m_specialClass; }
Expand Down
8 changes: 4 additions & 4 deletions src/mtconnect/mqtt/mqtt_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ namespace mtconnect {
m_connected = false;
if (m_handler && m_handler->m_disconnected)
m_handler->m_disconnected(shared_from_this());
m_handler->m_disconnected(shared_from_this());
m_handler->m_disconnected(shared_from_this());
if (m_running)
{
reconnect();
Expand Down Expand Up @@ -419,7 +419,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTcpClient>(shared_from_this());
}

/// @brief Get the Mqtt TCP Client
/// @return pointer to the Mqtt TCP Client
auto &getClient()
Expand Down Expand Up @@ -501,7 +501,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTlsWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down Expand Up @@ -540,7 +540,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down
22 changes: 12 additions & 10 deletions src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ namespace mtconnect {
}
}

auto seq = m_sinkContract->getCircularBuffer().getSequence();
auto seq = publishCurrent(boost::system::error_code {});
for (auto &dev : m_sinkContract->getDevices())
{
FilterSet filterSet = filterForDevice(dev);
Expand All @@ -197,10 +197,8 @@ namespace mtconnect {
sampler->observe(seq, [this](const std::string &id) {
return m_sinkContract->getDataItemById(id).get();
});
sampler->handlerCompleted();
publishSample(sampler);
}

publishCurrent(boost::system::error_code {});
}

/// @brief publish sample when observations arrive.
Expand Down Expand Up @@ -244,18 +242,20 @@ namespace mtconnect {
return end;
}

void Mqtt2Service::publishCurrent(boost::system::error_code ec)
SequenceNumber_t Mqtt2Service::publishCurrent(boost::system::error_code ec)
{
SequenceNumber_t firstSeq, seq = 0;

if (ec)
{
LOG(warning) << "Mqtt2Service::publishCurrent: " << ec.message();
return;
return 0;
}

if (!m_client->isRunning() || !m_client->isConnected())
{
LOG(warning) << "Mqtt2Service::publishCurrent: client stopped";
return;
return 0;
}

for (auto &device : m_sinkContract->getDevices())
Expand All @@ -264,7 +264,6 @@ namespace mtconnect {
LOG(debug) << "Publishing current for: " << topic;

ObservationList observations;
SequenceNumber_t firstSeq, seq;
auto filterSet = filterForDevice(device);

{
Expand All @@ -288,6 +287,8 @@ namespace mtconnect {
m_currentTimer.expires_after(m_currentInterval);
m_currentTimer.async_wait(boost::asio::bind_executor(
m_strand, boost::bind(&Mqtt2Service::publishCurrent, this, _1)));

return seq;
}

bool Mqtt2Service::publish(observation::ObservationPtr &observation)
Expand Down Expand Up @@ -325,8 +326,9 @@ namespace mtconnect {

LOG(debug) << "Publishing Asset to topic: " << topic;

auto doc = m_jsonPrinter->print(asset);

asset::AssetList list {asset};
auto doc = m_printer->printAssets(
m_instanceId, uint32_t(m_sinkContract->getAssetStorage()->getMaxAssets()), 1, list);
stringstream buffer;
buffer << doc;

Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/sink/mqtt_sink/mqtt2_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ namespace mtconnect {
void pubishInitialContent();

/// @brief Publish a current using `CurrentInterval` option.
void publishCurrent(boost::system::error_code ec);
SequenceNumber_t publishCurrent(boost::system::error_code ec);

/// @brief publish sample when observations arrive.
SequenceNumber_t publishSample(std::shared_ptr<observation::AsyncObserver> sampler);
Expand Down
11 changes: 7 additions & 4 deletions src/mtconnect/sink/rest_sink/rest_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,29 +523,32 @@ namespace mtconnect {
auto removed = *request->parameter<bool>("removed");
auto count = *request->parameter<int32_t>("count");
auto printer = printerForAccepts(request->m_accepts);
auto pretty = *request->parameter<bool>("pretty");

respond(session, assetRequest(printer, count, removed, request->parameter<string>("type"),
request->parameter<string>("device")));
request->parameter<string>("device"), pretty));
return true;
};

auto idHandler = [&](SessionPtr session, RequestPtr request) -> bool {
auto asset = request->parameter<string>("assetIds");
auto pretty = *request->parameter<bool>("pretty");
if (asset)
{
auto printer = m_sinkContract->getPrinter(acceptFormat(request->m_accepts));

list<string> ids;
stringstream str(*asset);
string id;
while (getline(str, id, ';'))
ids.emplace_back(id);
respond(session, assetIdsRequest(printer, ids));
respond(session, assetIdsRequest(printer, ids, pretty));
}
else
{
auto printer = printerForAccepts(request->m_accepts);
auto error = printError(printer, "INVALID_REQUEST", "No asset given");
auto pretty = *request->parameter<bool>("pretty");
auto error = printError(printer, "INVALID_REQUEST", "No asset given", pretty);
respond(session, make_unique<Response>(rest_sink::status::bad_request, error,
printer->mimeType()));
}
Expand Down
3 changes: 1 addition & 2 deletions test_package/agent_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3089,12 +3089,11 @@ TEST_F(AgentTest, should_initialize_observaton_to_initial_value_when_available)
PARSE_XML_RESPONSE("/current");
ASSERT_XML_PATH_EQUAL(doc, "//m:DeviceStream//m:PartCount", "UNAVAILABLE");
}

m_agentTestHelper->m_adapter->processData("2024-01-22T20:00:00Z|avail|AVAILABLE");

{
PARSE_XML_RESPONSE("/current");
ASSERT_XML_PATH_EQUAL(doc, "//m:DeviceStream//m:PartCount", "0");
}

}
30 changes: 19 additions & 11 deletions test_package/mqtt_sink_2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,23 @@ TEST_F(MqttSink2Test, mqtt_sink_should_publish_Sample)

auto handler = make_unique<ClientHandler>();
bool gotSample = false;
handler->m_receive = [&gotSample](std::shared_ptr<MqttClient> client, const std::string &topic,
bool first = true;
handler->m_receive = [&gotSample, &first](std::shared_ptr<MqttClient> client, const std::string &topic,
const std::string &payload) {
EXPECT_EQ("MTConnect/Sample/000", topic);

auto jdoc = json::parse(payload);
auto streams = jdoc.at("/MTConnectStreams/Streams/0/DeviceStream"_json_pointer);
EXPECT_EQ(string("LinuxCNC"), streams.at("/name"_json_pointer).get<string>());

gotSample = true;
if (first)
{
first = false;
}
else
{
EXPECT_EQ("MTConnect/Sample/000", topic);

auto jdoc = json::parse(payload);
auto streams = jdoc.at("/MTConnectStreams/Streams/0/DeviceStream"_json_pointer);
EXPECT_EQ(string("LinuxCNC"), streams.at("/name"_json_pointer).get<string>());

gotSample = true;
}
};

createClient(options, std::move(handler));
Expand All @@ -267,9 +275,9 @@ TEST_F(MqttSink2Test, mqtt_sink_should_publish_Sample)

auto service = m_agentTestHelper->getMqtt2Service();

ASSERT_TRUE(waitFor(60s, [&service]() { return service->isConnected(); }));
ASSERT_FALSE(gotSample);

ASSERT_TRUE(waitFor(60s, [&first]() { return !first; }));
ASSERT_FALSE(first);
m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204");
ASSERT_TRUE(waitFor(10s, [&gotSample]() { return gotSample; }));
}
Expand Down

0 comments on commit 7e59720

Please sign in to comment.