Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent OTA progress events from blocking system thread #2741

Merged
merged 6 commits into from Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions hal/src/newhal/static_recursive_mutex.h
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Particle Industries, Inc. All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/

#pragma once

class StaticRecursiveMutex {
public:
StaticRecursiveMutex() = default;

bool lock(unsigned timeout = 0) {
return true;
}

bool unlock() {
return true;
}
};
63 changes: 15 additions & 48 deletions system/inc/active_object.h
Expand Up @@ -29,7 +29,6 @@
#include <future>
#include <cstring>

#include "channel.h"
#include "concurrent_hal.h"
#include "hal_platform.h"

Expand Down Expand Up @@ -284,7 +283,7 @@ class ActiveObjectBase

// todo - concurrent queue should be a strategy so it's pluggable without requiring inheritance
virtual bool take(Item& item)=0;
virtual bool put(Item& item)=0;
virtual bool put(Item& item, bool dontBlock = false)=0;

/**
* Static thread entrypoint to run this active object loop.
Expand Down Expand Up @@ -316,16 +315,19 @@ class ActiveObjectBase
return started;
}

template<typename R> void invoke_async(const std::function<R(void)>& work)
template<typename R> bool invoke_async(const std::function<R(void)>& work, bool dontBlock = false)
{
auto task = new AsyncTask<R>(work);
if (task)
{
Item message = task;
if (!put(message))
delete task;
if (!task) {
return false;
}
}
Item message = task;
if (!put(message, dontBlock)) {
delete task;
return false;
}
return true;
}

template<typename R> SystemPromise<R>* invoke_future(const std::function<R(void)>& work)
{
Expand All @@ -344,41 +346,6 @@ class ActiveObjectBase

};


template <size_t queue_size=50>
class ActiveObjectChannel : public ActiveObjectBase
{
cpp::channel<Item, queue_size> _channel;

protected:

virtual bool take(Item& item) override
{
return cpp::select().recv_only(_channel, item).try_once();
}

virtual bool put(const Item& item) override
{
_channel.send(item);
return true;
}


public:

ActiveObjectChannel(ActiveObjectConfiguration& config) : ActiveObjectBase(config) {}

/**
* Start the asynchronous processing for this active object.
*/
void start()
{
_channel = cpp::channel<Item*, queue_size>();
start_thread();
}

};

class ActiveObjectQueue : public ActiveObjectBase
{
protected:
Expand All @@ -390,9 +357,9 @@ class ActiveObjectQueue : public ActiveObjectBase
return !os_queue_take(queue, &result, configuration.take_wait, nullptr);
}

virtual bool put(Item& item)
virtual bool put(Item& item, bool dontBlock)
{
return !os_queue_put(queue, &item, configuration.put_wait, nullptr);
return !os_queue_put(queue, &item, dontBlock ? 0 : configuration.put_wait, nullptr);
}

void createQueue()
Expand Down Expand Up @@ -458,9 +425,9 @@ class ActiveObjectThreadQueue : public ActiveObjectQueue
return r;
}

virtual bool put(Item& item) override
virtual bool put(Item& item, bool dontBlock) override
{
bool r = ActiveObjectQueue::put(item);
bool r = ActiveObjectQueue::put(item, dontBlock);
if (r && _thread != OS_THREAD_INVALID_HANDLE) {
os_thread_notify(_thread, nullptr);
}
Expand Down