Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/Npgsql/ClusterAwareDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class ClusterAwareDataSource: NpgsqlDataSource
/// Value = Number of Connections to that pool
/// </summary>
protected static Dictionary<int, int> poolToNumConnMap = new Dictionary<int, int>();

/// <summary>
/// Stores a map of host to their priority
/// </summary>
protected static Dictionary<string, int> hostToPriorityMap = new Dictionary<string, int>();
/// <summary>
/// Connection settings
/// </summary>
Expand Down Expand Up @@ -196,6 +201,16 @@ internal void CreatePool(List<string> hosts)
}
}

/// <summary>
/// Checks if a higher priority node is available
/// </summary>
/// <param name="poolindex"></param>
/// <returns></returns>
protected bool HasBetterNodeAvailable(int poolindex)
{
return false;
}

/// <summary>
/// gets the list of hosts
/// </summary>
Expand Down Expand Up @@ -369,6 +384,13 @@ internal override async ValueTask<NpgsqlConnector> Get(NpgsqlConnection conn, Np
CheckDisposed();

var poolIndex = conn.Settings.LoadBalanceHosts ? GetRoundRobinIndex() : 0;
var HasBetterNode = HasBetterNodeAvailable(poolIndex);
if (HasBetterNode)
{
UpdateConnectionMap(poolIndex, -1);
HasBetterNode = false;
await getConnector(conn, timeout, async, cancellationToken, exceptions);
}
if (poolIndex == -1)
break;
var timeoutPerHost = timeout.IsSet ? timeout.CheckAndGetTimeLeft() : TimeSpan.Zero;
Expand Down
18 changes: 18 additions & 0 deletions src/Npgsql/NpgsqlConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,24 @@ public double YBServersRefreshInterval
}
}
double _ybServersRefreshInterval;

/// <summary>
/// Sets the reconnect time for failed hosts
/// </summary>
[Category("Failover and load balancing")]
[Description("Sets the reconnect time for failed hosts")]
[DisplayName("Failed Host Reconnect Delay Secs")]
[NpgsqlConnectionStringProperty]
public double FailedHostReconnectDelaySecs
{
get => _failedHostReconnectDelaySecs;
set
{
_failedHostReconnectDelaySecs = value;
SetValue(nameof(YBServersRefreshInterval), value);
}
}
double _failedHostReconnectDelaySecs;

/// <summary>
/// Controls for how long the host's cached state will be considered as valid.
Expand Down
43 changes: 43 additions & 0 deletions src/Npgsql/TopologyAwareDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,54 @@ void PopulatePlacementSet(string placements, HashSet<CloudPlacement>? allowedPla
}
}

new bool HasBetterNodeAvailable(int poolindex)
{
var chosenHost = _pools[poolindex].Settings.Host;
if (chosenHost != null && hostToPriorityMap.ContainsKey(chosenHost)) {
var chosenHostPriority = hostToPriorityMap[chosenHost];
for (var i = 1; i < chosenHostPriority; i++) {
if (hostToPriorityMap.Values.Contains(i)) {
return true;
}
}
}
return false;
}

void UpdatePriorityMap(string host, string cloud, string region, string zone)
{
if (!unreachableHosts.Contains(host))
{
var priority = getPriority(cloud, region, zone);
hostToPriorityMap[host] = priority;
}
}

private int getPriority(string cloud, string region, string zone) {
CloudPlacement cp = new CloudPlacement(cloud, region, zone);
return getKeysByValue(cp);
}

private int getKeysByValue(CloudPlacement cp) {
int i;
for (i = 1; i <= MAX_PREFERENCE_VALUE; i++) {
if (allowedPlacements[i] != null && !allowedPlacements.Any()) {
if (cp.IsContainedIn(allowedPlacements[i])){
return i;
}
}
}
return MAX_PREFERENCE_VALUE + 1;
}

