Skip to content

Commit

Permalink
fixed race condition of async ProcessWrite() in a background thread
Browse files Browse the repository at this point in the history
- when the client is making a lot of requests on the loopback (i.e. only regression tests, not production)
- should not appear with normal network latency and regular workload
- observed via LUTI and by increasing RepeatTextArray() tests to successive calls
- now RepeatTextArray() tests with 50000 successive calls pass with no problem
  • Loading branch information
Arnaud Bouchez committed Mar 1, 2024
1 parent 84d8b58 commit ea798e9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/mormot.commit.inc
@@ -1 +1 @@
'2.2.7184'
'2.2.7185'
43 changes: 38 additions & 5 deletions src/net/mormot.net.async.pas
Expand Up @@ -119,8 +119,8 @@ TPollAsyncConnection = class(TSynPersistent)
fReadThread: TSynThread;
// how many bytes have been transmitted via Send() and Recv() methods
fBytesRecv, fBytesSend: Int64;
// opaque Windows IOCP instances returned by TWinIocp.Subscribe()
{$ifdef USE_WINIOCP}
// opaque Windows IOCP instances returned by TWinIocp.Subscribe()
fIocp: PWinIocpSubscription; // a single IOCP queue for wieRecv+wieSend
function IocpPrepareNextWrite(queue: TWinIocp): boolean;
{$endif USE_WINIOCP}
Expand All @@ -133,6 +133,9 @@ TPollAsyncConnection = class(TSynPersistent)
// - overriding this method is cheaper than the plain Destroy destructor
// - default implementation does nothing
procedure BeforeDestroy; virtual;
/// called just before ProcessRead/OnRead are done
// - is overriden e.g. in THttpAsyncConnection to wait for background Write
procedure BeforeProcessRead; virtual;
/// this method is called when the some input data is pending on the socket
// - should extract frames or requests from Connection.rd, and handle them
// - this is where the input should be parsed and extracted according to
Expand Down Expand Up @@ -315,7 +318,8 @@ TPollAsyncSockets = class
/// one or several threads should execute this method
// - thread-safe handle of any notified incoming packet
// - return true if something has been read or closed, false to retry later
function ProcessRead(Sender: TSynThread; const notif: TPollSocketResult): boolean;
function ProcessRead(Sender: TSynThread;
const notif: TPollSocketResult): boolean;
/// one thread should execute this method with the proper pseWrite notif
// - thread-safe handle of any outgoing packets
// - sent is the number of bytes already sent from connection.fWr buffer,
Expand Down Expand Up @@ -760,7 +764,7 @@ TAsyncServer = class(TAsyncConnections)
fMaxConnections: integer;
fAccepted: Int64;
fExecuteState: THttpServerExecuteState;
fExecuteAcceptOnly: boolean; // wr in other thread (POSIX THttpAsyncServer)
fExecuteAcceptOnly: boolean; // W in other thread (POSIX THttpAsyncServer)
fExecuteMessage: RawUtf8;
fSockPort: RawUtf8;
fBanned: THttpAcceptBan; // for hsoBan40xIP
Expand Down Expand Up @@ -891,6 +895,8 @@ THttpAsyncConnection = class(TAsyncConnection)
procedure AfterCreate; override;
procedure BeforeDestroy; override;
procedure HttpInit;
// overriden to wait for background Write to finish
procedure BeforeProcessRead; override;
// redirect to fHttp.ProcessRead()
function OnRead: TPollAsyncSocketOnReadWrite; override;
// DoRequest gathered all output in fWR buffer to be sent at once
Expand Down Expand Up @@ -1018,6 +1024,10 @@ procedure TPollAsyncConnection.BeforeDestroy;
begin
end;

procedure TPollAsyncConnection.BeforeProcessRead;
begin
end;

function TPollAsyncConnection.AfterWrite: TPollAsyncSocketOnReadWrite;
begin
result := soContinue;
Expand Down Expand Up @@ -2092,6 +2102,8 @@ procedure TAsyncConnectionsThread.Execute;
case e of
wieRecv:
begin
if sub^.Tag <> 0 then // not needed outside IOCP
TPollAsyncConnection(sub^.Tag).BeforeProcessRead;
SetRes(notif, sub^.Tag, [pseRead]);
fOwner.fClients.ProcessRead(self, notif);
end;
Expand Down Expand Up @@ -3602,9 +3614,28 @@ function THttpAsyncConnection.FlushPipelinedWrite: TPollAsyncSocketOnReadWrite;
fWR.Reset;
end;

procedure THttpAsyncConnection.BeforeProcessRead;
var
endtix: Int64;
begin
// ensure any previous request is actually finished (possible with IOCP only)
if fHttp.State <> hrsSendBody then
exit;
// possible race condition of ProcessWrite() in a background thread
// - when the client is making a lot of requests on the loopback (i.e. only
// tests, not production) - should not appear with normal network latency
endtix := GetTickCount64 + 50; // on Windows, Sleep() may return too quick
repeat
fOwner.DoLog(sllWarning, 'OnRead(%): wait for background W', [Socket], self);
SleepHiRes(5); // may wait any time < 16ms
until (fHttp.State <> hrsSendBody) or
(GetTickCount64 > endtix);
end;

function THttpAsyncConnection.OnRead: TPollAsyncSocketOnReadWrite;
var
st: TProcessParseLine;
previous: THttpRequestState;
begin
if (fOwner.fLog <> nil) and
(acoVerboseLog in fOwner.Options) and
Expand All @@ -3617,6 +3648,7 @@ function THttpAsyncConnection.OnRead: TPollAsyncSocketOnReadWrite;
fHttp.State := hrsErrorShutdownInProgress
else
begin
previous := fHttp.State;
// use the HTTP state machine to asynchronously parse fRd input
result := soContinue;
st.P := fRd.Buffer;
Expand All @@ -3643,8 +3675,8 @@ function THttpAsyncConnection.OnRead: TPollAsyncSocketOnReadWrite;
result := DoRequest;
else
begin
fOwner.DoLog(sllWarning, 'OnRead: close connection after %',
[HTTP_STATE[fHttp.State]], self);
fOwner.DoLog(sllWarning, 'OnRead: close connection after % (before=%)',
[HTTP_STATE[fHttp.State], HTTP_STATE[previous]], self);
DoReject(HTTP_BADREQUEST);
result := soClose;
end;
Expand All @@ -3661,6 +3693,7 @@ function THttpAsyncConnection.OnRead: TPollAsyncSocketOnReadWrite;
else if (result <> soContinue) or
(fHttp.State in [hrsGetCommand, hrsWaitAsyncProcessing, hrsUpgraded]) then
break; // rejected, authenticated, async or upgraded
previous := fHttp.State;
end;
if fPipelinedWrite then
if FlushPipelinedWrite <> soContinue then
Expand Down

0 comments on commit ea798e9

Please sign in to comment.