-
Notifications
You must be signed in to change notification settings - Fork 69
/
threadSafeQueue.c
137 lines (122 loc) · 3.39 KB
/
threadSafeQueue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include "pharovm/threadSafeQueue/threadSafeQueue.h"
#include "pharovm/semaphores/platformSemaphore.h"
#include <stdio.h>
#include <stdlib.h>
/**
* A queue implemented as a linked list with a link to the first and last nodes
* A mutex serves to mark a critical region and synchronize changes to the queue
* A semaphore serves to indicate if there are elements to take from the queue and block if there are none
**/
struct __TSQueue {
struct __TSQueueNode *first;
struct __TSQueueNode *last;
Semaphore *mutex;
Semaphore *semaphore;
};
/**
* A node in the queue
**/
typedef struct __TSQueueNode {
void *element;
struct __TSQueueNode *next;
} TSQueueNode;
/**
* Create a new queue in heap
* Returns a pointer to the newly created queue
**/
TSQueue *threadsafe_queue_new(Semaphore *semaphore) {
Semaphore* mutex;
mutex = platform_semaphore_new(1);
if (mutex == NULL) {
perror("mutex initialization error in make_queue");
return 0;
}
TSQueue *queue = (TSQueue *) malloc(sizeof(TSQueue));
queue->mutex = mutex;
queue->semaphore = semaphore;
queue->first = NULL;
queue->last = NULL;
return queue;
}
/**
* Free a queue in heap and all its nodes
* Does not free the elements pointed by the nodes, as they are owned by the user
* Fails if pointer is invalid
**/
void threadsafe_queue_free(TSQueue *queue) {
Semaphore *mutex = queue->mutex;
platform_semaphore_wait(mutex);
TSQueueNode *node = queue->first;
TSQueueNode *next_node = node;
while (node) {
next_node = node->next;
free(node);
node = next_node;
}
free(queue);
platform_semaphore_signal(mutex);
//TODO: shouldn't we free the mutex here? if so, we should check if the mutex is alive after every wait
}
/**
* Return the number of elements in the queue
**/
int threadsafe_queue_size(TSQueue *queue) {
int size = 0;
TSQueueNode *node;
platform_semaphore_wait(queue->mutex);
node = queue->first;
while(node){
size++;
node = node->next;
}
platform_semaphore_signal(queue->mutex);
return size;
}
/**
* Put an element at the end of in the thread safe queue
* Only one process may modify the queue at a single point in time
* Allocates a new node and puts the element into it
**/
void threadsafe_queue_put(TSQueue *queue, void *element) {
TSQueueNode *node = (TSQueueNode *) malloc(sizeof(TSQueueNode));
node->element = element;
node->next = NULL;
platform_semaphore_wait(queue->mutex);
if (!queue->first) {
queue->first = node;
queue->last = node;
} else {
queue->last->next = node;
queue->last = node;
}
platform_semaphore_signal(queue->mutex);
queue->semaphore->signal(queue->semaphore);
}
/**
* Take the first element from the thread safe queue
* Blocks the calling thread if there are none elements
*
* Only one process may modify the queue at a single point in time
* Frees the node and returns the element stored in it
**/
void *threadsafe_queue_take(TSQueue *queue) {
//Block until the queue has elements
if (queue->semaphore->wait(queue->semaphore) != 0){
perror("Failed semaphore wait on thread safe queue");
return NULL;
}
TSQueueNode *node = queue->first;
if(node == NULL)
return NULL;
void *element = node->element;
platform_semaphore_wait(queue->mutex);
if (queue->first == queue->last) {
queue->first = NULL;
queue->last = NULL;
} else {
queue->first = node->next;
}
platform_semaphore_signal(queue->mutex);
free(node);
return element;
}