Skip to content

Commit

Permalink
Support UDP server.
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed Mar 1, 2024
1 parent cd98afc commit fe0b261
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 65 deletions.
340 changes: 290 additions & 50 deletions src/kernel/Communicator.cc

Large diffs are not rendered by default.

38 changes: 24 additions & 14 deletions src/kernel/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@

class CommConnection
{
protected:
public:
virtual ~CommConnection() { }
friend class Communicator;
};

class CommTarget
Expand Down Expand Up @@ -158,7 +157,8 @@ class CommSession
private:
struct timespec begin_time;
int timeout;
int passive;
short passive;
short reliable;

public:
CommSession() { this->passive = 0; }
Expand Down Expand Up @@ -298,16 +298,6 @@ class Communicator

int create_handler_threads(size_t handler_threads);

int nonblock_connect(CommTarget *target);
int nonblock_listen(CommService *service);

struct CommConnEntry *launch_conn(CommSession *session,
CommTarget *target);
struct CommConnEntry *accept_conn(class CommServiceTarget *target,
CommService *service);

void release_conn(struct CommConnEntry *entry);

void shutdown_service(CommService *service);

void shutdown_io_service(IOService *service);
Expand All @@ -319,10 +309,14 @@ class Communicator

int send_message(struct CommConnEntry *entry);

int request_new_conn(CommSession *session, CommTarget *target);

int request_idle_conn(CommSession *session, CommTarget *target);
int reply_idle_conn(CommSession *session, CommTarget *target);

int request_new_conn(CommSession *session, CommTarget *target);
int reply_message_unreliable(struct CommConnEntry *entry);

int reply_unreliable(CommSession *session, CommTarget *target);

void handle_incoming_request(struct poller_result *res);
void handle_incoming_reply(struct poller_result *res);
Expand All @@ -336,6 +330,8 @@ class Communicator
void handle_connect_result(struct poller_result *res);
void handle_listen_result(struct poller_result *res);

void handle_recvfrom_result(struct poller_result *res);

void handle_ssl_accept_result(struct poller_result *res);

void handle_sleep_result(struct poller_result *res);
Expand All @@ -344,6 +340,14 @@ class Communicator

static void handler_thread_routine(void *context);

static int nonblock_connect(CommTarget *target);
static int nonblock_listen(CommService *service, int *reliable);

static struct CommConnEntry *launch_conn(CommSession *session,
CommTarget *target);
static struct CommConnEntry *accept_conn(class CommServiceTarget *target,
CommService *service);

static int first_timeout(CommSession *session);
static int next_timeout(CommSession *session);

Expand All @@ -358,11 +362,17 @@ class Communicator
static poller_message_t *create_request(void *context);
static poller_message_t *create_reply(void *context);

static int recv_request(const void *buf, size_t size,
struct CommConnEntry *entry);

static int partial_written(size_t n, void *context);

static void *accept(const struct sockaddr *addr, socklen_t addrlen,
int sockfd, void *context);

static void *recvfrom(const struct sockaddr *addr, socklen_t addrlen,
const void *buf, size_t size, void *context);

static void callback(struct poller_result *res, void *context);

public:
Expand Down
1 change: 1 addition & 0 deletions src/server/WFDnsServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using WFDnsServer = WFServer<protocol::DnsRequest,

static constexpr struct WFServerParams DNS_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_UDP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
Expand Down
1 change: 1 addition & 0 deletions src/server/WFHttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using WFHttpServer = WFServer<protocol::HttpRequest,

static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
Expand Down
1 change: 1 addition & 0 deletions src/server/WFMySQLServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class MySQLServer;

static constexpr struct WFServerParams MYSQL_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
Expand Down
1 change: 1 addition & 0 deletions src/server/WFRedisServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ using WFRedisServer = WFServer<protocol::RedisRequest,

static constexpr struct WFServerParams REDIS_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
Expand Down
27 changes: 26 additions & 1 deletion src/server/WFServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <condition_variable>
#include <openssl/ssl.h>
#include "CommScheduler.h"
#include "EndpointParams.h"
#include "WFConnection.h"
#include "WFGlobal.h"
#include "WFServer.h"
Expand Down Expand Up @@ -121,10 +122,34 @@ int WFServerBase::create_listen_fd()
{
const struct sockaddr *bind_addr;
socklen_t addrlen;
int type, protocol;
int reuse = 1;

switch (this->params.transport_type)
{
case TT_TCP:
case TT_TCP_SSL:
type = SOCK_STREAM;
protocol = 0;
break;
case TT_UDP:
type = SOCK_DGRAM;
protocol = 0;
break;
#ifdef IPPROTO_SCTP
case TT_SCTP:
case TT_SCTP_SSL:
type = SOCK_STREAM;
protocol = IPPROTO_SCTP;
break;
#endif
default:
errno = EPROTONOSUPPORT;
return -1;
}

this->get_addr(&bind_addr, &addrlen);
this->listen_fd = socket(bind_addr->sa_family, SOCK_STREAM, 0);
this->listen_fd = socket(bind_addr->sa_family, type, protocol);
if (this->listen_fd >= 0)
{
setsockopt(this->listen_fd, SOL_SOCKET, SO_REUSEADDR,
Expand Down
3 changes: 3 additions & 0 deletions src/server/WFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
#include <mutex>
#include <condition_variable>
#include <openssl/ssl.h>
#include "EndpointParams.h"
#include "WFTaskFactory.h"

struct WFServerParams
{
enum TransportType transport_type;
size_t max_connections;
int peer_response_timeout; /* timeout of each read or write operation */
int receive_timeout; /* timeout of receiving the whole message */
Expand All @@ -42,6 +44,7 @@ struct WFServerParams

static constexpr struct WFServerParams SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
Expand Down

0 comments on commit fe0b261

Please sign in to comment.