-
Notifications
You must be signed in to change notification settings - Fork 526
/
Copy pathfc_queue.h
235 lines (182 loc) · 6.11 KB
/
fc_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/*
* Copyright (c) 2020 YuQing <384681@qq.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the Lesser GNU General Public License, version 3
* or later ("LGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//fc_queue.h
#ifndef _FC_QUEUE_H
#define _FC_QUEUE_H
#include "common_define.h"
#include "fast_mblock.h"
struct fc_queue_info
{
void *head;
void *tail;
};
struct fc_queue
{
void *head;
void *tail;
pthread_lock_cond_pair_t lcp;
int next_ptr_offset;
};
#define FC_QUEUE_NEXT_PTR(queue, data) \
*((void **)(((char *)data) + (queue)->next_ptr_offset))
#ifdef __cplusplus
extern "C" {
#endif
int fc_queue_init(struct fc_queue *queue, const int next_ptr_offset);
void fc_queue_destroy(struct fc_queue *queue);
static inline void fc_queue_terminate(struct fc_queue *queue)
{
pthread_cond_signal(&queue->lcp.cond);
}
static inline void fc_queue_terminate_all(
struct fc_queue *queue, const int count)
{
int i;
for (i=0; i<count; i++) {
pthread_cond_signal(&(queue->lcp.cond));
}
}
#define fc_queue_notify(queue) fc_queue_terminate(queue)
#define fc_queue_notify_all(queue, count) \
fc_queue_terminate_all(queue, count)
//notify by the caller
void fc_queue_push_ex(struct fc_queue *queue, void *data, bool *notify);
int fc_queue_push_with_check_ex(struct fc_queue *queue,
void *data, bool *notify);
static inline void fc_queue_push(struct fc_queue *queue, void *data)
{
bool notify;
fc_queue_push_ex(queue, data, ¬ify);
if (notify) {
pthread_cond_signal(&(queue->lcp.cond));
}
}
static inline int fc_queue_push_with_check(struct fc_queue *queue, void *data)
{
int result;
bool notify;
result = fc_queue_push_with_check_ex(queue, data, ¬ify);
if (notify) {
pthread_cond_signal(&(queue->lcp.cond));
}
return result;
}
static inline void fc_queue_push_silence(struct fc_queue *queue, void *data)
{
bool notify;
fc_queue_push_ex(queue, data, ¬ify);
}
void fc_queue_push_queue_to_head_ex(struct fc_queue *queue,
struct fc_queue_info *qinfo, bool *notify);
static inline void fc_queue_push_queue_to_head(struct fc_queue *queue,
struct fc_queue_info *qinfo)
{
bool notify;
fc_queue_push_queue_to_head_ex(queue, qinfo, ¬ify);
if (notify) {
pthread_cond_signal(&(queue->lcp.cond));
}
}
static inline void fc_queue_push_queue_to_head_silence(
struct fc_queue *queue, struct fc_queue_info *qinfo)
{
bool notify;
fc_queue_push_queue_to_head_ex(queue, qinfo, ¬ify);
}
void fc_queue_push_queue_to_tail_ex(struct fc_queue *queue,
struct fc_queue_info *qinfo, bool *notify);
static inline void fc_queue_push_queue_to_tail(struct fc_queue *queue,
struct fc_queue_info *qinfo)
{
bool notify;
fc_queue_push_queue_to_tail_ex(queue, qinfo, ¬ify);
if (notify) {
pthread_cond_signal(&(queue->lcp.cond));
}
}
static inline void fc_queue_push_queue_to_tail_silence(
struct fc_queue *queue, struct fc_queue_info *qinfo)
{
bool notify;
fc_queue_push_queue_to_tail_ex(queue, qinfo, ¬ify);
}
void *fc_queue_pop_ex(struct fc_queue *queue, const bool blocked);
#define fc_queue_pop(queue) fc_queue_pop_ex(queue, true)
#define fc_queue_try_pop(queue) fc_queue_pop_ex(queue, false)
void *fc_queue_pop_all_ex(struct fc_queue *queue, const bool blocked);
#define fc_queue_pop_all(queue) fc_queue_pop_all_ex(queue, true)
#define fc_queue_try_pop_all(queue) fc_queue_pop_all_ex(queue, false)
void fc_queue_pop_to_queue_ex(struct fc_queue *queue,
struct fc_queue_info *qinfo, const bool blocked);
#define fc_queue_pop_to_queue(queue, qinfo) \
fc_queue_pop_to_queue_ex(queue, qinfo, true)
#define fc_queue_try_pop_to_queue(queue, qinfo) \
fc_queue_pop_to_queue_ex(queue, qinfo, false)
static inline bool fc_queue_empty(struct fc_queue *queue)
{
bool empty;
pthread_mutex_lock(&queue->lcp.lock);
empty = (queue->head == NULL);
pthread_mutex_unlock(&queue->lcp.lock);
return empty;
}
static inline int fc_queue_count(struct fc_queue *queue)
{
int count;
void *data;
count = 0;
pthread_mutex_lock(&queue->lcp.lock);
data = queue->head;
while (data != NULL)
{
++count;
data = FC_QUEUE_NEXT_PTR(queue, data);
}
pthread_mutex_unlock(&queue->lcp.lock);
return count;
}
static inline void *fc_queue_peek(struct fc_queue *queue)
{
void *data;
pthread_mutex_lock(&queue->lcp.lock);
data = queue->head;
pthread_mutex_unlock(&queue->lcp.lock);
return data;
}
void *fc_queue_timedpop(struct fc_queue *queue,
const int timeout, const int time_unit);
#define fc_queue_timedpop_sec(queue, timeout) \
fc_queue_timedpop(queue, timeout, FC_TIME_UNIT_SECOND)
#define fc_queue_timedpop_ms(queue, timeout_ms) \
fc_queue_timedpop(queue, timeout_ms, FC_TIME_UNIT_MSECOND)
#define fc_queue_timedpop_us(queue, timeout_us) \
fc_queue_timedpop(queue, timeout_us, FC_TIME_UNIT_USECOND)
void *fc_queue_timedpeek(struct fc_queue *queue,
const int timeout, const int time_unit);
#define fc_queue_timedpeek_sec(queue, timeout) \
fc_queue_timedpeek(queue, timeout, FC_TIME_UNIT_SECOND)
#define fc_queue_timedpeek_ms(queue, timeout_ms) \
fc_queue_timedpeek(queue, timeout_ms, FC_TIME_UNIT_MSECOND)
#define fc_queue_timedpeek_us(queue, timeout_us) \
fc_queue_timedpeek(queue, timeout_us, FC_TIME_UNIT_USECOND)
int fc_queue_alloc_chain(struct fc_queue *queue, struct fast_mblock_man
*mblock, const int count, struct fc_queue_info *chain);
int fc_queue_free_chain(struct fc_queue *queue, struct fast_mblock_man
*mblock, struct fc_queue_info *qinfo);
int fc_queue_remove(struct fc_queue *queue, void *data);
#ifdef __cplusplus
}
#endif
#endif