From 6c8f316518e58e598febb3d2f59fe3ff86f14e3c Mon Sep 17 00:00:00 2001 From: David Hotham Date: Fri, 29 Jul 2016 12:34:45 +0100 Subject: [PATCH 1/2] Scheduling rework: - divertReadLoop just puts packets on the queue - divertClockLoop takes packets off the queue and processes them --- src/divert.c | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/divert.c b/src/divert.c index 207a4fa..b9845d7 100644 --- a/src/divert.c +++ b/src/divert.c @@ -10,11 +10,11 @@ // FIXME does this need to be larger then the time to process the list? #define CLOCK_WAITMS 40 #define QUEUE_LEN 2 << 10 -#define QUEUE_TIME 2 << 9 +#define QUEUE_TIME 2 << 9 static HANDLE divertHandle; static volatile short stopLooping; -static HANDLE loopThread, clockThread, mutex; +static HANDLE loopThread, clockThread, mutex, event; static DWORD divertReadLoop(LPVOID arg); static DWORD divertClockLoop(LPVOID arg); @@ -113,6 +113,11 @@ int divertStart(const char *filter, char buf[]) { sprintf(buf, "Failed to create mutex (%lu)", GetLastError()); return FALSE; } + event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (event == NULL) { + sprintf(buf, "Failed to create event (%lu)", GetLastError()); + return FALSE; + } loopThread = CreateThread(NULL, 1, (LPTHREAD_START_ROUTINE)divertReadLoop, NULL, 0, NULL); if (loopThread == NULL) { @@ -237,15 +242,15 @@ static void divertConsumeStep() { // periodically try to consume packets to keep the network responsive and not blocked by recv static DWORD divertClockLoop(LPVOID arg) { - DWORD startTick, stepTick, waitResult; + DWORD waitResult; int ix; UNREFERENCED_PARAMETER(arg); for(;;) { - // use acquire as wait for yielding thread - startTick = GetTickCount(); - waitResult = WaitForSingleObject(mutex, CLOCK_WAITMS); + // Wait to be woken up, either by a packet becoming available to process or by timeout. + WaitForSingleObject(event, CLOCK_WAITMS); + waitResult = WaitForSingleObject(mutex, INFINITE); switch(waitResult) { case WAIT_OBJECT_0: /***************** enter critical region ************************/ @@ -256,16 +261,10 @@ static DWORD divertClockLoop(LPVOID arg) { LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); ABORT(); } - // if didn't spent enough time, we sleep on it - stepTick = GetTickCount() - startTick; - if (stepTick < CLOCK_WAITMS) { - Sleep(CLOCK_WAITMS - stepTick); - } break; case WAIT_TIMEOUT: - // read loop is processing, so we can skip this run + // shouldn't happen on an INFINITE wait LOG("!!! Skipping one run"); - Sleep(CLOCK_WAITMS); break; case WAIT_ABANDONED: LOG("Acquired abandoned mutex"); @@ -296,7 +295,7 @@ static DWORD divertClockLoop(LPVOID arg) { Module *module = modules[ix]; if (*(module->enabledFlag)) { module->closeDown(head, tail); - } + } } LOG("Send all packets upon closing"); lastSendCount = sendAllListPackets(); @@ -329,8 +328,6 @@ static DWORD divertReadLoop(LPVOID arg) { UNREFERENCED_PARAMETER(arg); for(;;) { - // each step must fully consume the list - assert(isListEmpty()); // FIXME has failed this assert before. don't know why if (!WinDivertRecv(divertHandle, packetBuf, MAX_PACKETSIZE, &addrBuf, &readLen)) { DWORD lastError = GetLastError(); if (lastError == ERROR_INVALID_HANDLE || lastError == ERROR_OPERATION_ABORTED) { @@ -343,10 +340,10 @@ static DWORD divertReadLoop(LPVOID arg) { } if (readLen > MAX_PACKETSIZE) { // don't know how this can happen - LOG("Internal Error: DivertRecv truncated recv packet."); + LOG("Internal Error: DivertRecv truncated recv packet."); } - //dumpPacket(packetBuf, readLen, &addrBuf); + //dumpPacket(packetBuf, readLen, &addrBuf); waitResult = WaitForSingleObject(mutex, INFINITE); switch(waitResult) { @@ -363,7 +360,9 @@ static DWORD divertReadLoop(LPVOID arg) { // create node and put it into the list pnode = createNode(packetBuf, readLen, &addrBuf); appendNode(pnode); - divertConsumeStep(); + + // notify that this has happened + SetEvent(event); /***************** leave critical region ************************/ if (!ReleaseMutex(mutex)) { LOG("Fatal: Failed to release mutex (%lu)", GetLastError()); From f33eda768a0cf01aadcf865c8fa763029d6e4fb6 Mon Sep 17 00:00:00 2001 From: David Hotham Date: Fri, 29 Jul 2016 12:36:12 +0100 Subject: [PATCH 2/2] Allow modules to indicate a maximum delay In particular, 'lag' now indicates that it wants to be rescheduled in time to send lagged packets --- src/cap.c | 9 ++++++--- src/common.h | 6 +++--- src/divert.c | 11 +++++++++-- src/drop.c | 5 ++++- src/duplicate.c | 6 +++++- src/lag.c | 4 +++- src/ood.c | 7 +++++-- src/reset.c | 8 ++++++-- src/tamper.c | 8 ++++++-- src/throttle.c | 9 ++++++--- 10 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/cap.c b/src/cap.c index 9c861b6..0d69f9f 100644 --- a/src/cap.c +++ b/src/cap.c @@ -88,7 +88,7 @@ static void capCloseDown(PacketNode *head, PacketNode *tail) { endTimePeriod(); } -static short capProcess(PacketNode *head, PacketNode *tail) { +static short capProcess(PacketNode *head, PacketNode *tail, short *delay) { short capped = FALSE; PacketNode *pac, *pacTmp, *oldLast; DWORD curTick = timeGetTime(); @@ -113,7 +113,7 @@ static short capProcess(PacketNode *head, PacketNode *tail) { if (totalBytes > bytesCapped) { break; - } + } } // process live packets @@ -130,7 +130,7 @@ static short capProcess(PacketNode *head, PacketNode *tail) { if (totalBytes > bytesCapped) { int capCnt = 0; capped = TRUE; - // buffer from pac to head + // buffer from pac to head while (bufSize < KEEP_AT_MOST && pac != head) { pacTmp = pac->prev; insertAfter(popNode(pac), bufHead); @@ -155,6 +155,9 @@ static short capProcess(PacketNode *head, PacketNode *tail) { } } + // We don't mind when we're next scheduled. + *delay = 1000; + return capped; } diff --git a/src/common.h b/src/common.h index 4163e05..a199411 100644 --- a/src/common.h +++ b/src/common.h @@ -116,12 +116,12 @@ typedef struct { Ihandle* (*setupUIFunc)(); // return hbox as controls group void (*startUp)(); // called when starting up the module void (*closeDown)(PacketNode *head, PacketNode *tail); // called when starting up the module - short (*process)(PacketNode *head, PacketNode *tail); + short (*process)(PacketNode *head, PacketNode *tail, short *delay); /* * Flags used during program excution. Need to be re initialized on each run */ short lastEnabled; // if it is enabled on last run - short processTriggered; // whether this module has been triggered in last step + short processTriggered; // whether this module has been triggered in last step Ihandle *iconHandle; // store the icon to be updated } Module; @@ -135,7 +135,7 @@ extern Module resetModule; extern Module capModule; extern Module* modules[MODULE_CNT]; // all modules in a list -// status for sending packets, +// status for sending packets, #define SEND_STATUS_NONE 0 #define SEND_STATUS_SEND 1 #define SEND_STATUS_FAIL -1 diff --git a/src/divert.c b/src/divert.c index b9845d7..ed32a84 100644 --- a/src/divert.c +++ b/src/divert.c @@ -12,6 +12,8 @@ #define QUEUE_LEN 2 << 10 #define QUEUE_TIME 2 << 9 +static short loopDelay = CLOCK_WAITMS; + static HANDLE divertHandle; static volatile short stopLooping; static HANDLE loopThread, clockThread, mutex, event; @@ -213,6 +215,7 @@ static void divertConsumeStep() { DWORD startTick = GetTickCount(), dt; #endif int ix, cnt; + short delay; // use lastEnabled to keep track of module starting up and closing down for (ix = 0; ix < MODULE_CNT; ++ix) { Module *module = modules[ix]; @@ -221,7 +224,8 @@ static void divertConsumeStep() { module->startUp(); module->lastEnabled = 1; } - if (module->process(head, tail)) { + if (module->process(head, tail, &delay)) { + loopDelay = min(loopDelay, delay); InterlockedIncrement16(&(module->processTriggered)); } } else { @@ -249,7 +253,10 @@ static DWORD divertClockLoop(LPVOID arg) { for(;;) { // Wait to be woken up, either by a packet becoming available to process or by timeout. - WaitForSingleObject(event, CLOCK_WAITMS); + WaitForSingleObject(event, loopDelay); + loopDelay = CLOCK_WAITMS; + + // Acquire mutex to perform processing. waitResult = WaitForSingleObject(mutex, INFINITE); switch(waitResult) { case WAIT_OBJECT_0: diff --git a/src/drop.c b/src/drop.c index be0e93b..00097f6 100644 --- a/src/drop.c +++ b/src/drop.c @@ -53,7 +53,7 @@ static void dropCloseDown(PacketNode *head, PacketNode *tail) { LOG("drop disabled"); } -static short dropProcess(PacketNode *head, PacketNode* tail) { +static short dropProcess(PacketNode *head, PacketNode* tail, short *delay) { int dropped = 0; while (head->next != tail) { PacketNode *pac = head->next; @@ -69,6 +69,9 @@ static short dropProcess(PacketNode *head, PacketNode* tail) { } } + // We don't mind when we're next scheduled. + *delay = 1000; + return dropped > 0; } diff --git a/src/duplicate.c b/src/duplicate.c index 3627a13..66c04b2 100644 --- a/src/duplicate.c +++ b/src/duplicate.c @@ -65,7 +65,7 @@ static void dupCloseDown(PacketNode *head, PacketNode *tail) { LOG("dup disabled"); } -static short dupProcess(PacketNode *head, PacketNode *tail) { +static short dupProcess(PacketNode *head, PacketNode *tail, short *delay) { short duped = FALSE; PacketNode *pac = head->next; while (pac != tail) { @@ -81,6 +81,10 @@ static short dupProcess(PacketNode *head, PacketNode *tail) { } pac = pac->next; } + + // We don't mind when we're next scheduled. + *delay = 1000; + return duped; } diff --git a/src/lag.c b/src/lag.c index 0746c61..92cbc1e 100644 --- a/src/lag.c +++ b/src/lag.c @@ -83,7 +83,7 @@ static void lagCloseDown(PacketNode *head, PacketNode *tail) { endTimePeriod(); } -static short lagProcess(PacketNode *head, PacketNode *tail) { +static short lagProcess(PacketNode *head, PacketNode *tail, short *delay) { DWORD currentTime = timeGetTime(); PacketNode *pac = tail->prev; // pick up all packets and fill in the current time @@ -98,6 +98,7 @@ static short lagProcess(PacketNode *head, PacketNode *tail) { } // try sending overdue packets from buffer tail + *delay = 1000; while (!isBufEmpty()) { PacketNode *pac = bufTail->prev; if (currentTime > pac->timestamp + lagTime) { @@ -106,6 +107,7 @@ static short lagProcess(PacketNode *head, PacketNode *tail) { LOG("Send lagged packets."); } else { LOG("Sent some lagged packets, still have %d in buf", bufSize); + *delay = pac->timestamp + lagTime - currentTime; break; } } diff --git a/src/ood.c b/src/ood.c index 51a092a..75a8ccd 100644 --- a/src/ood.c +++ b/src/ood.c @@ -4,7 +4,7 @@ #define NAME "ood" // keep a picked packet at most for KEEP_TURNS_MAX steps, or if there's no following // one, it will just be sent -#define KEEP_TURNS_MAX 10 +#define KEEP_TURNS_MAX 10 static Ihandle *inboundCheckbox, *outboundCheckbox, *chanceInput; @@ -100,7 +100,7 @@ static void swapNode(PacketNode *a, PacketNode *b) { } } -static short oodProcess(PacketNode *head, PacketNode *tail) { +static short oodProcess(PacketNode *head, PacketNode *tail, short *delay) { if (oodPacket != NULL) { if (!isListEmpty() || --giveUpCnt == 0) { LOG("Ooo sent direction %s, is giveup %s", BOUND_TEXT(oodPacket->addr.Direction), giveUpCnt ? "NO" : "YES"); @@ -136,6 +136,9 @@ static short oodProcess(PacketNode *head, PacketNode *tail) { } } + // We don't mind when we're next scheduled. + *delay = 1000; + return FALSE; } diff --git a/src/reset.c b/src/reset.c index 76cad84..c2e7770 100644 --- a/src/reset.c +++ b/src/reset.c @@ -72,7 +72,7 @@ static void resetCloseDown(PacketNode *head, PacketNode *tail) { InterlockedExchange16(&setNextCount, 0); } -static short resetProcess(PacketNode *head, PacketNode *tail) { +static short resetProcess(PacketNode *head, PacketNode *tail, short *delay) { short reset = FALSE; PacketNode *pac = head->next; while (pac != tail) { @@ -104,9 +104,13 @@ static short resetProcess(PacketNode *head, PacketNode *tail) { } } } - + pac = pac->next; } + + // We don't mind when we're next scheduled. + *delay = 1000; + return reset; } diff --git a/src/tamper.c b/src/tamper.c index f630703..bc6d2f7 100644 --- a/src/tamper.c +++ b/src/tamper.c @@ -83,7 +83,7 @@ static INLINE_FUNCTION void tamper_buf(char *buf, UINT len) { } } -static short tamperProcess(PacketNode *head, PacketNode *tail) { +static short tamperProcess(PacketNode *head, PacketNode *tail, short *delay) { short tampered = FALSE; PacketNode *pac = head->next; while (pac != tail) { @@ -92,7 +92,7 @@ static short tamperProcess(PacketNode *head, PacketNode *tail) { char *data = NULL; UINT dataLen = 0; if (WinDivertHelperParsePacket(pac->packet, pac->packetLen, NULL, NULL, NULL, - NULL, NULL, NULL, (PVOID*)&data, &dataLen) + NULL, NULL, NULL, (PVOID*)&data, &dataLen) && data != NULL && dataLen != 0) { // try to tamper the central part of the packet, // since common packets put their checksum at head or tail @@ -117,6 +117,10 @@ static short tamperProcess(PacketNode *head, PacketNode *tail) { } pac = pac->next; } + + // We don't mind when we're next scheduled. + *delay = 1000; + return tampered; } diff --git a/src/throttle.c b/src/throttle.c index b7534d0..59dd24e 100644 --- a/src/throttle.c +++ b/src/throttle.c @@ -13,10 +13,10 @@ static Ihandle *inboundCheckbox, *outboundCheckbox, *chanceInput, *frameInput, * static volatile short throttleEnabled = 0, throttleInbound = 1, throttleOutbound = 1, chance = 1000, // [0-10000] - // time frame in ms, when a throttle start the packets within the time + // time frame in ms, when a throttle start the packets within the time // will be queued and sent altogether when time is over throttleFrame = TIME_DEFAULT, - dropThrottled = 0; + dropThrottled = 0; static PacketNode throttleHeadNode = {0}, throttleTailNode = {0}; static PacketNode *bufHead = &throttleHeadNode, *bufTail = &throttleTailNode; @@ -113,7 +113,7 @@ static void throttleCloseDown(PacketNode *head, PacketNode *tail) { endTimePeriod(); } -static short throttleProcess(PacketNode *head, PacketNode *tail) { +static short throttleProcess(PacketNode *head, PacketNode *tail, short *delay) { short throttled = FALSE; UNREFERENCED_PARAMETER(head); if (!throttleStartTick) { @@ -152,6 +152,9 @@ static short throttleProcess(PacketNode *head, PacketNode *tail) { } } + // We don't mind when we're next scheduled. + *delay = 1000; + return throttled; }