Skip to content
Permalink
Browse files
Improved worker API
  • Loading branch information
pp3345 committed Jun 21, 2013
1 parent 73426ac commit 9223e247366965c3bd210e65eebb5704e3bfab1c
Showing with 216 additions and 38 deletions.
  1. +120 −29 Pancake.c
  2. +4 −8 Pancake.h
  3. +64 −0 PancakeWorkers.c
  4. +27 −0 PancakeWorkers.h
  5. +1 −1 autogen.sh
149 Pancake.c
@@ -2,19 +2,26 @@
#include "Pancake.h"
#include "PancakeLogger.h"
#include "PancakeConfiguration.h"
#include "PancakeWorkers.h"

PancakeWorker PancakeCurrentWorker;
PancakeWorker *PancakeCurrentWorker;
PancakeWorker **PancakeWorkerRegistry;
PancakeMainConfigurationStructure PancakeMainConfiguration;
UByte PancakeDoShutdown = 0;

/* Forward declaration */
/* Forward declarations */
UByte PancakeConfigurationServerArchitecture(UByte step, config_setting_t *setting, PancakeConfigurationScope **scope);
static void PancakeSignalHandler(Int32 type, siginfo_t *info, void *context);

/* Here we go. */
Int32 main(Int32 argc, Byte **argv) {
PancakeConfigurationGroup *group;
UInt32 i = 0;
UByte haveDeferred = 0;
PancakeModule *module;
PancakeWorker worker;
struct sigaction signalAction;
sigset_t signalSet;

// Initialize segfault handling
#if defined(HAVE_SIGACTION) && defined(HAVE_PANCAKE_SIGSEGV)
@@ -35,8 +42,11 @@ Int32 main(Int32 argc, Byte **argv) {
#endif

// Initialize global variables
PancakeCurrentWorker.name.value = "Master";
PancakeCurrentWorker.name.length = sizeof("Master") - 1;
worker.name.value = "Master";
worker.name.length = sizeof("Master") - 1;
worker.pid = getpid();
worker.isMaster = 1;
PancakeCurrentWorker = &worker;

// Tell the user we are loading
PancakeLogger(PANCAKE_LOGGER_SYSTEM,
@@ -133,43 +143,85 @@ Int32 main(Int32 argc, Byte **argv) {
exit(3);
}

// Initialize signal handling
sigemptyset(&signalSet);
sigaddset(&signalSet, SIGCHLD);
sigaddset(&signalSet, SIGINT);
sigaddset(&signalSet, SIGTERM);
signalAction.sa_sigaction = PancakeSignalHandler;
signalAction.sa_mask = signalSet;
signalAction.sa_flags = SA_SIGINFO;

sigaction(SIGCHLD, &signalAction, NULL);
sigaction(SIGINT, &signalAction, NULL);
sigaction(SIGTERM, &signalAction, NULL);

// Run workers
if(PancakeMainConfiguration.workers > 0) {
// Multithreaded mode
UInt16 i;

// Allocate worker registry
PancakeWorkerRegistry = PancakeAllocate(PancakeMainConfiguration.workers * sizeof(PancakeWorker*));

PancakeDebug {
PancakeLoggerFormat(PANCAKE_LOGGER_SYSTEM, 0, "Multithreaded mode enabled with %i workers", PancakeMainConfiguration.workers);
}

// Run workers
for(i = 1; i <= PancakeMainConfiguration.workers; i++) {
pid_t pid = fork();

if(pid == -1) {
PancakeLoggerFormat(PANCAKE_LOGGER_ERROR, 0, "Can't fork: %s", strerror(errno));
exit(3);
} else if(pid) {
// Master
} else {
// Worker
PancakeCurrentWorker.name.value = PancakeAllocate(sizeof("Worker #65535"));
PancakeCurrentWorker.name.length = sprintf(PancakeCurrentWorker.name.value, "Worker #%i", i);

PancakeDebug {
PancakeLoggerFormat(PANCAKE_LOGGER_SYSTEM, 0, "PID: %i", getpid());
}

// Run server
PancakeMainConfiguration.serverArchitecture->runServer();

PancakeFree(PancakeCurrentWorker.name.value);
goto shutdown;
PancakeWorker *worker = PancakeAllocate(sizeof(PancakeWorker));

worker->name.value = PancakeAllocate(sizeof("Worker #65535"));
worker->name.length = sprintf(worker->name.value, "Worker #%i", i);
worker->run = PancakeMainConfiguration.serverArchitecture->runServer;
worker->isMaster = 0;

switch(PancakeRunWorker(worker)) {
case 0:
// Fork failed
PancakeLoggerFormat(PANCAKE_LOGGER_ERROR, 0, "Unable to run worker");
exit(1);
case 1: {
// Child started successfully
UInt16 i2;

// Add worker to registry
PancakeWorkerRegistry[i - 1] = worker;

// Sleep forever after all workers are started
if(i == PancakeMainConfiguration.workers) {
do {
sleep(3600);
} while(!PancakeDoShutdown);

// Destroy worker registry
for(i2 = 0; i2 < PancakeMainConfiguration.workers; i2++) {
PancakeWorker *worker = PancakeWorkerRegistry[i2];

PancakeFree(worker->name.value);
PancakeFree(worker);
}

PancakeFree(PancakeWorkerRegistry);
}
} break;
case 2: {
// Child process shutdown
UInt16 i2;

// Destroy allocated workers
for(i2 = 0; i2 < i - 1; i2++) {
PancakeWorker *worker = PancakeWorkerRegistry[i2];

PancakeFree(worker->name.value);
PancakeFree(worker);
}

PancakeFree(PancakeWorkerRegistry);
} goto shutdown;
}
}

while(1) {
sleep(1);
}
} else {
// Singlethreaded mode

@@ -180,15 +232,54 @@ Int32 main(Int32 argc, Byte **argv) {

shutdown:

// Shutdown
if(PancakeCurrentWorker->isMaster) {
PancakeLoggerFormat(PANCAKE_LOGGER_SYSTEM, 0, "Stopping...");
}

// Unload configuration and free memory
PancakeConfigurationUnload();
PancakeConfigurationDestroy();

// Unload server architectures
PancakeNetworkUnload();

// Free worker
if(!PancakeCurrentWorker->isMaster) {
PancakeFree(PancakeCurrentWorker->name.value);
PancakeFree(PancakeCurrentWorker);
}

// Show memory leaks
PancakeDumpHeap();

return 0;
}

static void PancakeSignalHandler(Int32 type, siginfo_t *info, void *context) {
switch(type) {
case SIGINT:
case SIGTERM:
PancakeDoShutdown = 1;

if(PancakeCurrentWorker->isMaster) {
UInt16 i;

for(i = 0; i < PancakeMainConfiguration.workers; i++) {
PancakeWorker *worker = PancakeWorkerRegistry[i];

write(worker->masterSocket, PANCAKE_WORKER_GRACEFUL_SHUTDOWN, sizeof(PANCAKE_WORKER_GRACEFUL_SHUTDOWN));
}
}
break;
case SIGCHLD:
if(PancakeDoShutdown) {
return;
}

PancakeDoShutdown = 1;

PancakeLoggerFormat(PANCAKE_LOGGER_ERROR, 0, "Worker crashed");
break;
}
}
@@ -94,11 +94,8 @@ typedef struct _String {
typedef UByte (*PancakeModuleInitializeFunction)();
typedef void (*PancakeWorkerEntryFunction)();

typedef struct _PancakeWorker {
String name;
PancakeWorkerEntryFunction run;
Int32 pid;
} PancakeWorker;
/* Forward decalarations */
typedef struct _PancakeServerArchitecture PancakeServerArchitecture;

typedef struct _PancakeModule {
UByte *name;
@@ -109,8 +106,6 @@ typedef struct _PancakeModule {
UByte intialized;
} PancakeModule;

#include "PancakeNetwork.h"

typedef struct _PancakeMainConfigurationStructure {
/* Logging */
FILE *systemLog;
@@ -127,10 +122,11 @@ typedef struct _PancakeMainConfigurationStructure {
PancakeServerArchitecture *serverArchitecture;
} PancakeMainConfigurationStructure;

extern PancakeWorker PancakeCurrentWorker;
extern PancakeModule *PancakeModules[];
extern PancakeMainConfigurationStructure PancakeMainConfiguration;

extern UByte PancakeDoShutdown;

/* Pancake version constant */
#define PANCAKE_VERSION "2.0"
#define PANCAKE_COPYRIGHT "2012 - 2013 Yussuf Khalil"
@@ -0,0 +1,64 @@

#include "PancakeWorkers.h"
#include "PancakeLogger.h"

/* Forward declaration */
static void PancakeInternalCommunicationEvent(PancakeSocket *sock);

PANCAKE_API UByte PancakeRunWorker(PancakeWorker *worker) {
pid_t pid;
Int32 sockets[2];

// Create sockets for internal communication
if(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) == -1) {
PancakeLoggerFormat(PANCAKE_LOGGER_ERROR, 0, "Can't create sockets for internal communication: %s", strerror(errno));
return 0;
}

worker->masterSocket = sockets[0];
worker->workerSocket.fd = sockets[1];
worker->workerSocket.flags = 0;
worker->workerSocket.onRead = PancakeInternalCommunicationEvent;

// Fork child from master
pid = fork();

if(pid == -1) {
PancakeLoggerFormat(PANCAKE_LOGGER_ERROR, 0, "Can't fork: %s", strerror(errno));
return 0;
} else if(pid) {
/* Master */

worker->pid = pid;
return 1;
} else {
/* Child */

worker->pid = getpid();
PancakeCurrentWorker = worker;

// Register communication socket
PancakeNetworkAddReadSocket(&worker->workerSocket);

PancakeDebug {
PancakeLoggerFormat(PANCAKE_LOGGER_SYSTEM, 0, "PID: %i", worker->pid);
}

worker->run();

return 2;
}
}

static void PancakeInternalCommunicationEvent(PancakeSocket *sock) {
UByte instruction;

read(sock->fd, &instruction, 1);

switch(instruction) {
default:
case PANCAKE_WORKER_GRACEFUL_SHUTDOWN_INT:
PancakeDoShutdown = 1;
break;
}
}
@@ -0,0 +1,27 @@

#ifndef _PANCAKE_WORKERS_H
#define _PANCAKE_WORKERS_H

#include "Pancake.h"
#include "PancakeNetwork.h"

#define PANCAKE_WORKER_GRACEFUL_SHUTDOWN "\1"
#define PANCAKE_WORKER_GRACEFUL_SHUTDOWN_INT '\1'

typedef struct _PancakeWorker {
String name;
PancakeWorkerEntryFunction run;
Int32 pid;

Int32 masterSocket;
PancakeSocket workerSocket;

UByte isMaster;
} PancakeWorker;

PANCAKE_API UByte PancakeRunWorker(PancakeWorker *worker);

extern PancakeWorker *PancakeCurrentWorker;
extern PancakeWorker **PancakeWorkerRegistry;

#endif
@@ -1,6 +1,6 @@
#!/bin/bash

PANCAKE_SOURCES="Pancake.c PancakeLogger.c PancakeDateTime.c PancakeDebug.c PancakeConfiguration.c PancakeNetwork.c PancakeModules.c"
PANCAKE_SOURCES="Pancake.c PancakeLogger.c PancakeDateTime.c PancakeDebug.c PancakeConfiguration.c PancakeNetwork.c PancakeModules.c PancakeWorkers.c"

for dir in */;
do

0 comments on commit 9223e24

Please sign in to comment.