Skip to content

Commit

Permalink
picoev, picohttparser: reimplement in V (#18506)
Browse files Browse the repository at this point in the history
  • Loading branch information
Casper64 committed Jul 12, 2023
1 parent 045adb6 commit a43064a
Show file tree
Hide file tree
Showing 16 changed files with 1,651 additions and 258 deletions.
2 changes: 0 additions & 2 deletions cmd/tools/check_os_api_parity.v
Expand Up @@ -24,8 +24,6 @@ const (
'crypto.rand',
'os.bare',
'os2',
'picohttpparser',
'picoev',
'szip',
'v.eval',
]
Expand Down
5 changes: 2 additions & 3 deletions cmd/tools/modules/testing/common.v
Expand Up @@ -625,9 +625,8 @@ pub fn prepare_test_session(zargs string, folder string, oskipped []string, main
continue
}
$if windows {
// skip pico and process/command examples on windows
if fnormalised.ends_with('examples/pico/pico.v')
|| fnormalised.ends_with('examples/process/command.v') {
// skip process/command examples on windows
if fnormalised.ends_with('examples/process/command.v') {
continue
}
}
Expand Down
17 changes: 11 additions & 6 deletions examples/pico/pico.v
Expand Up @@ -3,7 +3,7 @@ import picoev
import picohttpparser

const (
port = 8088
port = 8089
)

struct Message {
Expand All @@ -24,21 +24,25 @@ fn hello_response() string {
}

fn callback(data voidptr, req picohttpparser.Request, mut res picohttpparser.Response) {
if picohttpparser.cmpn(req.method, 'GET ', 4) {
if picohttpparser.cmp(req.path, '/t') {
if req.method == 'GET' {
if req.path == '/t' {
res.http_ok()
res.header_server()
res.header_date()
res.plain()
res.body(hello_response())
} else if picohttpparser.cmp(req.path, '/j') {
} else if req.path == '/j' {
res.http_ok()
res.header_server()
res.header_date()
res.json()
res.body(json_response())
} else {
res.http_404()
res.http_ok()
res.header_server()
res.header_date()
res.html()
res.body('Hello Picoev!\n')
}
} else {
res.http_405()
Expand All @@ -48,5 +52,6 @@ fn callback(data voidptr, req picohttpparser.Request, mut res picohttpparser.Res

fn main() {
println('Starting webserver on http://127.0.0.1:${port}/ ...')
picoev.new(port: port, cb: &callback).serve()
mut server := picoev.new(port: port, cb: callback)
server.serve()
}
2 changes: 1 addition & 1 deletion vlib/picoev/README.md
@@ -1,4 +1,4 @@
## Description:

`picoev` is a thin wrapper over [picoev](https://github.com/kazuho/picoev),
`picoev` is a V implementation of [picoev](https://github.com/kazuho/picoev),
which in turn is "A tiny, lightning fast event loop for network applications".
95 changes: 95 additions & 0 deletions vlib/picoev/loop_default.c.v
@@ -0,0 +1,95 @@
module picoev

$if windows {
#include <winsock2.h>
#include <ws2tcpip.h>
} $else {
#include <sys/select.h>
}

pub struct SelectLoop {
mut:
id int
now i64
}

type LoopType = SelectLoop

// create_select_loop creates a `SelectLoop` struct with `id`
pub fn create_select_loop(id int) !&SelectLoop {
return &SelectLoop{
id: id
}
}

[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds

pv.file_descriptors[fd].events = u32(events & picoev_readwrite)
return 0
}

[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
readfds, writefds, errorfds := C.fd_set{}, C.fd_set{}, C.fd_set{}

// setup
C.FD_ZERO(&readfds)
C.FD_ZERO(&writefds)
C.FD_ZERO(&errorfds)

mut maxfd := 0

// find the maximum socket for `select` and add sockets to the fd_sets
for target in pv.file_descriptors {
if target.loop_id == pv.loop.id {
if target.events & picoev_read != 0 {
C.FD_SET(target.fd, &readfds)
if maxfd < target.fd {
maxfd = target.fd
}
}
if target.events & picoev_write != 0 {
C.FD_SET(target.fd, &writefds)
if maxfd < target.fd {
maxfd = target.fd
}
}
}
}

// select and handle sockets if any
tv := C.timeval{
tv_sec: u64(max_wait)
tv_usec: 0
}
r := C.@select(maxfd + 1, &readfds, &writefds, &errorfds, &tv)
if r == -1 {
// timeout
return -1
} else if r > 0 {
for target in pv.file_descriptors {
if target.loop_id == pv.loop.id {
// vfmt off
read_events := (
(if C.FD_ISSET(target.fd, &readfds) { picoev_read } else { 0 })
|
(if C.FD_ISSET(target.fd, &writefds) { picoev_write } else { 0 })
)
// vfmt on
if read_events != 0 {
$if trace_fd ? {
eprintln('do callback ${target.fd}')
}

// do callback!
unsafe { target.cb(target.fd, read_events, &pv) }
}
}
}
}

return 0
}
203 changes: 203 additions & 0 deletions vlib/picoev/loop_freebsd.c.v
@@ -0,0 +1,203 @@
module picoev

#include <errno.h>
#include <sys/types.h>
#include <sys/event.h>

fn C.kevent(int, changelist voidptr, nchanges int, eventlist voidptr, nevents int, timout &C.timespec) int
fn C.kqueue() int
fn C.EV_SET(kev voidptr, ident int, filter i16, flags u16, fflags u32, data voidptr, udata voidptr)

pub struct C.kevent {
pub mut:
ident int
// uintptr_t
filter i16
flags u16
fflags u32
data voidptr
// intptr_t
udata voidptr
}

[heap]
pub struct KqueueLoop {
mut:
id int
now i64
kq_id int
// -1 if not changed
changed_fds int
events [1024]C.kevent
changelist [256]C.kevent
}

type LoopType = KqueueLoop

// create_kqueue_loop creates a new kernel event queue with loop_id=`id`
pub fn create_kqueue_loop(id int) !&KqueueLoop {
mut loop := &KqueueLoop{
id: id
}

loop.kq_id = C.kqueue()
if loop.kq_id == -1 {
return error('could not create kqueue loop!')
}
loop.changed_fds = -1
return loop
}

// ev_set sets a new `kevent` with file descriptor `index`
[inline]
pub fn (mut pv Picoev) ev_set(index int, operation int, events int) {
// vfmt off
filter := i16(
(if events & picoev_read != 0 { C.EVFILT_READ } else { 0 })
|
(if events & picoev_write != 0 { C.EVFILT_WRITE } else { 0 })
)
// vfmt on
C.EV_SET(&pv.loop.changelist[index], pv.loop.changed_fds, filter, operation, 0, 0,
0)
}

// backend_build uses the lower 8 bits to store the old events and the higher 8
// bits to store the next file descriptor in `Target.backend`
[inline]
fn backend_build(next_fd int, events u32) int {
return int((u32(next_fd) << 8) | (events & 0xff))
}

// get the lower 8 bits
[inline]
fn backend_get_old_events(backend int) int {
return backend & 0xff
}

// get the higher 8 bits
[inline]
fn backend_get_next_fd(backend int) int {
return backend >> 8
}

// apply pending processes all changes for the file descriptors and updates `loop.changelist`
// if `aplly_all` is `true` the changes are immediately applied
fn (mut pv Picoev) apply_pending_changes(apply_all bool) int {
mut total, mut nevents := 0, 0

for pv.loop.changed_fds != -1 {
mut target := pv.file_descriptors[pv.loop.changed_fds]
old_events := backend_get_old_events(target.backend)
if target.events != old_events {
// events have been changed
if old_events != 0 {
pv.ev_set(total, C.EV_DISABLE, old_events)
total++
}
if target.events != 0 {
pv.ev_set(total, C.EV_ADD | C.EV_ENABLE, int(target.events))
total++
}
// Apply the changes if the total changes exceed the changelist size
if total + 1 >= pv.loop.changelist.len {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL,
0, C.NULL)
assert nevents == 0
total = 0
}
}

pv.loop.changed_fds = backend_get_next_fd(target.backend)
target.backend = -1
}

if apply_all && total != 0 {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL, 0, C.NULL)
assert nevents == 0
total = 0
}

return total
}

[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds

mut target := pv.file_descriptors[fd]

// initialize if adding the fd
if events & picoev_add != 0 {
target.backend = -1
}

// return if nothing to do
if (events == picoev_del && target.backend == -1)
|| (events != picoev_del && events & picoev_readwrite == target.events) {
return 0
}

// add to changed list if not yet being done
if target.backend == -1 {
target.backend = backend_build(pv.loop.changed_fds, target.events)
pv.loop.changed_fds = fd
}

// update events
target.events = u32(events & picoev_readwrite)
// apply immediately if is a DELETE
if events & picoev_del != 0 {
pv.apply_pending_changes(true)
}

return 0
}

[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
ts := C.timespec{
tv_sec: max_wait
tv_nsec: 0
}

mut total, mut nevents := 0, 0
// apply changes later when the callback is called.
total = pv.apply_pending_changes(false)

nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, &pv.loop.events, pv.loop.events.len,
&ts)
if nevents == -1 {
// the errors we can only rescue
assert C.errno == C.EACCES || C.errno == C.EFAULT || C.errno == C.EINTR
return -1
}

for i := 0; i < nevents; i++ {
event := pv.loop.events[i]
target := pv.file_descriptors[event.ident]

// changelist errors are fatal
assert event.flags & C.EV_ERROR == 0

if pv.loop.id == target.loop_id && event.filter & (C.EVFILT_READ | C.EVFILT_WRITE) != 0 {
read_events := match int(event.filter) {
C.EVFILT_READ {
picoev_read
}
C.EVFILT_WRITE {
picoev_write
}
else {
0
}
}

// do callback!
unsafe { target.cb(target.fd, read_events, &pv) }
}
}

return 0
}

0 comments on commit a43064a

Please sign in to comment.