Skip to content

Commit

Permalink
fixed IOCP complex sending
Browse files Browse the repository at this point in the history
- strange race conditions were observed on LUTI continuous integration servers
- we made a huge refactoring so that delayed/huge writes will be done in the main server thread, not in the thread pool
  • Loading branch information
Arnaud Bouchez committed Feb 29, 2024
1 parent d552238 commit 84d8b58
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 70 deletions.
2 changes: 1 addition & 1 deletion src/mormot.commit.inc
@@ -1 +1 @@
'2.2.7183'
'2.2.7184'
131 changes: 62 additions & 69 deletions src/net/mormot.net.async.pas
Expand Up @@ -576,7 +576,11 @@ TAsyncConnections = class(TNotifiedThread)
fLastOperationIdleSeconds: cardinal;
fKeepConnectionInstanceMS: cardinal;
fLastOperationMS: Int64; // as set by ProcessIdleTix()
{$ifndef USE_WINIOCP}
{$ifdef USE_WINIOCP}
// in IOCP mode, Execute does wieSend (and wieAccept for TAsyncServer)
fIocp: TWinIocp; // wieAccept/wieSend events in their its own IOCP queue
fIocpAccept: PWinIocpSubscription;
{$else}
fThreadReadPoll: TAsyncConnectionsThread;
fThreadPollingWakeupSafe: TLightLock;
fThreadPollingWakeupLoad: integer;
Expand Down Expand Up @@ -756,7 +760,7 @@ TAsyncServer = class(TAsyncConnections)
fMaxConnections: integer;
fAccepted: Int64;
fExecuteState: THttpServerExecuteState;
fExecuteAcceptOnly: boolean; // writes in another thread (THttpAsyncServer)
fExecuteAcceptOnly: boolean; // wr in other thread (POSIX THttpAsyncServer)
fExecuteMessage: RawUtf8;
fSockPort: RawUtf8;
fBanned: THttpAcceptBan; // for hsoBan40xIP
Expand Down Expand Up @@ -820,7 +824,6 @@ TAsyncServer = class(TAsyncConnections)
// of each connected client
TAsyncClient = class(TAsyncConnections)
protected
fEvent: TSynEvent;
procedure Execute; override;
public
/// start the TCP client connections, connecting to the supplied IP server
Expand All @@ -830,8 +833,6 @@ TAsyncClient = class(TAsyncConnections)
aConnectionClass: TAsyncConnectionClass; const ProcessName: RawUtf8;
aLog: TSynLogClass; aOptions: TAsyncConnectionsOptions;
aThreadPoolCount: integer = 1); reintroduce; virtual;
/// overriden for proper Execute closing
procedure Shutdown; override;
published
/// server IP address
property Server: RawUtf8
Expand Down Expand Up @@ -1771,9 +1772,10 @@ procedure TPollAsyncSockets.ProcessWrite(
{$ifdef USE_WINIOCP}
if ((connection.fSecure = nil) or // ensure TLS won't actually block
(neWrite in connection.Socket.WaitFor(0, [neWrite]))) and
connection.WaitLock({writer=}true, {timeout=}0) then
connection.WaitLock({writer=}true, {timeout=}20) then
// allow to wait a little since we are in a single W thread
{$else}
if connection.WaitLock({writer=}true, {timeout=}0) then // no need to wait
if connection.TryLock({writer=}true) then // no need to wait
{$endif USE_WINIOCP}
try
buflen := connection.fWr.Len;
Expand Down Expand Up @@ -1841,9 +1843,9 @@ procedure TPollAsyncSockets.ProcessWrite(
if fDebugLog <> nil then
DoLog('ProcessWrite: WaitLock failed % -> will retry later',
[pointer(connection)]);
SleepHiRes(0); // avoid switch threads for nothing
{$ifdef USE_WINIOCP}
if not connection.IocpPrepareNextWrite(fIocp) then
CloseConnection(connection, 'ProcessWrite waitlock');
fIocp.Enqueue(connection.fIocp, wieSend, sent); // not PrepareNextWrite
{$endif USE_WINIOCP}
end;
finally
Expand Down Expand Up @@ -2094,10 +2096,8 @@ procedure TAsyncConnectionsThread.Execute;
fOwner.fClients.ProcessRead(self, notif);
end;
wieSend:
begin
SetRes(notif, sub^.Tag, [pseWrite]);
fOwner.fClients.ProcessWrite(notif, bytes);
end;
// writes are done in the single (and main) fOwner.Execute thread
fOwner.fIocp.Enqueue(sub, e, bytes);
end;
end;
{$else}
Expand Down Expand Up @@ -2247,6 +2247,8 @@ constructor TAsyncConnections.Create(const OnStart, OnStop: TOnNotifyThread;
{$ifdef USE_WINIOCP}
for i := 0 to aThreadPoolCount - 1 do
fThreads[i] := TAsyncConnectionsThread.Create(self, atpReadPending, i);
fIocp := TWinIocp.Create({processing=}1);
fIocp.OnLog := fClients.fIocp.OnLog;
{$else}
fClientsEpoll := fClients.fRead.PollClass.FollowEpoll;
if aThreadPoolCount = 1 then
Expand Down Expand Up @@ -2363,6 +2365,10 @@ procedure TAsyncConnections.Shutdown;
i, n: PtrInt;
endtix: Int64;
begin
{$ifdef USE_WINIOCP}
fIocp.Unsubscribe(fIocpAccept);
fIocp.Terminate;
{$endif USE_WINIOCP}
Terminate;
// terminate the main clients asynchronous logic
if fClients <> nil then
Expand Down Expand Up @@ -2420,6 +2426,9 @@ destructor TAsyncConnections.Destroy;
begin
Shutdown;
inherited Destroy;
{$ifdef USE_WINIOCP}
FreeAndNil(fIocp);
{$endif USE_WINIOCP}
if not (acoNoConnectionTrack in fOptions) then
begin
if fConnectionCount <> 0 then
Expand Down Expand Up @@ -3246,36 +3255,30 @@ procedure TAsyncServer.SetExecuteState(State: THttpServerExecuteState);
procedure TAsyncServer.Execute;
var
{$ifdef USE_WINIOCP}
// in IOCP mode, this thread does wieAccept and wieSend
iocp: TWinIocp; // wieAccept/wieSend events in their its own IOCP queue
sub: PWinIocpSubscription;
s: TNetSocket;
e: TWinIocpEvent;
bytes: cardinal;
{$ifdef IOCP_ACCEPT_PREALLOCATE_SOCKETS}
sockets: TNetSocketDynArray;
socketsalloc: PtrInt;
s: TNetSocket;
{$endif IOCP_ACCEPT_PREALLOCATE_SOCKETS}
{$else}
// in select/poll/epoll mode, this thread may do accept or accept+write
async: boolean;
{$endif USE_WINIOCP}
res: TNetResult;
notif: TPollSocketResult;
client: TNetSocket;
connection: TAsyncConnection;
res: TNetResult;
start: Int64;
bytes: cardinal;
len: integer;
sin: TNetAddr;
begin
// Accept() incoming connections
// and Send() output packets in the background if fExecuteAcceptOnly=false
SetCurrentThreadName('A:%', [fProcessName]);
SetCurrentThreadName('AW:%', [fProcessName]);
NotifyThreadStart(self);
{$ifdef USE_WINIOCP}
iocp := TWinIocp.Create({processing=}1);
try
{$endif USE_WINIOCP}
try
// create and bind fServer to the expected TCP port
SetExecuteState(esBinding);
Expand All @@ -3285,8 +3288,8 @@ procedure TAsyncServer.Execute;
raise EAsyncConnections.CreateUtf8('%.Execute: bind failed', [self]);
SetExecuteState(esRunning);
{$ifdef USE_WINIOCP}
sub := iocp.Subscribe(fServer.Sock, 0);
if not iocp.PrepareNext(sub, wieAccept) then
fIocpAccept := fIocp.Subscribe(fServer.Sock, 0);
if not fIocp.PrepareNext(fIocpAccept, wieAccept) then
RaiseLastError('TAsyncServer.Execute: acceptex', EWinIocp);
{$ifdef IOCP_ACCEPT_PREALLOCATE_SOCKETS}
sockets := NewRawSockets(fServer.SocketFamily, nlTcp, 10000);
Expand All @@ -3307,44 +3310,43 @@ procedure TAsyncServer.Execute;
begin
PQWord(@notif)^ := 0; // direct blocking accept() by default
{$ifdef USE_WINIOCP}
sub := iocp.GetNext(INFINITE, e, bytes);
sub := fIocp.GetNext(INFINITE, e, bytes);
if sub = nil then
break;
break; // terminated
res := nrFatalError;
case e of
wieAccept:
if iocp.GetNextAccept(sub, client, sin) then
if fIocp.GetNextAccept(sub, client, sin) then
begin
if acoEnableTls in fOptions then
res := nrOk
else
res := client.MakeAsync;
if res = nrOk then
{$ifdef IOCP_ACCEPT_PREALLOCATE_SOCKETS}
begin
s := nil; // allocate in PrepareNext()
{$ifdef IOCP_ACCEPT_PREALLOCATE_SOCKETS}
if socketsalloc <= high(sockets) then
begin
s := sockets[socketsalloc]; // provide one pre-allocated socket
inc(socketsalloc);
end;
if not iocp.PrepareNext(sub, wieAccept, nil, 0, s) then
{$endif IOCP_ACCEPT_PREALLOCATE_SOCKETS}
if not fIocp.PrepareNext(sub, wieAccept, nil, 0, s) then
res := nrFatalError;
end;
{$else}
if not iocp.PrepareNext(sub, wieAccept) then
res := nrFatalError;
{$endif IOCP_ACCEPT_PREALLOCATE_SOCKETS}
end;
wieSend:
begin
// redirected from TAsyncConnectionsThread.Execute
SetRes(notif, sub^.Tag, [pseWrite]);
res := nrOk;
end;
end;
if e = wieAccept then
begin
{$else}
bytes := 0;
if async and
not fClients.fWrite.GetOne(1000, 'AW', notif) then
continue;
Expand Down Expand Up @@ -3440,10 +3442,10 @@ procedure TAsyncServer.Execute;
client.ShutdownAndClose({rdwr=}false);
end
else
// this was a pseWrite notification -> try to send pending data
// this was a pseWrite (wieSend) notification -> try send pending data
// here connection = TObject(notif.tag)
// - never executed if fExecuteAcceptOnly=true (THttpAsyncServer)
fClients.ProcessWrite(notif, 0);
// - never executed if fExecuteAcceptOnly=true (POSIX THttpAsyncServer)
fClients.ProcessWrite(notif, bytes);
end;
except
on E: Exception do
Expand All @@ -3455,11 +3457,6 @@ procedure TAsyncServer.Execute;
[E.ClassType, fProcessName], self);
end;
end;
{$ifdef USE_WINIOCP}
finally
iocp.Free;
end;
{$endif USE_WINIOCP}
DoLog(sllInfo, 'Execute: done AW %', [fProcessName], self);
SetExecuteState(esFinished);
end;
Expand All @@ -3482,41 +3479,37 @@ constructor TAsyncClient.Create(const aServer, aPort: RawUtf8;
aLog, aOptions, aThreadPoolCount);
end;

procedure TAsyncClient.Shutdown;
begin
{$ifdef USE_WINIOCP}
if fEvent <> nil then
fEvent.SetEvent;
{$endif USE_WINIOCP}
inherited Shutdown;
end;

procedure TAsyncClient.Execute;
{$ifndef USE_WINIOCP}
var
notif: TPollSocketResult;
{$endif USE_WINIOCP}
bytes: cardinal;
{$ifdef USE_WINIOCP}
e: TWinIocpEvent;
sub: PWinIocpSubscription;
{$endif USE_WINIOCP}
begin
SetCurrentThreadName('C:% %', [fProcessName, self]);
SetCurrentThreadName('W:% %', [fProcessName, self]);
NotifyThreadStart(self);
try
if fThreadClients.Count > 0 then
while InterlockedDecrement(fThreadClients.Count) >= 0 do
// will first connect some clients in this main thread
ThreadClientsConnect;
{$ifdef USE_WINIOCP}
if not Terminated then
begin
fEvent := TSynEvent.Create;
DoLog(sllDebug, 'Execute: wait % C', [fProcessName], self);
fEvent.WaitForEver; // triggered from Shutdown
FreeAndNil(fEvent);
end;
{$else}
while not Terminated do
begin
{$ifdef USE_WINIOCP}
sub := fIocp.GetNext(INFINITE, e, bytes);
if sub = nil then
break; // terminated
if e <> wieSend then
continue;
SetRes(notif, sub^.Tag, [pseWrite]);
{$else}
bytes := 0;
if fClients.fWrite.GetOne(1000, 'W', notif) then
fClients.ProcessWrite(notif, 0);
{$endif USE_WINIOCP}
{$endif USE_WINIOCP}
fClients.ProcessWrite(notif, bytes);
end;
DoLog(sllInfo, 'Execute: done % C', [fProcessName], self);
except
on E: Exception do
Expand Down Expand Up @@ -4076,7 +4069,7 @@ procedure THttpAsyncConnection.DoAfterResponse;

procedure THttpAsyncConnections.Execute;
begin
fExecuteAcceptOnly := true; // THttpAsyncServer.Execute will do the writes
fExecuteAcceptOnly := true; // THttpAsyncServer.Execute will do POSIX writes
inherited Execute;
end;

Expand Down Expand Up @@ -4267,14 +4260,14 @@ procedure THttpAsyncServer.Execute;
tix, lasttix: cardinal;
msidle: integer;
begin
// Send() output packets in the background
SetCurrentThreadName('W:%', [fAsync.fProcessName]);
// call ProcessIdleTix - and POSIX Send() output packets in the background
SetCurrentThreadName('M:%', [fAsync.fProcessName]);
NotifyThreadStart(self);
WaitStarted(10); // wait for fAsync.Execute to bind and start
if fAsync <> nil then
try
fSock := fAsync.fServer;
fAsync.DoLog(sllTrace, 'Execute: main W loop', [], self);
fAsync.DoLog(sllTrace, 'Execute: main loop', [], self);
IdleEverySecond; // initialize idle process (e.g. fHttpDateNowUtc)
tix := mormot.core.os.GetTickCount64 shr 16; // delay=500 after 1 min idle
lasttix := tix;
Expand Down

0 comments on commit 84d8b58

Please sign in to comment.