new List<string> GetCurrentServers(NpgsqlConnection conn)
{
NpgsqlCommand QUERY_SERVER = new NpgsqlCommand("Select * from yb_servers()",conn);
NpgsqlDataReader reader = QUERY_SERVER.ExecuteReader();
_lastServerFetchTime = DateTime.Now;
List<string> currentPrivateIps = new List<string>();
var hostConnectedTo = conn.Host;
hostToPriorityMap.Clear();

Debug.Assert(hostConnectedTo != null, nameof(hostConnectedTo) + " != null");
while (reader.Read())
Expand All @@ -169,6 +210,8 @@ void PopulatePlacementSet(string placements, HashSet<CloudPlacement>? allowedPla
var cloud = reader.GetString(reader.GetOrdinal("cloud"));
var region = reader.GetString(reader.GetOrdinal("region"));
var zone = reader.GetString(reader.GetOrdinal("zone"));

UpdatePriorityMap(host, cloud, region, zone);

UpdateCurrentHostList(currentPrivateIps, host, publicHost, cloud, region, zone);

Expand Down
2 changes: 1 addition & 1 deletion test/Npgsql.Tests/Support/AssemblySetUp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[SetUpFixture]
public class AssemblySetUp
{
[OneTimeSetUp]
// [OneTimeSetUp]
public void Setup()
{
var connString = TestUtil.ConnectionString;
Expand Down
147 changes: 147 additions & 0 deletions test/Npgsql.Tests/YBFallBackTopologyExtended.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Npgsql.Tests;

public class YBFallBackTopologyExtended : YBFallbackTopolgyTests
{

static int mlock = 0;
string connStringBuilder = "host=127.0.0.1,127.0.0.5;port=5433;database=yugabyte;userid=yugabyte;password=yugsbyte;Load Balance Hosts=true;Timeout=0;YB Servers Refresh Interval=10;Topology Keys=";

[Test]
public async Task TestFallback()
{

string? _Output = null;
string? _Error = null;

CreateCluster();
int[] count = { 4, 4, 4, -1, -1, -1 };
var conns = await CreateConnections(connStringBuilder+"aws.us-west.us-west-1a:1,aws.us-east.us-east-2a:2,aws.us-east.us-east-2b:3,aws.us-east.us-east-2c:4", count);

var cmd = "/bin/yb-ctl stop_node 1";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

cmd = "/bin/yb-ctl stop_node 2";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

cmd = "/bin/yb-ctl stop_node 3";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

cmd = "/bin/yb-ctl stop_node 4";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

count = new[] { -1, -1, -1, -1, 12, 0 };
conns = await CreateConnections(connStringBuilder+"aws.us-west.us-west-1a:1,aws.us-east.us-east-2a:2,aws.us-east.us-east-2b:3,aws.us-east.us-east-2c:4", count);

cmd = "/bin/yb-ctl start_node 4 --placement_info \"aws.us-east.us-east-2a\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);
Thread.Sleep(15000);

count = new[] { -1, -1, -1, 12, 0, 0 };
conns = await CreateConnections(connStringBuilder+"aws.us-west.us-west-1a:1,aws.us-east.us-east-2a:2,aws.us-east.us-east-2b:3,aws.us-east.us-east-2c:4", count);

cmd = "/bin/yb-ctl start_node 1 --placement_info \"aws.us-west.us-west-1a\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

cmd = "/bin/yb-ctl start_node 2 --placement_info \"aws.us-west.us-west-1a\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);

Thread.Sleep(15000);

count = new[] { 6, 6, -1, -1, -1, -1 };
conns = await CreateConnections(connStringBuilder+"aws.us-west.us-west-1a:1,aws.us-east.us-east-2a:2,aws.us-east.us-east-2b:3,aws.us-east.us-east-2c:4", count);

DestroyCluster();
}
static async Task<List<NpgsqlConnection>> CreateConnections(string connString, int[] counts)
{
List<NpgsqlConnection> conns = new List<NpgsqlConnection>();
if (mlock == 0)
{
mlock = 1;
try
{
for (var i = 1; i <= 12; i++)
{
NpgsqlConnection conn = new NpgsqlConnection(connString);
conn.Open();
conns.Add(conn);
}

Console.WriteLine("Connections Created");

var j = 1;
foreach (var count in counts)
{
if (count != -1)
{
var host = "127.0.0." + j;
await VerifyOn(host, count);
}

j++;
}
CloseConnections(conns);
mlock = 0;
}
catch (Exception ex)
{
Console.WriteLine("Failure:" + ex.Message);
Console.WriteLine("Failure stacktrace: " + ex.StackTrace);
return conns;
}
}

return conns;

}

static void CloseConnections(List<NpgsqlConnection>conns)
{
foreach (var conn in conns)
{
if (conn.State != System.Data.ConnectionState.Closed)
{
conn.Close();
}
}
}

void CreateCluster()
{
string? _Output = null;
string? _Error = null;
var cmd = "/bin/yb-ctl start --rf 3 --placement_info \"aws.us-west.us-west-1a,aws.us-west.us-west-1a,aws.us-west.us-west-1a\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);
cmd = "/bin/yb-ctl add_node --placement_info \"aws.us-east.us-east-2a\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);
cmd = "/bin/yb-ctl add_node --placement_info \"aws.us-east.us-east-2b\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);
cmd = "/bin/yb-ctl add_node --placement_info \"aws.us-east.us-east-2c\"";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
Console.WriteLine("Output:" + _Output);
}

void DestroyCluster()
{
string? _Output = null;
string? _Error = null;
var cmd = "/bin/yb-ctl destroy";
ExecuteShellCommand(cmd, ref _Output, ref _Error );
}
}