/
mormot.net.async.pas
3289 lines (3105 loc) · 118 KB
/
mormot.net.async.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/// Asynchronous Network Layer for Event-Driven Clients or Servers
// - this unit is a part of the Open Source Synopse mORMot framework 2,
// licensed under a MPL/GPL/LGPL three license - see LICENSE.md
unit mormot.net.async;
{
*****************************************************************************
Event-Driven Network Classes and Functions
- Low-Level Non-blocking Connections
- Client or Server Asynchronous Process
- THttpAsyncServer Event-Driven HTTP Server
*****************************************************************************
}
interface
{$I ..\mormot.defines.inc}
uses
sysutils,
classes,
mormot.core.base,
mormot.core.os,
mormot.core.data,
mormot.core.text,
mormot.core.unicode,
mormot.core.buffers,
mormot.core.threads,
mormot.core.log,
mormot.core.rtti,
mormot.core.perf,
mormot.core.zip,
mormot.net.sock,
mormot.net.http,
mormot.net.server; // for multi-threaded process
{ ******************** Low-Level Non-blocking Connections }
type
/// 32-bit integer value used to identify an asynchronous connection
// - will start from 1, and increase during the TAsyncConnections live-time
TPollAsyncConnectionHandle = type integer;
/// a dynamic array of TPollAsyncConnectionHandle identifiers
TPollAsyncConnectionHandleDynArray = array of TPollAsyncConnectionHandle;
/// let TPollAsyncSockets.OnRead/AfterWrite shutdown the socket if needed
TPollAsyncSocketOnReadWrite = (
soContinue,
soClose);
/// low-level flags used by the state machine about one TPollAsyncConnection
// - fWasActive is set by TAsyncConnections.IdleEverySecond to purge rd/wr
// unused buffers, to avoid calling GetTickCount64 for every activity
// - fClosed is set by OnClose virtual method
// - fFirstRead is set once TPollAsyncSockets.OnFirstRead is called
// - fSubRead/fSubWrite flags are set when Subscribe() has been called
// - fInList indicates that the connection was Added to the list
TPollAsyncConnectionFlags = set of (
fWasActive,
fClosed,
fFirstRead,
fSubRead,
fSubWrite,
fInList);
/// abstract parent to store information aboout one TPollAsyncSockets connection
TPollAsyncConnection = class(TSynPersistent)
protected
/// the associated TCP connection
// - equals nil after TPollAsyncSockets.Stop
fSocket: TNetSocket;
/// the associated 32-bit sequence number
// - equals 0 after TPollAsyncSockets.Stop
fHandle: TPollAsyncConnectionHandle;
/// atomically incremented during WaitLock()
fWaitCounter: integer;
/// false for a single lock (default), true to separate read/write locks
fLockMax: boolean;
/// low-level flags used by the state machine about this connection
fFlags: TPollAsyncConnectionFlags;
/// the current (reusable) read data buffer of this connection
fRd: TRawByteStringBuffer;
/// the current (reusable) write data buffer of this connection
fWr: TRawByteStringBuffer;
/// TryLock/Unlock R/W thread acquisition
// - uses its own rentrant implementation, faster than TRTLCriticalSection
fRW: array[boolean] of record
Lock: PtrUInt;
ThreadID: TThreadID;
RentrantCount: integer;
end;
/// low-level TLS context
fSecure: INetTls;
/// called when the instance is connected to a poll
// - i.e. at the end of TAsyncConnections.ConnectionNew(), when Handle is set
// - overriding this method is cheaper than the plain Create destructor
// - default implementation does nothing
procedure AfterCreate; virtual;
/// called when the instance is about to be deleted from a poll
// - overriding this method is cheaper than the plain Destroy destructor
// - default implementation does nothing
procedure BeforeDestroy; virtual;
/// this method is called when the some input data is pending on the socket
// - should extract frames or requests from Connection.rd, and handle them
// - this is where the input should be parsed and extracted according to
// the implemented procotol; Connection.rd could be kept as temporary
// buffer during the parsing, and rd.Reset called once processed
// - Sender.Write() could be used for asynchronous answer sending
// - Sender.LogVerbose() allows logging of escaped data
// - could return sorClose to shutdown the socket, e.g. on parsing error
function OnRead: TPollAsyncSocketOnReadWrite; virtual; abstract;
/// this method is called when some data has been written to the socket
// - default implementation will do nothing - see e.g. TRtspConnection
// - you may send data asynchronously using Connection.wr.Append()
function AfterWrite: TPollAsyncSocketOnReadWrite; virtual;
/// this method is called when the sockets is closing
procedure OnClose; virtual;
public
/// finalize the instance
destructor Destroy; override;
/// quick check if this instance seems still active, i.e. its Handle <> 0
function IsDangling: boolean;
{$ifdef HASINLINE} inline; {$endif}
/// quick check if this instance is still open
function IsClosed: boolean;
{$ifdef HASINLINE} inline; {$endif}
/// acquire an exclusive R/W access to this connection
// - returns true if connection has been acquired, setting the wasactive flag
// - returns false if it is used by another thread
function TryLock(writer: boolean): boolean;
/// try to acquire an exclusive R/W access to this connection
// - returns true if connection has been acquired
// - returns false if it is used by another thread, after the timeoutMS period
function WaitLock(writer: boolean; timeoutMS: cardinal): boolean;
/// release exclusive R/W access to this connection
procedure UnLock(writer: boolean);
{$ifdef HASINLINE} inline; {$endif}
/// release all R/W nested locks
// - used when the connection is closed and inactive
procedure UnLockFinal(writer: boolean);
{$ifdef HASINLINE} inline; {$endif}
/// called after TAsyncConnections.LastOperationReleaseMemorySeconds
function ReleaseMemoryOnIdle: PtrInt;
/// send some buffer to the connection, using TLS if possible
function Send(buf: pointer; var len: integer): TNetResult;
/// receive some buffer from the connection, using TLS if possible
function Recv(buf: pointer; var len: integer): TNetResult;
/// read-only access to the socket number associated with this connection
property Socket: TNetSocket
read fSocket;
/// read-only access to the low-level TLS context
property Secure: INetTls
read fSecure;
published
/// read-only access to the handle number associated with this connection
property Handle: TPollAsyncConnectionHandle
read fHandle;
end;
/// possible options for low-level TPollAsyncSockets process
// - as translated from homonymous high-level acoWritePollOnly
// TAsyncConnectionsOptions item
TPollAsyncSocketsOptions = set of (
paoWritePollOnly);
TPollConnectionSockets = class(TPollSockets)
protected
function IsValidPending(tag: TPollSocketTag): boolean; override;
procedure PendingLogDebug(const caller: shortstring);
end;
/// callback prototype for TPollAsyncSockets.OnStart events
// - should return true if Start() should not subscribe for this connection
TOnPollAsyncFunc = function(Sender: TPollAsyncConnection): boolean of object;
/// callback prototype for TPollAsyncSockets.OnStop events
TOnPollAsyncProc = procedure(Sender: TPollAsyncConnection) of object;
{$M+}
/// read/write buffer-oriented process of multiple non-blocking connections
// - to be used e.g. for stream protocols (e.g. WebSockets or IoT communication)
// - assigned sockets will be set in non-blocking mode, so that polling will
// work as expected: you should then never use direclty the socket (e.g. via
// blocking TCrtSocket), but rely on this class for asynchronous process:
// TPollAsyncConnection.OnRead() overriden method will receive all incoming
// data from input buffer, and Write() should be called to add send some data,
// potentially asynchronous with an internal buffer
// - ProcessRead/ProcessWrite methods are to be run for actual communication:
// either you call those methods from multiple threads, or you run them in
// loop from a single thread, then define a TSynThreadPool for running any
// blocking process (e.g. computing requests answers) from OnRead callbacks
TPollAsyncSockets = class
protected
fRead: TPollConnectionSockets;
fWrite: TPollSockets; // separated fWrite (short-term subscribe)
fProcessingRead, fProcessingWrite: integer;
fSendBufferSize: integer; // retrieved at first connection Start()
fReadCount: Int64;
fWriteCount: Int64;
fReadBytes: Int64;
fWriteBytes: Int64;
fDebugLog: TSynLogClass;
fOptions: TPollAsyncSocketsOptions;
fProcessReadCheckPending: boolean;
fReadWaitMs: integer;
fOnStart: TOnPollAsyncFunc;
fOnFirstRead, fOnStop: TOnPollAsyncProc;
function GetCount: integer;
procedure DoLog(const TextFmt: RawUtf8; const TextArgs: array of const);
// pseError: return false to close socket and connection
function OnError(connection: TPollAsyncConnection;
events: TPollSocketEvents): boolean; virtual; abstract;
procedure OnClosed(connection: TPollAsyncConnection); virtual; abstract;
procedure UnlockAndCloseConnection(writer: boolean;
var connection: TPollAsyncConnection; const caller: ShortString);
procedure RegisterConnection(connection: TPollAsyncConnection); virtual; abstract;
function SubscribeConnection(const caller: shortstring;
connection: TPollAsyncConnection; sub: TPollSocketEvent): boolean;
procedure CloseConnection(var connection: TPollAsyncConnection);
public
/// initialize the read/write sockets polling
// - fRead and fWrite TPollSocketsBuffer instances will track pseRead or
// pseWrite events, and maintain input and output data buffers
constructor Create(aOptions: TPollAsyncSocketsOptions); virtual;
/// finalize buffer-oriented sockets polling, and release all used memory
destructor Destroy; override;
/// assign a new connection to the internal reading poll
// - the TSocket handle will be set in non-blocking mode from now on - it
// is not recommended to access it directly any more, but use Write() and
// handle OnRead() callback
// - fRead will poll incoming packets, then call OnRead to handle them,
// or Unsubscribe and delete the socket when pseClosed is notified
// - fWrite will poll for outgoing packets as specified by Write(), then
// send any pending data once the socket is ready
// - any manual call of Start() should ensure the connection is non-blocking
function Start(connection: TPollAsyncConnection): boolean; virtual;
/// remove a connection from the internal poll, and shutdown its socket
// - most of the time, the connection is released by OnClosed when the other
// end shutdown the socket; but you can explicitly call this method when
// the connection (and its socket) is to be shutdown
// - this method won't call OnClosed, since it is initiated by the class
function Stop(connection: TPollAsyncConnection): boolean; virtual;
/// add some data to the asynchronous output buffer of a given connection
// - this method may block if the connection is currently writing from
// another thread (which is not possible from TPollAsyncSockets.Write),
// up to timeout milliseconds
function Write(connection: TPollAsyncConnection;
data: pointer; datalen: integer; timeout: integer = 5000): boolean; virtual;
/// add some data to the asynchronous output buffer of a given connection
function WriteString(connection: TPollAsyncConnection;
const data: RawByteString; timeout: integer = 5000): boolean;
/// one or several threads should execute this method
// - thread-safe handle of any notified incoming packet
// - return true if something has been read or closed, false to retry later
function ProcessRead(const notif: TPollSocketResult): boolean;
/// one thread should execute this method with the proper pseWrite notif
// - thread-safe handle of any outgoing packets
procedure ProcessWrite(const notif: TPollSocketResult);
/// notify internal socket polls to stop their polling loop ASAP
procedure Terminate(waitforMS: integer);
/// low-level access to the polling class used for incoming data
property PollRead: TPollConnectionSockets
read fRead;
/// low-level access to the polling class used for outgoind data
property PollWrite: TPollSockets
write fWrite;
/// some processing options
property Options: TPollAsyncSocketsOptions
read fOptions write fOptions;
/// event called on Start() method success
// - warning: this callback should be very quick because it is blocking
property OnStart: TOnPollAsyncFunc
read fOnStart write fOnStart;
/// event called on first ProcessRead() on a given connection
// - is assigned e.g. to TAsyncServer.OnFirstReadDoTls to setup the TLS
// in one sub-thread of the thread pool
property OnFirstRead: TOnPollAsyncProc
read fOnFirstRead write fOnFirstRead;
/// event called on Stop() method success
// - warning: this callback should be very quick because it is blocking
property OnStop: TOnPollAsyncProc
read fOnStop write fOnStop;
published
/// how many connections are currently managed by this instance
property Count: integer
read GetCount;
/// how many times data has been received by this instance
property ReadCount: Int64
read fReadCount;
/// how many times data has been sent by this instance
property WriteCount: Int64
read fWriteCount;
/// how many data bytes have been received by this instance
property ReadBytes: Int64
read fReadBytes;
/// how many data bytes have been sent by this instance
property WriteBytes: Int64
read fWriteBytes;
// enable WaitFor() during recv() in ProcessRead
// - may enhance responsiveness especially on HTTP/1.0 connections
// - equals 0 ms by default, but could be tuned e.g. to 50 or 100 if needed
// - is set to 50 ms if hsoFavorHttp10 option is set
// - use with care: performance degrades with highly concurrent HTTP/1.1
property ReadWaitMs: integer
read fReadWaitMs write fReadWaitMs;
end;
{$M-}
function ToText(ev: TPollSocketEvent): PShortString; overload;
{ ******************** Client or Server Asynchronous Process }
type
/// exception associated with TAsyncConnection / TAsyncConnections process
EAsyncConnections = class(ESynException);
TAsyncConnections = class;
/// 32-bit type used to store GetTickCount64 div 1000 values
// - as used e.g. by TAsyncConnection.fLastOperation
TAsyncConnectionSec = type cardinal;
/// abstract class to store one TAsyncConnections connection
// - may implement e.g. WebSockets frames, or IoT binary protocol
// - each connection will be identified by a TPollAsyncConnectionHandle integer
// - idea is to minimize the resources used per connection, and allow full
// customization of the process by overriding the OnRead virtual method (and,
// if needed, AfterCreate/AfterWrite/BeforeDestroy/OnLastOperationIdle)
TAsyncConnection = class(TPollAsyncConnection)
protected
fLastOperation: TAsyncConnectionSec;
fOwner: TAsyncConnections;
fRemoteIP: RawUtf8;
fRemoteConnID: THttpServerConnectionID;
// called after TAsyncConnections.LastOperationIdleSeconds of no activity
// - Sender.Write() could be used to send e.g. a hearbeat frame
// - should finish quickly and be non-blocking
// - returns true to log notified events, false if nothing happened
function OnLastOperationIdle(nowsec: TAsyncConnectionSec): boolean; virtual;
public
/// initialize this instance
constructor Create(aOwner: TAsyncConnections;
const aRemoteIP: RawUtf8); reintroduce; virtual;
/// read-only access to the associated connections list
property Owner: TAsyncConnections
read fOwner;
published
/// the associated remote IP4/IP6, as text
property RemoteIP: RawUtf8
read fRemoteIP;
end;
PAsyncConnection = ^TAsyncConnection;
/// meta-class of one TAsyncConnections connection
TAsyncConnectionClass = class of TAsyncConnection;
/// used to store a dynamic array of TAsyncConnection
TAsyncConnectionDynArray = array of TAsyncConnection;
/// handle multiple non-blocking connections using TAsyncConnection instances
TAsyncConnectionsSockets = class(TPollAsyncSockets)
protected
fOwner: TAsyncConnections;
function GetTotal: integer;
{$ifdef HASINLINE} inline; {$endif}
procedure RegisterConnection(connection: TPollAsyncConnection); override;
// just log the error, and close connection if acoOnErrorContinue is not set
function OnError(connection: TPollAsyncConnection;
events: TPollSocketEvents): boolean; override;
// log the closing
procedure OnClosed(connection: TPollAsyncConnection); override;
public
/// add some data to the asynchronous output buffer of a given connection
// - this overriden method will also log the write operation if needed
// - can be executed from an TAsyncConnection.OnRead method
function Write(connection: TPollAsyncConnection;
data: pointer; datalen: integer; timeout: integer = 5000): boolean; override;
published
/// how many connections have been handled by the poll, from the beginning
property Total: integer
read GetTotal;
end;
/// define what TAsyncConnectionsThread.Execute should actually do
TAsyncConnectionsThreadProcess = (
atpReadSingle,
atpReadPoll,
atpReadPending);
TAsyncConnectionsThreadProcesses = set of TAsyncConnectionsThreadProcess;
/// used to implement a thread poll to process TAsyncConnection instances
TAsyncConnectionsThread = class(TSynThread)
protected
fOwner: TAsyncConnections;
fProcess: TAsyncConnectionsThreadProcess;
fWaitForReadPending: boolean;
fExecuteState: THttpServerExecuteState;
fIndex: integer;
fEvent: TEvent;
fName: RawUtf8;
procedure Execute; override;
public
/// initialize the thread
constructor Create(aOwner: TAsyncConnections;
aProcess: TAsyncConnectionsThreadProcess; aIndex: integer); reintroduce;
/// finalize the thread resources
destructor Destroy; override;
published
/// which kind of ProcessRead or ProcessWrite this thread is doing
property Process: TAsyncConnectionsThreadProcess
read fProcess;
/// when used as a thread pool, the number of this thread
property Index: integer
read fIndex;
/// the low-level thread name
property Name: RawUtf8
read fName;
end;
PAsyncConnectionsThread = ^TAsyncConnectionsThread;
/// low-level options for TAsyncConnections processing
// - TAsyncConnectionsSockets.OnError will shutdown the connection on any error,
// unless acoOnErrorContinue is defined
// - acoNoLogRead and acoNoLogWrite could reduce the log verbosity
// - acoVerboseLog will log transmitted frames content, for debugging purposes
// - acoWritePollOnly will be translated into paoWritePollOnly on server
// - acoDebugReadWriteLog would make low-level send/receive logging
// - acoNoConnectionTrack would force to by-pass the internal Connections list
// if it is not needed - not used by now
// - acoEnableTls flag for TLS support, via Windows SChannel or OpenSSL 1.1/3.x
TAsyncConnectionsOptions = set of (
acoOnErrorContinue,
acoNoLogRead,
acoNoLogWrite,
acoVerboseLog,
acoWritePollOnly,
acoDebugReadWriteLog,
acoNoConnectionTrack,
acoEnableTls);
/// implements an abstract thread-pooled high-performance TCP clients or server
// - internal TAsyncConnectionsSockets will handle high-performance process
// of a high number of long-living simultaneous connections
// - will use a TAsyncConnection inherited class to maintain connection state
// - don't use this abstract class but either TAsyncServer or TAsyncClient
// - under Linux/POSIX, check your "ulimit -H -n" value: one socket consumes
// two file descriptors: you may better add the following line to your
// /etc/limits.conf or /etc/security/limits.conf system file:
// $ * hard nofile 65535
TAsyncConnections = class(TNotifiedThread)
protected
fConnectionClass: TAsyncConnectionClass;
fConnection: TAsyncConnectionDynArray; // sorted by TAsyncConnection.Handle
fClients: TAsyncConnectionsSockets;
fThreads: array of TAsyncConnectionsThread;
fThreadReadPoll: TAsyncConnectionsThread;
fConnectionCount: integer; // only subscribed - not just after accept()
fConnectionHigh: integer;
fThreadPoolCount: integer;
fLastConnectionFind: integer;
fLastHandle: integer;
fLog: TSynLogClass;
fConnectionLock: TRWLock; // would block only on connection add/remove
fOptions: TAsyncConnectionsOptions;
fClientsEpoll: boolean; // = PollSocketClass.FollowEpoll
fLastOperationSec: TAsyncConnectionSec;
fLastOperationMS: cardinal; // stored by AddGC in connection.LastOperation
fLastOperationReleaseMemorySeconds: cardinal;
fLastOperationIdleSeconds: cardinal;
fKeepConnectionInstanceMS: cardinal;
fThreadClients: record // used by TAsyncClient
Count, Timeout: integer;
Address, Port: RawUtf8;
end;
fThreadPollingWakeupSafe: TLightLock;
fThreadPollingWakeupIndex: integer;
fGCCount: integer;
fGCSafe: TLightLock;
fGC: array of TAsyncConnection;
function AllThreadsStarted: boolean; virtual;
procedure AddGC(aConnection: TPollAsyncConnection);
procedure DoGC;
function ConnectionCreate(aSocket: TNetSocket; const aRemoteIp: RawUtf8;
out aConnection: TAsyncConnection): boolean; virtual;
function ConnectionNew(aSocket: TNetSocket; aConnection: TAsyncConnection;
aAddAndSubscribe: boolean = true): boolean; virtual;
function ConnectionDelete(
aConnection: TPollAsyncConnection): boolean; overload; virtual;
function LockedConnectionDelete(
aConnection: TAsyncConnection; aIndex: integer): boolean;
procedure ConnectionAdd(conn: TAsyncConnection);
function ThreadPollingWakeup(Events: PtrInt): PtrInt;
procedure DoLog(Level: TSynLogInfo; const TextFmt: RawUtf8;
const TextArgs: array of const; Instance: TObject);
procedure ProcessIdleTix(Sender: TObject; NowTix: Int64); virtual;
function ProcessClientStart(Sender: TPollAsyncConnection): boolean;
procedure IdleEverySecond; virtual;
public
/// initialize the multiple connections
// - don't use this constructor but inherited client/server classes
constructor Create(const OnStart, OnStop: TOnNotifyThread;
aConnectionClass: TAsyncConnectionClass; const ProcessName: RawUtf8;
aLog: TSynLogClass; aOptions: TAsyncConnectionsOptions;
aThreadPoolCount: integer); reintroduce; virtual;
/// shut down the instance, releasing all associated threads and sockets
procedure Shutdown;
/// shut down and finalize the instance, calling Shutdown
destructor Destroy; override;
/// high-level access to a connection instance, from its handle
// - use efficient O(log(n)) binary search
// - could be executed e.g. from a TAsyncConnection.OnRead method
// - raise an exception if acoNoConnectionTrack option was defined
// - returns nil if the handle was not found
// - returns the maching instance, and caller should release the lock as:
// ! try ... finally UnLock(aLock); end;
function ConnectionFindAndLock(aHandle: TPollAsyncConnectionHandle;
aLock: TRWLockContext; aIndex: PInteger = nil): TAsyncConnection;
/// high-level access to a connection instance, from its handle
// - use efficient O(log(n)) binary search
// - this method won't keep the main Lock, but this class will ensure that
// the returned pointer will last for at least 100ms until Free is called
function ConnectionFind(aHandle: TPollAsyncConnectionHandle): TAsyncConnection;
/// low-level access to a connection instance, from its handle
// - use efficient O(log(n)) binary search, since handles are increasing
// - caller should have called Lock before this method is done
function LockedConnectionSearch(aHandle: TPollAsyncConnectionHandle): TAsyncConnection;
/// just a wrapper around fConnectionLock.Lock
// - raise an exception if acoNoConnectionTrack option was defined
procedure Lock(aLock: TRWLockContext);
/// just a wrapper around fConnectionLock.UnLock
// - raise an exception if acoNoConnectionTrack option was defined
// - to be called e.g. after a successfull ConnectionFindAndLock(aLock)
procedure Unlock(aLock: TRWLockContext);
/// remove an handle from the internal list, and close its connection
// - raise an exception if acoNoConnectionTrack option was defined
// - could be executed e.g. from a TAsyncConnection.OnRead method
function ConnectionRemove(aHandle: TPollAsyncConnectionHandle): boolean;
/// call ConnectionRemove unless acoNoConnectionTrack is set
procedure EndConnection(connection: TAsyncConnection);
/// add some data to the asynchronous output buffer of a given connection
// - could be executed e.g. from a TAsyncConnection.OnRead method
function Write(connection: TAsyncConnection; data: pointer; datalen: integer;
timeout: integer = 5000): boolean;
/// add some data to the asynchronous output buffer of a given connection
// - could be executed e.g. from a TAsyncConnection.OnRead method
function WriteString(connection: TAsyncConnection; const data: RawByteString;
timeout: integer = 5000): boolean;
/// low-level method to connect a client to this server
// - is called e.g. from fThreadClients
function ThreadClientsConnect: TAsyncConnection;
/// log some binary data with proper escape
// - can be executed from an TAsyncConnection.OnRead method to track content:
// $ if acoVerboseLog in Sender.Options then Sender.LogVerbose(self,...);
procedure LogVerbose(connection: TPollAsyncConnection; const ident: RawUtf8;
const identargs: array of const; frame: pointer; framelen: integer); overload;
/// log some binary data with proper escape
// - can be executed from an TAsyncConnection.OnRead method to track content:
// $ if acoVerboseLog in Sender.Options then Sender.LogVerbose(...);
procedure LogVerbose(connection: TPollAsyncConnection; const ident: RawUtf8;
const identargs: array of const; const frame: TRawByteStringBuffer); overload;
/// the current monotonic time elapsed, evaluated in seconds
// - IdleEverySecond will set GetTickCount64 div 1000
property LastOperationSec: TAsyncConnectionSec
read fLastOperationSec;
/// allow idle connection to release its internal Connection.rd/wr buffers
// - default is 60 seconds, which is pretty conservative
// - could be tuned in case of high numbers of concurrent connections and
// constrained memory, e.g. with a lower value like 2 seconds
property LastOperationReleaseMemorySeconds: cardinal
read fLastOperationReleaseMemorySeconds write fLastOperationReleaseMemorySeconds;
/// will execute TAsyncConnection.OnLastOperationIdle after an idle period
// - could be used to send heartbeats after read/write inactivity
// - equals 0 (i.e. disabled) by default
property LastOperationIdleSeconds: cardinal
read fLastOperationIdleSeconds write fLastOperationIdleSeconds;
/// how many milliseconds a TAsyncConnection instance is kept alive after closing
// - default is 100 ms before the internal GC call Free on this instance
property KeepConnectionInstanceMS: cardinal
read fKeepConnectionInstanceMS write fKeepConnectionInstanceMS;
/// allow to customize low-level options for processing
property Options: TAsyncConnectionsOptions
read fOptions write fOptions;
/// access to the associated log class
property Log: TSynLogClass
read fLog;
/// low-level unsafe direct access to the connection instances
// - ensure this property is used in a thread-safe manner, i.e. calling
// ConnectionFindAndLock() high-level function, ot via manual
// ! Lock; try ... finally UnLock; end;
property Connection: TAsyncConnectionDynArray
read fConnection;
published
/// how many read threads there are in this thread pool
property ThreadPoolCount: integer
read fThreadPoolCount;
/// current HTTP/1.1 / WebSockets connections count
// - this is the number of long-living connections - may not appear just
// after accept, so never for a HTTP/1.0 short-living request
property ConnectionCount: integer
read fConnectionCount;
/// maximum number of concurrent long-living connections
property ConnectionHigh: integer
read fConnectionHigh;
/// access to the TCP client sockets poll
// - TAsyncConnection.OnRead should rather use Write() and LogVerbose()
// methods of this TAsyncConnections class instead of using Clients
property Clients: TAsyncConnectionsSockets
read fClients;
end;
/// implements a thread-pooled high-performance TCP server
// - will use a TAsyncConnection inherited class to maintain connection
// state for server process
TAsyncServer = class(TAsyncConnections)
protected
fServer: TCrtSocket; // for proper complex binding
fMaxPending: integer;
fMaxConnections: integer;
fAccepted: Int64;
fExecuteState: THttpServerExecuteState;
fExecuteAcceptOnly: boolean; // writes in another thread (THttpAsyncServer)
fExecuteMessage: RawUtf8;
fSockPort: RawUtf8;
procedure OnFirstReadDoTls(Sender: TPollAsyncConnection);
procedure SetExecuteState(State: THttpServerExecuteState); virtual;
procedure Execute; override;
public
/// run the TCP server, listening on a supplied IP port
// - aThreadPoolCount = 1 is fine if the process is almost non-blocking,
// like our mormot.net.rtsphttp relay - but not e.g. for a REST/SOA server
// - with aThreadPoolCount > 1, a thread will do atpReadPoll, and all other
// threads will do atpReadPending for socket reading and processing the data
// - there will always be two other threads, one for Accept() and another
// for asynchronous data writing (i.e. sending to the socket)
// - warning: should call WaitStarted() to let Execute bind and run
// - for TLS support, set acoEnableTls, and once WaitStarted() returned,
// set Server.TLS.CertificateFile/PrivateKeyFile/PrivatePassword properties
// and call Server.DoTlsAfter(cstaBind)
constructor Create(const aPort: RawUtf8;
const OnStart, OnStop: TOnNotifyThread;
aConnectionClass: TAsyncConnectionClass; const ProcessName: RawUtf8;
aLog: TSynLogClass; aOptions: TAsyncConnectionsOptions;
aThreadPoolCount: integer); reintroduce; virtual;
/// to be called just after Create to wait for Execute to Bind
// - will raise an exception on timeout, or if the binding failed
// - needed only for raw protocol implementation: THttpServerGeneric will
// have its own WaitStarted method
procedure WaitStarted(seconds: integer);
/// prepare the server finalization
procedure Shutdown;
/// shut down the server, releasing all associated threads and sockets
destructor Destroy; override;
published
/// access to the TCP server socket
property Server: TCrtSocket
read fServer;
/// how many connections have been accepted since server startup
// - ConnectionCount is the number of long-living connections, this
// counter is the absolute number of successfull accept() calls,
// including short-living (e.g. HTTP/1.0) connections
property Accepted: Int64
read fAccepted;
/// above how many active connections accept() would reject
// - MaxPending applies to the actual thread-pool processing activity,
// whereas MaxConnections tracks the number of connections even in idle state
property MaxConnections: integer
read fMaxConnections write fMaxConnections;
/// above how many fClients.fRead.PendingCount accept() would reject
// - is mapped by the high-level THttpAsyncServer.HttpQueueLength property
// - default is 10000, but could be a lower value e.g. for a load-balancer
// - MaxConnections regulates the absolute number of (idle) connections,
// whereas this property tracks the actual REST/HTTP requests pending for
// the internal thread pool
property MaxPending: integer
read fMaxPending write fMaxPending;
end;
/// implements thread-pooled high-performance TCP multiple clients
// - e.g. to run some load stress tests with optimized resource use
// - will use a TAsyncConnection inherited class to maintain connection state
// of each connected client
TAsyncClient = class(TAsyncConnections)
protected
procedure Execute; override;
public
/// start the TCP client connections, connecting to the supplied IP server
constructor Create(const aServer, aPort: RawUtf8;
aClientsCount, aClientsTimeoutSecs: integer;
const OnStart, OnStop: TOnNotifyThread;
aConnectionClass: TAsyncConnectionClass; const ProcessName: RawUtf8;
aLog: TSynLogClass; aOptions: TAsyncConnectionsOptions;
aThreadPoolCount: integer = 1); reintroduce; virtual;
published
/// server IP address
property Server: RawUtf8
read fThreadClients.Address;
/// server IP port
property Port: RawUtf8
read fThreadClients.Port;
end;
const
/// the TAsyncConnectionsOptions for THttpAsyncServer running on production
// - with low verbosity of the logs - similar to a plain THttpServer
ASYNC_OPTION_PROD = [
acoNoLogRead,
acoNoLogWrite];
/// the TAsyncConnectionsOptions for debugging THttpAsyncServer
// - with high-level receive/send block information
ASYNC_OPTION_DEBUG = [
];
/// the TAsyncConnectionsOptions for fully detailed debug of THttpAsyncServer
// - with all possible - and very verbose - log information
// - could be used to track performance or heisenbug issues
ASYNC_OPTION_VERBOSE = [
acoVerboseLog,
acoDebugReadWriteLog];
{ ******************** THttpAsyncServer Event-Driven HTTP Server }
type
/// exception associated with Event-Driven HTTP Server process
EHttpAsyncConnections = class(EAsyncConnections);
THttpAsyncServer = class;
THttpAsyncConnections = class;
/// handle one HTTP client connection to our non-blocking THttpAsyncServer
THttpAsyncConnection = class(TAsyncConnection)
protected
fHttp: THttpRequestContext;
fServer: THttpAsyncServer;
fKeepAliveSec: TAsyncConnectionSec;
fHeadersSec: TAsyncConnectionSec;
fRespStatus: integer;
fConnectionOpaque: THttpServerConnectionOpaque;
procedure AfterCreate; override;
procedure BeforeDestroy; override;
procedure HttpInit;
// redirect to fHttp.ProcessRead()
function OnRead: TPollAsyncSocketOnReadWrite; override;
// redirect to fHttp.ProcessWrite()
function AfterWrite: TPollAsyncSocketOnReadWrite; override;
// quickly reject incorrect requests (payload/timeout/OnBeforeBody)
function DecodeHeaders: integer; virtual;
function DoHeaders: TPollAsyncSocketOnReadWrite;
function DoRequest: TPollAsyncSocketOnReadWrite;
end;
/// event-driven process of HTTP/WebSockets connections
THttpAsyncConnections = class(TAsyncServer)
protected
fAsyncServer: THttpAsyncServer;
procedure IdleEverySecond; override;
procedure SetExecuteState(State: THttpServerExecuteState); override;
procedure Execute; override;
end;
/// meta-class of THttpAsyncConnections type
THttpAsyncConnectionsClass = class of THttpAsyncConnections;
/// HTTP server using non-blocking sockets
THttpAsyncServer = class(THttpServerSocketGeneric)
protected
fAsync: THttpAsyncConnections;
fHeadersDefaultBufferSize: integer;
fConnectionClass: TAsyncConnectionClass;
fConnectionsClass: THttpAsyncConnectionsClass;
fInterning: PRawUtf8InterningSlot;
fInterningTix: cardinal;
function GetHttpQueueLength: cardinal; override;
procedure SetHttpQueueLength(aValue: cardinal); override;
function GetExecuteState: THttpServerExecuteState; override;
procedure IdleEverySecond; virtual;
// the main thread will Send output packets in the background
procedure Execute; override;
public
/// create an event-driven HTTP Server
constructor Create(const aPort: RawUtf8;
const OnStart, OnStop: TOnNotifyThread; const ProcessName: RawUtf8;
ServerThreadPoolCount: integer = 32; KeepAliveTimeOut: integer = 30000;
ProcessOptions: THttpServerOptions = []); override;
/// finalize the HTTP Server
destructor Destroy; override;
published
/// initial capacity of internal per-connection Headers buffer
// - 2 KB by default is within the mormot.core.fpcx64mm SMALL blocks limit
// so will use up to 3 locks before contention
property HeadersDefaultBufferSize: integer
read fHeadersDefaultBufferSize write fHeadersDefaultBufferSize;
/// direct access to the internal high-performance TCP server
// - you could set e.g. Async.MaxConnections
property Async: THttpAsyncConnections
read fAsync;
end;
implementation
{ ******************** Low-Level Non-blocking Connections }
function ToText(ev: TPollSocketEvent): PShortString;
begin
result := GetEnumName(TypeInfo(TPollSocketEvent), ord(ev));
end;
{ TPollAsyncConnection }
destructor TPollAsyncConnection.Destroy;
begin
// note: our light locks do not need any specific release
try
if not (fClosed in fFlags) then
try
OnClose;
except
end;
BeforeDestroy;
// finalize the instance
fHandle := 0; // to detect any dangling pointer
except
// ignore any exception at this stage
end;
inherited Destroy;
end;
procedure TPollAsyncConnection.AfterCreate;
begin
end;
procedure TPollAsyncConnection.BeforeDestroy;
begin
end;
function TPollAsyncConnection.AfterWrite: TPollAsyncSocketOnReadWrite;
begin
result := soContinue;
end;
function TPollAsyncConnection.IsDangling: boolean;
begin
result := (self = nil) or
(fHandle = 0);
end;
function TPollAsyncConnection.IsClosed: boolean;
begin
result := (self = nil) or
(fHandle = 0) or
(fSocket = nil) or
(fClosed in fFlags);
end;
function TPollAsyncConnection.TryLock(writer: boolean): boolean;
var
tid: TThreadID;
begin
result := false;
if (self = nil) or
(fSocket = nil) then
exit;
tid := GetCurrentThreadId;
with fRW[writer and fLockMax] do
if Lock <> 0 then
if ThreadID = tid then
begin
inc(RentrantCount);
result := true;
end
else
exit
else if LockedExc(Lock, 1, 0) then
begin
include(fFlags, fWasActive);
ThreadID := tid;
RentrantCount := 1;
result := true;
end;
end;
procedure TPollAsyncConnection.UnLock(writer: boolean);
begin
if self <> nil then
with fRW[writer and fLockMax] do
begin
dec(RentrantCount);
if RentrantCount <> 0 then
exit;
Lock := 0;
ThreadID := TThreadID(0);
end;
end;
procedure TPollAsyncConnection.UnLockFinal(writer: boolean);
begin
fRW[writer and fLockMax].Lock:= 0;
end;
procedure TPollAsyncConnection.OnClose;
begin
include(fFlags, fClosed);
end;
function TPollAsyncConnection.ReleaseMemoryOnIdle: PtrInt;
begin
// called now and then to reduce temp memory consumption on Idle connections
result := 0;
if (fRd.Buffer <> nil) and
TryLock({wr=}false) then
begin
inc(result, fRd.Capacity); // returns number of bytes released
fRd.Clear;
UnLock(false);
end;
if (fWr.Buffer <> nil) and
TryLock({fWr=}true) then
begin
inc(result, fWr.Capacity);
fWr.Clear;
UnLock(true);
end;
exclude(fFlags, fWasActive); // TryLock() was with no true activity here
end;
function TPollAsyncConnection.WaitLock(writer: boolean; timeoutMS: cardinal): boolean;
var
endtix: Int64;
ms: integer;
begin
result := (@self <> nil) and
(fSocket <> nil);
if not result then
exit; // socket closed
result := TryLock(writer);
if result or
(timeoutMS = 0) then
// we acquired the Connection for this direction, or we don't want to wait
exit;
// loop to wait for the lock release
LockedInc32(@fWaitCounter);
endtix := GetTickCount64 + timeoutMS; // never wait forever
ms := 0;
repeat
SleepHiRes(ms);
ms := ms xor 1; // 0,1,0,1,0,1...
if IsClosed then
break; // no socket to lock any more
result := TryLock(writer);
if result then
begin
// the lock has been acquired
result := not IsClosed; // check it again
if not result then
UnLock(writer);
break; // acquired or socket closed
end;
until GetTickCount64 >= endtix;
InterlockedDecrement(fWaitCounter);
end;
function TPollAsyncConnection.Send(buf: pointer; var len: integer): TNetResult;
begin
if fSecure <> nil then
result := fSecure.Send(buf, len)
else
result := fSocket.Send(buf, len);
end;
function TPollAsyncConnection.Recv(buf: pointer; var len: integer): TNetResult;
begin
if fSecure <> nil then
result := fSecure.Receive(buf, len)
else
result := fSocket.Recv(buf, len);
end;
{ TPollConnectionSockets }
function TPollConnectionSockets.IsValidPending(tag: TPollSocketTag): boolean;
begin
// same logic than TPollAsyncConnection IsDangling() + TryLock()
result := (tag <> 0) and
// avoid dangling pointer
(TPollAsyncConnection(tag).fHandle <> 0) and