6
6
7
7
struct ACL_FIBER_SEM {
8
8
int num ;
9
+ int buf ;
9
10
unsigned flags ;
10
11
RING waiting ;
11
12
unsigned long tid ;
@@ -17,11 +18,18 @@ ACL_FIBER_SEM *acl_fiber_sem_create(int num)
17
18
}
18
19
19
20
ACL_FIBER_SEM * acl_fiber_sem_create2 (int num , unsigned flags )
21
+ {
22
+ int buf = (flags & ACL_FIBER_SEM_F_ASYNC ) ? 50000000 : 0 ;
23
+ return acl_fiber_sem_create3 (num , buf , flags );
24
+ }
25
+
26
+ ACL_FIBER_SEM * acl_fiber_sem_create3 (int num , int buf , unsigned flags )
20
27
{
21
28
ACL_FIBER_SEM * sem = (ACL_FIBER_SEM * ) mem_malloc (sizeof (ACL_FIBER_SEM ));
22
29
23
30
sem -> tid = 0 ;
24
31
sem -> num = num ;
32
+ sem -> buf = buf ;
25
33
sem -> flags = flags ;
26
34
ring_init (& sem -> waiting );
27
35
return sem ;
@@ -58,21 +66,14 @@ int acl_fiber_sem_waiters_num(ACL_FIBER_SEM *sem)
58
66
return ring_size (& sem -> waiting );
59
67
}
60
68
61
- int acl_fiber_sem_wait (ACL_FIBER_SEM * sem )
69
+ int acl_fiber_sem_timed_wait (ACL_FIBER_SEM * sem , int milliseconds )
62
70
{
63
71
ACL_FIBER * curr ;
64
72
EVENT * ev ;
65
73
66
74
if (sem -> tid == 0 ) {
67
75
sem -> tid = thread_self ();
68
76
}
69
- #if 0
70
- else if (sem -> tid != (unsigned long ) thread_self ()) {
71
- msg_error ("%s(%d): current tid=%lu, sem tid=%lu" ,
72
- __FUNCTION__ , __LINE__ , thread_self (), sem -> tid );
73
- return -1 ;
74
- }
75
- #endif
76
77
77
78
if (sem -> num > 0 ) {
78
79
sem -> num -- ;
@@ -84,35 +85,55 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
84
85
return -1 ;
85
86
}
86
87
87
- // Sanity check befor suspending.
88
+ if (milliseconds == 0 ) {
89
+ acl_fiber_set_errno (curr , FIBER_EAGAIN );
90
+ acl_fiber_set_error (FIBER_EAGAIN );
91
+ return -1 ;
92
+ }
93
+
94
+ // Sanity check before suspending.
88
95
if (acl_fiber_canceled (curr )) {
89
96
acl_fiber_set_error (curr -> errnum );
90
- //msg_info("%s(%d): fiber-%d be killed",
91
- // __FUNCTION__, __LINE__, acl_fiber_id(curr));
92
97
return -1 ;
93
98
}
94
99
95
- ring_prepend (& sem -> waiting , & curr -> me );
100
+ ring_prepend (& sem -> waiting , & curr -> me2 );
96
101
97
102
curr -> wstatus |= FIBER_WAIT_SEM ;
98
103
104
+ if (milliseconds > 0 ) {
105
+ fiber_timer_add (curr , (size_t ) milliseconds );
106
+ }
107
+
108
+ // Make sure to start wakeup_timers in fiber_io.c by the following:
109
+ // fiber_io_event -> fiber_io_check -> fiber_io_loop -> wakeup_timers.
99
110
ev = fiber_io_event ();
100
111
WAITER_INC (ev ); // Just for avoiding fiber_io_loop to exit
101
112
acl_fiber_switch ();
102
113
WAITER_DEC (ev );
103
114
115
+ if (milliseconds > 0 ) {
116
+ fiber_timer_del (curr );
117
+ }
118
+
104
119
curr -> wstatus &= ~FIBER_WAIT_SEM ;
105
120
106
121
/* If switch to me because other killed me, I should detach myself;
107
- * else if because other unlock, I'll be detached twice which is
108
- * hamless because RIGN can deal with it.
122
+ * else if because other unlock, I'll be detached twice which is
123
+ * harmless because RING can deal with it.
109
124
*/
110
125
ring_detach (& curr -> me );
126
+ ring_detach (& curr -> me2 );
111
127
112
128
if (acl_fiber_canceled (curr )) {
113
129
acl_fiber_set_error (curr -> errnum );
114
- //msg_info("%s(%d): fiber-%d be killed",
115
- // __FUNCTION__, __LINE__, acl_fiber_id(curr));
130
+ return -1 ;
131
+ } else if (curr -> flag & FIBER_F_TIMER ) {
132
+ // Clear FIBER_F_TIMER flag been set in wakeup_timers.
133
+ curr -> flag &= ~FIBER_F_TIMER ;
134
+
135
+ acl_fiber_set_errno (curr , FIBER_EAGAIN );
136
+ acl_fiber_set_error (FIBER_EAGAIN );
116
137
return -1 ;
117
138
}
118
139
@@ -122,18 +143,16 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
122
143
return sem -> num ;
123
144
}
124
145
146
+ int acl_fiber_sem_wait (ACL_FIBER_SEM * sem )
147
+ {
148
+ return acl_fiber_sem_timed_wait (sem , -1 );
149
+ }
150
+
125
151
int acl_fiber_sem_trywait (ACL_FIBER_SEM * sem )
126
152
{
127
153
if (sem -> tid == 0 ) {
128
154
sem -> tid = thread_self ();
129
155
}
130
- #if 0
131
- else if (sem -> tid != thread_self ()) {
132
- msg_error ("%s(%d): current tid=%lu, sem tid=%lu" ,
133
- __FUNCTION__ , __LINE__ , thread_self (), sem -> tid );
134
- return -1 ;
135
- }
136
- #endif
137
156
138
157
if (sem -> num > 0 ) {
139
158
sem -> num -- ;
@@ -144,7 +163,7 @@ int acl_fiber_sem_trywait(ACL_FIBER_SEM *sem)
144
163
}
145
164
146
165
#define RING_TO_FIBER (r ) \
147
- ((ACL_FIBER *) ((char *) (r) - offsetof(ACL_FIBER, me )))
166
+ ((ACL_FIBER *) ((char *) (r) - offsetof(ACL_FIBER, me2 )))
148
167
149
168
#define FIRST_FIBER (head ) \
150
169
(ring_succ(head) != (head) ? RING_TO_FIBER(ring_succ(head)) : 0)
@@ -168,16 +187,19 @@ int acl_fiber_sem_post(ACL_FIBER_SEM *sem)
168
187
sem -> num ++ ;
169
188
170
189
if ((ready = FIRST_FIBER (& sem -> waiting )) == NULL ) {
190
+ if (sem -> num >= sem -> buf ) {
191
+ acl_fiber_yield ();
192
+ }
171
193
return sem -> num ;
172
194
}
173
195
174
- ring_detach (& ready -> me );
196
+ ring_detach (& ready -> me2 );
175
197
FIBER_READY (ready );
176
198
177
199
/* Help the fiber to be wakeup to decrease the sem number. */
178
200
num = sem -> num -- ;
179
201
180
- if (!( sem -> flags & ACL_FIBER_SEM_F_ASYNC ) ) {
202
+ if (num >= sem -> buf ) {
181
203
acl_fiber_yield ();
182
204
}
183
205
0 commit comments