Skip to content

Commit

Permalink
Merge pull request #2156 from particle-iot/feature/concurrent-queue-p…
Browse files Browse the repository at this point in the history
…eek-isr

Expose os_queue_peek() function, make sure that queues and semaphores can be accessed from ISR
  • Loading branch information
avtolstoy committed Jul 20, 2020
2 parents 208cab6 + 9630aa8 commit a252428
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 280 deletions.
9 changes: 9 additions & 0 deletions hal/inc/concurrent_hal.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ int os_queue_put(os_queue_t queue, const void* item, system_tick_t delay, void*
* @return
*/
int os_queue_take(os_queue_t queue, void* item, system_tick_t delay, void* reserved);

/**
* Return 0 on success.
* @param queue
* @param item
* @param delay
* @return
*/
int os_queue_peek(os_queue_t queue, void* item, system_tick_t delay, void* reserved);
int os_queue_destroy(os_queue_t queue, void* reserved);

int os_mutex_create(os_mutex_t* mutex);
Expand Down
1 change: 1 addition & 0 deletions hal/inc/hal_dynalib_concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ DYNALIB_FN(31, hal_concurrent, os_semaphore_destroy, int(os_semaphore_t))
DYNALIB_FN(32, hal_concurrent, os_semaphore_take, int(os_semaphore_t, system_tick_t, bool))
DYNALIB_FN(33, hal_concurrent, os_semaphore_give, int(os_semaphore_t, bool))
DYNALIB_FN(34, hal_concurrent, os_scheduler_get_state, os_scheduler_state_t(void*))
DYNALIB_FN(35, hal_concurrent, os_queue_peek, int(os_queue_t, void* item, system_tick_t, void*))
#endif // PLATFORM_THREADING

DYNALIB_END(hal_concurrent)
Expand Down
166 changes: 29 additions & 137 deletions hal/src/nRF52840/concurrent_hal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,107 +214,6 @@ int os_thread_notify(os_thread_t thread, void* reserved)
}
}

class ThreadQueue
{
QueueHandle_t queue;

public:

ThreadQueue(UBaseType_t max_size) {
queue = xQueueCreate(max_size, sizeof(TaskHandle_t));
}

~ThreadQueue() {
vQueueDelete(queue);
}


void enqueue(TicksType_t ticks=portMAX_DELAY) {
TaskHandle_t current = xTaskGetCurrentTaskHandle();
xQueueSend(queue, &current, ticks);
}

void sleep() {
enqueue();
vTaskSuspend(NULL);
}

bool wake() {
TaskHandle_t next;
if (xQueueReceive(queue, &next, 0)==pdTRUE) {
vTaskResume(next);
return true;
}
return false;
}

void wakeAll()
{
while (wake()) {}
}
};


class Lock
{
SemaphoreHandle_t semaphore;

public:

Lock() {
semaphore = xSemaphoreCreateMutex();
}

void acquire() {
while (xSemaphoreTake(semaphore, portMAX_DELAY)==pdFALSE);
}

void release() {
xSemaphoreGive(semaphore);
}

~Lock() {
vSemaphoreDelete(semaphore);
}
};


class ConditionVariable
{
ThreadQueue queue;

typedef std::unique_lock<std::mutex> lock_t;
public:
ConditionVariable(UBaseType_t max_size) : queue(max_size) {}

void wait(lock_t* lock)
{
taskENTER_CRITICAL();
lock->unlock();
queue.enqueue();
taskEXIT_CRITICAL();
vTaskSuspend(NULL);
lock->lock();
}

void signal()
{
taskENTER_CRITICAL();
queue.wake();
taskEXIT_CRITICAL();
}

void broadcast()
{
taskENTER_CRITICAL();
queue.wakeAll();
taskEXIT_CRITICAL();
}
};

static_assert(sizeof(__gthread_cond_t)==sizeof(ConditionVariable), "__gthread_cond_t must be the same size as ConditionVariable");


