Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Picker poisoning when try to call Listener() simultaneously with PickAsync() #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 106 additions & 0 deletions test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -551,5 +551,111 @@ public async Task ResolveServiceConfig_ErrorOnSecondResolve_PickSuccess()
var balancer = (ChildHandlerLoadBalancer)channel.ConnectionManager._balancer!;
return (T?)balancer._current?.LoadBalancer;
}

[Test]
public async Task PickAsync_ErrorWhenInParallelWithUpdateChannelState()
{
// Arrange
var services = new ServiceCollection();

// add logger
services.AddNUnitLogger();
var loggerFactory = services.BuildServiceProvider().GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<ResolverTests>();

// add resolver and balancer
services.AddSingleton<ResolverFactory, CustomResolverFactory>();
services.AddSingleton<LoadBalancerFactory, CustomBalancerFactory>();

var connectionStep = 0;
services.AddSingleton<ISubchannelTransportFactory>(
new TestSubchannelTransportFactory((_, _) =>
{
// the first call should return `Ready` connection but then one of connections should be broken
// (as it happens in real life)
var connectivityState = connectionStep switch
{
> 0 => Random.Shared.NextDouble() switch
{
// use 0.7 probability of broken connection just to make the bug occur sooner
< 0.7 => ConnectivityState.TransientFailure,
_ => ConnectivityState.Ready
},
_ => ConnectivityState.Ready
};

Interlocked.Increment(ref connectionStep);

return Task.FromResult(new TryConnectResult(connectivityState));
}));

var channelOptions = new GrpcChannelOptions
{
Credentials = ChannelCredentials.Insecure,
ServiceProvider = services.BuildServiceProvider(),
};

// Act & Assert
var channel = GrpcChannel.ForAddress("test:///test_addr", channelOptions);
await channel.ConnectionManager.ConnectAsync(waitForReady: false, CancellationToken.None);

// the point is to perform a lot of `picks` to catch `PickAsync()` and `UpdateChannelState()` simultaneous
// execution when ` _balancer.UpdateChannelState(state)` runs after `GetPickerAsync()` internals but before
// the task it returned completed
var pickAsyncTask = Task.Run(async () =>
{
var counter = 0;
var exceptionsCounter = 0;

while (counter < 1000)
{
// short delay
await Task.Delay(20);

try
{
// I used counter to verify that exception is not always occurs at the first or the second
// (or any other) predefined step
counter++;

var (subchannel, address, _) = await channel.ConnectionManager.PickAsync(
new PickContext(),
waitForReady: false,
CancellationToken.None);

logger.LogInformation(
"[ {Counter} ] PickAsync result: subchannel = `{Subchannel}`, address = `{Address}`",
counter,
subchannel,
address);
}
catch (Exception ex)
{
logger.LogError(ex, "[ {Counter} ] PickAsync Error", counter);

exceptionsCounter++;
switch (exceptionsCounter)
{
// in real life renews are not too often so we stop renews here to show that this resolver
// state is broken and will forever throw exceptions (at least until the next renew came)
case 1:
CustomResolver.StopRenew();
break;

// restart renew to show that renews to _different_ addresses can fix errors
case > 5 and < 50:
CustomResolver.RestartRenew();
break;

case > 50:
throw;
}
}
}
});

// in some time this will definitely fail
await pickAsyncTask;
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer;
using Microsoft.Extensions.Logging;

namespace Grpc.Tests.Shared;

public class CustomBalancer(
IChannelControlHelper controller,
ILoggerFactory loggerFactory)
: SubchannelsLoadBalancer(controller, loggerFactory)
{
protected override SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels)
{
return new CustomPicker(readySubchannels);
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer;

namespace Grpc.Tests.Shared;

public class CustomBalancerFactory : LoadBalancerFactory
{
public override string Name => "test";

public override LoadBalancer Create(LoadBalancerOptions options)
{
return new CustomBalancer(options.Controller, options.LoggerFactory);
}
}
#endif
13 changes: 13 additions & 0 deletions test/Grpc.Net.Client.Tests/Infrastructure/Balancer/CustomPicker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer;

namespace Grpc.Tests.Shared;

internal class CustomPicker(IReadOnlyList<Subchannel> subchannels) : SubchannelPicker
{
public override PickResult Pick(PickContext context)
{
return PickResult.ForSubchannel(subchannels[Random.Shared.Next(0, subchannels.Count)]);
}
}
#endif
109 changes: 109 additions & 0 deletions test/Grpc.Net.Client.Tests/Infrastructure/Balancer/CustomResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer;
using Microsoft.Extensions.Logging;

namespace Grpc.Tests.Shared;

/// <summary>
/// Test Resolver but a bit similar to really used in our project
/// </summary>
internal class CustomResolver(ILoggerFactory loggerFactory) : PollingResolver(loggerFactory)
{
private const int AddressesUpdatePeriodMs = 500;

private static bool ShouldRenew = true;

// address lists should be different
private readonly string[][] _addresses = [
[ "test_addr_01", "test_addr_02" ],
[ "test_addr_03", "test_addr_04" ],
];

private volatile int _updateAddressesIndex;
private Timer? _timer;
private readonly object _lock = new ();
private readonly object _lock2 = new ();

private ResolverResult _result = ResolverResult.ForResult([]);

protected override void OnStarted()
{
// periodically renew address lists
_timer = new Timer(
NotifyListenerAboutResolverResult,
null,
AddressesUpdatePeriodMs,
AddressesUpdatePeriodMs);

NotifyListenerAboutResolverResult();
}

public static void StopRenew()
{
ShouldRenew = false;
}

public static void RestartRenew()
{
ShouldRenew = true;
}

protected override async Task ResolveAsync(CancellationToken cancellationToken)
{
await Task.Yield(); // fix used to avoid deadlock

lock (_lock)
{
Listener(_result);
}
}

private void NotifyListenerAboutResolverResult(object? state = null)
{
if (!ShouldRenew)
{
return;
}

lock (_lock)
{
UpdateResolverState();
Listener(_result);
}
}

// fill `_result` field with addresses we will use during the next `Listener()` call
private void UpdateResolverState()
{
var addresses = _addresses[_updateAddressesIndex];

var balancerAddresses = addresses
.Select(host => new BalancerAddress(host, 4242))
.ToArray();

_result = ResolverResult.ForResult(balancerAddresses);

if (_result.Addresses?.Count == 0)
{
return;
}

// choose next addresses list (or the first one if the end of the list is reached)
lock (_lock2)
{
Interlocked.Increment(ref _updateAddressesIndex);
Interlocked.CompareExchange(ref _updateAddressesIndex, 0, _addresses.Length);
}
}

protected override void Dispose(bool disposing)
{
if (!disposing)
{
return;
}

_timer?.Dispose();
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer;

namespace Grpc.Tests.Shared;

public class CustomResolverFactory : ResolverFactory
{
public override string Name => "test";

public override Resolver Create(ResolverOptions options)
{
return new CustomResolver(options.LoggerFactory);
}
}
#endif