Skip to content

Commit

Permalink
- use accessible CPUs (can be limited by taskset) instead of available
Browse files Browse the repository at this point in the history
- added named command line parameters for servers (-s), threads )-t) and CPU pinning (-p)
- added pinning servers to CPUs
- change rawupdates to use `update table set .. from values (), (), ... where id = id`
  • Loading branch information
pavel.mash committed Apr 11, 2023
1 parent 9cf89fc commit 8a02b93
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 78 deletions.
23 changes: 18 additions & 5 deletions ex/techempower-bench/README.md
Expand Up @@ -37,15 +37,28 @@ Those two `.sql` scripts are located in this folder for convenience.

## Command line parameters

Working threads (per server), used CPU cores, and servers count can be specified in command line as such:
Working threads (per server), servers count and CPU pinning can be specified in command line as such:
```
$ raw threads cores servers
$ raw [-s serversCount] [-t threadsPerServer] [-p]
```
Default values are computed at startup depending on the available CPU cores on the running system - trying to leverage the framework potential.
Default values are computed at startup depending on the accessible (can be limited using `taskset`) CPU cores
on the running system - trying to leverage the framework potential.

Note that currently, the `cores` number is ignored.
Example (consider total CPU count > 12):
```shell
# use all CPUs and default parameters
./raw

# limit to first 6 CPUs and use default parameters
taskset -c 0-5 ./raw

Depending on the hardware you have, you may consider using our [x86_64 Memory Manager](https://github.com/synopse/mORMot2/blob/master/src/core/mormot.core.fpcx64mm.pas) if your CPU has less than 8/16 cores, but would rather switch to [the libc Memory Manager](https://github.com/synopse/mORMot2/blob/master/src/core/mormot.core.fpclibcmm.pas) for high-end harware. On the TFB hardware, we enable the libc heap, which has lower performance with a few cores, but scales better when allocating small blocks with a high number of cores.
# limit to first 6 CPUs, launch 6 servers with 4 threads for each without pinning servers to CPUs
taskset -c 0-5 ./raw -s 6 -t 4
```
Depending on the hardware you have, you may consider using our [x86_64 Memory Manager](https://github.com/synopse/mORMot2/blob/master/src/core/mormot.core.fpcx64mm.pas)
if your CPU has less than 8/16 cores, but would rather switch to [the libc Memory Manager](https://github.com/synopse/mORMot2/blob/master/src/core/mormot.core.fpclibcmm.pas)
for high-end harware. On the TFB hardware, we enable the libc heap, which has lower performance with a few cores,
but scales better when allocating small blocks with a high number of cores.

## Some Numbers

Expand Down
184 changes: 117 additions & 67 deletions ex/techempower-bench/raw.pas
Expand Up @@ -21,6 +21,7 @@
{$I mormot.uses.inc} // include mormot.core.fpcx64mm or mormot.core.fpclibcmm
sysutils,
classes,
initc, // sched_getaffinity
mormot.core.base,
mormot.core.os,
mormot.core.rtti,
Expand Down Expand Up @@ -93,15 +94,15 @@ TRawAsyncServer = class(TSynPersistent)
fModel: TOrmModel;
fStore: TRestServerDB;
fTemplate: TSynMustache;
fRawCache: TOrmWorlds;
protected
{$ifdef USE_SQLITE3}
procedure GenerateDB;
{$endif USE_SQLITE3}
// as used by /rawqueries and /rawupdates
function GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): boolean;
public
constructor Create(threadCount: integer; flags: THttpServerOptions); reintroduce;
constructor Create(threadCount: integer; flags: THttpServerOptions;
pin2Core: integer = -1); reintroduce;
destructor Destroy; override;
published
// all service URI are implemented by these published methods using RTTI
Expand All @@ -114,7 +115,6 @@ TRawAsyncServer = class(TSynPersistent)
function updates(ctxt: THttpServerRequest): cardinal;
function rawdb(ctxt: THttpServerRequest): cardinal;
function rawqueries(ctxt: THttpServerRequest): cardinal;
function rawcached(ctxt: THttpServerRequest): cardinal;
function rawfortunes(ctxt: THttpServerRequest): cardinal;
function rawupdates(ctxt: THttpServerRequest): cardinal;
end;
Expand Down Expand Up @@ -164,7 +164,9 @@ function GetQueriesParamValue(ctxt: THttpServerRequest;
{ TRawAsyncServer }

constructor TRawAsyncServer.Create(
threadCount: integer; flags: THttpServerOptions);
threadCount: integer; flags: THttpServerOptions; pin2Core: integer);
var
i: cardinal;
begin
inherited Create;
// setup the DB connection
Expand Down Expand Up @@ -197,10 +199,9 @@ constructor TRawAsyncServer.Create(
{$else}
fStore.Server.CreateMissingTables; // create SQlite3 virtual tables
{$endif USE_SQLITE3}
// pre-fill the ORM and raw caches
// pre-fill the ORM
if fStore.Server.Cache.SetCache(TOrmCachedWorld) then
fStore.Server.Cache.FillFromQuery(TOrmCachedWorld, '', []);
fStore.RetrieveListObjArray(fRawCache, TOrmCachedWorld, 'order by id', []);
// initialize the mustache template for /fortunes
fTemplate := TSynMustache.Parse(FORTUNES_TPL);
// setup the HTTP server
Expand All @@ -217,7 +218,15 @@ constructor TRawAsyncServer.Create(
{$endif WITH_LOGS}
hsoIncludeDateHeader // required by TPW General Test Requirements #5
] + flags);

if pin2Core <> -1 then
begin
SetThreadCpuAffinity(fHttpServer.Async, pin2Core);
for i := 0 to length(fHttpServer.Async.threads) - 1 do
SetThreadCpuAffinity(fHttpServer.Async.threads[i], pin2Core);
end;
fHttpServer.HttpQueueLength := 10000; // needed e.g. from wrk/ab benchmarks
fHttpServer.ServerName := 'M';
// use default routing using RTTI on the TRawAsyncServer published methods
fHttpServer.Route.RunMethods([urmGet], self);
// wait for the server to be ready and raise exception e.g. on binding issue
Expand All @@ -230,7 +239,6 @@ destructor TRawAsyncServer.Destroy;
fStore.Free;
fModel.Free;
fDBPool.free;
ObjArrayClear(fRawCache);
inherited Destroy;
end;

Expand Down Expand Up @@ -325,6 +333,7 @@ function TRawAsyncServer.GetRawRandomWorlds(cnt: PtrInt; out res: TWorlds): bool
pStmt.SendPipelinePrepared;
pConn.PipelineSync;
end;
pConn.Flush; // in case we use modified libpq what not flush inside PQPipelineSync - flush manually
for i := 0 to cnt - 1 do
begin
pStmt.GetPipelineResult;
Expand Down Expand Up @@ -485,18 +494,6 @@ function TRawAsyncServer.rawqueries(ctxt: THttpServerRequest): cardinal;
result := HTTP_SUCCESS;
end;

function TRawAsyncServer.rawcached(ctxt: THttpServerRequest): cardinal;
var
i: PtrInt;
res: TOrmWorlds;
begin
SetLength(res, GetQueriesParamValue(ctxt, 'COUNT='));
for i := 0 to length(res) - 1 do
res[i] := fRawCache[ComputeRandomWorld - 1];
ctxt.SetOutJson(@res, TypeInfo(TOrmWorlds));
result := HTTP_SUCCESS;
end;

function FortuneCompareByMessage(const A, B): integer;
begin
result := StrComp(pointer(TFortune(A).message), pointer(TFortune(B).message));
Expand Down Expand Up @@ -544,24 +541,18 @@ function ComputeUpdateSql(cnt: integer): RawUtf8;
LastComputeUpdateSqlSafe.Lock;
if cnt <> LastComputeUpdateSqlCnt then
begin
// update table set randomNumber = CASE id when ? then ? when ? then ? ...
// when ? then ? else randomNumber end where id in (?,?,?,?,?)
// - this weird syntax gives best number for TFB /rawupdates?queries=20 but
// seems not good for smaller or higher count - we won't include it in the
// ORM but only for our RAW results - as other frameworks (e.g. ntex) do
// update table set .. from values (), (), ... where id = id
// we won't include it in the ORM but only for our RAW results
LastComputeUpdateSqlCnt := cnt;
W := TTextWriter.CreateOwnedStream(tmp);
W := TTextWriter.CreateOwnedStream(tmp{%H-});
try
W.AddShort('update world set randomnumber = case id');
for i := 1 to cnt do
W.AddShort(' when ? then ?');
W.AddShort(' else randomNumber end where id in (');
repeat
W.Add('?', ',');
dec(cnt);
until cnt = 0;
W.AddShort('UPDATE world SET randomNumber = v.randomNumber FROM (VALUES');
for i := 1 to cnt do begin
W.AddShort('(?::integer, ?::integer)');
W.Add(',');
end;
W.CancelLastComma;
W.Add(')');
W.AddShort(' order by 1) AS v (id, randomNumber) WHERE world.id = v.id');
W.SetText(LastComputeUpdateSql);
finally
W.Free;
Expand Down Expand Up @@ -603,13 +594,12 @@ function TRawAsyncServer.rawupdates(ctxt: THttpServerRequest): cardinal;
end
else
begin
// fill parameters for update up to 20 items as CASE .. WHEN .. THEN ..
// fill parameters for update up to 20 items as VALUES(?,?),(?,?),...
stmt := conn.NewStatementPrepared(ComputeUpdateSql(cnt), false, true);
for i := 0 to cnt - 1 do
begin
stmt.Bind(i * 2 + 1, res[i].id);
stmt.Bind(i * 2 + 2, res[i].randomNumber);
stmt.Bind(cnt * 2 + i + 1, res[i].id);
end;
end;
stmt.ExecutePrepared;
Expand All @@ -621,55 +611,93 @@ function TRawAsyncServer.rawupdates(ctxt: THttpServerRequest): cardinal;

var
rawServers: array of TRawAsyncServer;
threads, servers, i: integer;
threads, servers, i, k, cpuIdx: integer;
pinServers2Cores: boolean;
cpuMask: TCpuSet;
accessibleCPUCount: PtrInt;
flags: THttpServerOptions;

procedure ComputeExecutionContextFromParams;
function FindCmdLineSwitchVal(const Switch: string; out Value: string): Boolean;
var
cores: integer;
I, L: integer;
S, T: string;
begin
// user specified some values at command line: raw [threads] [cores] [servers]
// in practice, [cores] is just ignored
if not TryStrToInt(ParamStr(1), threads) then
threads := SystemInfo.dwNumberOfProcessors * 4;
if not TryStrToInt(ParamStr(2), cores) then
cores := 16;
if not TryStrToInt(ParamStr(3), servers) then
servers := 1;
if threads < 2 then
threads := 2
Result := False;
S := Switch;
Value := '';
S := UpperCase(S);
I := ParamCount;
while (Not Result) and (I>0) do
begin
L := Length(Paramstr(I));
if (L>0) and (ParamStr(I)[1] in SwitchChars) then
begin
T := Copy(ParamStr(I),2,L-1);
T := UpperCase(T);
Result := S=T;
if Result and (I <> ParamCount) then
Value := ParamStr(I+1)
end;
Dec(i);
end;
end;

procedure ComputeExecutionContextFromParams(cpusAccesible: PtrInt);
var
sw: string;
begin
// user specified some values at command line: raw [-s serversCount] [-t threadsPerServer] [-p]
if not FindCmdLineSwitchVal('t', sw) or not TryStrToInt(sw, threads) then
threads := cpusAccesible * 4;
if not FindCmdLineSwitchVal('s', sw) or not TryStrToInt(sw, servers) then
servers := 1;
pinServers2Cores := FindCmdLineSwitch('p', true) or FindCmdLineSwitch('-pin', true);
if threads < 1 then
threads := 1
else if threads > 256 then
threads := 256; // max. threads for THttpAsyncServer
{if SystemInfo.dwNumberOfProcessors > cores then
SystemInfo.dwNumberOfProcessors := cores; // for hsoThreadCpuAffinity}

if servers < 1 then
servers := 1
else if servers > 256 then
servers := 256;
end;

procedure ComputeExecutionContextFromNumberOfProcessors;
var
logicalcores: integer;
procedure ComputeExecutionContextFromNumberOfProcessors(cpusAccessible: PtrInt);
begin
// automatically guess best parameters depending on available CPU cores
logicalcores := SystemInfo.dwNumberOfProcessors;
if logicalcores >= 12 then
if cpusAccessible >= 6 then
begin
// high-end CPU - scale using several listeners (one per core)
// scale using several listeners (one per core)
// see https://synopse.info/forum/viewtopic.php?pid=39263#p39263
servers := logicalcores;
servers := cpusAccessible;
threads := 8;
pinServers2Cores := true;
end
else
begin
// regular CPU - a single instance and a few threads per core
// low-level CPU - a single instance and a few threads per core
servers := 1;
threads := logicalcores * 4;
threads := cpusAccessible * 4;
pinServers2Cores := false;
end;
end;

function sched_getaffinity(pid: integer; cpusetsize: SizeUInt;
cpuset: pointer): integer; cdecl external clib name 'sched_getaffinity';

begin
if FindCmdLineSwitch('?') or FindCmdLineSwitch('h') or FindCmdLineSwitch('-help', ['-'], false) then
begin
writeln('Usage: ' + UTF8ToString(ExeVersion.ProgramName) + ' [-s serversCount] [-t threadsPerServer] [-p]');
writeln('Options:');
writeln(' -?, --help displays this message');
writeln(' -s serversCount count of servers (listener sockets)');
writeln(' -t, threadsPerServer per-server thread poll size');
writeln(' -p, --pin pin each server to CPU starting from 0');
exit;
end;

// setup logs
{$ifdef WITH_LOGS}
TSynLog.Family.Level := LOG_VERBOSE; // disable logs for benchmarking
Expand All @@ -690,26 +718,48 @@ procedure ComputeExecutionContextFromNumberOfProcessors;
TypeInfo(TFortune), 'id:integer message:RawUtf8']);

