Skip to content

Commit

Permalink
KTOR-5822 Move fd_set usages to interop (#3686)
Browse files Browse the repository at this point in the history
* KTOR-5822 Move fd_set usages to interop
  • Loading branch information
e5l committed Jul 20, 2023
1 parent 6532273 commit 88119ce
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import platform.darwin.*
import platform.posix.*
import kotlin.Byte

internal actual fun pselectBridge(
descriptor: Int,
readSet: CPointer<fd_set>,
writeSet: CPointer<fd_set>,
errorSet: CPointer<fd_set>
): Int = pselect(descriptor, readSet, writeSet, errorSet, null, null)

internal actual fun inetNtopBridge(
type: Int,
address: CPointer<*>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import kotlinx.cinterop.*
import platform.linux.*
import platform.posix.*

internal actual fun pselectBridge(
descriptor: Int,
readSet: CPointer<fd_set>,
writeSet: CPointer<fd_set>,
errorSet: CPointer<fd_set>
): Int = pselect(descriptor, readSet, writeSet, errorSet, null, null)

internal actual fun inetNtopBridge(
type: Int,
address: CPointer<*>,
Expand Down
32 changes: 26 additions & 6 deletions ktor-network/nix/interop/network.def
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@ package=io.ktor.network.interop
---
#include <sys/select.h>
#include <arpa/inet.h>
#include <stdlib.h>

static inline void select_fd_add(int descriptor, fd_set* set) {
FD_SET(descriptor, set);
typedef struct selection_set {
fd_set *value;
} selection_set;

selection_set select_create_fd_set() {
fd_set *value = malloc(sizeof(fd_set));
selection_set result;
result.value = value;
return result;
}

static inline void select_fd_add(int descriptor, selection_set set) {
FD_SET(descriptor, set.value);
}

static inline void select_fd_clear(selection_set set) {
FD_ZERO(set.value);
}

static inline int select_fd_isset(int descriptor, selection_set set) {
return FD_ISSET(descriptor, set.value);
}

static inline void select_fd_clear(fd_set *set) {
FD_ZERO(set);
void selector_release_fd_set(selection_set set) {
free(set.value);
}

static inline int select_fd_isset(int descriptor, fd_set* set) {
return FD_ISSET(descriptor, set);
int selector_pselect(int descriptor, selection_set read_set, selection_set write_set, selection_set error_set) {
return pselect(descriptor, read_set.value, write_set.value, error_set.value, NULL, NULL);
}
107 changes: 52 additions & 55 deletions ktor-network/nix/src/io/ktor/network/selector/SelectUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,13 @@ import io.ktor.network.interop.*
import io.ktor.network.util.*
import io.ktor.util.*
import io.ktor.util.collections.*
import io.ktor.utils.io.*
import io.ktor.utils.io.errors.*
import kotlinx.cinterop.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import platform.posix.*
import kotlin.coroutines.*
import kotlin.math.*
import kotlin.native.concurrent.*

internal expect fun pselectBridge(
descriptor: Int,
readSet: CPointer<fd_set>,
writeSet: CPointer<fd_set>,
errorSet: CPointer<fd_set>
): Int

internal expect fun inetNtopBridge(type: Int, address: CPointer<*>, addressOf: CPointer<*>, size: Int)

Expand Down Expand Up @@ -70,52 +61,58 @@ internal class SelectorHelper {
wakeupSignal.signal()
}

private fun selectionLoop(): Unit = memScoped {
val readSet = alloc<fd_set>()
val writeSet = alloc<fd_set>()
val errorSet = alloc<fd_set>()
private fun selectionLoop() {
val readSet = select_create_fd_set()
val writeSet = select_create_fd_set()
val errorSet = select_create_fd_set()

val completed = mutableSetOf<EventInfo>()
val watchSet = mutableSetOf<EventInfo>()
val closeSet = mutableSetOf<Int>()
try {
val completed = mutableSetOf<EventInfo>()
val watchSet = mutableSetOf<EventInfo>()
val closeSet = mutableSetOf<Int>()

while (!interestQueue.isClosed) {
watchSet.add(wakeupSignalEvent)
var maxDescriptor = fillHandlers(watchSet, readSet, writeSet, errorSet)
if (maxDescriptor == 0) continue
while (!interestQueue.isClosed) {
watchSet.add(wakeupSignalEvent)
var maxDescriptor = fillHandlers(watchSet, readSet, writeSet, errorSet)
if (maxDescriptor == 0) continue

maxDescriptor = max(maxDescriptor + 1, wakeupSignalEvent.descriptor + 1)
maxDescriptor = max(maxDescriptor + 1, wakeupSignalEvent.descriptor + 1)

try {
pselectBridge(maxDescriptor + 1, readSet.ptr, writeSet.ptr, errorSet.ptr).check()
} catch (_: PosixException.BadFileDescriptorException) {
// Thrown if the descriptor was closed.
}
try {
selector_pselect(maxDescriptor + 1, readSet, writeSet, errorSet).check()
} catch (_: PosixException.BadFileDescriptorException) {
// Thrown if the descriptor was closed.
}

processSelectedEvents(watchSet, closeSet, completed, readSet, writeSet, errorSet)
}
processSelectedEvents(watchSet, closeSet, completed, readSet, writeSet, errorSet)
}

val exception = CancellationException("Selector closed")
while (!interestQueue.isEmpty) {
interestQueue.removeFirstOrNull()?.fail(exception)
}
val exception = CancellationException("Selector closed")
while (!interestQueue.isEmpty) {
interestQueue.removeFirstOrNull()?.fail(exception)
}

for (item in watchSet) {
item.fail(exception)
for (item in watchSet) {
item.fail(exception)
}
} finally {
selector_release_fd_set(readSet)
selector_release_fd_set(writeSet)
selector_release_fd_set(errorSet)
}
}

private fun fillHandlers(
watchSet: MutableSet<EventInfo>,
readSet: fd_set,
writeSet: fd_set,
errorSet: fd_set
readSet: CValue<selection_set>,
writeSet: CValue<selection_set>,
errorSet: CValue<selection_set>
): Int {
var maxDescriptor = 0

select_fd_clear(readSet.ptr)
select_fd_clear(writeSet.ptr)
select_fd_clear(errorSet.ptr)
select_fd_clear(readSet)
select_fd_clear(writeSet)
select_fd_clear(errorSet)

while (true) {
val event = interestQueue.removeFirstOrNull() ?: break
Expand All @@ -132,26 +129,26 @@ internal class SelectorHelper {

private fun addInterest(
event: EventInfo,
readSet: fd_set,
writeSet: fd_set,
errorSet: fd_set
readSet: CValue<selection_set>,
writeSet: CValue<selection_set>,
errorSet: CValue<selection_set>
) {
val set = descriptorSetByInterestKind(event, readSet, writeSet)

select_fd_add(event.descriptor, set.ptr)
select_fd_add(event.descriptor, errorSet.ptr)
select_fd_add(event.descriptor, set)
select_fd_add(event.descriptor, errorSet)

check(select_fd_isset(event.descriptor, set.ptr) != 0)
check(select_fd_isset(event.descriptor, errorSet.ptr) != 0)
check(select_fd_isset(event.descriptor, set) != 0)
check(select_fd_isset(event.descriptor, errorSet) != 0)
}

private fun processSelectedEvents(
watchSet: MutableSet<EventInfo>,
closeSet: MutableSet<Int>,
completed: MutableSet<EventInfo>,
readSet: fd_set,
writeSet: fd_set,
errorSet: fd_set
readSet: CValue<selection_set>,
writeSet: CValue<selection_set>,
errorSet: CValue<selection_set>
) {
while (true) {
val event = closeQueue.removeFirstOrNull() ?: break
Expand All @@ -166,13 +163,13 @@ internal class SelectorHelper {

val set = descriptorSetByInterestKind(event, readSet, writeSet)

if (select_fd_isset(event.descriptor, errorSet.ptr) != 0) {
if (select_fd_isset(event.descriptor, errorSet) != 0) {
completed.add(event)
event.fail(IOException("Fail to select descriptor ${event.descriptor} for ${event.interest}"))
continue
}

if (select_fd_isset(event.descriptor, set.ptr) == 0) continue
if (select_fd_isset(event.descriptor, set) == 0) continue

if (event.descriptor == wakeupSignal.selectionDescriptor) {
wakeupSignal.check()
Expand All @@ -194,9 +191,9 @@ internal class SelectorHelper {

private fun descriptorSetByInterestKind(
event: EventInfo,
readSet: fd_set,
writeSet: fd_set
): fd_set = when (event.interest) {
readSet: CValue<selection_set>,
writeSet: CValue<selection_set>
): CValue<selection_set> = when (event.interest) {
SelectInterest.READ -> readSet
SelectInterest.WRITE -> writeSet
SelectInterest.ACCEPT -> readSet
Expand Down

0 comments on commit 88119ce

Please sign in to comment.