Skip to content

Commit 1bf1abd

Browse files
committed
fasthttp: Linux/epoll support
1 parent 1b09723 commit 1bf1abd

File tree

1 file changed

+284
-0
lines changed

1 file changed

+284
-0
lines changed

vlib/fasthttp/fasthttp_linux.v

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
module fasthttp
2+
3+
#include <sys/epoll.h>
4+
5+
// Epoll constants
6+
const epoll_ctl_add = 1
7+
const epoll_ctl_del = 2
8+
const epoll_ctl_mod = 3
9+
const epoll_in = 1
10+
const epoll_out = 4
11+
const epoll_err = 8
12+
const epoll_hup = 16
13+
const epoll_rdhup = 8192
14+
15+
union C.epoll_data {
16+
mut:
17+
ptr voidptr
18+
fd int
19+
u32 u32
20+
u64 u64
21+
}
22+
23+
struct C.epoll_event {
24+
mut:
25+
events u32
26+
data C.epoll_data
27+
}
28+
29+
fn C.epoll_create1(flags int) int
30+
fn C.epoll_ctl(epfd int, op int, fd int, event &C.epoll_event) int
31+
fn C.epoll_wait(epfd int, events &C.epoll_event, maxevents int, timeout int) int
32+
33+
// Helper to wrap epoll_ctl for cleaner code
34+
fn control_epoll(epfd int, op int, fd int, events u32, data voidptr) {
35+
mut ev := C.epoll_event{
36+
events: events
37+
}
38+
ev.data.ptr = data
39+
C.epoll_ctl(epfd, op, fd, &ev)
40+
}
41+
42+
// process_dones handles connections that have been processed by a worker thread.
43+
fn (mut s Server) process_dones() {
44+
C.pthread_mutex_lock(&s.worker_data.done_mutex)
45+
mut local_head := s.worker_data.done_head
46+
s.worker_data.done_head = unsafe { nil }
47+
s.worker_data.done_tail = unsafe { nil }
48+
C.pthread_mutex_unlock(&s.worker_data.done_mutex)
49+
50+
for local_head != unsafe { nil } {
51+
d := local_head
52+
local_head = d.next
53+
mut c := d.c
54+
c.write_buf = d.resp
55+
c.write_len = d.len
56+
c.write_pos = 0
57+
58+
// Try to write immediately
59+
write_ptr := unsafe { &u8(c.write_buf) + c.write_pos }
60+
written := C.write(c.fd, write_ptr, c.write_len - c.write_pos)
61+
if written > 0 {
62+
c.write_pos += int(written)
63+
} else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK {
64+
s.close_conn(c)
65+
unsafe { C.free(d) }
66+
continue
67+
}
68+
69+
if c.write_pos < c.write_len {
70+
// Not all data sent, add WRITE event
71+
// Note: The connection was removed from epoll before sending to worker, so we ADD here.
72+
control_epoll(s.poll_fd, epoll_ctl_add, c.fd, u32(epoll_out | epoll_rdhup), c)
73+
} else {
74+
// Response sent, re-enable reading for keep-alive
75+
C.free(c.write_buf)
76+
c.write_buf = unsafe { nil }
77+
78+
// Note: The connection was removed from epoll before sending to worker, so we ADD here.
79+
control_epoll(s.poll_fd, epoll_ctl_add, c.fd, u32(epoll_in | epoll_rdhup), c)
80+
c.read_len = 0
81+
}
82+
unsafe { C.free(d) }
83+
}
84+
}
85+
86+
// run starts the server and enters the main event loop (Epoll version).
87+
pub fn (mut s Server) run() ! {
88+
// Create server socket
89+
s.socket_fd = C.socket(.ip, .tcp, 0)
90+
if s.socket_fd < 0 {
91+
C.perror(c'socket')
92+
return error('socket creation failed')
93+
}
94+
95+
opt := 1
96+
C.setsockopt(s.socket_fd, C.SOL_SOCKET, C.SO_REUSEADDR, &opt, sizeof(int))
97+
98+
mut addr := C.sockaddr_in{}
99+
C.memset(&addr, 0, sizeof(addr))
100+
addr.sin_family = u16(C.AF_INET)
101+
addr.sin_port = u16(C.htons(u16(s.port)))
102+
103+
if C.bind(s.socket_fd, voidptr(&addr), sizeof(addr)) < 0 {
104+
C.perror(c'bind')
105+
return error('socket bind failed')
106+
}
107+
if C.listen(s.socket_fd, backlog) < 0 {
108+
C.perror(c'listen')
109+
return error('socket listen failed')
110+
}
111+
C.fcntl(s.socket_fd, C.F_SETFL, C.O_NONBLOCK)
112+
113+
// Create epoll instance
114+
s.poll_fd = C.epoll_create1(0)
115+
if s.poll_fd < 0 {
116+
C.perror(c'epoll_create1')
117+
return error('epoll creation failed')
118+
}
119+
120+
// Add listener socket to epoll
121+
// We pass the fd as the pointer value to identify it later.
122+
control_epoll(s.poll_fd, epoll_ctl_add, s.socket_fd, u32(epoll_in), voidptr(isize(s.socket_fd)))
123+
124+
// Initialize worker data
125+
C.pthread_mutex_init(&s.worker_data.task_mutex, unsafe { nil })
126+
C.pthread_cond_init(&s.worker_data.task_cond, unsafe { nil })
127+
C.pthread_mutex_init(&s.worker_data.done_mutex, unsafe { nil })
128+
129+
// Create wake pipe
130+
if C.pipe(&s.worker_data.wake_pipe[0]) < 0 {
131+
C.perror(c'pipe')
132+
return error('pipe creation failed')
133+
}
134+
C.fcntl(s.worker_data.wake_pipe[0], C.F_SETFL, C.O_NONBLOCK)
135+
C.fcntl(s.worker_data.wake_pipe[1], C.F_SETFL, C.O_NONBLOCK)
136+
137+
// Add wake pipe to epoll
138+
control_epoll(s.poll_fd, epoll_ctl_add, s.worker_data.wake_pipe[0], u32(epoll_in), voidptr(isize(s.worker_data.wake_pipe[0])))
139+
140+
// Create worker threads
141+
for i := 0; i < num_threads; i++ {
142+
C.pthread_create(&s.threads[i], unsafe { nil }, worker_func, s)
143+
}
144+
145+
println('Server listening on port ${s.port}')
146+
147+
// Event loop
148+
events := [64]C.epoll_event{}
149+
for {
150+
nev := C.epoll_wait(s.poll_fd, &events[0], 64, -1)
151+
if nev < 0 {
152+
C.perror(c'epoll_wait')
153+
break
154+
}
155+
156+
for i := 0; i < nev; i++ {
157+
event := events[i]
158+
ptr_val := isize(event.data.ptr)
159+
160+
// 1. Check for Listener Socket
161+
if ptr_val == s.socket_fd {
162+
client_fd := C.accept(s.socket_fd, unsafe { nil }, unsafe { nil })
163+
if client_fd < 0 {
164+
continue
165+
}
166+
mut new_c := unsafe { &Conn(C.malloc(sizeof(Conn))) }
167+
C.memset(new_c, 0, sizeof(Conn))
168+
new_c.fd = client_fd
169+
C.fcntl(new_c.fd, C.F_SETFL, C.O_NONBLOCK)
170+
171+
control_epoll(s.poll_fd, epoll_ctl_add, new_c.fd, u32(epoll_in | epoll_rdhup), new_c)
172+
continue
173+
}
174+
175+
// 2. Check for Wake Pipe (Worker finished a task)
176+
if ptr_val == s.worker_data.wake_pipe[0] {
177+
buf := [1024]u8{}
178+
for C.read(s.worker_data.wake_pipe[0], &buf[0], sizeof(buf)) > 0 {}
179+
s.process_dones()
180+
continue
181+
}
182+
183+
// 3. Client Connection
184+
mut c := unsafe { &Conn(event.data.ptr) }
185+
186+
// Handle Errors or HUP
187+
if (event.events & u32(epoll_err | epoll_hup | epoll_rdhup)) != 0 {
188+
s.close_conn(c)
189+
continue
190+
}
191+
192+
// Handle Read
193+
if (event.events & u32(epoll_in)) != 0 {
194+
n := C.read(c.fd, &c.read_buf[c.read_len], buf_size - c.read_len)
195+
if n <= 0 {
196+
if n < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK {
197+
s.close_conn(c)
198+
} else if n == 0 {
199+
s.close_conn(c)
200+
}
201+
continue
202+
}
203+
c.read_len += int(n)
204+
205+
header_end := C.memmem(&c.read_buf[0], c.read_len, c'\r\n\r\n', 4)
206+
if header_end == unsafe { nil } {
207+
if c.read_len >= buf_size {
208+
s.close_conn(c)
209+
}
210+
continue
211+
}
212+
213+
if C.memcmp(&c.read_buf[0], c'GET ', 4) != 0 {
214+
s.close_conn(c)
215+
continue
216+
}
217+
path_start := &c.read_buf[4]
218+
path_end := C.strchr(path_start, ` `)
219+
if path_end == unsafe { nil } {
220+
s.close_conn(c)
221+
continue
222+
}
223+
path_len := unsafe { path_end - path_start }
224+
225+
req := HttpRequest{
226+
buffer: c.read_buf[..c.read_len]
227+
method: Slice{
228+
buf: &c.read_buf[0]
229+
len: 3
230+
}
231+
path: Slice{
232+
buf: path_start
233+
len: path_len
234+
}
235+
client_conn_fd: c.fd
236+
}
237+
238+
c.read_len = 0
239+
240+
// Offload to worker thread
241+
// Remove from epoll so we don't trigger while worker is busy
242+
control_epoll(s.poll_fd, epoll_ctl_del, c.fd, 0, unsafe { nil })
243+
244+
mut t := unsafe { &Task(C.malloc(sizeof(Task))) }
245+
t.c = c
246+
t.req = req
247+
t.next = unsafe { nil }
248+
249+
C.pthread_mutex_lock(&s.worker_data.task_mutex)
250+
if s.worker_data.task_tail != unsafe { nil } {
251+
s.worker_data.task_tail.next = t
252+
} else {
253+
s.worker_data.task_head = t
254+
}
255+
s.worker_data.task_tail = t
256+
C.pthread_cond_signal(&s.worker_data.task_cond)
257+
C.pthread_mutex_unlock(&s.worker_data.task_mutex)
258+
259+
} else if (event.events & u32(epoll_out)) != 0 { // Handle Write
260+
write_ptr := unsafe { &u8(c.write_buf) + c.write_pos }
261+
written := C.write(c.fd, write_ptr, c.write_len - c.write_pos)
262+
if written > 0 {
263+
c.write_pos += int(written)
264+
} else if written < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK {
265+
s.close_conn(c)
266+
continue
267+
}
268+
269+
if c.write_pos >= c.write_len {
270+
C.free(c.write_buf)
271+
c.write_buf = unsafe { nil }
272+
// Done writing, modify epoll to stop listening for OUT and start listening for IN
273+
control_epoll(s.poll_fd, epoll_ctl_mod, c.fd, u32(epoll_in | epoll_rdhup), c)
274+
c.read_len = 0
275+
}
276+
}
277+
}
278+
}
279+
280+
C.close(s.socket_fd)
281+
C.close(s.poll_fd)
282+
C.close(s.worker_data.wake_pipe[0])
283+
C.close(s.worker_data.wake_pipe[1])
284+
}

0 commit comments

Comments
 (0)