Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature req: DataOutputQueue with maxSize=0 --> received data only sent to callbacks #366

Closed
diablodale opened this issue Feb 6, 2022 · 3 comments

Comments

@diablodale
Copy link
Contributor

diablodale commented Feb 6, 2022

I request the DataOutputQueue support maxSize=0 so that no data is retained in the internal std::queue yet the callbacks continue to be called with the data.

I think the change is trivial and isolated to LockingQueue.hpp. I can take on this work as it already has use potential in my app.

Setup

  • all

Repro

By code review. LockingQueue::push() and tryWaitAndPush() don't behave appropriately when maxSize=0.
If maxSize=0, they would actually behave as if it was =1.

bool push(T const& data) {
{
std::unique_lock<std::mutex> lock(guard);
if(!blocking) {
// if non blocking, remove as many oldest elements as necessary, so next one will fit
// necessary if maxSize was changed
while(queue.size() >= maxSize) {
queue.pop();
}
} else {
signalPop.wait(lock, [this]() { return queue.size() < maxSize || destructed; });
if(destructed) return false;
}
queue.push(data);
}
signalPush.notify_all();
return true;
}

A one-line change to check for if (maxSize == 0) return true would correct the behavior and support this feature request. I think true is the appropriate return since this is not an error condition and the function would have placed the data into the queue. It just so happens that a maxSize=0 queue is a singularity with size 0... or like /dev/null.

And removing the one line check here

if(sz == 0) throw std::invalid_argument("Queue size can't be 0!");

@diablodale
Copy link
Contributor Author

While updating the code, those same two push functions have an inconsistency and threading issue.

  • blocking: failed attempt to check for destructed, but there is no lock(guard) in destruct() so this check is not thread safe. Another thread could set destructed=true using::destruct() a nanosecond after the if check and negate this attempted logic. This is a race condition.
  • non-blocking: never does the check. Making it inconsistent in a destructed check and likely a marginal amount less thread-safe.

Then for both cases, it is possible for one thread to be "on the path" to destruct the class instance while another thread is calling LockingQueue::push(). This happens in dai::DataInputQueue and dai::DataOutputQueue classes. Their "reading/writing" thread could be calling e.g. LQ:push() or LQ::tryPop() at the same time the main app thread is on the path to destruct a DataQueue. This is a very common scenario.

In the two dai::DataQueue classes, I think there is sufficient locking to prevent crashes. 👍

However, there is not 👎 sufficient locking anywhere to prevent, e.g., DataOutputQueue::readingThread from adding frames of data to the queue at the same time as the main app thread has called DataOutputQueue::close() or the destructor (which calls close()). In those scenarios the main app thread eventually calls destruct(). But like I wrote above, the nanosecond timing could be such that the readingThread adds a frame of data to the Queue even though destructed=true.

Adding those frames to the Queue won't crash. Instead, it is useless work. 😵 The 6 lines of LockingQueue.hpp which have if(destructed) return false; are all useless tests since destructed doesn't share the lock(guard) and nanosecond race conditions that I describe above will continue to occur. This is probably a common occurrence in today's depthai-code. Doesn't crash...just useless work.

I recommend removing all 6 of those if tests. It removes the useless tests and makes the blocking/non-blocking align.
I don't recommend fixing the race condition so the if tests become valid. Doesn't see worth the effort to me.
Note: the testing of destructed in the signal::waitxxx() predicates is good and should remain.

@diablodale diablodale changed the title feature request: support DataOutputQueue with maxSize=0 --> therefore received data is only sent to callbacks feature req: DataOutputQueue with maxSize=0 --> received data only sent to callbacks Feb 7, 2022
@themarpe
Copy link
Collaborator

I recommend removing all 6 of those if tests. It removes the useless tests and makes the blocking/non-blocking align.
I don't recommend fixing the race condition so the if tests become valid. Doesn't see worth the effort to me.
Note: the testing of destructed in the signal::waitxxx() predicates is good and should remain.

I agree with the proposed.

Also +1 on the maxSize=0 behavior.

One alternative to this was to have one callback overtake the queuing mechanism - by having a setCallback (instead of addCallback) it wouldn't push to queue but only call the callback and hand over the message. Might be a tad less error prone compared to the current capability of adding many callbacks but having to be aware of also popping messages from the queue if not set as non-blocking.

Thoughts on above compared with or inline with maxSize = 0?

@themarpe
Copy link
Collaborator

Edit on above - addCallback is also being used internally, so good on only going with maxSize=0 approach instead:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants