Skip to content

Commit

Permalink
removed internal list of unsubscribed tags for async sockets
Browse files Browse the repository at this point in the history
- this list intend was an attempt to stabilize HTTP/1.0 process, but it was in fact not a good idea at all :(
- it triggered random GPF and memory leaks on highly-concurrent HTTP/1.0 connections, because TAsyncConnection pointers were reused by the MM whereas the pending results tags were still not flushed, so those new connections were in fact ignored... and eventually leaked...
- to be fair, it was not a very realistic use case, but "wrk -c 16384" was able to reproduce it, if you set the server keep alive parameter to 1 second, or with Apache Bench in HTTP/1.0 mode
- resulting code seems now pretty stable from HTTP/1.1 or HTTP/1.0 requests
- as a side benefit, it also enhances performance, because we don't have to make an O(n) over this list any more
  • Loading branch information
Arnaud Bouchez committed Jul 25, 2022
1 parent 5b3fd06 commit 00c3a15
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/mormot.commit.inc
@@ -1 +1 @@
'2.0.3762'
'2.0.3763'
2 changes: 1 addition & 1 deletion src/net/mormot.net.async.pas
Expand Up @@ -1076,7 +1076,7 @@ function TPollAsyncSockets.Start(connection: TPollAsyncConnection): boolean;
end;

const
_SUB: array[boolean] of AnsiChar = '+-';
_SUB: array[boolean] of AnsiChar = '-+';

function TPollAsyncSockets.Stop(connection: TPollAsyncConnection): boolean;
var
Expand Down
27 changes: 8 additions & 19 deletions src/net/mormot.net.sock.pas
Expand Up @@ -558,7 +558,7 @@ procedure InitNetTlsContext(var TLS: TNetTlsContext; Server: boolean = false;
/// set of events monitored by TPollSocketAbstract
TPollSocketEvents = set of TPollSocketEvent;

/// some opaque value (which may be a pointer) associated with a polling event
/// some opaque value (typically a pointer) associated with a polling event
TPollSocketTag = type PtrInt;
PPollSocketTag = ^TPollSocketTag;
TPollSocketTagDynArray = TPtrUIntDynArray;
Expand Down Expand Up @@ -685,14 +685,12 @@ TPollSockets = class(TPollAbstract)
fPendingSafe: TLightLock;
fPollIndex: integer;
fGettingOne: integer;
fLastUnsubscribeTagCount: integer;
fTerminated: boolean;
fEpollGettingOne: boolean;
fUnsubscribeShouldShutdownSocket: boolean;
fPollClass: TPollSocketClass;
fOnLog: TSynLogProc;
fOnGetOneIdle: TOnPollSocketsIdle;
fLastUnsubscribeTag: TPollSocketTagDynArray; // protected by fPendingSafe
// used for select/poll (FollowEpoll=false) with multiple thread-unsafe fPoll[]
fSubscription: TPollSocketsSubscription;
fSubscriptionSafe: TLightLock; // dedicated not to block Accept()
Expand Down Expand Up @@ -2457,7 +2455,6 @@ procedure TPollSockets.Unsubscribe(socket: TNetSocket; tag: TPollSocketTag);
if fnd <> nil then
byte(fnd^.events) := 0; // GetOnePending() will ignore it
end;
AddPtrUInt(fLastUnsubscribeTag, fLastUnsubscribeTagCount, tag);
finally
fPendingSafe.UnLock;
end;
Expand Down Expand Up @@ -2487,7 +2484,7 @@ procedure TPollSockets.Unsubscribe(socket: TNetSocket; tag: TPollSocketTag);

function TPollSockets.IsValidPending(tag: TPollSocketTag): boolean;
begin
result := true; // overriden e.g. in TPollAsyncReadSockets
result := tag <> 0; // overriden e.g. in TPollAsyncReadSockets
end;

function TPollSockets.GetSubscribeCount: integer;
Expand All @@ -2512,8 +2509,6 @@ function TPollSockets.GetOnePending(out notif: TPollSocketResult;
const call: RawUtf8): boolean;
var
n, ndx: PtrInt;
label
ok;
begin
result := false;
if fTerminated or
Expand All @@ -2528,31 +2523,25 @@ function TPollSockets.GetOnePending(out notif: TPollSocketResult;
n := fPending.Count;
ndx := fPendingIndex;
if ndx < n then
begin
repeat
// retrieve next notified event
notif := fPending.Events[ndx];
// move forward
inc(ndx);
if (byte(notif.events) <> 0) and // Unsubscribe() may have reset to 0
IsValidPending(notif.tag) and // e.g. TPollAsyncReadSockets
((fLastUnsubscribeTagCount = 0) or
not PtrUIntScanExists(pointer(fLastUnsubscribeTag),
fLastUnsubscribeTagCount, notif.tag)) then
if (byte(notif.events) <> 0) and // Unsubscribe() may have reset to 0
IsValidPending(notif.tag) then // e.g. TPollAsyncReadSockets
begin
// there is a non-void event to return
result := true;
fPendingIndex := ndx; // continue with next event
// quick exit with one notified event
if ndx = n then
break; // reset fPending list
goto ok;
break;
end;
until ndx >= n;
if ndx >= n then
begin
fPending.Count := 0; // reuse shared fPending.Events[] memory
fPendingIndex := 0;
fLastUnsubscribeTagCount := 0;
ok: end;
end;
{$ifdef HASFASTTRYFINALLY}
finally
{$endif HASFASTTRYFINALLY}
Expand Down

0 comments on commit 00c3a15

Please sign in to comment.