Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling rework #35

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/cap.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static void capCloseDown(PacketNode *head, PacketNode *tail) {
endTimePeriod();
}

static short capProcess(PacketNode *head, PacketNode *tail) {
static short capProcess(PacketNode *head, PacketNode *tail, short *delay) {
short capped = FALSE;
PacketNode *pac, *pacTmp, *oldLast;
DWORD curTick = timeGetTime();
Expand All @@ -113,7 +113,7 @@ static short capProcess(PacketNode *head, PacketNode *tail) {

if (totalBytes > bytesCapped) {
break;
}
}
}

// process live packets
Expand All @@ -130,7 +130,7 @@ static short capProcess(PacketNode *head, PacketNode *tail) {
if (totalBytes > bytesCapped) {
int capCnt = 0;
capped = TRUE;
// buffer from pac to head
// buffer from pac to head
while (bufSize < KEEP_AT_MOST && pac != head) {
pacTmp = pac->prev;
insertAfter(popNode(pac), bufHead);
Expand All @@ -155,6 +155,9 @@ static short capProcess(PacketNode *head, PacketNode *tail) {
}
}

// We don't mind when we're next scheduled.
*delay = 1000;

return capped;
}

Expand Down
6 changes: 3 additions & 3 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ typedef struct {
Ihandle* (*setupUIFunc)(); // return hbox as controls group
void (*startUp)(); // called when starting up the module
void (*closeDown)(PacketNode *head, PacketNode *tail); // called when starting up the module
short (*process)(PacketNode *head, PacketNode *tail);
short (*process)(PacketNode *head, PacketNode *tail, short *delay);
/*
* Flags used during program excution. Need to be re initialized on each run
*/
short lastEnabled; // if it is enabled on last run
short processTriggered; // whether this module has been triggered in last step
short processTriggered; // whether this module has been triggered in last step
Ihandle *iconHandle; // store the icon to be updated
} Module;

Expand All @@ -135,7 +135,7 @@ extern Module resetModule;
extern Module capModule;
extern Module* modules[MODULE_CNT]; // all modules in a list

// status for sending packets,
// status for sending packets,
#define SEND_STATUS_NONE 0
#define SEND_STATUS_SEND 1
#define SEND_STATUS_FAIL -1
Expand Down
46 changes: 26 additions & 20 deletions src/divert.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
// FIXME does this need to be larger then the time to process the list?
#define CLOCK_WAITMS 40
#define QUEUE_LEN 2 << 10
#define QUEUE_TIME 2 << 9
#define QUEUE_TIME 2 << 9

static short loopDelay = CLOCK_WAITMS;

static HANDLE divertHandle;
static volatile short stopLooping;
static HANDLE loopThread, clockThread, mutex;
static HANDLE loopThread, clockThread, mutex, event;

static DWORD divertReadLoop(LPVOID arg);
static DWORD divertClockLoop(LPVOID arg);
Expand Down Expand Up @@ -113,6 +115,11 @@ int divertStart(const char *filter, char buf[]) {
sprintf(buf, "Failed to create mutex (%lu)", GetLastError());
return FALSE;
}
event = CreateEvent(NULL, FALSE, FALSE, NULL);
if (event == NULL) {
sprintf(buf, "Failed to create event (%lu)", GetLastError());
return FALSE;
}

loopThread = CreateThread(NULL, 1, (LPTHREAD_START_ROUTINE)divertReadLoop, NULL, 0, NULL);
if (loopThread == NULL) {
Expand Down Expand Up @@ -208,6 +215,7 @@ static void divertConsumeStep() {
DWORD startTick = GetTickCount(), dt;
#endif
int ix, cnt;
short delay;
// use lastEnabled to keep track of module starting up and closing down
for (ix = 0; ix < MODULE_CNT; ++ix) {
Module *module = modules[ix];
Expand All @@ -216,7 +224,8 @@ static void divertConsumeStep() {
module->startUp();
module->lastEnabled = 1;
}
if (module->process(head, tail)) {
if (module->process(head, tail, &delay)) {
loopDelay = min(loopDelay, delay);
InterlockedIncrement16(&(module->processTriggered));
}
} else {
Expand All @@ -237,15 +246,18 @@ static void divertConsumeStep() {

// periodically try to consume packets to keep the network responsive and not blocked by recv
static DWORD divertClockLoop(LPVOID arg) {
DWORD startTick, stepTick, waitResult;
DWORD waitResult;
int ix;

UNREFERENCED_PARAMETER(arg);

for(;;) {
// use acquire as wait for yielding thread
startTick = GetTickCount();
waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS);
// Wait to be woken up, either by a packet becoming available to process or by timeout.
WaitForSingleObject(event, loopDelay);
loopDelay = CLOCK_WAITMS;

// Acquire mutex to perform processing.
waitResult = WaitForSingleObject(mutex, INFINITE);
switch(waitResult) {
case WAIT_OBJECT_0:
/***************** enter critical region ************************/
Expand All @@ -256,16 +268,10 @@ static DWORD divertClockLoop(LPVOID arg) {
LOG("Fatal: Failed to release mutex (%lu)", GetLastError());
ABORT();
}
// if didn't spent enough time, we sleep on it
stepTick = GetTickCount() - startTick;
if (stepTick < CLOCK_WAITMS) {
Sleep(CLOCK_WAITMS - stepTick);
}
break;
case WAIT_TIMEOUT:
// read loop is processing, so we can skip this run
// shouldn't happen on an INFINITE wait
LOG("!!! Skipping one run");
Sleep(CLOCK_WAITMS);
break;
case WAIT_ABANDONED:
LOG("Acquired abandoned mutex");
Expand Down Expand Up @@ -296,7 +302,7 @@ static DWORD divertClockLoop(LPVOID arg) {
Module *module = modules[ix];
if (*(module->enabledFlag)) {
module->closeDown(head, tail);
}
}
}
LOG("Send all packets upon closing");
lastSendCount = sendAllListPackets();
Expand Down Expand Up @@ -329,8 +335,6 @@ static DWORD divertReadLoop(LPVOID arg) {
UNREFERENCED_PARAMETER(arg);

for(;;) {
// each step must fully consume the list
assert(isListEmpty()); // FIXME has failed this assert before. don't know why
if (!WinDivertRecv(divertHandle, packetBuf, MAX_PACKETSIZE, &addrBuf, &readLen)) {
DWORD lastError = GetLastError();
if (lastError == ERROR_INVALID_HANDLE || lastError == ERROR_OPERATION_ABORTED) {
Expand All @@ -343,10 +347,10 @@ static DWORD divertReadLoop(LPVOID arg) {
}
if (readLen > MAX_PACKETSIZE) {
// don't know how this can happen
LOG("Internal Error: DivertRecv truncated recv packet.");
LOG("Internal Error: DivertRecv truncated recv packet.");
}

//dumpPacket(packetBuf, readLen, &addrBuf);
//dumpPacket(packetBuf, readLen, &addrBuf);

waitResult = WaitForSingleObject(mutex, INFINITE);
switch(waitResult) {
Expand All @@ -363,7 +367,9 @@ static DWORD divertReadLoop(LPVOID arg) {
// create node and put it into the list
pnode = createNode(packetBuf, readLen, &addrBuf);
appendNode(pnode);
divertConsumeStep();

// notify that this has happened
SetEvent(event);
/***************** leave critical region ************************/
if (!ReleaseMutex(mutex)) {
LOG("Fatal: Failed to release mutex (%lu)", GetLastError());
Expand Down
5 changes: 4 additions & 1 deletion src/drop.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static void dropCloseDown(PacketNode *head, PacketNode *tail) {
LOG("drop disabled");
}

static short dropProcess(PacketNode *head, PacketNode* tail) {
static short dropProcess(PacketNode *head, PacketNode* tail, short *delay) {
int dropped = 0;
while (head->next != tail) {
PacketNode *pac = head->next;
Expand All @@ -69,6 +69,9 @@ static short dropProcess(PacketNode *head, PacketNode* tail) {
}
}

// We don't mind when we're next scheduled.
*delay = 1000;

return dropped > 0;
}

Expand Down
6 changes: 5 additions & 1 deletion src/duplicate.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static void dupCloseDown(PacketNode *head, PacketNode *tail) {
LOG("dup disabled");
}

static short dupProcess(PacketNode *head, PacketNode *tail) {
static short dupProcess(PacketNode *head, PacketNode *tail, short *delay) {
short duped = FALSE;
PacketNode *pac = head->next;
while (pac != tail) {
Expand All @@ -81,6 +81,10 @@ static short dupProcess(PacketNode *head, PacketNode *tail) {
}
pac = pac->next;
}

// We don't mind when we're next scheduled.
*delay = 1000;

return duped;
}

Expand Down
4 changes: 3 additions & 1 deletion src/lag.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static void lagCloseDown(PacketNode *head, PacketNode *tail) {
endTimePeriod();
}

static short lagProcess(PacketNode *head, PacketNode *tail) {
static short lagProcess(PacketNode *head, PacketNode *tail, short *delay) {
DWORD currentTime = timeGetTime();
PacketNode *pac = tail->prev;
// pick up all packets and fill in the current time
Expand All @@ -98,6 +98,7 @@ static short lagProcess(PacketNode *head, PacketNode *tail) {
}

// try sending overdue packets from buffer tail
*delay = 1000;
while (!isBufEmpty()) {
PacketNode *pac = bufTail->prev;
if (currentTime > pac->timestamp + lagTime) {
Expand All @@ -106,6 +107,7 @@ static short lagProcess(PacketNode *head, PacketNode *tail) {
LOG("Send lagged packets.");
} else {
LOG("Sent some lagged packets, still have %d in buf", bufSize);
*delay = pac->timestamp + lagTime - currentTime;
break;
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/ood.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#define NAME "ood"
// keep a picked packet at most for KEEP_TURNS_MAX steps, or if there's no following
// one, it will just be sent
#define KEEP_TURNS_MAX 10
#define KEEP_TURNS_MAX 10

static Ihandle *inboundCheckbox, *outboundCheckbox, *chanceInput;

Expand Down Expand Up @@ -100,7 +100,7 @@ static void swapNode(PacketNode *a, PacketNode *b) {
}
}

static short oodProcess(PacketNode *head, PacketNode *tail) {
static short oodProcess(PacketNode *head, PacketNode *tail, short *delay) {
if (oodPacket != NULL) {
if (!isListEmpty() || --giveUpCnt == 0) {
LOG("Ooo sent direction %s, is giveup %s", BOUND_TEXT(oodPacket->addr.Direction), giveUpCnt ? "NO" : "YES");
Expand Down Expand Up @@ -136,6 +136,9 @@ static short oodProcess(PacketNode *head, PacketNode *tail) {
}
}

// We don't mind when we're next scheduled.
*delay = 1000;

return FALSE;
}

Expand Down
8 changes: 6 additions & 2 deletions src/reset.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static void resetCloseDown(PacketNode *head, PacketNode *tail) {
InterlockedExchange16(&setNextCount, 0);
}

static short resetProcess(PacketNode *head, PacketNode *tail) {
static short resetProcess(PacketNode *head, PacketNode *tail, short *delay) {
short reset = FALSE;
PacketNode *pac = head->next;
while (pac != tail) {
Expand Down Expand Up @@ -104,9 +104,13 @@ static short resetProcess(PacketNode *head, PacketNode *tail) {
}
}
}

pac = pac->next;
}

// We don't mind when we're next scheduled.
*delay = 1000;

return reset;
}

Expand Down
8 changes: 6 additions & 2 deletions src/tamper.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static INLINE_FUNCTION void tamper_buf(char *buf, UINT len) {
}
}

static short tamperProcess(PacketNode *head, PacketNode *tail) {
static short tamperProcess(PacketNode *head, PacketNode *tail, short *delay) {
short tampered = FALSE;
PacketNode *pac = head->next;
while (pac != tail) {
Expand All @@ -92,7 +92,7 @@ static short tamperProcess(PacketNode *head, PacketNode *tail) {
char *data = NULL;
UINT dataLen = 0;
if (WinDivertHelperParsePacket(pac->packet, pac->packetLen, NULL, NULL, NULL,
NULL, NULL, NULL, (PVOID*)&data, &dataLen)
NULL, NULL, NULL, (PVOID*)&data, &dataLen)
&& data != NULL && dataLen != 0) {
// try to tamper the central part of the packet,
// since common packets put their checksum at head or tail
Expand All @@ -117,6 +117,10 @@ static short tamperProcess(PacketNode *head, PacketNode *tail) {
}
pac = pac->next;
}

// We don't mind when we're next scheduled.
*delay = 1000;

return tampered;
}

Expand Down
9 changes: 6 additions & 3 deletions src/throttle.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ static Ihandle *inboundCheckbox, *outboundCheckbox, *chanceInput, *frameInput, *
static volatile short throttleEnabled = 0,
throttleInbound = 1, throttleOutbound = 1,
chance = 1000, // [0-10000]
// time frame in ms, when a throttle start the packets within the time
// time frame in ms, when a throttle start the packets within the time
// will be queued and sent altogether when time is over
throttleFrame = TIME_DEFAULT,
dropThrottled = 0;
dropThrottled = 0;

static PacketNode throttleHeadNode = {0}, throttleTailNode = {0};
static PacketNode *bufHead = &throttleHeadNode, *bufTail = &throttleTailNode;
Expand Down Expand Up @@ -113,7 +113,7 @@ static void throttleCloseDown(PacketNode *head, PacketNode *tail) {
endTimePeriod();
}

static short throttleProcess(PacketNode *head, PacketNode *tail) {
static short throttleProcess(PacketNode *head, PacketNode *tail, short *delay) {
short throttled = FALSE;
UNREFERENCED_PARAMETER(head);
if (!throttleStartTick) {
Expand Down Expand Up @@ -152,6 +152,9 @@ static short throttleProcess(PacketNode *head, PacketNode *tail) {
}
}

// We don't mind when we're next scheduled.
*delay = 1000;

return throttled;
}

Expand Down