// setup execution context
if ParamCount > 1 then
ComputeExecutionContextFromParams
ResetCpuSet(cpuMask);
sched_getaffinity(0, SizeOf(cpuMask), @cpuMask);
accessibleCPUCount := GetBitsCount(cpuMask, SizeOf(cpuMask) * sizeof(byte));

if ParamCount > 0 then
ComputeExecutionContextFromParams(accessibleCPUCount)
else
ComputeExecutionContextFromNumberOfProcessors;
ComputeExecutionContextFromNumberOfProcessors(accessibleCPUCount);
flags := [];
if servers > 1 then
include(flags, hsoReusePort); // allow several bindings on the same port

// start the server instance(s), in hsoReusePort mode if needed
SetLength(rawServers, servers);
for i := 0 to servers - 1 do
rawServers[i] := TRawAsyncServer.Create(threads, flags);
SetLength(rawServers{%H-}, servers);
cpuIdx := -1; // do not pin to CPU by default
for i := 0 to servers - 1 do begin
if pinServers2Cores then
begin
k := i mod accessibleCPUCount;
cpuIdx := -1;
// find real CPU index according to the cpuMask
repeat
inc(cpuIdx);
if GetBit(cpuMask, cpuIdx) then
dec(k);
until k = -1;
writeln('Pin ', i, '''s server to ', cpuIdx, ' CPU');
end;
rawServers[i] := TRawAsyncServer.Create(threads, flags, cpuIdx)
end;

try
// display some information and wait for SIGTERM
{$I-}
writeln;
writeln(rawServers[0].fHttpServer.ClassName,
' running on localhost:', rawServers[0].fHttpServer.SockPort);
writeln(' num thread=', threads,
', num CPU=', SystemInfo.dwNumberOfProcessors,
', total CPU=', SystemInfo.dwNumberOfProcessors,
', accessible CPU=', accessibleCPUCount,
', num servers=', servers,
', pinned=', pinServers2Cores,
', total workers=', threads * servers,
', db=', rawServers[0].fDbPool.DbmsEngineName);
writeln(' options=', GetSetName(TypeInfo(THttpServerOptions), flags));
Expand Down
12 changes: 6 additions & 6 deletions packages/lazarus/mormot2.pas
Expand Up @@ -8,7 +8,7 @@
interface

uses
mormot.app.console, mormot.app.daemon, mormot.app.agl, mormot.core.base,
mormot.app.console, mormot.app.daemon, mormot.core.base,
mormot.core.buffers, mormot.core.collections, mormot.core.data,
mormot.core.datetime, mormot.core.fpcx64mm, mormot.core.interfaces,
mormot.core.json, mormot.core.log, mormot.core.mustache, mormot.core.os,
Expand All @@ -28,18 +28,18 @@ interface
mormot.lib.static, mormot.lib.winhttp, mormot.lib.z, mormot.net.async,
mormot.net.client, mormot.net.http, mormot.net.relay, mormot.net.rtsphttp,
mormot.net.server, mormot.net.sock, mormot.net.tunnel, mormot.net.ws.client,
mormot.net.ws.core, mormot.net.ws.server, mormot.net.acme, mormot.orm.base,
mormot.net.ws.core, mormot.net.ws.server, mormot.orm.base,
mormot.orm.client, mormot.orm.core, mormot.orm.mongodb, mormot.orm.rest,
mormot.orm.server, mormot.orm.sql, mormot.orm.sqlite3, mormot.orm.storage,
mormot.rest.client, mormot.rest.core, mormot.rest.http.client,
mormot.rest.http.server, mormot.rest.memserver, mormot.rest.mvc,
mormot.rest.server, mormot.rest.sqlite3, mormot.script.core,
mormot.script.quickjs, mormot.soa.client, mormot.soa.codegen,
mormot.soa.core, mormot.soa.server, mormot.db.rad.ui, mormot.db.rad.ui.orm,
mormot.db.rad.ui.sql, mormot.net.tftp.client, mormot.net.tftp.server,
mormot.lib.gdiplus, mormot.db.rad.ui.cds, mormot.core.os.mac,
mormot.misc.pecoff, mormot.lib.pkcs11, mormot.net.ldap, mormot.core.fpclibcmm,
mormot.net.dns, mormot.lib.win7zip;
mormot.db.rad.ui.sql, mormot.net.tftp.client, mormot.net.tftp.server,
mormot.lib.gdiplus, mormot.net.acme, mormot.db.rad.ui.cds,
mormot.core.os.mac, mormot.app.agl, mormot.misc.pecoff, mormot.lib.pkcs11,
mormot.net.ldap, mormot.core.fpclibcmm, mormot.net.dns, mormot.lib.win7zip;

implementation

Expand Down

0 comments on commit 8a02b93

Please sign in to comment.