/
loop.c
166 lines (127 loc) · 3.39 KB
/
loop.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#include "uwsgi.h"
extern struct uwsgi_server uwsgi;
struct wsgi_request *threaded_current_wsgi_req() {
return pthread_getspecific(uwsgi.tur_key);
}
struct wsgi_request *simple_current_wsgi_req() {
return uwsgi.wsgi_req;
}
void uwsgi_register_loop(char *name, void (*func) (void)) {
struct uwsgi_loop *old_loop = NULL, *loop = uwsgi.loops;
while (loop) {
// check if the loop engine is already registered
if (!strcmp(name, loop->name))
return;
old_loop = loop;
loop = loop->next;
}
loop = uwsgi_calloc(sizeof(struct uwsgi_loop));
loop->name = name;
loop->loop = func;
if (old_loop) {
old_loop->next = loop;
}
else {
uwsgi.loops = loop;
}
}
void *uwsgi_get_loop(char *name) {
struct uwsgi_loop *loop = uwsgi.loops;
while (loop) {
if (!strcmp(name, loop->name)) {
return loop->loop;
}
loop = loop->next;
}
return NULL;
}
/*
this is the default (simple) loop.
it will run simple_loop_run function for each spawned thread
simple_loop_run monitors sockets and signals descriptors
and manages them.
*/
void simple_loop() {
uwsgi_loop_cores_run(simple_loop_run);
}
void uwsgi_loop_cores_run(void *(*func) (void *)) {
int i;
for (i = 1; i < uwsgi.threads; i++) {
long j = i;
pthread_create(&uwsgi.workers[uwsgi.mywid].cores[i].thread_id, &uwsgi.threads_attr, func, (void *) j);
}
long y = 0;
func((void *) y);
}
void uwsgi_setup_thread_req(long core_id, struct wsgi_request *wsgi_req) {
int i;
sigset_t smask;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &i);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &i);
pthread_setspecific(uwsgi.tur_key, (void *) wsgi_req);
if (core_id > 0) {
// block all signals on new threads
sigfillset(&smask);
#ifdef UWSGI_DEBUG
sigdelset(&smask, SIGSEGV);
#endif
pthread_sigmask(SIG_BLOCK, &smask, NULL);
// run per-thread socket hook
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->proto_thread_fixup) {
uwsgi_sock->proto_thread_fixup(uwsgi_sock, core_id);
}
uwsgi_sock = uwsgi_sock->next;
}
for (i = 0; i < 256; i++) {
if (uwsgi.p[i]->init_thread) {
uwsgi.p[i]->init_thread(core_id);
}
}
}
}
void simple_loop_run_int(int core_id) {
long y = core_id;
simple_loop_run((void *) y);
}
void *simple_loop_run(void *arg1) {
long core_id = (long) arg1;
struct wsgi_request *wsgi_req = &uwsgi.workers[uwsgi.mywid].cores[core_id].req;
if (uwsgi.threads > 1) {
uwsgi_setup_thread_req(core_id, wsgi_req);
}
// initialize the main event queue to monitor sockets
int main_queue = event_queue_init();
uwsgi_add_sockets_to_queue(main_queue, core_id);
if (uwsgi.signal_socket > -1) {
event_queue_add_fd_read(main_queue, uwsgi.signal_socket);
event_queue_add_fd_read(main_queue, uwsgi.my_signal_socket);
}
// ok we are ready, let's start managing requests and signals
while (uwsgi.workers[uwsgi.mywid].manage_next_request) {
wsgi_req_setup(wsgi_req, core_id, NULL);
if (wsgi_req_accept(main_queue, wsgi_req)) {
continue;
}
if (wsgi_req_recv(main_queue, wsgi_req)) {
uwsgi_destroy_request(wsgi_req);
continue;
}
uwsgi_close_request(wsgi_req);
}
// end of the loop
if (uwsgi.workers[uwsgi.mywid].destroy && uwsgi.workers[0].pid > 0) {
#ifdef __APPLE__
kill(uwsgi.workers[0].pid, SIGTERM);
#else
if (uwsgi.propagate_touch) {
kill(uwsgi.workers[0].pid, SIGHUP);
}
else {
gracefully_kill(0);
}
#endif
}
return NULL;
}