bool __gthread_equal(__gthread_t t1, __gthread_t t2)
{
return t1==t2;
Expand All @@ -325,37 +224,6 @@ __gthread_t __gthread_self()
return xTaskGetCurrentTaskHandle();
}

int os_condition_variable_create(condition_variable_t* cond)
{
return (*cond = new ConditionVariable(10))==NULL;
}

void os_condition_variable_destroy(condition_variable_t cond)
{
ConditionVariable* cv = (ConditionVariable*)cond;
cv->~ConditionVariable();
}

void os_condition_variable_wait(condition_variable_t cond, void* v)
{
std::unique_lock<std::mutex>* lock = (std::unique_lock<std::mutex>*)v;
ConditionVariable* cv = (ConditionVariable*)cond;
cv->wait(lock);
}

void os_condition_variable_notify_one(condition_variable_t cond)
{
ConditionVariable* cv = (ConditionVariable*)cond;
cv->signal();
}

void os_condition_variable_notify_all(condition_variable_t cond)
{
ConditionVariable* cv = (ConditionVariable*)cond;
cv->broadcast();
}


int os_queue_create(os_queue_t* queue, size_t item_size, size_t item_count, void*)
{
*queue = xQueueCreate(item_count, item_size);
Expand All @@ -366,19 +234,36 @@ static_assert(portMAX_DELAY==CONCURRENT_WAIT_FOREVER, "expected portMAX_DELAY==C

int os_queue_put(os_queue_t queue, const void* item, system_tick_t delay, void*)
{
if (HAL_IsISR()) {
if (!HAL_IsISR()) {
return xQueueSend(static_cast<QueueHandle_t>(queue), item, delay)!=pdTRUE;
} else {
BaseType_t woken = pdFALSE;
int res = xQueueSendFromISR(static_cast<QueueHandle_t>(queue), item, &woken) != pdTRUE;
portYIELD_FROM_ISR(woken);
return res;
} else {
return xQueueSend(static_cast<QueueHandle_t>(queue), item, delay)!=pdTRUE;
}
}

int os_queue_take(os_queue_t queue, void* item, system_tick_t delay, void*)
{
return xQueueReceive(static_cast<QueueHandle_t>(queue), item, delay)!=pdTRUE;
if (!HAL_IsISR()) {
return xQueueReceive(static_cast<QueueHandle_t>(queue), item, delay)!=pdTRUE;
} else {
BaseType_t woken = pdFALSE;
int res = xQueueReceiveFromISR(static_cast<QueueHandle_t>(queue), item, &woken) != pdTRUE;
portYIELD_FROM_ISR(woken);
return res;
}
}

int os_queue_peek(os_queue_t queue, void* item, system_tick_t delay, void*)
{
if (!HAL_IsISR()) {
return xQueuePeek(static_cast<QueueHandle_t>(queue), item, delay)!=pdTRUE;
} else {
// Delay is ignored
return xQueuePeekFromISR(static_cast<QueueHandle_t>(queue), item)!=pdTRUE;
}
}

int os_queue_destroy(os_queue_t queue, void*)
Expand Down Expand Up @@ -471,7 +356,14 @@ int os_semaphore_destroy(os_semaphore_t semaphore)

int os_semaphore_take(os_semaphore_t semaphore, system_tick_t timeout, bool reserved)
{
return (xSemaphoreTake(static_cast<SemaphoreHandle_t>(semaphore), timeout)!=pdTRUE);
if (!HAL_IsISR()) {
return (xSemaphoreTake(static_cast<SemaphoreHandle_t>(semaphore), timeout)!=pdTRUE);
} else {
BaseType_t woken = pdFALSE;
int res = xSemaphoreTakeFromISR(static_cast<SemaphoreHandle_t>(semaphore), &woken) != pdTRUE;
portYIELD_FROM_ISR(woken);
return res;
}
}

int os_semaphore_give(os_semaphore_t semaphore, bool reserved)
Expand Down
Loading

0 comments on commit a252428

Please sign in to comment.