Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

selectors: Plumb in kqueue user events instead of pipes for custom events #19587

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
136 changes: 100 additions & 36 deletions lib/pure/ioselects/ioselectors_kqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,31 @@ else:
sock: cint
Selector*[T] = ref SelectorImpl[T]

type
SelectEventImpl = object
rfd: cint
wfd: cint

SelectEvent* = ptr SelectEventImpl
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.
when defined(kqueueUserEvent):
when hasThreadSupport:
type
SelectEventImpl = object
uid: int
fdi: int
kq: cint
SelectEvent* = ptr SelectEventImpl
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.
else:
type
SelectEventImpl = object
uid: int
fdi: int
kq: cint
SelectEvent* = ref SelectEventImpl
else:
type
SelectEventImpl = object
rfd: cint
wfd: cint
SelectEvent* = ptr SelectEventImpl
# SelectEvent is declared as `ptr` to be placed in `shared memory`,
# so you can share one SelectEvent handle between threads.

proc getUnique[T](s: Selector[T]): int {.inline.} =
# we create duplicated handles to get unique indexes for our `fds` array.
Expand Down Expand Up @@ -131,27 +148,55 @@ proc close*[T](s: Selector[T]) =
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())

proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) != 0:
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rfd = fds[0]
result.wfd = fds[1]

proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())

proc close*(ev: SelectEvent) =
let res1 = posix.close(ev.rfd)
let res2 = posix.close(ev.wfd)
deallocShared(cast[pointer](ev))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
when defined(kqueueUserEvent):
import std/atomics
var userEventsCount: Atomic[int]

proc newSelectEvent*(): SelectEvent =
new(result)
result.uid = 1000 + userEventsCount.fetchAdd(1)
result.kq = -1

proc trigger*(ev: SelectEvent) =
if ev.kq < 0:
raiseIOSelectorsError("event not registered with a select event")
# echo "trigger SelectEvent: ", repr(ev)
var ktrigger = KEvent(ident: ev.fdi.uint,
filter: EVFILT_USER,
flags: EV_ADD,
fflags: NOTE_TRIGGER,
data: 0, udata: nil)
# trigger USER event
if kevent(ev.kq, addr(ktrigger), 1, nil, 0, nil) == -1:
raiseIOSelectorsError(osLastError())

proc close*(ev: SelectEvent) =
ev.fdi = -1
ev.kq = -1
when hasThreadSupport:
deallocShared(cast[pointer](ev))
else:
proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
if posix.pipe(fds) != 0:
raiseIOSelectorsError(osLastError())
setNonBlocking(fds[0])
setNonBlocking(fds[1])
result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
result.rfd = fds[0]
result.wfd = fds[1]

proc trigger*(ev: SelectEvent) =
var data: uint64 = 1
if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64):
raiseIOSelectorsError(osLastError())

proc close*(ev: SelectEvent) =
let res1 = posix.close(ev.rfd)
let res2 = posix.close(ev.wfd)
deallocShared(cast[pointer](ev))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())

template checkFd(s, f) =
if f >= s.maxFD:
Expand Down Expand Up @@ -327,11 +372,19 @@ proc registerProcess*[T](s: Selector[T], pid: int,
result = fdi

proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
setKey(s, fdi, {Event.User}, 0, data)

modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
when defined(kqueueUserEvent):
let fdi = getUnique(s)
s.checkFd(fdi)
doAssert(s.fds[fdi].ident == InvalidIdent)
ev.fdi = fdi
ev.kq = s.kqFD
s.setKey(fdi, {Event.User}, ev.uid, data)
modifyKQueue(s, fdi.uint, EVFILT_USER, EV_ADD, 0, 0, nil)
else:
let fdi = ev.rfd.int
doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
setKey(s, fdi, {Event.User}, 0, data)
modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)

when not declared(CACHE_EVENTS):
flushKQueue(s)
Expand Down Expand Up @@ -420,15 +473,23 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
flushKQueue(s)
dec(s.count)
elif Event.User in pkey.events:
modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
const evtFilt =
when defined(kqueueUserEvent):
EVFILT_USER
else:
EVFILT_READ
modifyKQueue(s, uint(fdi), evtFilt, EV_DELETE, 0, 0, nil)
when not declared(CACHE_EVENTS):
flushKQueue(s)
dec(s.count)

clearKey(pkey)

proc unregister*[T](s: Selector[T], ev: SelectEvent) =
let fdi = int(ev.rfd)
when defined(kqueueUserEvent):
let fdi = int(ev.fdi)
else:
let fdi = int(ev.rfd)
s.checkFd(fdi)
var pkey = addr(s.fds[fdi])
doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
Expand Down Expand Up @@ -534,6 +595,9 @@ proc selectInto*[T](s: Selector[T], timeout: int,
# we are marking key with `Finished` event, to avoid double decrease.
pkey.events.incl(Event.Finished)
rkey.events.incl(Event.Timer)
of EVFILT_USER:
pkey = addr(s.fds[int(kevent.ident)])
rkey.events.incl(Event.User)
of EVFILT_VNODE:
pkey = addr(s.fds[int(kevent.ident)])
rkey.events.incl(Event.Vnode)
Expand Down