diff --git a/hal/src/newhal/static_recursive_mutex.h b/hal/src/newhal/static_recursive_mutex.h
new file mode 100644
index 0000000000..061650b234
--- /dev/null
+++ b/hal/src/newhal/static_recursive_mutex.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2024 Particle Industries, Inc. All rights reserved.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see .
+ */
+
+#pragma once
+
+class StaticRecursiveMutex {
+public:
+ StaticRecursiveMutex() = default;
+
+ bool lock(unsigned timeout = 0) {
+ return true;
+ }
+
+ bool unlock() {
+ return true;
+ }
+};
diff --git a/system/inc/active_object.h b/system/inc/active_object.h
index af2af4e90f..35b2afb593 100644
--- a/system/inc/active_object.h
+++ b/system/inc/active_object.h
@@ -29,7 +29,6 @@
#include
#include
-#include "channel.h"
#include "concurrent_hal.h"
#include "hal_platform.h"
@@ -284,7 +283,7 @@ class ActiveObjectBase
// todo - concurrent queue should be a strategy so it's pluggable without requiring inheritance
virtual bool take(Item& item)=0;
- virtual bool put(Item& item)=0;
+ virtual bool put(Item& item, bool dontBlock = false)=0;
/**
* Static thread entrypoint to run this active object loop.
@@ -316,16 +315,19 @@ class ActiveObjectBase
return started;
}
- template void invoke_async(const std::function& work)
+ template bool invoke_async(const std::function& work, bool dontBlock = false)
{
auto task = new AsyncTask(work);
- if (task)
- {
- Item message = task;
- if (!put(message))
- delete task;
+ if (!task) {
+ return false;
}
- }
+ Item message = task;
+ if (!put(message, dontBlock)) {
+ delete task;
+ return false;
+ }
+ return true;
+ }
template SystemPromise* invoke_future(const std::function& work)
{
@@ -344,41 +346,6 @@ class ActiveObjectBase
};
-
-template
-class ActiveObjectChannel : public ActiveObjectBase
-{
- cpp::channel- _channel;
-
-protected:
-
- virtual bool take(Item& item) override
- {
- return cpp::select().recv_only(_channel, item).try_once();
- }
-
- virtual bool put(const Item& item) override
- {
- _channel.send(item);
- return true;
- }
-
-
-public:
-
- ActiveObjectChannel(ActiveObjectConfiguration& config) : ActiveObjectBase(config) {}
-
- /**
- * Start the asynchronous processing for this active object.
- */
- void start()
- {
- _channel = cpp::channel
- ();
- start_thread();
- }
-
-};
-
class ActiveObjectQueue : public ActiveObjectBase
{
protected:
@@ -390,9 +357,9 @@ class ActiveObjectQueue : public ActiveObjectBase
return !os_queue_take(queue, &result, configuration.take_wait, nullptr);
}
- virtual bool put(Item& item)
+ virtual bool put(Item& item, bool dontBlock)
{
- return !os_queue_put(queue, &item, configuration.put_wait, nullptr);
+ return !os_queue_put(queue, &item, dontBlock ? 0 : configuration.put_wait, nullptr);
}
void createQueue()
@@ -458,9 +425,9 @@ class ActiveObjectThreadQueue : public ActiveObjectQueue
return r;
}
- virtual bool put(Item& item) override
+ virtual bool put(Item& item, bool dontBlock) override
{
- bool r = ActiveObjectQueue::put(item);
+ bool r = ActiveObjectQueue::put(item, dontBlock);
if (r && _thread != OS_THREAD_INVALID_HANDLE) {
os_thread_notify(_thread, nullptr);
}
diff --git a/system/inc/channel.h b/system/inc/channel.h
deleted file mode 100644
index 6e3146a41d..0000000000
--- a/system/inc/channel.h
+++ /dev/null
@@ -1,852 +0,0 @@
-// Copyright 2014, Alex Horn. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-#ifndef CPP_CHANNEL_H
-#define CPP_CHANNEL_H
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-namespace cpp
-{
-
-namespace internal
-{
-
-#if __cplusplus <= 201103L
-// since C++14 in std, see Herb Sutter's blog
-template
-std::unique_ptr make_unique(Args&& ...args)
-{
- return std::unique_ptr(new T(std::forward(args)...));
-}
-#else
- using std::make_unique;
-#endif
-
-template
-struct _is_exception_safe :
- std::integral_constant::value or
- std::is_nothrow_move_constructible::value>
-{};
-
-// Note that currently handshakes between send/receives inside selects
-// have higher priority compared to sends/receives outside selects.
-
-// TODO: investigate and ideally also discuss other scheduling algorithms
-template
-class _channel
-{
-static_assert(N < std::numeric_limits::max(),
- "N must be strictly less than the largest possible size_t value");
-
-private:
- std::mutex m_mutex;
- std::condition_variable m_send_begin_cv;
- std::condition_variable m_send_end_cv;
- std::condition_variable m_recv_cv;
-
- // FIFO order
- std::deque> m_queue;
-
- bool m_is_send_done;
- bool m_is_try_send_done;
- bool m_is_recv_ready;
- bool m_is_try_send_ready;
- bool m_is_try_recv_ready;
-
- bool is_full() const
- {
- return m_queue.size() > N;
- }
-
- // Is nonblocking receive and nonblocking send ready for handshake?
- bool is_try_ready() const
- {
- return m_is_try_recv_ready && m_is_try_send_ready;
- }
-
- // Block calling thread until queue becomes nonempty. While waiting
- // (i.e. queue is empty), give try_send() a chance to succeed.
- //
- // \pre: calling thread owns lock
- // \post: queue is nonempty and calling thread still owns lock
- void _pre_blocking_recv(std::unique_lock& lock)
- {
- m_is_recv_ready = true;
- m_recv_cv.wait(lock, [this]{ return !m_queue.empty(); });
-
- // TODO: support the case where both ends of a channel are inside a select
- assert(!is_try_ready());
- }
-
- // Pop front of queue and unblock one _send() (if any)
- //
- // \pre: calling thread must own lock and queue is nonempty
- // \post: calling thread doesn't own lock anymore, and protocol with
- // try_send() and try_recv() is fulfilled
- void _post_blocking_recv(std::unique_lock& lock)
- {
- // If queue is full, then there exists either a _send() waiting
- // for m_send_end_cv, or try_send() has just enqueued an element.
- //
- // In general, the converse is false: if there exists a blocking send,
- // then a nonblocking receive might have just dequeued an element,
- // i.e. queue is not full.
- assert(!is_full() || !m_is_send_done || !m_is_try_send_done);
-
- // blocking and nonblocking send can never occur simultaneously
- assert(m_is_try_send_done || m_is_send_done);
-
- m_queue.pop_front();
- assert(!is_full());
-
- // protocol with nonblocking calls
- m_is_try_send_done = true;
- m_is_recv_ready = false;
- m_is_try_recv_ready = false;
- m_is_try_send_ready = false;
-
- // Consider two concurrent _send() calls denoted by s and s'.
- //
- // Suppose s is waiting to enqueue an element (i.e. m_send_begin_cv),
- // whereas s' is waiting for an acknowledgment (i.e. m_send_end_cv)
- // that its previously enqueued element has been dequeued. Since s'
- // is waiting and the flag m_is_send_done is only modified by _send(),
- // m_is_send_done is false. Hence, we notify m_send_end_cv. This
- // causes s' to notify s, thereby allowing s to proceed if possible.
- //
- // Now suppose there is no such s' (say, due to the fact that the
- // queue never became full). Then, m_is_send_done == true. Thus,
- // m_send_begin_cv is notified and s proceeds if possible. Note
- // that if we hadn't notified s this way, then it could deadlock
- // in case that it waited on m_is_try_send_done to become true.
- if (m_is_send_done)
- {
- // unlock before notifying threads; otherwise, the
- // notified thread would unnecessarily block again
- lock.unlock();
-
- // nonblocking, see also note below about notifications
- m_send_begin_cv.notify_one();
- }
- else
- {
- // unlock before notifying threads; otherwise, the
- // notified thread would unnecessarily block again
- lock.unlock();
-
- // we rely on the following semantics of notify_one():
- //
- // if a notification n is issued to s (i.e. s is chosen from among
- // a set of threads waiting on a condition variable associated with
- // mutex m) but another thread t locks m before s wakes up (i.e. s
- // has not owned yet m after n had been issued), then n is retried
- // as soon as t unlocks m. The retries repeat until n arrives at s
- // meaning that s actually owns m and checks its wait guard.
- m_send_end_cv.notify_one();
- }
- }
-
- template
- void _send(U&&);
-
-public:
- // \pre: calling thread must own mutex()
- // \post: calling thread doesn't own mutex() anymore
- template
- bool try_send(std::unique_lock&, U&&);
-
- // \pre: calling thread must own mutex()
- // \post: calling thread doesn't own mutex() anymore
- std::pair> try_recv_ptr(
- std::unique_lock&);
-
- _channel(const _channel&) = delete;
-
- // Propagates exceptions thrown by std::condition_variable constructor
- _channel()
- : m_mutex(),
- m_send_begin_cv(),
- m_send_end_cv(),
- m_recv_cv(),
- m_queue(),
- m_is_send_done(true),
- m_is_try_send_done(true),
- m_is_recv_ready(false),
- m_is_try_send_ready(false),
- m_is_try_recv_ready(false) {}
-
- // channel lock
- std::mutex& mutex()
- {
- return m_mutex;
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(const T& t)
- {
- _send(t);
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(T&& t)
- {
- _send(std::move(t));
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- T recv();
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void recv(T&);
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- std::unique_ptr recv_ptr();
-};
-
-}
-
-template class ichannel;
-template class ochannel;
-
-/// Go-style concurrency
-
-/// Thread synchronization mechanism as in the Go language.
-/// As in Go, cpp::channel are first-class values.
-///
-/// Unlike Go, however, cpp::channels cannot be nil
-/// not closed. This simplifies the usage of the library.
-///
-/// The template arguments are as follows:
-///
-/// * T -- type of data to be communicated over channel
-/// * N is zero -- synchronous channel
-/// * N is positive -- asynchronous channel with queue size N
-///
-/// Note that cpp::channel::recv() is only supported if T is
-/// exception safe. This is automatically checked at compile time.
-/// If T is not exception safe, use any of the other receive member
-/// functions.
-///
-/// \see http://golang.org/ref/spec#Channel_types
-/// \see http://golang.org/ref/spec#Send_statements
-/// \see http://golang.org/ref/spec#Receive_operator
-/// \see http://golang.org/doc/effective_go.html#chan_of_chan
-template
-class channel
-{
-static_assert(N < std::numeric_limits::max(),
- "N must be strictly less than the largest possible size_t value");
-
-private:
- friend class ichannel;
- friend class ochannel;
-
- std::shared_ptr> m_channel_ptr;
-
-public:
- channel(const channel& other) noexcept
- : m_channel_ptr(other.m_channel_ptr) {}
-
- // Propagates exceptions thrown by std::condition_variable constructor
- channel()
- : m_channel_ptr(std::make_shared>()) {}
-
- channel& operator=(const channel& other) noexcept
- {
- m_channel_ptr = other.m_channel_ptr;
- return *this;
- }
-
- bool operator==(const channel& other) const noexcept
- {
- return m_channel_ptr == other.m_channel_ptr;
- }
-
- bool operator!=(const channel& other) const noexcept
- {
- return m_channel_ptr != other.m_channel_ptr;
- }
-
- bool operator==(const ichannel&) const noexcept;
- bool operator!=(const ichannel&) const noexcept;
-
- bool operator==(const ochannel&) const noexcept;
- bool operator!=(const ochannel&) const noexcept;
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(const T& t)
- {
- m_channel_ptr->send(t);
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(T&& t)
- {
- m_channel_ptr->send(std::move(t));
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- T recv()
- {
- static_assert(internal::_is_exception_safe::value,
- "Cannot guarantee exception safety, use another recv operator");
-
- return m_channel_ptr->recv();
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- std::unique_ptr recv_ptr()
- {
- return m_channel_ptr->recv_ptr();
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void recv(T& t)
- {
- m_channel_ptr->recv(t);
- }
-};
-
-class select;
-
-/// Can only be used to receive elements of type T
-template
-class ichannel
-{
-private:
- friend class select;
- friend class channel;
- std::shared_ptr> m_channel_ptr;
-
-public:
- ichannel(const channel& other) noexcept
- : m_channel_ptr(other.m_channel_ptr) {}
-
- ichannel(const ichannel& other) noexcept
- : m_channel_ptr(other.m_channel_ptr) {}
-
- ichannel(ichannel&& other) noexcept
- : m_channel_ptr(std::move(other.m_channel_ptr)) {}
-
- ichannel& operator=(const ichannel& other) noexcept
- {
- m_channel_ptr = other.m_channel_ptr;
- return *this;
- }
-
- bool operator==(const ichannel& other) const noexcept
- {
- return m_channel_ptr == other.m_channel_ptr;
- }
-
- bool operator!=(const ichannel& other) const noexcept
- {
- return m_channel_ptr != other.m_channel_ptr;
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- T recv()
- {
- static_assert(internal::_is_exception_safe::value,
- "Cannot guarantee exception safety, use another recv operator");
-
- return m_channel_ptr->recv();
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void recv(T& t)
- {
- m_channel_ptr->recv(t);
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- std::unique_ptr recv_ptr()
- {
- return m_channel_ptr->recv_ptr();
- }
-
-};
-
-/// Can only be used to send elements of type T
-template
-class ochannel
-{
-private:
- friend class select;
- friend class channel;
- std::shared_ptr> m_channel_ptr;
-
-public:
- ochannel(const channel& other) noexcept
- : m_channel_ptr(other.m_channel_ptr) {}
-
- ochannel(const ochannel& other) noexcept
- : m_channel_ptr(other.m_channel_ptr) {}
-
- ochannel(ochannel&& other) noexcept
- : m_channel_ptr(std::move(other.m_channel_ptr)) {}
-
- ochannel& operator=(const ochannel& other) noexcept
- {
- m_channel_ptr = other.m_channel_ptr;
- return *this;
- }
-
- bool operator==(const ochannel& other) const noexcept
- {
- return m_channel_ptr == other.m_channel_ptr;
- }
-
- bool operator!=(const ochannel& other) const noexcept
- {
- return m_channel_ptr != other.m_channel_ptr;
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(const T& t)
- {
- m_channel_ptr->send(t);
- }
-
- // Propagates exceptions thrown by std::condition_variable::wait()
- void send(T&& t)
- {
- m_channel_ptr->send(std::move(t));
- }
-};
-
-/// Go's select statement
-
-/// \see http://golang.org/ref/spec#Select_statements
-///
-/// \warning select objects must not be shared between threads
-///
-// TODO: investigate and ideally discuss pseudo-random distribution
-class select
-{
-private:
- template
- class try_send_nullary
- {
- private:
- template::type>
- static bool _run(ochannel& c, U&& u, NullaryFunction f)
- {
- internal::_channel& _c = *c.m_channel_ptr;
- std::unique_lock lock(_c.mutex(), std::defer_lock);
- if (lock.try_lock() && _c.try_send(lock, std::forward(u)))
- {
- assert(!lock.owns_lock());
- f();
- return true;
- }
-
- return false;
- }
-
- public:
- bool operator()(ochannel& c, const T& t, NullaryFunction f)
- {
- return _run(c, t, f);
- }
-
- bool operator()(ochannel& c, T&& t, NullaryFunction f)
- {
- return _run(c, std::move(t), f);
- }
- };
-
- template
- struct try_recv_nullary
- {
- bool operator()(ichannel& c, T& t, NullaryFunction f)
- {
- internal::_channel& _c = *c.m_channel_ptr;
- std::unique_lock lock(_c.mutex(), std::defer_lock);
- if (lock.try_lock())
- {
- std::pair> pair = _c.try_recv_ptr(lock);
- if (pair.first)
- {
- assert(!lock.owns_lock());
- t = *pair.second;
- f();
- return true;
- }
- }
-
- return false;
- }
- };
-
- template
- struct try_recv_unary
- {
- bool operator()(ichannel& c, UnaryFunction f)
- {
- internal::_channel& _c = *c.m_channel_ptr;
- std::unique_lock lock(_c.mutex(), std::defer_lock);
- if (lock.try_lock())
- {
- std::pair> pair = _c.try_recv_ptr(lock);
- if (pair.first)
- {
- assert(!lock.owns_lock());
- f(std::move(*pair.second));
- return true;
- }
- }
-
- return false;
- }
- };
-
- typedef std::function try_function;
- typedef std::vector try_functions;
- try_functions m_try_functions;
-
- std::mt19937 random_gen;
-
-public:
- select()
- : m_try_functions(),
- random_gen(std::time(nullptr)) {}
-
- /* send cases */
-
- template::type>
- select& send_only(channel c, T&& t)
- {
- return send_only(ochannel(c), std::forward(t));
- }
-
- template::type>
- select& send_only(ochannel c, T&& t)
- {
- return send(c, std::forward(t), [](){ /* skip */ });
- }
-
- template::type>
- select& send(channel c, T&& t, NullaryFunction f)
- {
- return send(ochannel(c), std::forward(t), f);
- }
-
- template::type>
- select& send(ochannel c, T&& t, NullaryFunction f)
- {
- m_try_functions.push_back(std::bind(
- try_send_nullary(), c, std::forward(t), f));
- return *this;
- }
-
- /* receive cases */
-
- template
- select& recv_only(channel c, T& t)
- {
- return recv_only(ichannel(c), t);
- }
-
- template
- select& recv_only(ichannel c, T& t)
- {
- return recv(c, t, [](){ /* skip */ });
- }
-
- template
- select& recv(channel c, T& t, NullaryFunction f)
- {
- return recv(ichannel(c), t, f);
- }
-
- template
- select& recv(ichannel c, T& t, NullaryFunction f)
- {
- m_try_functions.push_back(std::bind(
- try_recv_nullary(), c, std::ref(t), f));
- return *this;
- }
-
- template
- select& recv(channel c, UnaryFunction f)
- {
- return recv(ichannel(c), f);
- }
-
- template
- select& recv(ichannel c, UnaryFunction f)
- {
- m_try_functions.push_back(std::bind(
- try_recv_unary(), c, f));
- return *this;
- }
-
- /// Nonblocking like Go's select statement with default case
-
- /// Returns true if and only if exactly one case succeeded
- bool try_once()
- {
- const try_functions::size_type n = m_try_functions.size(), i = random_gen();
- for(try_functions::size_type j = 0; j < n; j++)
- {
- if (m_try_functions.at((i + j) % n)())
- return true;
- }
- return false;
- }
-
- void wait()
- {
- const try_functions::size_type n = m_try_functions.size();
- try_functions::size_type i = random_gen();
- for(;;)
- {
- i = (i + 1) % n;
- if (m_try_functions.at(i)())
- break;
- }
- }
-
- // Propagates any exception thrown by std::this_thread::sleep_for
- template
- void wait(const std::chrono::duration& sleep)
- {
- const try_functions::size_type n = m_try_functions.size();
- try_functions::size_type i = random_gen();
- for(;;)
- {
- i = (i + 1) % n;
- if (m_try_functions.at(i)())
- break;
-
- std::this_thread::sleep_for(sleep);
- }
- }
-};
-
-template
-bool channel::operator==(const ichannel& other) const noexcept
-{
- return m_channel_ptr == other.m_channel_ptr;
-}
-
-template
-bool channel::operator!=(const ichannel& other) const noexcept
-{
- return m_channel_ptr != other.m_channel_ptr;
-}
-
-template
-bool channel::operator==(const ochannel& other) const noexcept
-{
- return m_channel_ptr == other.m_channel_ptr;
-}
-
-template
-bool channel::operator!=(const ochannel& other) const noexcept
-{
- return m_channel_ptr != other.m_channel_ptr;
-}
-
-template
-template
-bool internal::_channel::try_send(
- std::unique_lock& lock, U&& u)
-{
- m_is_try_send_ready = true;
-
- // TODO: support the case where both ends of a channel are inside a select
- assert(!is_try_ready());
-
- if ((!m_is_send_done || !m_is_try_send_done || is_full() ||
- (0 == N - m_queue.size() && !m_is_recv_ready)))
- {
- // TODO: investigate potential LLVM libc++ RAII unlocking issue
- lock.unlock();
- return false;
- }
-
- assert(m_is_send_done);
- assert(m_is_try_send_done);
- assert(!is_full());
-
- // if enqueue should block, there must be a receiver waiting
- m_is_try_send_done = 0 < N - m_queue.size();
- assert(m_is_try_send_done || m_is_recv_ready);
-
- m_queue.emplace_back(std::this_thread::get_id(), std::forward(u));
-
- // Let v be the value enqueued by try_send(). If m_is_try_send_done
- // is false, no other sender (whether blocking or not) can enqueue a
- // value until a receiver has dequeued v, thereby ensuring the channel
- // FIFO order when the queue is filled up by try_send(). Moreover, in
- // that case, since m_is_try_send_done implies m_is_recv_ready, such a
- // receiver is guaranteed to exist, and it will reset m_is_try_send_done
- // to true so that other senders can make progress after v has been
- // dequeued. And by notifying m_recv_cv, other receivers waiting for
- // the queue to become nonempty can make progress as well.
- lock.unlock();
- m_recv_cv.notify_one();
- return true;
-}
-
-template
-std::pair> internal::_channel::try_recv_ptr(
- std::unique_lock& lock)
-{
- m_is_try_recv_ready = true;
-
- if (m_queue.empty())
- return std::make_pair(false, std::unique_ptr(nullptr));
-
- // If queue is full, then there exists either a _send() waiting
- // for m_send_end_cv, or try_send() has just enqueued an element.
- //
- // In general, the converse is false: if there exists a blocking send,
- // then a nonblocking receive might have just dequeued an element,
- // i.e. queue is not full.
- assert(!is_full() || !m_is_send_done || !m_is_try_send_done);
-
- // blocking and nonblocking send can never occur simultaneously
- assert(m_is_try_send_done || m_is_send_done);
-
- std::pair pair(std::move(m_queue.front()));
- assert(!is_full() || std::this_thread::get_id() != pair.first);
-
- // move/copy before pop_front() to ensure strong exception safety
- std::unique_ptr t_ptr(make_unique(std::move(pair.second)));
-
- m_queue.pop_front();
- assert(!is_full());
-
- // protocol with nonblocking calls
- m_is_try_send_done = true;
- m_is_try_recv_ready = false;
- m_is_try_send_ready = false;
-
- // see also explanation in _channel::_post_blocking_recv()
- if (m_is_send_done)
- {
- lock.unlock();
- m_send_begin_cv.notify_one();
- }
- else
- {
- lock.unlock();
- m_send_end_cv.notify_one();
- }
-
- return std::make_pair(true, std::move(t_ptr));
-}
-
-template
-template
-void internal::_channel::_send(U&& u)
-{
- // unlock before notifying threads; otherwise, the
- // notified thread would unnecessarily block again
- {
- // wait (if necessary) until queue is no longer full and any
- // previous _send() has successfully enqueued element
- std::unique_lock lock(m_mutex);
- m_send_begin_cv.wait(lock, [this]{ return m_is_send_done &&
- m_is_try_send_done && !is_full(); });
-
- assert(m_is_send_done);
- assert(m_is_try_send_done);
- assert(!is_full());
-
- // TODO: support the case where both ends of a channel are inside a select
- assert(!is_try_ready());
-
- m_queue.emplace_back(std::this_thread::get_id(), std::forward(u));
- m_is_send_done = false;
- }
-
- // nonblocking
- m_recv_cv.notify_one();
-
- // wait (if necessary) until u has been received by another thread
- {
- std::unique_lock lock(m_mutex);
-
- // It is enough to check !is_full() because m_is_send_done == false.
- // Therefore, no other thread could have caused the queue to fill up
- // during the brief time we didn't own the lock.
- //
- // Performance note: unblocks after at least N successful recv calls
- m_send_end_cv.wait(lock, [this]{ return !is_full(); });
- m_is_send_done = true;
- }
-
- // see also explanation in _channel::recv()
- m_send_begin_cv.notify_one();
-}
-
-template
-T internal::_channel::recv()
-{
- static_assert(internal::_is_exception_safe::value,
- "Cannot guarantee exception safety, use another recv operator");
-
- std::unique_lock lock(m_mutex);
- _pre_blocking_recv(lock);
-
- std::pair pair(std::move(m_queue.front()));
- assert(!is_full() || std::this_thread::get_id() != pair.first);
-
- _post_blocking_recv(lock);
- return std::move(pair.second);
-}
-
-template
-void internal::_channel::recv(T& t)
-{
- std::unique_lock lock(m_mutex);
- _pre_blocking_recv(lock);
-
- std::pair pair(std::move(m_queue.front()));
- assert(!is_full() || std::this_thread::get_id() != pair.first);
-
- // assignment before pop_front() to ensure strong exception safety
- t = std::move(pair.second);
- _post_blocking_recv(lock);
-}
-
-template
-std::unique_ptr internal::_channel::recv_ptr()
-{
- std::unique_lock lock(m_mutex);
- _pre_blocking_recv(lock);
-
- std::pair pair(std::move(m_queue.front()));
- assert(!is_full() || std::this_thread::get_id() != pair.first);
-
- // move/copy before pop_front() to ensure strong exception safety
- std::unique_ptr t_ptr(make_unique(std::move(pair.second)));
- _post_blocking_recv(lock);
- return t_ptr;
-}
-
-}
-
-#endif
\ No newline at end of file
diff --git a/system/inc/system_event.h b/system/inc/system_event.h
index a350bad032..3e6d8eab14 100644
--- a/system/inc/system_event.h
+++ b/system/inc/system_event.h
@@ -113,7 +113,8 @@ enum SystemEventsParam {
* Flags altering the behavior of the `system_notify_event()` function.
*/
enum SystemNotifyEventFlag {
- NOTIFY_SYNCHRONOUSLY = 0x01
+ NOTIFY_SYNCHRONOUSLY = 0x01, ///< Invoke the event handlers directly in the calling thread.
+ NOTIFY_DONT_BLOCK = 0x02 ///< Ignore the event if the event queue of the application thread is full.
};
#define SYSTEM_EVENT_CONTEXT_VERSION (2)
diff --git a/system/inc/system_threading.h b/system/inc/system_threading.h
index c48852bcf5..a5a912ebd0 100644
--- a/system/inc/system_threading.h
+++ b/system/inc/system_threading.h
@@ -86,6 +86,13 @@ os_mutex_recursive_t mutex_usb_serial();
return; \
}
+#define _THREAD_CONTEXT_ASYNC_TRY(thread, fn) \
+ if (thread.isStarted() && !thread.isCurrentThread()) { \
+ auto lambda = [=]() { (fn); }; \
+ thread.invoke_async(particle::FFL(lambda), true /* dontBlock */); \
+ return; \
+ }
+
// execute synchronously on the system thread. Since the parameter lifetime is
// assumed to be bound by the caller, the parameters don't need marshalling
// fn: the function call to perform. This is textually substitued into a lambda, with the
@@ -105,6 +112,7 @@ os_mutex_recursive_t mutex_usb_serial();
#else // !PLATFORM_THREADING
#define _THREAD_CONTEXT_ASYNC(thread, fn)
+#define _THREAD_CONTEXT_ASYNC_TRY(thread, fn)
#define _THREAD_CONTEXT_ASYNC_RESULT(thread, fn, result)
#define SYSTEM_THREAD_CONTEXT_SYNC(fn)
@@ -116,6 +124,7 @@ os_mutex_recursive_t mutex_usb_serial();
#define SYSTEM_THREAD_CONTEXT_ASYNC(fn) _THREAD_CONTEXT_ASYNC(particle::SystemThread, fn)
#define SYSTEM_THREAD_CONTEXT_ASYNC_RESULT(fn, result) _THREAD_CONTEXT_ASYNC_RESULT(particle::SystemThread, fn, result)
#define APPLICATION_THREAD_CONTEXT_ASYNC(fn) _THREAD_CONTEXT_ASYNC(particle::ApplicationThread, fn)
+#define APPLICATION_THREAD_CONTEXT_ASYNC_TRY(fn) _THREAD_CONTEXT_ASYNC_TRY(particle::ApplicationThread, fn)
#define APPLICATION_THREAD_CONTEXT_ASYNC_RESULT(fn, result) _THREAD_CONTEXT_ASYNC_RESULT(particle::ApplicationThread, fn, result)
// Perform an asynchronous function call if not on the system thread,
diff --git a/system/src/firmware_update.cpp b/system/src/firmware_update.cpp
index 8a6be95312..56dedafecc 100644
--- a/system/src/firmware_update.cpp
+++ b/system/src/firmware_update.cpp
@@ -259,7 +259,7 @@ int FirmwareUpdate::saveChunk(const char* chunkData, size_t chunkSize, size_t ch
// Generate a system event
fileDesc_.chunk_address = fileDesc_.file_address + chunkOffset;
fileDesc_.chunk_size = chunkSize;
- system_notify_event(firmware_update, firmware_update_progress, &fileDesc_);
+ system_notify_event(firmware_update, firmware_update_progress, &fileDesc_, nullptr /* fn */, nullptr /* fndata */, NOTIFY_DONT_BLOCK);
lastActiveTime_ = HAL_Timer_Get_Milli_Seconds();
return 0;
}
diff --git a/system/src/main.cpp b/system/src/main.cpp
index bb1f99c3d5..57f5954d14 100644
--- a/system/src/main.cpp
+++ b/system/src/main.cpp
@@ -31,6 +31,8 @@
// STATIC_ASSERT macro clashes with the nRF SDK
#define NO_STATIC_ASSERT
+#include
+
#include "debug.h"
#include "system_event.h"
#include "system_mode.h"
diff --git a/system/src/system_event.cpp b/system/src/system_event.cpp
index a84f5e1338..5dbbbae48f 100644
--- a/system/src/system_event.cpp
+++ b/system/src/system_event.cpp
@@ -17,14 +17,17 @@
******************************************************************************
*/
+#define NO_STATIC_ASSERT
+
#include "system_event.h"
#include "system_threading.h"
#include "interrupts_hal.h"
#include "system_task.h"
+#include "static_recursive_mutex.h"
+#include "scope_guard.h"
#include
#include "spark_wiring_vector.h"
#include "spark_wiring_interrupts.h"
-#include "spark_wiring_thread.h"
#include
namespace {
@@ -75,10 +78,20 @@ struct SystemEventSubscription {
// for now a simple implementation
spark::Vector subscriptions;
#if PLATFORM_THREADING
-RecursiveMutex sSubscriptionsMutex;
+StaticRecursiveMutex sSubscriptionsMutex;
#endif // PLATFORM_THREADING
-void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata) {
+void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata, bool isIsr) {
+#if PLATFORM_THREADING
+ if (!isIsr) {
+ sSubscriptionsMutex.lock();
+ }
+ SCOPE_GUARD({
+ if (!isIsr) {
+ sSubscriptionsMutex.unlock();
+ }
+ });
+#endif // PLATFORM_THREADING
for (SystemEventSubscription& subscription : subscriptions) {
subscription.notify(event, data, pointer);
}
@@ -87,13 +100,14 @@ void system_notify_event_impl(system_event_t event, uint32_t data, void* pointer
}
}
-void system_notify_event_async(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata) {
+void system_notify_event_async(system_event_t event, uint32_t data, void* pointer, void (*fn)(void* data), void* fndata, bool dontBlock = false) {
// run event notifications on the application thread
- APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata));
-#if PLATFORM_THREADING
- std::lock_guard lk(sSubscriptionsMutex);
-#endif // PLATFORM_THREADING
- system_notify_event_impl(event, data, pointer, fn, fndata);
+ if (dontBlock) {
+ APPLICATION_THREAD_CONTEXT_ASYNC_TRY(system_notify_event_async(event, data, pointer, fn, fndata, dontBlock));
+ } else {
+ APPLICATION_THREAD_CONTEXT_ASYNC(system_notify_event_async(event, data, pointer, fn, fndata, dontBlock));
+ }
+ system_notify_event_impl(event, data, pointer, fn, fndata, false /* isIsr */);
}
class SystemEventTask : public ISRTaskQueue::Task {
@@ -145,7 +159,7 @@ int system_subscribe_event(system_event_t events, system_event_handler_t* handle
// Modification of subscriptions normally happens from thread context, so for events generated outside ISR
// context, only mutex acquisition is sufficient to keep things thread safe (see system_notify_event_async())
#if PLATFORM_THREADING
- std::lock_guard lk(sSubscriptionsMutex);
+ std::lock_guard lk(sSubscriptionsMutex);
#endif // PLATFORM_THREADING
int r = 0;
ATOMIC_BLOCK() {
@@ -164,7 +178,7 @@ void system_unsubscribe_event(system_event_t events, system_event_handler_t* han
// Modification of subscriptions normally happens from thread context, so for events generated outside ISR
// context, only mutex acquisition is sufficient to keep things thread safe (see system_notify_event_async())
#if PLATFORM_THREADING
- std::lock_guard lk(sSubscriptionsMutex);
+ std::lock_guard lk(sSubscriptionsMutex);
#endif // PLATFORM_THREADING
ATOMIC_BLOCK() {
auto it = std::remove_if(subscriptions.begin(), subscriptions.end(), [events, handler, context](const SystemEventSubscription& sub) {
@@ -195,15 +209,16 @@ void system_notify_event(system_event_t event, uint32_t data, void* pointer, voi
unsigned flags) {
// TODO: Add an API that would allow user applications to control which event handlers can be
// executed synchronously, possibly in the context of an ISR
+ bool isIsr = hal_interrupt_is_isr();
if (flags & NOTIFY_SYNCHRONOUSLY) {
- system_notify_event_impl(event, data, pointer, fn, fndata);
- } else if (hal_interrupt_is_isr()) {
+ system_notify_event_impl(event, data, pointer, fn, fndata, isIsr);
+ } else if (isIsr) {
auto task = systemPoolNew(event, data, pointer, fn, fndata);
if (task) {
SystemISRTaskQueue.enqueue(task);
- };
+ }
} else {
- system_notify_event_async(event, data, pointer, fn, fndata);
+ system_notify_event_async(event, data, pointer, fn, fndata, flags & NOTIFY_DONT_BLOCK);
}
}
diff --git a/user/tests/wiring/no_fixture/thread.cpp b/user/tests/wiring/no_fixture/thread.cpp
index f433e1c611..83ddf7ef84 100644
--- a/user/tests/wiring/no_fixture/thread.cpp
+++ b/user/tests/wiring/no_fixture/thread.cpp
@@ -248,6 +248,24 @@ test(THREAD_08_newlib_reent_impure_ptr_changes_on_context_switch)
assertEqual((uintptr_t)testImpure, (uintptr_t)_impure_ptr);
}
+test(THREAD_09_dont_block_event_queue_option)
+{
+ if (system_thread_get_state(nullptr) != spark::feature::ENABLED) {
+ skip();
+ return;
+ }
+ ActiveObjectBase* app = (ActiveObjectBase*)system_internal(0, nullptr); // Returns application thread instance
+ test_val_fn1 = 0;
+ std::function fn = increment;
+ for (int i = 0; i < 20; ++i) {
+ assertTrue(app->invoke_async(fn, true /* dontBlock */));
+ }
+ assertFalse(app->invoke_async(fn, true));
+ while (Particle.process()) {
+ }
+ assertEqual((int)test_val_fn1, 20);
+}
+
// todo - test for SingleThreadedSection
diff --git a/wiring/inc/spark_wiring_print.h b/wiring/inc/spark_wiring_print.h
index adc2748f14..3af8efcf62 100644
--- a/wiring/inc/spark_wiring_print.h
+++ b/wiring/inc/spark_wiring_print.h
@@ -35,6 +35,7 @@
#include "spark_wiring_printable.h"
#include "spark_wiring_fixed_point.h"
+#include
#include
#include
@@ -65,9 +66,63 @@ class Print
size_t printNumber(unsigned long, uint8_t);
size_t printNumber(unsigned long long, uint8_t);
+
#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
- size_t printFloat(double, uint8_t);
+
+ static constexpr auto FLOAT_DEFAULT_FRACTIONAL_DIGITS = 2;
+
+ size_t printFloat(double number, uint8_t digits) {
+ size_t n = 0;
+
+ if (std::isnan(number)) {
+ return print("nan");
+ }
+ if (std::isinf(number)) {
+ return print("inf");
+ }
+ if (number > 4294967040.0) {
+ return print ("ovf"); // constant determined empirically
+ }
+ if (number <-4294967040.0) {
+ return print ("ovf"); // constant determined empirically
+ }
+
+ // Handle negative numbers
+ if (number < 0.0) {
+ n += print('-');
+ number = -number;
+ }
+
+ // Round correctly so that print(1.999, 2) prints as "2.00"
+ double rounding = 0.5;
+ for (uint8_t i = 0; i < digits; ++i) {
+ rounding /= 10.0;
+ }
+
+ number += rounding;
+
+ // Extract the integer part of the number and print it
+ unsigned long int_part = (unsigned long)number;
+ double remainder = number - (double)int_part;
+ n += print(int_part);
+
+ // Print the decimal point, but only if there are digits beyond
+ if (digits > 0) {
+ n += print(".");
+ }
+
+ // Extract digits from the remainder one at a time
+ while (digits-- > 0) {
+ remainder *= 10.0;
+ int toPrint = int(remainder);
+ n += print(toPrint);
+ remainder -= toPrint;
+ }
+
+ return n;
+ }
#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
+
size_t printVariant(const particle::Variant& var);
protected:
@@ -92,9 +147,15 @@ class Print
template ::value && (std::is_integral::value || std::is_convertible::value ||
std::is_convertible::value), int> = 0>
size_t print(T, int = DEC);
+
#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
- size_t print(float, int = 2);
- size_t print(double, int = 2);
+ size_t print(float n, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) {
+ return printFloat((double)n, digits);
+ }
+
+ size_t print(double n, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) {
+ return printFloat(n, digits);
+ }
#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
// Prevent implicit constructors of Variant from affecting overload resolution
@@ -115,9 +176,17 @@ class Print
n += println();
return n;
}
+
#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
- size_t println(float, int = 2);
- size_t println(double, int = 2);
+ size_t println(float num, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) {
+ return println((double)num, digits);
+ }
+
+ size_t println(double num, int digits = FLOAT_DEFAULT_FRACTIONAL_DIGITS) {
+ size_t n = print(num, digits);
+ n += println();
+ return n;
+ }
#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
template, int> = 0>
diff --git a/wiring/src/spark_wiring_print.cpp b/wiring/src/spark_wiring_print.cpp
index 779db026bf..1944cffac2 100644
--- a/wiring/src/spark_wiring_print.cpp
+++ b/wiring/src/spark_wiring_print.cpp
@@ -124,18 +124,6 @@ size_t Print::print(char c)
return write(c);
}
-#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
-size_t Print::print(float n, int digits)
-{
- return printFloat((double)n, digits);
-}
-
-size_t Print::print(double n, int digits)
-{
- return printFloat(n, digits);
-}
-#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
-
size_t Print::print(const Printable& x)
{
return x.printTo(*this);
@@ -167,20 +155,6 @@ size_t Print::println(char c)
return n;
}
-#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
-size_t Print::println(float num, int digits)
-{
- return println((double)num, digits);
-}
-
-size_t Print::println(double num, int digits)
-{
- size_t n = print(num, digits);
- n += println();
- return n;
-}
-#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
-
size_t Print::println(const Printable& x)
{
size_t n = print(x);
@@ -234,53 +208,6 @@ size_t Print::printNumber(unsigned long n, uint8_t base) {
return write(str);
}
-#ifndef PARTICLE_WIRING_PRINT_NO_FLOAT
-size_t Print::printFloat(double number, uint8_t digits)
-{
- size_t n = 0;
-
- if (isnan(number)) return print("nan");
- if (isinf(number)) return print("inf");
- if (number > 4294967040.0) return print ("ovf"); // constant determined empirically
- if (number <-4294967040.0) return print ("ovf"); // constant determined empirically
-
- // Handle negative numbers
- if (number < 0.0)
- {
- n += print('-');
- number = -number;
- }
-
- // Round correctly so that print(1.999, 2) prints as "2.00"
- double rounding = 0.5;
- for (uint8_t i=0; i 0) {
- n += print(".");
- }
-
- // Extract digits from the remainder one at a time
- while (digits-- > 0)
- {
- remainder *= 10.0;
- int toPrint = int(remainder);
- n += print(toPrint);
- remainder -= toPrint;
- }
-
- return n;
-}
-#endif // PARTICLE_WIRING_PRINT_NO_FLOAT
-
size_t Print::printVariant(const Variant& var) {
JSONStreamWriter writer(*this);
writeVariant(var, writer);