Skip to content

Commit

Permalink
epoll implementation for linux
Browse files Browse the repository at this point in the history
  • Loading branch information
trikko committed Apr 29, 2024
1 parent 62b82c7 commit 239a7f8
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 194 deletions.
33 changes: 28 additions & 5 deletions source/serverino/communicator.d
Expand Up @@ -25,6 +25,13 @@ OTHER DEALINGS IN THE SOFTWARE.

module serverino.communicator;

version(use_select) { version=with_select; }
else version(use_epoll) { version=with_epoll; }
else {
version(linux) version=with_epoll;
else version=with_select;
}

import serverino.common;
import serverino.databuffer;
import serverino.daemon : WorkerInfo, now;
Expand Down Expand Up @@ -135,6 +142,14 @@ package class Communicator

if (this.clientSkt !is null)
{
import serverino.daemon : Daemon;

version(with_epoll)
{
import core.sys.linux.epoll : EPOLLIN, EPOLLOUT;
Daemon.epollRemoveSocket(clientSkt);
}

// Remove the communicator from the list of alives
if (prev !is null) prev.next = next;
else alives = next;
Expand Down Expand Up @@ -171,9 +186,16 @@ package class Communicator
alives = this;

s.blocking = false;
}
this.clientSkt = s;

this.clientSkt = s;
version(with_epoll)
{
import serverino.daemon : Daemon;
import core.sys.linux.epoll : EPOLLIN, EPOLLOUT;
Daemon.epollAddSocket(s, EPOLLIN | EPOLLOUT, cast(void*) this);
}
}
else assert(false);
}

// Reset the communicator to the initial state and clear the requests queue
Expand Down Expand Up @@ -245,7 +267,7 @@ package class Communicator
}

// Write the buffered data to the client socket
void write()
void onWriteAvailable()
{
auto maxToSend = bufferSent + DEFAULT_BUFFER_SIZE;
if (maxToSend > sendBuffer.length) maxToSend = sendBuffer.length;
Expand Down Expand Up @@ -333,12 +355,12 @@ package class Communicator
else
{
sendBuffer.append(data);
write();
onWriteAvailable();
}
}

// Read the data from the client socket and parse the incoming data
void read()
void onReadAvailable()
{
import std.string: indexOf;

Expand Down Expand Up @@ -368,6 +390,7 @@ package class Communicator
// Read the data from the client socket if it's not buffered
// Set the requestDatareceived flag to true if the first data is read to check for timeouts
bytesRead = clientSkt.receive(buffer);
lastRecv = now;

if (bytesRead < 0)
{
Expand Down
26 changes: 23 additions & 3 deletions source/serverino/config.d
Expand Up @@ -202,8 +202,8 @@ struct ServerinoConfig
enum LISTEN_IPV4 = (p == ListenerProtocol.IPV4 || p == ListenerProtocol.BOTH);
enum LISTEN_IPV6 = (p == ListenerProtocol.IPV6 || p == ListenerProtocol.BOTH);

static if(LISTEN_IPV4) daemonConfig.listeners ~= Listener(daemonConfig.listeners.length, new InternetAddress(address, port));
static if(LISTEN_IPV6) daemonConfig.listeners ~= Listener(daemonConfig.listeners.length, new Internet6Address(address, port));
static if(LISTEN_IPV4) daemonConfig.listeners ~= new Listener(daemonConfig.listeners.length, new InternetAddress(address, port));
static if(LISTEN_IPV6) daemonConfig.listeners ~= new Listener(daemonConfig.listeners.length, new Internet6Address(address, port));

return this;
}
Expand Down Expand Up @@ -248,8 +248,27 @@ import std.typecons : Typedef;
package alias DaemonConfigPtr = Typedef!(DaemonConfig*);
package alias WorkerConfigPtr = Typedef!(WorkerConfig*);

package struct Listener

package class Listener
{
void onConnectionAvailable()
{
import serverino.communicator : Communicator;
import serverino.daemon : now;

// We have an incoming connection to handle
Communicator communicator;

// First: check if any idling communicator is available
auto dead = Communicator.deads;

if (dead !is null) communicator = dead;
else communicator = new Communicator(config);

communicator.lastRecv = now;
communicator.setClientSocket(socket.accept());
}

@safe:

@disable this();
Expand All @@ -260,6 +279,7 @@ package struct Listener
this.index = index;
}

DaemonConfigPtr config;
Address address;
size_t index;

Expand Down

0 comments on commit 239a7f8

Please sign in to comment.