-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Best practices to get a timeout? #26
Comments
Hi! Delayed messages are used for those things: struct no_image_timeout final : public so_5::signal_t {};
...
so_subscribe(channel).event([](cv::Mat image) {...});
so_subscribe_self().event([](mhood_t<no_image_timeout>) { cv::destroyAllWindows(); });
...
// Start timer.
so_5::send_delayed<no_image_timeout>(&this, 200ms); Please note also that states can have timeouts and there is no need to use delayed messages for controlling time spent in a particular state: class images_reciver : public so_5::agent_t {
state_t st_wait_next_image{this};
state_t st_no_images{this};
...
void so_define_agent() override {
st_wait_next_image
.event(channel, [](cv::Mat image) {...})
.time_limit(200ms, st_no_images);
st_no_images
.on_enter([]{ cv::destroyAllWindows(); });
this >= st_wait_next_image; // Internal timer will be fired automatically.
}
}; |
Hi, thanks for your reply. I think the solution based on delayed messages needs some extra handling: anytime an image is received, we send a delayed message. When the delayed message handler is entered, we check if last time we got an image was more than 200ms ago. On the other hand, the solution with states is very close and it would work only if the internal timer was reset every time the image handler was entered. In other words: void so_define_agent() override
{
st_wait_next_image
.event(m_source, [](cv::Mat image) { imshow(image); }) // to work, time_limit should be reset here
.time_limit(200ms, st_no_images);
st_no_images
.on_enter([] { cv::destroyAllWindows(); })
.time_limit(1ms, st_wait_next_image); // this has been added to resume listening in case the stream gets resumed
this >>= st_wait_next_image;
} I tried this one and it seems to work: void so_define_agent() override
{
st_wait_next_image
.event(m_source, [this](cv::Mat image) {
imshow(image);
st_wait_next_image.time_limit(200ms, st_no_images);
});
st_no_images
.on_enter([this] {
cv::destroyAllWindows();
st_wait_next_image.drop_time_limit(); // don't know why this is needed
})
.time_limit(1ms, st_wait_next_image);
this >>= st_wait_next_image;
} I don't understand why a call to Also, I don't know the impact on performance when calling
So I have supposed this is intended. Also, I cannot judge if the solution overall is correct or I can expect some subtle edge cases. What's your feeling? Another idea I had was based on message chains. Pseudo-code: void so_evt_start() override
{
bool closed = false;
while (!closed)
{
auto res = so_5::receive(from(m_chain)
.handle_all()
.empty_timeout(200ms), [this](cv::Mat image) {
imshow(image);
});
closed = res.status() == so_5::mchain_props::extraction_status_t::chain_closed;
// if here, either the chain has been closed or the timeout has been fired: just destroy the windows
cv::destroyAllWindows();
}
} However, I would be happier with message boxes. Please let me know your thoughts! |
Hi! st_wait_next_image.drop_time_limit(); // don't know why this is needed I'm surprised why this call is necessary. Let me investigate this case. |
Hi @eao197 , I think I got it. When the second handler is executed, it moves the agent state to st_no_images
.on_enter([this]
{
...
this->st_wait_next_image.drop_time_limit();
})
.time_limit(1ms, st_wait_next_image); // move back to listening However, I think the agent is still configured to spend no more than 200ms in that state: st_wait_next_image
.event(m_source, [this](cv::Mat image) {
....
st_wait_next_image.time_limit(200ms, st_no_images); // still affecting the state?
}); Since in my experiments I was simulating a streaming pause, after moving back to Is this a possible explanation? |
If your test code looked like: st_wait_next_image
.event(m_source, [this](cv::Mat image) {
...
std::this_thread::sleep_for(500ms);
st_wait_next_image.time_limit(200ms, st_no_images);
}) Then the following actions happened under the hood:
So there should be a place for several calls to I'll continue the investigation. |
I still don't understand why a separate call to But let me give some answers to your questions. Historically, the simplest and obvious way to handle such scenarios was the usage of delayed messages but sent via class image_receiver : public so_5::agent_t {
struct no_images final : public so_5::signal_t {};
so_5::timer_id_t m_no_images_timer;
...
void so_define_agent() override {
so_subscribe(m_source).event([this](cv::Mat image) {
m_no_images_timer = so_5::send_periodic<no_images>(*this, 200ms, 0ms); // 0ms as period makes the message delayed.
... // Image processing.
}
so_subscribe_self().event([this](mhood_t<no_images>) {...});
}
void so_evt_start() override {
m_no_images_timer = so_5::send_periodic<no_images>(*this, 200ms, 0ms); // 0ms as period makes the message delayed.
}
}; This approach is the most efficient because no helper objects are created. But this approach has a hidden underwater rock. Let's assume there is a Let's assume that the processing of that message takes longer than 200ms. The timer sends an instance of So now we have The agent finishes the processing of the current message, extracts the next But So when the agent completes the current processing it receives old And that isn't what we want in most cases. |
There are several tricks for avoiding that problem. The first one is used in the current implementation of It's based on the usage of an additional mbox. A new mbox is created for every new call of auto tmp_mbox = so_environment().create_mbox();
so_subscribe(tmp_mbox).event([this](mhood_t<no_images>) {...});
auto timer_id = so_5::send_periodic<no_images>(tmp_mbox, 200ms, 0ms); where The timer cancelation is now more complex: // Subscription must be destroyed.
so_drop_subscription<no_images>(tmp_mbox, current_state);
// Timer should be cancelled too.
timer_id.reset(); In that case, even if This approach is reliable but it requires the allocation of some additional data: temporary mbox, an object for holding the temporary mbox, and timer_id, the subscription. Usually, it isn't a problem, but it is better to know about that price. |
The second trick is implemented in so5extra's revocable_timer submodule. It uses a special envelope with an atomic boolean flag inside. When a delayed message is revoked this flag is set. Then it is checked just before the invocation of the event handler. If the flag is set (it means that the message is revoked) then the message is ignored. Such a trick allows revoking timers even if the timer has already fired the message and that message is waiting in the receiver's queue. This approach requires just one additional allocation: for an envelope with a boolean flag inside. So it's more efficient than the usage of an additional mbox. |
Could you show how you did such a simulation? |
Thanks for the details and for explaining other ways to do this. The solution with the extra call to Simulating the delay is simply a sleep between sending some images to the message box. Images are pushed from the main thread for simplicity, whereas the receiver agent has a dedicated one. So |
I wrote a small test program and can't reproduce that issue with additional call to #include <so_5/all.hpp>
using namespace std::chrono_literals;
class a_test_t final : public so_5::agent_t
{
state_t st_wait_next_image{ this, "wait_next_image" };
state_t st_no_images{ this, "no_images" };
public:
struct next_image final : public so_5::signal_t {};
a_test_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) } {}
void so_define_agent() override
{
st_wait_next_image
.on_enter( []() { std::cout << "entering st_wait_next_image" << std::endl; } )
.event(
[this](mhood_t<next_image>) {
std::cout << "on next_image" << std::endl;
st_wait_next_image.time_limit( 200ms, st_no_images );
} )
.time_limit( 200ms, st_no_images );
st_no_images
.on_enter( []() { std::cout << "no images." << std::endl; } )
.time_limit( 1ms, st_wait_next_image );
this >>= st_wait_next_image;
}
};
int main()
{
so_5::mbox_t test_mbox;
so_5::wrapped_env_t sobj{
[&]( so_5::environment_t & env ) {
auto test_agent = env.make_agent< a_test_t >();
test_mbox = test_agent->so_direct_mbox();
env.register_agent_as_coop( std::move(test_agent) );
},
[]( so_5::environment_params_t & params ) {
params.message_delivery_tracer( so_5::msg_tracing::std_cerr_tracer() );
}
};
auto pause_then_send = [test_mbox]( auto pause ) {
std::cout << "=" << pause.count() << "ms" << std::endl;
std::this_thread::sleep_for( pause );
so_5::send< a_test_t::next_image >( test_mbox );
};
const auto pauses = {
100ms, 150ms,
200ms, 250ms,
300ms, 350ms,
400ms, 450ms,
500ms, 550ms,
600ms, 650ms,
700ms, 750ms
};
for(;;)
{
for( const auto p : pauses )
pause_then_send( p );
}
return 0;
} Can you run your code with message-delivery-tracing turned on? It will be very interesting to see messages like those:
|
Another simple way to handle delayed messages safely is the following: class image_receiver : public so_5::agent_t {
struct no_image_timeout {
unsigned long long m_counter;
};
unsigned long long m_timeout_counter{}; // Will be incremented on every send.
// Just a helper method.
void prolonge_timeout() {
so_5::send_delayed<no_image_timeout>(*this, ++m_timeout_counter);
}
...
void so_define_agent() override {
so_subscribe(m_source).event([this](cv::Mat image) {
... // Actual handling.
prolonge_timeout();
});
so_subscribe_self().event([this](no_image_timeout cmd) {
if(m_timeout_counter != cmd.m_counter)
// This is outdated message. Ignore it.
return;
... // Handling the absence of image.
});
}
void so_evt_start() override {
prolonge_timeout();
...
}
}; Also if you decided to use the approach with st_wait_next_image
.event(...)
.event(...)
.time_limit(...)
...;
st_no_images
.on_enter(...)
.time_limit(...)
.transfer_to_state<cv::Mat>(st_wait_next_image); |
Hi, #include <so_5/all.hpp>
using namespace std;
class FakeImageProducer
{
public:
explicit FakeImageProducer(so_5::mbox_t dst)
: m_destination{ std::move(dst) }
{
}
~FakeImageProducer()
{
if (m_thread.joinable())
m_thread.join();
}
void Play()
{
m_pause = false;
m_thread = std::thread([&] {
while (!m_pause)
{
std::this_thread::sleep_for(60ms);
so_5::send<int>(m_destination, m_counter++);
}
});
}
void Pause()
{
m_pause = true;
}
void StopAndWait()
{
m_pause = true;
m_thread.join();
}
private:
std::thread m_thread;
so_5::mbox_t m_destination;
std::atomic<bool> m_pause;
int m_counter = 0;
};
class FakeImageReceiver : public so_5::agent_t
{
public:
FakeImageReceiver(context_t ctx)
: so_5::agent_t(ctx), m_source(ctx.env().create_mbox("main.images"))
{}
state_t st_wait_next_image{ this };
state_t st_no_images{ this };
void so_define_agent() override
{
st_wait_next_image
.event(m_source, [this](int image) {
std::cout << "got an image " << image << "\n";
this->st_wait_next_image.time_limit(200ms, this->st_no_images);
})
.time_limit(200ms, st_no_images);
st_no_images
.on_enter([this] {
std::cout << "no images arrived by 200ms...\n";
this->st_wait_next_image.drop_time_limit(); //// !!!!!! THIS ONE MAKES
THE DIFFERENCE
})
.time_limit(1ms, st_wait_next_image);
this >>= st_wait_next_image; // Internal timer will be fired automatically.
}
private:
so_5::mbox_t m_source;
};
int main()
{
so_5::wrapped_env_t sobj{ [](so_5::environment_t& env) {
},
[](so_5::environment_params_t& params) {
params.message_delivery_tracer(so_5::msg_tracing::std_cout_tracer());
} };
auto& env = sobj.environment();
auto coop = sobj.environment().make_coop(so_5::disp::active_obj::make_dispatcher(sobj.environment()).binder());
auto fakeReceiver = coop->make_agent<FakeImageReceiver>();
FakeImageProducer producer{ env.create_mbox("main.images") };
env.register_coop(move(coop));
producer.Play();
std::this_thread::sleep_for(std::chrono::seconds(1));
producer.StopAndWait();
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "resuming\n";
producer.Play();
std::this_thread::sleep_for(std::chrono::seconds(2));
producer.StopAndWait();
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "ending...\n";
} Attaching the tracing files: Please let me know your thoughts. I'll have a look at the other solutions you proposed shortly! |
Ok, it seems I got the point. Without an extra call to
That is why you see several prints:
It's just a ping-pong of switching states from But when you add an extra call to That is why there are no new changes from
The agent will now wait for an incoming message forever. And you resume ping-pong between states only after receiving an incoming message (because you restore the time limit after that). |
Thanks for the investigation. Do you think this is safe/efficient or you see any major drawbacks? Consider that this is just a troubleshooting utility, not intended for running in production. You mentioned that I should add a handler for a new image in st_no_images
.on_enter([this] {
std::cout << "no images arrived by 200ms...\n";
this->st_wait_next_image.drop_time_limit();
})
.event([](cv::Mat image) {
imshow(image); // in case I get an image at this point, just draw it
})
.time_limit(1ms, st_wait_next_image); |
Not exactly. It seems that you can't call st_no_images
.on_enter([this] { ... /* Destroy windows */ })
.event([this](cv::Mat image) {
... // Create new windows.
imshow(image);
// Because an image received it's time to switch to st_wait_next_image.
this >>= st_wait_next_images;
}); // NOTE: there is no need to limit the time of waiting the first image anymore. But there is a special marker st_wait_next_image
.on_enter([this] { ... /* Create necessary windows */ })
.event([this](cv::Mat image) {
imshow(image);
})
.time_limit(200ms, st_no_images);
st_no_images
.on_enter([this] { ... /* Destroy windows */ })
.transfer_to_state<cv::Mat>(st_wait_next_image); In that scenario, you will wait a new image no more than 200ms in |
It seems that the state-based approach is straightforward and understandable enough. I don't think that it'll have a significant performance penalty (this can be measured if needed). At least one drawback should be taken into account: on_enter/on_exit handlers can't throw, they should catch all exceptions. So it's hard to propagate an error from the on_enter/on_exit handlers. If this is not an issue then the state-base approach looks good. |
Great! The limitations are fine for me so I go with this implementation. |
Hi everyone,
my adventures with SObjectizer continue and I am very happy with it.
I have a question that I suppose is related to agent states, but I need your expert advise.
Suppose I have an agent that subscribes to a mailbox of images (e.g. OpenCV Mat) and shows them in real-time with OpenCV's imshow (it's just for troubleshooting and not intended for production):
The problem with this code is that not receiving images for a some time causes the window to hang (the behavior is intended because of
cv::waitKey
). In another implementation (not based on SObjectizer) I have a way to get into another lambda when the channel does not get data for a specified amount of time.Imagine something like this:
What is the best way to get something like this in SObjectizer?
Many thanks!
Marco
The text was updated successfully, but these errors were encountered: