Skip to content

Commit

Permalink
fix: misc. issues in RpcBenchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Dec 7, 2023
1 parent e2d7962 commit e32ce38
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 30 deletions.
19 changes: 11 additions & 8 deletions src/RpcBenchmark/Client/BenchmarkRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ public BenchmarkRunner(ClientCommand command, Func<ITestService> clientFactory,
}
}

public async Task RunAll(string title)
public async Task RunAll(string title, CancellationToken cancellationToken)
{
WriteLine($"{title}:");
if (Command.Benchmark == BenchmarkKind.Calls) {
await RunOne("Sum", w => w.Sum);
await RunOne("GetUser", w => w.GetUser);
await RunOne("SayHello", w => w.SayHello);
await RunOne("Sum", w => w.Sum, cancellationToken);
await RunOne("GetUser", w => w.GetUser, cancellationToken);
await RunOne("SayHello", w => w.SayHello, cancellationToken);
}
else {
await RunOne("StreamS", w => w.StreamS);
await RunOne("StreamL", w => w.StreamL);
await RunOne("StreamS", w => w.StreamS, cancellationToken);
await RunOne("StreamL", w => w.StreamL, cancellationToken);
}

// Dispose clients
Expand Down Expand Up @@ -77,10 +77,13 @@ protected override async Task<double> Benchmark(CancellationToken cancellationTo
private Task<double> GetCallFrequency(double duration, CancellationToken cancellationToken)
=> Benchmarks.CallFrequency(Workers, duration, cancellationToken, _currentOperationFactory, w => w.WhenReady());

private Task RunOne(string title, Func<BenchmarkWorker, Func<CancellationToken, Task>> operationFactory)
private Task RunOne(
string title,
Func<BenchmarkWorker, Func<CancellationToken, Task>> operationFactory,
CancellationToken cancellationToken)
{
Title = title;
Interlocked.Exchange(ref _currentOperationFactory, operationFactory);
return Run();
return Run(cancellationToken);
}
}
69 changes: 50 additions & 19 deletions src/RpcBenchmark/Client/Clients/ClientFactories.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Net.Security;
using System.Net.WebSockets;
using Grpc.Net.Client;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;
using Stl.RestEase;
using Stl.Rpc;
Expand Down Expand Up @@ -50,6 +51,14 @@ private Func<ITestService> CreateClientFactory<TClient>()
where TClient : class, ITestService
{
var services = CreateBaseServiceCollection();
#if false
services.AddLogging(logging => logging
.AddDebug()
.SetMinimumLevel(LogLevel.Debug)
.AddFilter("Microsoft", LogLevel.Debug)
);
#endif

if (typeof(TClient) == typeof(ITestService))
services.AddRpc().AddClient<ITestService>();
else
Expand All @@ -67,28 +76,35 @@ private IServiceCollection CreateBaseServiceCollection()

// Rpc
services.AddRpc().AddWebSocketClient(c => RpcWebSocketClient.Options.Default with {
HostUrlResolver = (_, _) => BaseUrl,
WebSocketChannelOptions = WebSocketChannel<RpcMessage>.Options.Default with {
WriteFrameSize = 4350,
},
WebSocketOwnerFactory = (_, peer) => {
var ws = new ClientWebSocket();
ws.Options.RemoteCertificateValidationCallback = (_, _, _, _) => true;
return new WebSocketOwner(peer.Ref.Key, ws, c);
},
});
HostUrlResolver = (_, _) => BaseUrl,
WebSocketChannelOptions = WebSocketChannel<RpcMessage>.Options.Default with {
WriteFrameSize = 4350,
},
WebSocketOwnerFactory = (_, peer) => {
var ws = new ClientWebSocket();
ws.Options.HttpVersion = HttpVersion.Version11;
ws.Options.RemoteCertificateValidationCallback = (_, _, _, _) => true;
return new WebSocketOwner(peer.Ref.Key, ws, c);
},
});

// SignalR
services.AddSingleton(c => {
var connection = new HubConnectionBuilder()
.WithUrl($"{BaseUrl}hubs/testService", options => {
options.HttpMessageHandlerFactory = message => {
if (message is HttpClientHandler httpClientHandler)
httpClientHandler.ServerCertificateCustomValidationCallback += (_, _, _, _) => true;
return message;
};
options.WebSocketConfiguration = wso => {
wso.RemoteCertificateValidationCallback = (_, _, _, _) => true;
.WithUrl($"{BaseUrl}hubs/testService", options => {
options.Transports = HttpTransportType.WebSockets;
options.WebSocketFactory = async (context, ct) => {
var ws = new ClientWebSocket();
ws.Options.HttpVersion = HttpVersion.Version11;
ws.Options.RemoteCertificateValidationCallback = (_, _, _, _) => true;
try {
await ws.ConnectAsync(context.Uri, ct).ConfigureAwait(false);
return ws;
}
catch {
ws.Dispose();
throw;
}
};
})
.Build();
Expand All @@ -111,10 +127,25 @@ private IServiceCollection CreateBaseServiceCollection()

// gRPC
services.AddSingleton(c => {
/*
var messageHandler = new SocketsHttpHandler() {
PooledConnectionLifetime = TimeSpan.FromDays(1),
EnableMultipleHttp2Connections = true,
MaxConnectionsPerServer = 20_000,
SslOptions = new SslClientAuthenticationOptions() {
RemoteCertificateValidationCallback = (_, _, _, _) => true,
}
};
var httpClient = new HttpClient(messageHandler) {
DefaultRequestVersion = HttpVersion.Version20,
};
*/
var channelOptions = new GrpcChannelOptions() {
// HttpClient = httpClient,
HttpHandler = new SocketsHttpHandler {
PooledConnectionLifetime = TimeSpan.FromDays(1),
EnableMultipleHttp2Connections = true,
MaxConnectionsPerServer = 50_000,
MaxConnectionsPerServer = 20_000,
SslOptions = new SslClientAuthenticationOptions() {
RemoteCertificateValidationCallback = (_, _, _, _) => true,
},
Expand Down
6 changes: 4 additions & 2 deletions src/RpcBenchmark/CommandLine/ClientCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

namespace Samples.RpcBenchmark;

#pragma warning disable VSTHRD103

[GeneratedParser]
[Command]
[Description("Starts the client part of this benchmark.")]
Expand Down Expand Up @@ -81,19 +83,19 @@ public override async Task<int> RunAsync()
WriteLine($" Client count: {(WorkersValue + ClientConcurrencyValue - 1) / ClientConcurrencyValue}");
WriteLine($" Client concurrency: {ClientConcurrencyValue}");
WriteLine($" Total worker count: {WorkersValue}");
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);

// Run
WriteLine();
var clientFactories = new ClientFactories(Url);
var benchmarkKinds = Libraries.Split(",").Select(LibraryKindExt.Parse).ToArray();
foreach (var benchmarkKind in benchmarkKinds) {
var (name, factory) = clientFactories[benchmarkKind];
await new BenchmarkRunner(this, factory, name.Contains("Stream")).RunAll(name);
await new BenchmarkRunner(this, factory, name.Contains("Stream")).RunAll(name, cancellationToken);
}

if (Wait)
ReadKey();
// ReSharper disable once MethodHasAsyncOverload
StopTokenSource.Cancel(); // Stops the server if it's running
return 0;
}
Expand Down
7 changes: 6 additions & 1 deletion src/RpcBenchmark/CommandLine/ServerCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Ookii.CommandLine;
using Ookii.CommandLine.Commands;
using Samples.RpcBenchmark.Server;
Expand Down Expand Up @@ -39,7 +40,7 @@ public override async Task<int> RunAsync()
// Core services
var services = builder.Services;
services.AddSignalR(hub => {
hub.MaximumParallelInvocationsPerClient = int.MaxValue;
hub.MaximumParallelInvocationsPerClient = 100_000_000; // Can't be int.MaxValue!
});
var rpc = services.AddRpc();
rpc.AddWebSocketServer();
Expand All @@ -54,6 +55,10 @@ public override async Task<int> RunAsync()

// Kestrel
builder.WebHost.ConfigureKestrel(kestrel => {
kestrel.AddServerHeader = false;
kestrel.ConfigureEndpointDefaults(listen => {
listen.Protocols = HttpProtocols.Http1 | HttpProtocols.Http2;
});
var limits = kestrel.Limits;
limits.MaxConcurrentConnections = 20_000;
limits.MaxConcurrentUpgradedConnections = 20_000;
Expand Down

0 comments on commit e32ce38

Please sign in to comment.