Skip to content

Commit

Permalink
Wire refactor (part 1)
Browse files Browse the repository at this point in the history
Remote instance support has been dropped for the time being
  • Loading branch information
rcelyte committed Apr 1, 2023
1 parent d1b55b6 commit 8f9ef33
Show file tree
Hide file tree
Showing 16 changed files with 376 additions and 667 deletions.
2 changes: 1 addition & 1 deletion src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ bool config_load(struct Config *out, const char *path) {
uprintf("Failed to write default config to %s: %s\n", path, strerror(errno));
goto fail;
}
char publicIP[] = "[\"127.0.0.1\", \"[::]\"]"; // TODO: resolve actual public IP
char publicIP[] = "\"127.0.0.1\""; // TODO: resolve actual public IP
uprintf("Writing default config to %s\n", path);
fprintf(def,
"{" NEWLINE
Expand Down
121 changes: 73 additions & 48 deletions src/instance/instance.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "instance.h"
#include "common.h"
#include "../wire.h"
#include "../counter.h"
#include <mbedtls/error.h>
#include <stdio.h>
Expand Down Expand Up @@ -112,7 +113,8 @@ struct Room {

struct InstanceContext {
struct NetContext net;
union WireLink *master;
struct WireContext wire;
struct WireLink *master;
struct Counter64 roomMask;
struct Room *rooms[64][4];
};
Expand Down Expand Up @@ -1552,7 +1554,9 @@ static void room_disconnect(struct InstanceContext *ctx, struct Room **room, str
return;

room_free(ctx, room);
wire_send(&ctx->net, ctx->master, &(struct WireMessage){
if(ctx->master == NULL)
return;
WireLink_send(ctx->master, &(const struct WireMessage){
.cookie = 0,
.type = WireMessageType_WireRoomCloseNotify,
.roomCloseNotify.room = (uint32_t)indexof(*ctx->rooms, room),
Expand Down Expand Up @@ -1639,27 +1643,33 @@ static inline void handle_packet(struct InstanceContext *ctx, struct Room **room
} while(handle_Merged(&data, end, &header, &length, session->net.version));
}

static const char *instance_masterAddress = NULL;
static struct {
bool isRemote;
union {
struct WireContext *local;
const char *remote;
};
} instance_master = {0};
static void *instance_handler(struct InstanceContext *ctx) {
net_lock(&ctx->net);
if(*instance_masterAddress) {
ctx->master = wire_connect_remote(&ctx->net, instance_masterAddress);
} else if(ctx->master) {
struct NetContext *localMaster = WireLink_cast_local(ctx->master);
ctx->master = localMaster ? wire_connect_local(&ctx->net, localMaster) : NULL;
{
const struct WireMessage connectMessage = {
.cookie = 0,
.type = WireMessageType_WireSetAttribs,
.setAttribs = {
.capacity = sizeof(ctx->rooms) / sizeof(**ctx->rooms),
.discover = true,
},
};
if(instance_master.isRemote)
ctx->master = WireContext_connect(&ctx->wire, instance_master.remote, &connectMessage);
else if(instance_master.local != NULL)
ctx->master = WireContext_attach(&ctx->wire, instance_master.local, &connectMessage);
}
if(!ctx->master) {
uprintf("wire_connect() failed\n");
goto fail;
if(ctx->master == NULL) {
uprintf("WireContext_connect() failed\n");
goto unlock;
}
wire_send(&ctx->net, ctx->master, &(struct WireMessage){
.cookie = 0,
.type = WireMessageType_WireSetAttribs,
.setAttribs = {
.capacity = sizeof(ctx->rooms) / sizeof(**ctx->rooms),
.discover = true,
},
});
uprintf("Started\n");
uint8_t pkt[1536];
memset(pkt, 0, sizeof(pkt));
Expand All @@ -1668,9 +1678,9 @@ static void *instance_handler(struct InstanceContext *ctx) {
struct InstanceSession *session;
while((len = net_recv(&ctx->net, pkt, (struct NetSession**)&session, (void**)&room)))
handle_packet(ctx, room, session, pkt, &pkt[len]);
wire_disconnect(&ctx->net, ctx->master);
WireLink_free(ctx->master);
ctx->master = NULL;
fail:
unlock:
net_unlock(&ctx->net);
return 0;
}
Expand Down Expand Up @@ -1890,7 +1900,7 @@ static struct WireSessionAllocResp room_resolve_session(struct InstanceContext *
return resp;
}

static void instance_room_spawn(struct InstanceContext *ctx, union WireLink *link, uint32_t cookie, const struct WireRoomSpawn *req) {
static void instance_room_spawn(struct InstanceContext *ctx, struct WireLink *link, uint32_t cookie, const struct WireRoomSpawn *req) {
struct WireMessage r_alloc = {
.type = WireMessageType_WireRoomSpawnResp,
.cookie = cookie,
Expand All @@ -1904,10 +1914,10 @@ static void instance_room_spawn(struct InstanceContext *ctx, union WireLink *lin
} else {
uprintf("room_open() failed\n");
}
wire_send(&ctx->net, link, &r_alloc);
WireLink_send(link, &r_alloc);
}

static void instance_room_join(struct InstanceContext *ctx, union WireLink *link, uint32_t cookie, const struct WireRoomJoin *req) {
static void instance_room_join(struct InstanceContext *ctx, struct WireLink *link, uint32_t cookie, const struct WireRoomJoin *req) {
struct WireMessage r_alloc = {
.type = WireMessageType_WireRoomJoinResp,
.cookie = cookie,
Expand All @@ -1919,31 +1929,39 @@ static void instance_room_join(struct InstanceContext *ctx, union WireLink *link
} else {
r_alloc.roomJoinResp.base = room_resolve_session(ctx, &req->base);
}
wire_send(&ctx->net, link, &r_alloc);
WireLink_send(link, &r_alloc);
}

static void instance_onWireMessage(struct InstanceContext *ctx, union WireLink *link, const struct WireMessage *message) {
static void instance_onWireMessage(struct WireContext *wire, struct WireLink *link, const struct WireMessage *message) {
struct InstanceContext *const ctx = (struct InstanceContext*)wire->userptr;
net_lock(&ctx->net);
if(link != ctx->master)
return;
goto unlock;
if(message == NULL) {
ctx->master = NULL;
return;
goto unlock;
}
switch(message->type) {
case WireMessageType_WireRoomSpawn: instance_room_spawn(ctx, link, message->cookie, &message->roomSpawn); break;
case WireMessageType_WireRoomJoin: instance_room_join(ctx, link, message->cookie, &message->roomJoin); break;
default: uprintf("UNHANDLED WIRE MESSAGE [%s]\n", reflect(WireMessageType, message->type));
}
unlock:
net_unlock(&ctx->net);
}

static uint32_t threads_len = 0;
static pthread_t *threads = NULL;
bool instance_init(const char *domainIPv4, const char *domain, const char *remoteMaster, struct NetContext *localMaster, const char *mapPoolFile, uint32_t count) {
bool instance_init(const char *domainIPv4, const char *domain, const char *remoteMaster, struct WireContext *localMaster, const char *mapPoolFile, uint32_t count) {
if(mapPoolFile && *mapPoolFile)
mapPool_init(mapPoolFile);
instance_domainIPv4 = domainIPv4;
instance_domain = domain;
instance_masterAddress = remoteMaster;
instance_master.isRemote = (*remoteMaster != 0);
if(instance_master.isRemote)
instance_master.remote = remoteMaster;
else
instance_master.local = localMaster;
threads_len = 0;
contexts = malloc(count * sizeof(*contexts));
threads = malloc(count * sizeof(*threads));
Expand All @@ -1953,21 +1971,27 @@ bool instance_init(const char *domainIPv4, const char *domain, const char *remot
}
for(; threads_len < count; ++threads_len) {
struct InstanceContext *ctx = &contexts[threads_len];
if(net_init(&ctx->net, (uint16_t)(5000 + threads_len), true, 16)) {
if(net_init(&ctx->net, (uint16_t)(5000 + threads_len), true)) {
uprintf("net_init() failed\n");
return true;
}
if(WireContext_init(&ctx->wire, &ctx->net, 0)) {
net_cleanup(&ctx->net);
uprintf("WireContext_init() failed\n");
return true;
}
ctx->net.userptr = &contexts[threads_len];
ctx->net.onResolve = (struct NetSession *(*)(void*, struct SS, const uint8_t*, uint32_t, uint8_t*, uint32_t*, void**))instance_onResolve;
ctx->net.onResend = (uint32_t (*)(void*, uint32_t))instance_onResend;
ctx->net.onWireMessage = (void (*)(void*, union WireLink*, const struct WireMessage*))instance_onWireMessage;
ctx->wire.onMessage = instance_onWireMessage;
ctx->roomMask = COUNTER64_CLEAR;
ctx->master = (union WireLink*)localMaster;
ctx->master = NULL;
memset(ctx->rooms, 0, sizeof(ctx->rooms));

if(pthread_create(&threads[threads_len], NULL, (void *(*)(void*))instance_handler, ctx))
threads[threads_len] = 0;
if(!threads[threads_len]) {
WireContext_cleanup(&ctx->wire);
net_cleanup(&ctx->net);
uprintf("Instance thread creation failed\n");
return true;
Expand All @@ -1978,23 +2002,24 @@ bool instance_init(const char *domainIPv4, const char *domain, const char *remot

void instance_cleanup() {
for(uint32_t i = 0; i < threads_len; ++i) {
if(threads[i]) {
struct InstanceContext *ctx = &contexts[i];
net_stop(&ctx->net);
uprintf("Stopping #%u\n", i);
pthread_join(threads[i], NULL);
threads[i] = 0;
FOR_ALL_ROOMS(ctx, room) {
FOR_SOME_PLAYERS(id, (*room)->playerSort,) {
instance_channels_reset(&(*room)->players[id].channels);
NetSession_free(&(*room)->players[id].net);
}
room_free(ctx, room);
if(!threads[i])
continue;
struct InstanceContext *ctx = &contexts[i];
net_stop(&ctx->net);
uprintf("Stopping #%u\n", i);
pthread_join(threads[i], NULL);
threads[i] = 0;
FOR_ALL_ROOMS(ctx, room) {
FOR_SOME_PLAYERS(id, (*room)->playerSort,) {
instance_channels_reset(&(*room)->players[id].channels);
NetSession_free(&(*room)->players[id].net);
}
ctx->roomMask = COUNTER64_CLEAR; // should be redundant, but just to be safe
memset(ctx->rooms, 0, sizeof(ctx->rooms));
net_cleanup(&ctx->net);
room_free(ctx, room);
}
ctx->roomMask = COUNTER64_CLEAR; // should be redundant, but just to be safe
memset(ctx->rooms, 0, sizeof(ctx->rooms));
WireContext_cleanup(&ctx->wire);
net_cleanup(&ctx->net);
}
free(instance_mapPool);
free(threads);
Expand Down
3 changes: 2 additions & 1 deletion src/instance/instance.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include "../net.h"
#include "../wire.h"

bool instance_init(const char *domainIPv4, const char *domain, const char *remoteMaster, struct NetContext *localMaster, const char *mapPoolFile, uint32_t count);
bool instance_init(const char *domainIPv4, const char *domain, const char *remoteMaster, struct WireContext *localMaster, const char *mapPoolFile, uint32_t count);
void instance_cleanup(void);
7 changes: 4 additions & 3 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ int main(int argc, const char *argv[]) {
memset(&cfg, 0, sizeof(cfg));
if(config_load(&cfg, config_path)) // TODO: live config reloading
goto fail0;
wire_set_key(cfg.wireKey, cfg.wireKey_len);
wire_init(cfg.wireKey, cfg.wireKey_len);
if(cfg.statusPort) {
status_internal_init();
if(mbedtls_pk_get_type(&cfg.statusKey) != MBEDTLS_PK_NONE) {
Expand All @@ -48,10 +48,10 @@ int main(int argc, const char *argv[]) {
goto fail1;
}
}
struct NetContext *localMaster = NULL;
struct WireContext *localMaster = NULL;
if(cfg.masterPort) {
localMaster = master_init(cfg.certs, cfg.keys, cfg.masterPort);
if(!localMaster)
if(localMaster == NULL)
goto fail3;
}
if(instance_init(cfg.instanceAddress[0], cfg.instanceAddress[1], cfg.instanceParent, localMaster, cfg.instanceMapPool, cfg.instanceCount))
Expand Down Expand Up @@ -83,6 +83,7 @@ int main(int argc, const char *argv[]) {
else
status_cleanup();
}
wire_cleanup();
fail0:
config_free(&cfg);
return 0;
Expand Down
Loading

0 comments on commit 8f9ef33

Please sign in to comment.