diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj
index d800e715b..07444a6af 100644
--- a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj
+++ b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj
@@ -116,6 +116,7 @@
+
diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs
index 420bbdf5d..50b6822b2 100644
--- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs
+++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using Moq;
@@ -32,6 +33,23 @@ public class ClusterConnectionPoolTests
{
private static Uri ServerUri { get; } = new Uri("bolt+routing://1234:5678");
+ public class Constructor
+ {
+ [Fact]
+ public void ShouldEnsureInitialRouter()
+ {
+ var uris = new HashSet{new Uri("bolt://123:456")};
+ var config = Config.DefaultConfig;
+ var connSettings = new ConnectionSettings(ServerUri, new Mock().Object, config);
+ var poolSettings = new ConnectionPoolSettings(config);
+
+ var pool = new ClusterConnectionPool(connSettings, poolSettings, uris, null);
+
+ pool.ToString().Should().Be(
+ "[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]");
+ }
+ }
+
public class TryAcquireMethod
{
[Fact]
diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs
index d95997728..abf356aaa 100644
--- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs
+++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs
@@ -16,526 +16,34 @@
// limitations under the License.
using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading;
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
using FluentAssertions;
using Moq;
-using Neo4j.Driver.Internal;
using Neo4j.Driver.Internal.Connector;
using Neo4j.Driver.Internal.Routing;
using Neo4j.Driver.V1;
using Xunit;
+using static Neo4j.Driver.Tests.Routing.RoutingTableManagerTests;
namespace Neo4j.Driver.Tests.Routing
{
public class LoadBalancerTests
{
- public class Constructor
- {
- [Fact]
- public void ShouldEnsureInitialRouter()
- {
- var uri = new Uri("bolt://123:456");
- var config = Config.DefaultConfig;
- var routingSettings = new RoutingSettings(new Dictionary());
- var connSettings = new ConnectionSettings(uri, new Mock().Object, config);
- var poolSettings = new ConnectionPoolSettings(config);
-
- var loadbalancer = new LoadBalancer(routingSettings, connSettings, poolSettings, null);
-
- loadbalancer.ToString().Should().Be(
- "_routingTable: {[_routers: bolt://123:456/], [_detachedRouters: ], [_readers: ], [_writers: ]}, " +
- "_clusterConnectionPool: {[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]}");
- }
- }
-
- public class AcquireConnectionMethod
- {
- public class UpdateRoutingTableWithInitialUriFallbackMethod
- {
- [Fact]
- public void ShouldPrependInitialRouterIfWriterIsAbsent()
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
-
- var routingTableMock = new Mock();
- routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
- .Callback>(r => r.Single().Should().Be(uri));
-
- var poolMock = new Mock();
- poolMock.Setup(x => x.Add(It.IsAny>()))
- .Callback>(r => r.Single().Should().Be(uri));
-
- var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object, uri);
- balancer.IsReadingInAbsenceOfWriter = true;
- var routingTableReturnMock = new Mock();
-
- // When
- // should throw an exception as the initial routers should not be tried again
- var exception = Record.Exception(()=>
- balancer.UpdateRoutingTableWithInitialUriFallback(c => c != null ? null : routingTableReturnMock.Object));
- exception.Should().BeOfType();
-
- // Then
- poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once);
- routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
- }
-
- [Fact]
- public void ShouldAddInitialUriWhenNoAvailableRouters()
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
-
- var routingTableMock = new Mock();
- routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
- .Callback>(r => r.Single().Should().Be(uri));
-
- var poolMock = new Mock();
- poolMock.Setup(x => x.Add(It.IsAny>()))
- .Callback>(r => r.Single().Should().Be(uri));
-
- var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object, uri);
-
- var routingTableReturnMock = new Mock();
-
- // When
- balancer.UpdateRoutingTableWithInitialUriFallback(c => c != null ? null : routingTableReturnMock.Object);
-
- // Then
- poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once);
- routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
- }
-
- [Fact]
- public void ShouldNotTryInitialUriIfAlreadyTried()
- {
- // Given
- var a = new Uri("bolt+routing://123:456");
- var b = new Uri("bolt+routing://123:789");
- var s = a; // should not be retried
- var t = new Uri("bolt+routing://222:123"); // this should be retried
-
- var routingTableMock = new Mock();
- routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
- // ensure the retried is only t
- .Callback>(set => set.Single().Should().Be(t));
-
- var poolMock = new Mock();
- poolMock.Setup(x => x.Add(It.IsAny>()))
- // ensure the retried is only t
- .Callback>(set => set.Single().Should().Be(t));
-
- var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object);
-
- Func, IRoutingTable> updateRoutingTableFunc = set =>
- {
- if (set != null)
- {
- set.Add(a);
- set.Add(b);
- return null;
- }
- else
- {
- return new Mock().Object;
- }
- };
- Func> resolveInitialUriFunc = () => new HashSet {s, t};
- // When
- balancer.UpdateRoutingTableWithInitialUriFallback(updateRoutingTableFunc, resolveInitialUriFunc);
-
- // Then
- // verify the method is actually called
- poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once);
- routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
- }
- }
-
- public class UpdateRoutingTableMethod
- {
- [Fact]
- public void ConcurrentUpdateRequestsHaveNoEffect()
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- var balancer = SetupLoadBalancer(new[] {uri});
- var balancerRoutingTable = NewRoutingTable(new[] {uri});
-
- var anotherUri = new Uri("bolt+routing://123:789");
- var updateCount = 0;
- var directReturnCount = 0;
- Func updateRoutingTableFunc =
- connection =>
- {
- if (!balancerRoutingTable.IsStale(AccessMode.Write))
- {
- Interlocked.Add(ref directReturnCount, 1);
- return balancerRoutingTable;
- }
- Interlocked.Add(ref updateCount, 1);
- // needs to return a valid routing table to stop update routing table
- var newTable = NewRoutingTable(new[] { anotherUri }, new[] { anotherUri }, new[] { anotherUri });
- balancerRoutingTable = newTable;
- return newTable;
- };
-
- // When calling update routing table in many threads
- var size = 10;
- Thread[] threads = new Thread[size];
- for (var i = 0; i < size; i++)
- {
- var thread = new Thread(() =>
- {
- var result = balancer.UpdateRoutingTable(updateRoutingTableFunc);
- result.All().Should().ContainInOrder(anotherUri);
- });
- threads[i] = thread;
- thread.Start();
- }
-
- foreach (var thread in threads)
- {
- thread.Join();
- }
- updateCount.Should().Be(1);
- directReturnCount.Should().Be(size-1);
- }
-
- [Fact]
- public void UpdateReplaceEntireRoutingTable()
- {
- // Given
- var uriA = new Uri("bolt+routing://123a:456");
- var uriB = new Uri("bolt+routing://123b:456");
- var uriC = new Uri("bolt+routing://123c:456");
- var balancer = SetupLoadBalancer(new[] { uriA }, new[] { uriB }, new[] { uriC });
-
- // When
- var uriX = new Uri("bolt+routing://123x:789");
- var uriY = new Uri("bolt+routing://123y:789");
- var uriZ = new Uri("bolt+routing://123z:789");
- var result = balancer.UpdateRoutingTable(connection
- => NewRoutingTable(new[] {uriX}, new[] { uriY }, new[] { uriZ }));
-
- // Then
- result.All().Should().Contain(new [] {uriX, uriY, uriZ});
- }
-
- [Fact]
- public void ShouldForgetAndTryNextRouterWhenFailedWithConnectionError()
- {
- // Given
- var uriA = new Uri("bolt+routing://123:456");
- var uriB = new Uri("bolt+routing://123:789");
-
- // This ensures that uri and uri2 will return in order
- var routingTable = new ListBasedRoutingTable(new List {uriA, uriB});
- var balancer = SetupLoadBalancer(routingTable);
-
- // When
- var newRoutingTable = balancer.UpdateRoutingTable(connection =>
- {
- // the second connectin will give a new routingTable
- if (connection.Server.Address.Equals(uriA.ToString())) // uriA
- {
- ((ClusterConnection)connection).OnError(new ServiceUnavailableException("failed init"));
- }
- if (connection.Server.Address.Equals(uriB.ToString())) // uriB
- {
- return NewRoutingTable(new[] {uriA}, new [] {uriA}, new []{uriA});
- }
-
- throw new NotSupportedException($"Unknown uri: {connection.Server.Address}");
- });
-
- // Then
- newRoutingTable.All().Should().ContainInOrder(uriA);
- routingTable.All().Should().ContainInOrder(uriB);
- }
-
- [Fact]
- public void ShouldPropagateServiceUnavailable()
- {
- var balancer = SetupLoadBalancer(new[] {new Uri("bolt+routing://123:456")});
-
- var exception = Record.Exception(()=>balancer.UpdateRoutingTable(
- conn => { throw new ServiceUnavailableException("Procedure not found"); }));
-
- exception.Should().BeOfType();
- exception.Message.Should().Be("Procedure not found");
- }
-
- [Fact]
- public void ShouldTryNextRouterIfNoReader()
- {
- // Given
- var uriA = new Uri("bolt+routing://123:1");
- var uriB = new Uri("bolt+routing://123:2");
-
- var uriX = new Uri("bolt+routing://456:1");
- var uriY = new Uri("bolt+routing://789:2");
-
- var balancer = SetupLoadBalancer(new ListBasedRoutingTable(new List {uriA, uriB}));
-
- // When
- var updateRoutingTable = balancer.UpdateRoutingTable(conn =>
- {
- if (conn.Server.Address.Equals(uriA.ToString()))
- {
- return NewRoutingTable(new[] {uriX}, new Uri[0], new[] {uriX});
- }
- if (conn.Server.Address.Equals(uriB.ToString()))
- {
- return NewRoutingTable(new[] {uriY}, new[] {uriY}, new[] {uriY});
- }
- throw new NotSupportedException($"Unknown uri: {conn.Server.Address}");
- });
-
- // Then
- updateRoutingTable.All().Should().ContainInOrder(uriY);
- balancer.IsReadingInAbsenceOfWriter.Should().BeFalse();
- }
-
- [Fact]
- public void ShouldAcceptRoutingTableIfNoWriter()
- {
- // Given
- var uriA = new Uri("bolt+routing://123:1");
- var uriX = new Uri("bolt+routing://456:1");
-
- var balancer = SetupLoadBalancer(new ListBasedRoutingTable(new List { uriA }));
-
- // When
- var updateRoutingTable = balancer.UpdateRoutingTable(conn =>
- {
- if (conn.Server.Address.Equals(uriA.ToString()))
- {
- return NewRoutingTable(new[] {uriX}, new[] {uriX});
- }
- throw new NotSupportedException($"Unknown uri: {conn.Server.Address}");
- });
-
- // Then
- updateRoutingTable.All().Should().ContainInOrder(uriX);
- balancer.IsReadingInAbsenceOfWriter.Should().BeTrue();
- }
-
- [Theory]
- [InlineData(1, 1, 1)]
- [InlineData(2, 1, 1)]
- [InlineData(1, 2, 1)]
- [InlineData(2, 2, 1)]
- [InlineData(1, 1, 2)]
- [InlineData(2, 1, 2)]
- [InlineData(1, 2, 2)]
- [InlineData(2, 2, 2)]
- [InlineData(3, 1, 2)]
- [InlineData(3, 2, 1)]
- public void ShouldAcceptValidRoutingTables(int routerCount, int writerCount, int readerCount)
- {
- var balancer = SetupLoadBalancer(new[] {new Uri("bolt+routing://123:45")});
- var newRoutingTable = NewRoutingTable(routerCount, readerCount, writerCount);
- var result = balancer.UpdateRoutingTable(connection => newRoutingTable);
-
- // Then
- result.All().Should().Contain(newRoutingTable.All());
- newRoutingTable.All().Should().Contain(result.All());
- balancer.IsReadingInAbsenceOfWriter.Should().BeFalse();
- }
-
- [Fact]
- public void ShouldPropagateProtocolError()
- {
- var balancer = SetupLoadBalancer(new[] { new Uri("bolt+routing://123:456") });
-
- var exception = Record.Exception(() => balancer.UpdateRoutingTable(
- conn => { throw new ProtocolException("Cannot parse procedure result"); }));
-
- exception.Should().BeOfType();
- exception.Message.Should().Be("Cannot parse procedure result");
- }
-
- [Fact]
- public void ShouldPropagateAuthenticationException()
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- // a routing table which knows a uri
- var routingTable = NewRoutingTable(new[] {uri});
-
- var mockedClusterPool = new Mock();
- var balancer = new LoadBalancer(mockedClusterPool.Object, routingTable);
-
- var mockedConn = new Mock();
- var conn = mockedConn.Object;
- mockedClusterPool.Setup(x => x.TryAcquire(uri, out conn)).Callback(() =>
- {
- throw new AuthenticationException("Failed to auth the client to the server.");
- });
-
- // When
- var error = Record.Exception(() => balancer.UpdateRoutingTable());
-
- // Then
- error.Should().BeOfType();
- error.Message.Should().Contain("Failed to auth the client to the server.");
-
- // while the server is not removed
- routingTable.All().Should().ContainInOrder(uri);
- }
- }
-
- public class AcquireReadWriteConnectionMethod
- {
- [Theory]
- [InlineData(AccessMode.Read)]
- [InlineData(AccessMode.Write)]
- public void ShouldThrowSessionExpiredExceptionIfNoServerAvailable(AccessMode mode)
- {
- // Given
- var mock = CreateRoutingTable(mode, null, false);
- var balancer = new LoadBalancer(null, mock.Object);
-
- // When
- var error = Record.Exception(()=>balancer.Acquire(mode));
-
- // Then
- error.Should().BeOfType();
- error.Message.Should().Contain("Failed to connect to any");
- }
-
- [Theory]
- [InlineData(AccessMode.Read)]
- [InlineData(AccessMode.Write)]
- public void ShouldReturnConnectionWithCorrectMode(AccessMode mode)
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- var mock = CreateRoutingTable(mode, uri);
- mock.Setup(m => m.All()).Returns(new HashSet { uri });
- var balancer = SetupLoadBalancer(mock.Object);
-
- // When
- var acquiredConn = balancer.Acquire(mode);
-
- // Then
- acquiredConn.Server.Address.Should().Be(uri.ToString());
- }
-
- private static Mock CreateRoutingTable(AccessMode mode, Uri uri, bool hasNext = true)
- {
- var mock = new Mock();
- mock.Setup(m => m.IsStale(It.IsAny())).Returns(false);
- if (mode == AccessMode.Read)
- {
- mock.SetupSequence(m => m.TryNextReader(out uri)).Returns(hasNext).Returns(false);
- }
- else
- {
- mock.SetupSequence(m => m.TryNextWriter(out uri)).Returns(hasNext).Returns(false);
- }
- return mock;
- }
-
- [Theory]
- [InlineData(AccessMode.Read)]
- [InlineData(AccessMode.Write)]
- public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode)
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- var routingTableMock = CreateRoutingTable(mode, uri);
-
- var clusterConnPoolMock = new Mock();
- IConnection conn = null;
- clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn))
- .Callback(() =>
- {
- throw new ServiceUnavailableException("failed init");
- });
-
- var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object);
-
- // When
- var error = Record.Exception(() => balancer.Acquire(mode));
-
- // Then
- error.Should().BeOfType();
- error.Message.Should().Contain("Failed to connect to any");
-
- // should be removed
- routingTableMock.Verify(m=>m.Remove(uri), Times.Once);
- clusterConnPoolMock.Verify(m=>m.Purge(uri), Times.Once);
- }
-
- [Theory]
- [InlineData(AccessMode.Read)]
- [InlineData(AccessMode.Write)]
- public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode)
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- var routingTableMock = CreateRoutingTable(mode, uri);
-
- var clusterConnPoolMock = new Mock();
- IConnection conn = null;
- clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn))
- .Callback(() =>
- {
- throw new SecurityException("Failed to establish ssl connection with the server");
- });
-
- var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object);
-
- // When
- var error = Record.Exception(() => balancer.Acquire(mode));
-
- // Then
- error.Should().BeOfType();
- error.Message.Should().Contain("ssl connection with the server");
-
- // while the server is not removed
- routingTableMock.Verify(m => m.Remove(uri), Times.Never);
- clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
- }
-
- [Theory]
- [InlineData(AccessMode.Read)]
- [InlineData(AccessMode.Write)]
- public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode)
- {
- // Given
- var uri = new Uri("bolt+routing://123:456");
- var routingTableMock = CreateRoutingTable(mode, uri);
-
- var clusterConnPoolMock = new Mock();
- IConnection conn = null;
- clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(false)
- .Callback(() =>
- {
- throw new ProtocolException("do not understand struct 0x01");
- });
-
- var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object);
-
- // When
- var error = Record.Exception(() => balancer.Acquire(mode));
-
- // Then
- error.Should().BeOfType();
- error.Message.Should().Contain("do not understand struct 0x01");
-
- // while the server is not removed
- routingTableMock.Verify(m => m.Remove(uri), Times.Never);
- clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
- }
- }
- }
-
public class ClusterErrorHandlerTests
{
public class OnConnectionErrorMethod
@@ -546,7 +54,9 @@ public void ShouldRmoveFromLoadBalancer()
var clusterPoolMock = new Mock();
var routingTableMock = new Mock();
var uri = new Uri("https://neo4j.com");
- var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableMock.Object);
+ var routingTableManagerMock = new Mock();
+ routingTableManagerMock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
+ var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object);
loadBalancer.OnConnectionError(uri, new ClientException());
clusterPoolMock.Verify(x=>x.Purge(uri),Times.Once);
@@ -563,7 +73,9 @@ public void ShouldRemoveWriterFromRoutingTable()
var clusterPoolMock = new Mock();
var routingTableMock = new Mock();
var uri = new Uri("https://neo4j.com");
- var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableMock.Object);
+ var routingTableManagerMock = new Mock();
+ routingTableManagerMock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
+ var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object);
loadBalancer.OnWriteError(uri);
clusterPoolMock.Verify(x => x.Purge(uri), Times.Never);
@@ -573,127 +85,140 @@ public void ShouldRemoveWriterFromRoutingTable()
}
}
- internal class ListBasedRoutingTable : IRoutingTable
+ public class AcquireMethod
{
- private readonly List _routers;
- private readonly List _removed;
- private int _count = 0;
-
- public ListBasedRoutingTable(List routers)
+ [Theory]
+ [InlineData(AccessMode.Read)]
+ [InlineData(AccessMode.Write)]
+ public void ShouldThrowSessionExpiredExceptionIfNoServerAvailable(AccessMode mode)
{
- _routers = routers;
- _removed = new List();
- }
- public bool IsStale(AccessMode mode)
- {
- return false;
- }
+ // Given
+ var mock = new Mock();
+ mock.Setup(x => x.RoutingTable).Returns(NewMockedRoutingTable(mode, null, false).Object);
+ var balancer = new LoadBalancer(null, mock.Object);
- public bool TryNextRouter(out Uri uri)
- {
- var pos = _count++ % _routers.Count;
- uri = _routers[pos];
- return true;
- }
+ // When
+ var error = Record.Exception(() => balancer.Acquire(mode));
- public bool TryNextReader(out Uri uri)
- {
- throw new NotSupportedException();
+ // Then
+ error.Should().BeOfType();
+ error.Message.Should().Contain("Failed to connect to any");
}
- public bool TryNextWriter(out Uri uri)
+ [Theory]
+ [InlineData(AccessMode.Read)]
+ [InlineData(AccessMode.Write)]
+ public void ShouldReturnConnectionWithCorrectMode(AccessMode mode)
{
- throw new NotSupportedException();
- }
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+ var mock = new Mock();
+ var routingTableMock = NewMockedRoutingTable(mode, uri);
+ mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
- public void Remove(Uri uri)
- {
- _removed.Add(uri);
- }
+ var clusterPoolMock = new Mock();
+ var mockedConn = new Mock();
+ mockedConn.Setup(x => x.Server.Address).Returns(uri.ToString);
+ var conn = mockedConn.Object;
+ clusterPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(true);
+ var balancer = new LoadBalancer(clusterPoolMock.Object, mock.Object);
+
+ // When
+ var acquiredConn = balancer.Acquire(mode);
- public void RemoveWriter(Uri uri)
- {
- throw new NotSupportedException();
+ // Then
+ acquiredConn.Server.Address.Should().Be(uri.ToString());
}
- public ISet All()
+ [Theory]
+ [InlineData(AccessMode.Read)]
+ [InlineData(AccessMode.Write)]
+ public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode)
{
- return new HashSet(_routers.Distinct().Except(_removed.Distinct()));
- }
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTableMock = NewMockedRoutingTable(mode, uri);
+ var mock = new Mock();
+ mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
- public void Clear()
- {
- throw new NotSupportedException();
- }
+ var clusterConnPoolMock = new Mock();
+ IConnection conn = null;
+ clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn))
+ .Callback(() => throw new ServiceUnavailableException("failed init"));
- public void PrependRouters(IEnumerable uris)
- {
- throw new NotSupportedException();
- }
- }
+ var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object);
- private static IRoutingTable NewRoutingTable(int routerCount, int readerCount, int writerCount)
- {
- return NewRoutingTable(GenerateServerUris(routerCount), GenerateServerUris(readerCount),
- GenerateServerUris(writerCount));
- }
+ // When
+ var error = Record.Exception(() => balancer.Acquire(mode));
- private static IEnumerable GenerateServerUris(int count)
- {
- var uris = new Uri[count];
- for (var i = 0; i < count; i++)
- {
- uris[i] = new Uri($"bolt+routing://127.0.0.1:{i + 9001}");
- }
- return uris;
- }
+ // Then
+ error.Should().BeOfType();
+ error.Message.Should().Contain("Failed to connect to any");
- private static IRoutingTable NewRoutingTable(
- IEnumerable routers = null,
- IEnumerable readers = null,
- IEnumerable writers = null)
- {
- // assign default value of uri
- if (routers == null)
- {
- routers = new Uri[0];
+ // should be removed
+ routingTableMock.Verify(m => m.Remove(uri), Times.Once);
+ clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Once);
}
- if (readers == null)
+
+ [Theory]
+ [InlineData(AccessMode.Read)]
+ [InlineData(AccessMode.Write)]
+ public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode)
{
- readers = new Uri[0];
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTableMock = NewMockedRoutingTable(mode, uri);
+ var mock = new Mock();
+ mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
+
+ var clusterConnPoolMock = new Mock();
+ IConnection conn = null;
+ clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn))
+ .Callback(() => throw new SecurityException("Failed to establish ssl connection with the server"));
+
+ var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object);
+
+ // When
+ var error = Record.Exception(() => balancer.Acquire(mode));
+
+ // Then
+ error.Should().BeOfType();
+ error.Message.Should().Contain("ssl connection with the server");
+
+ // while the server is not removed
+ routingTableMock.Verify(m => m.Remove(uri), Times.Never);
+ clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
}
- if (writers == null)
+
+ [Theory]
+ [InlineData(AccessMode.Read)]
+ [InlineData(AccessMode.Write)]
+ public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode)
{
- writers = new Uri[0];
- }
- return new RoundRobinRoutingTable(routers, readers, writers, new Stopwatch(), 1000);
- }
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTableMock = NewMockedRoutingTable(mode, uri);
+ var mock = new Mock();
+ mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object);
- private static LoadBalancer SetupLoadBalancer(
- IEnumerable routers = null,
- IEnumerable readers = null,
- IEnumerable writers = null)
- {
- // create a routing table which knows a few servers
- var routingTable = NewRoutingTable(routers, readers, writers);
- return SetupLoadBalancer(routingTable);
- }
+ var clusterConnPoolMock = new Mock();
+ IConnection conn = null;
+ clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(false)
+ .Callback(() => throw new ProtocolException("do not understand struct 0x01"));
- private static LoadBalancer SetupLoadBalancer(IRoutingTable routingTable)
- {
- var uris = routingTable.All();
+ var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object);
- // create a mocked cluster connection pool, which will return the same connection for each different uri
- var mockedClusterPool = new Mock();
+ // When
+ var error = Record.Exception(() => balancer.Acquire(mode));
- foreach (var uri in uris)
- {
- var mockedConn = new Mock();
- mockedConn.Setup(x => x.Server.Address).Returns(uri.ToString);
- var conn = mockedConn.Object;
- mockedClusterPool.Setup(x => x.TryAcquire(uri, out conn)).Returns(true);
+ // Then
+ error.Should().BeOfType();
+ error.Message.Should().Contain("do not understand struct 0x01");
+
+ // while the server is not removed
+ routingTableMock.Verify(m => m.Remove(uri), Times.Never);
+ clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never);
}
- return new LoadBalancer(mockedClusterPool.Object, routingTable);
}
}
}
diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs
index d8a986f2c..b261a9999 100644
--- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs
+++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs
@@ -17,7 +17,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Linq;
using FluentAssertions;
+using Moq;
+using Neo4j.Driver.Internal;
using Neo4j.Driver.Internal.Routing;
using Neo4j.Driver.V1;
using Xunit;
@@ -36,6 +39,23 @@ private static IEnumerable CreateUriArray(int count)
return uris;
}
+ public class Constructor
+ {
+ [Fact]
+ public void ShouldEnsureInitialRouter()
+ {
+ var initUri = new Uri("bolt://123:456");
+ var routers = new HashSet{initUri};
+ var table = new RoundRobinRoutingTable(routers);
+
+ Uri uri;
+ table.TryNextRouter(out uri).Should().BeTrue();
+ uri.Should().Be(initUri);
+
+ table.All().Single().Should().Be(initUri);
+ }
+ }
+
public class IsStatleMethod
{
[Theory] [InlineData(1, 2, 1, 5 * 60, false)] // 1 router, 2 reader, 1 writer
@@ -49,7 +69,6 @@ public void ShouldBeStaleInReadModeIfOnlyHaveOneRouter(int routerCount, int read
CreateUriArray(routerCount),
CreateUriArray(readerCount),
CreateUriArray(writerCount),
- new Stopwatch(),
expireAfterSeconds);
table.IsStale(AccessMode.Read).Should().Be(isStale);
}
@@ -66,7 +85,6 @@ public void ShouldBeStaleInWriteModeIfOnlyHaveOneRouter(int routerCount, int rea
CreateUriArray(routerCount),
CreateUriArray(readerCount),
CreateUriArray(writerCount),
- new Stopwatch(),
expireAfterSeconds);
table.IsStale(AccessMode.Write).Should().Be(isStale);
}
@@ -82,7 +100,7 @@ public void ShouldInjectInFront()
CreateUriArray(3),
CreateUriArray(0),
CreateUriArray(0),
- new Stopwatch(), 5 * 60);
+ 5 * 60);
Uri router;
table.TryNextRouter(out router);
var head = new Uri("http://neo4j:10");
@@ -102,6 +120,5 @@ public void ShouldInjectInFront()
router.Should().Be(head);
}
}
-
}
}
diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs
new file mode 100644
index 000000000..219e14693
--- /dev/null
+++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs
@@ -0,0 +1,445 @@
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using FluentAssertions;
+using Moq;
+using Neo4j.Driver.Internal;
+using Neo4j.Driver.Internal.Connector;
+using Neo4j.Driver.Internal.Routing;
+using Neo4j.Driver.V1;
+using Xunit;
+
+namespace Neo4j.Driver.Tests.Routing
+{
+ public class RoutingTableManagerTests
+ {
+
+ internal static RoutingTableManager NewRoutingTableManager(
+ IRoutingTable routingTable,
+ IClusterConnectionPoolManager poolManager,
+ Uri seedUri)
+ {
+ return new RoutingTableManager(
+ routingTable,
+ new RoutingSettings(new Dictionary()),
+ poolManager, seedUri, null);
+ }
+
+ internal static Mock NewMockedRoutingTable(AccessMode mode, Uri uri, bool hasNext = true)
+ {
+ var mock = new Mock();
+ mock.Setup(m => m.IsStale(It.IsAny())).Returns(false);
+ mock.SetupSequence(m => m.TryNext(mode, out uri)).Returns(hasNext).Returns(false);
+ return mock;
+ }
+
+ private static IRoutingTable NewRoutingTable(
+ IEnumerable routers = null,
+ IEnumerable readers = null,
+ IEnumerable writers = null)
+ {
+ // assign default value of uri
+ if (routers == null)
+ {
+ routers = new Uri[0];
+ }
+ if (readers == null)
+ {
+ readers = new Uri[0];
+ }
+ if (writers == null)
+ {
+ writers = new Uri[0];
+ }
+ return new RoundRobinRoutingTable(routers, readers, writers, 1000);
+ }
+
+ public class UpdateRoutingTableWithInitialUriFallbackMethod
+ {
+ [Fact]
+ public void ShouldPrependInitialRouterIfWriterIsAbsent()
+ {
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+
+ var routingTableMock = new Mock();
+ routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(uri));
+
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(uri));
+
+ var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null);
+ manager.IsReadingInAbsenceOfWriter = true;
+ var routingTableReturnMock = new Mock();
+
+ // When
+ // should throw an exception as the initial routers should not be tried again
+ var exception = Record.Exception(() =>
+ manager.UpdateRoutingTableWithInitialUriFallback(new HashSet {uri}, c => c != null
+ ? null
+ : routingTableReturnMock.Object));
+ exception.Should().BeOfType();
+
+ // Then
+ poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once);
+ routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
+ }
+
+ [Fact]
+ public void ShouldAddInitialUriWhenNoAvailableRouters()
+ {
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+
+ var routingTableMock = new Mock();
+ routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(uri));
+
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(uri));
+
+ var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null);
+ var routingTableReturnMock = new Mock();
+
+ // When
+ manager.UpdateRoutingTableWithInitialUriFallback(new HashSet {uri}, c => c != null
+ ? null
+ : routingTableReturnMock.Object);
+
+ // Then
+ poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once);
+ routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
+ }
+
+ [Fact]
+ public void ShouldNotTryInitialUriIfAlreadyTried()
+ {
+ // Given
+ var a = new Uri("bolt+routing://123:456");
+ var b = new Uri("bolt+routing://123:789");
+ var s = a; // should not be retried
+ var t = new Uri("bolt+routing://222:123"); // this should be retried
+
+ var routingTableMock = new Mock();
+ routingTableMock.Setup(x => x.PrependRouters(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(t));
+
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>()))
+ .Callback>(r => r.Single().Should().Be(t));
+
+ var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null);
+
+ IRoutingTable UpdateRoutingTableFunc(ISet set)
+ {
+ if (set != null)
+ {
+ set.Add(a);
+ set.Add(b);
+ return null;
+ }
+ else
+ {
+ return new Mock().Object;
+ }
+ }
+
+ // When
+ var initialUriSet = new HashSet {s, t};
+ manager.UpdateRoutingTableWithInitialUriFallback(initialUriSet, UpdateRoutingTableFunc);
+
+ // Then
+ // verify the method is actually called
+ poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once);
+ routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once);
+ }
+ }
+
+ public class UpdateRoutingTableMethod
+ {
+ [Fact]
+ public void ShouldForgetAndTryNextRouterWhenConnectionIsNull()
+ {
+ // Given
+ var uriA = new Uri("bolt+routing://123:456");
+ var uriB = new Uri("bolt+routing://123:789");
+
+ // This ensures that uri and uri2 will return in order
+ var routingTable = new ListBasedRoutingTable(new List {uriA, uriB});
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.CreateClusterConnection(It.IsAny()))
+ .Returns((ClusterConnection) null);
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ // When
+ var newRoutingTable = manager.UpdateRoutingTable(connection =>
+ throw new NotSupportedException($"Unknown uri: {connection.Server.Address}"));
+
+ // Then
+ newRoutingTable.Should().BeNull();
+ routingTable.All().Should().BeEmpty();
+ }
+
+ [Fact]
+ public void ShouldForgetAndTryNextRouterWhenFailedWithConnectionError()
+ {
+ // Given
+ var uriA = new Uri("bolt+routing://123:456");
+ var uriB = new Uri("bolt+routing://123:789");
+ var connA = new Mock().Object;
+ var connB = new Mock().Object;
+
+ // This ensures that uri and uri2 will return in order
+ var routingTable = new ListBasedRoutingTable(new List {uriA, uriB});
+ var poolManagerMock = new Mock();
+ poolManagerMock.SetupSequence(x => x.CreateClusterConnection(It.IsAny()))
+ .Returns(connA).Returns(connB);
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ // When
+ var newRoutingTable = manager.UpdateRoutingTable(connection =>
+ {
+ // the second connectin will give a new routingTable
+ if (connection.Equals(connA)) // uriA
+ {
+ routingTable.Remove(uriA);
+ throw new SessionExpiredException("failed init");
+ }
+ if (connection.Equals(connB)) // uriB
+ {
+ return NewRoutingTable(new[] {uriA}, new[] {uriA}, new[] {uriA});
+ }
+
+ throw new NotSupportedException($"Unknown uri: {connection.Server.Address}");
+ });
+
+ // Then
+ newRoutingTable.All().Should().ContainInOrder(uriA);
+ routingTable.All().Should().ContainInOrder(uriB);
+ }
+
+ [Fact]
+ public void ShouldPropagateServiceUnavailable()
+ {
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTable = new ListBasedRoutingTable(new List { uri });
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.CreateClusterConnection(uri))
+ .Returns(new Mock().Object);
+
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ var exception = Record.Exception(() => manager.UpdateRoutingTable(
+ conn => throw new ServiceUnavailableException("Procedure not found")));
+
+ exception.Should().BeOfType();
+ exception.Message.Should().Be("Procedure not found");
+ }
+
+
+ [Fact]
+ public void ShouldPropagateProtocolError()
+ {
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTable = new ListBasedRoutingTable(new List {uri});
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.CreateClusterConnection(uri))
+ .Returns(new Mock().Object);
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ var exception = Record.Exception(() => manager.UpdateRoutingTable(
+ conn => throw new ProtocolException("Cannot parse procedure result")));
+
+ exception.Should().BeOfType();
+ exception.Message.Should().Be("Cannot parse procedure result");
+ }
+
+ [Fact]
+ public void ShouldPropagateAuthenticationException()
+ {
+ // Given
+ var uri = new Uri("bolt+routing://123:456");
+ var routingTable = new ListBasedRoutingTable(new List {uri});
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.CreateClusterConnection(uri))
+ .Callback(() => throw new AuthenticationException("Failed to auth the client to the server."));
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ // When
+ var error = Record.Exception(() => manager.UpdateRoutingTable());
+
+ // Then
+ error.Should().BeOfType();
+ error.Message.Should().Contain("Failed to auth the client to the server.");
+
+ // while the server is not removed
+ routingTable.All().Should().ContainInOrder(uri);
+ }
+
+ [Fact]
+ public void ShouldTryNextRouterIfNoReader()
+ {
+ // Given
+ var uriA = new Uri("bolt+routing://123:1");
+ var uriB = new Uri("bolt+routing://123:2");
+ var connA = new Mock().Object;
+ var connB = new Mock().Object;
+
+ var uriX = new Uri("bolt+routing://456:1");
+ var uriY = new Uri("bolt+routing://789:2");
+
+ var routingTable = new ListBasedRoutingTable(new List {uriA, uriB});
+ var poolManagerMock = new Mock();
+ poolManagerMock.SetupSequence(x => x.CreateClusterConnection(It.IsAny()))
+ .Returns(connA).Returns(connB);
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+
+ // When
+ var updateRoutingTable = manager.UpdateRoutingTable(conn =>
+ {
+ if (conn.Equals(connA))
+ {
+ return NewRoutingTable(new[] {uriX}, new Uri[0], new[] {uriX});
+ }
+ if (conn.Equals(connB))
+ {
+ return NewRoutingTable(new[] {uriY}, new[] {uriY}, new[] {uriY});
+ }
+ throw new NotSupportedException($"Unknown uri: {conn.Server.Address}");
+ });
+
+ // Then
+ updateRoutingTable.All().Should().ContainInOrder(uriY);
+ manager.IsReadingInAbsenceOfWriter.Should().BeFalse();
+ }
+
+ [Fact]
+ public void ShouldAcceptRoutingTableIfNoWriter()
+ {
+ // Given
+ var uriA = new Uri("bolt+routing://123:1");
+ var connA = new Mock().Object;
+ var uriX = new Uri("bolt+routing://456:1");
+
+ var routingTable = new ListBasedRoutingTable(new List {uriA});
+ var poolManagerMock = new Mock();
+ poolManagerMock.Setup(x => x.CreateClusterConnection(It.IsAny()))
+ .Returns(connA);
+ var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null);
+
+ // When
+ var updateRoutingTable = manager.UpdateRoutingTable(conn =>
+ {
+ if (conn.Equals(connA))
+ {
+ return NewRoutingTable(new[] {uriX}, new[] {uriX});
+ }
+ throw new NotSupportedException($"Unknown uri: {conn.Server.Address}");
+ });
+
+ // Then
+ updateRoutingTable.All().Should().ContainInOrder(uriX);
+ manager.IsReadingInAbsenceOfWriter.Should().BeTrue();
+ }
+ }
+
+ public class TryAcquireConnectionMethod
+ {
+
+
+ }
+
+ internal class ListBasedRoutingTable : IRoutingTable
+ {
+ private readonly List _routers;
+ private List _removed;
+ private int _count = 0;
+
+ public ListBasedRoutingTable(List routers)
+ {
+ _routers = routers;
+ _removed = new List();
+ }
+
+ public bool IsStale(AccessMode mode)
+ {
+ return false;
+ }
+
+ public bool TryNextRouter(out Uri uri)
+ {
+ if (_count >= _routers.Count)
+ {
+ uri = null;
+ return false;
+ }
+ else
+ {
+ uri = _routers[_count++];
+ return true;
+ }
+ }
+
+ public bool TryNextReader(out Uri uri)
+ {
+ throw new NotSupportedException();
+ }
+
+ public bool TryNextWriter(out Uri uri)
+ {
+ throw new NotSupportedException();
+ }
+
+ public bool TryNext(AccessMode mode, out Uri uri)
+ {
+ throw new NotSupportedException();
+ }
+
+ public void Remove(Uri uri)
+ {
+ _removed.Add(uri);
+ }
+
+ public void RemoveWriter(Uri uri)
+ {
+ throw new NotSupportedException();
+ }
+
+ public ISet All()
+ {
+ return new HashSet(_routers.Distinct().Except(_removed.Distinct()));
+ }
+
+ public void Clear()
+ {
+ throw new NotSupportedException();
+ }
+
+ public void PrependRouters(IEnumerable uris)
+ {
+ throw new NotSupportedException();
+ }
+ }
+ }
+}
+
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs
index 723a40c16..406d0271d 100644
--- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs
@@ -296,15 +296,14 @@ public void Release(IPooledConnection connection)
{
if (_disposeCalled)
{
- // pool already disposed
+ // pool already disposed.
return;
}
if (!_inUseConnections.TryRemove(connection))
{
- // pool already disposed
+ // pool already disposed.
return;
}
-
if (IsConnectionReusable(connection))
{
if (IsPoolFull())
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs
index 842abcfd2..4ae3ed676 100644
--- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs
@@ -37,13 +37,13 @@ internal class ClusterConnectionPool : LoggerBase, IClusterConnectionPool
public ClusterConnectionPool(
ConnectionSettings connectionSettings,
ConnectionPoolSettings poolSettings,
- ILogger logger
+ IEnumerable initUris, ILogger logger
)
: base(logger)
{
_connectionSettings = connectionSettings;
_poolSettings = poolSettings;
-
+ Add(initUris);
}
internal ClusterConnectionPool(
@@ -53,7 +53,7 @@ ILogger logger
ConnectionPoolSettings poolSettings=null,
ILogger logger=null
) :
- this(connSettings, poolSettings, logger)
+ this(connSettings, poolSettings, new HashSet(), logger)
{
_fakePool = connectionPool;
_pools = clusterPool;
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs
new file mode 100644
index 000000000..3b0eabe4f
--- /dev/null
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs
@@ -0,0 +1,29 @@
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+using System;
+using System.Collections.Generic;
+using Neo4j.Driver.Internal.Connector;
+
+namespace Neo4j.Driver.Internal.Routing
+{
+ internal interface IClusterConnectionPoolManager
+ {
+ void AddConnectionPool(IEnumerable uris);
+ void UpdateConnectionPool(IEnumerable uris);
+ IConnection CreateClusterConnection(Uri uri);
+ }
+}
\ No newline at end of file
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs
index 4d9d9114a..2181a5d30 100644
--- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs
@@ -1,3 +1,19 @@
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
using System;
using System.Collections.Generic;
using Neo4j.Driver.V1;
@@ -8,8 +24,7 @@ internal interface IRoutingTable
{
bool IsStale(AccessMode mode);
bool TryNextRouter(out Uri uri);
- bool TryNextReader(out Uri uri);
- bool TryNextWriter(out Uri uri);
+ bool TryNext(AccessMode mode, out Uri uri);
void Remove(Uri uri);
void RemoveWriter(Uri uri);
ISet All();
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs
new file mode 100644
index 000000000..1cae7a6c8
--- /dev/null
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs
@@ -0,0 +1,26 @@
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+using Neo4j.Driver.V1;
+
+namespace Neo4j.Driver.Internal.Routing
+{
+ internal interface IRoutingTableManager
+ {
+ IRoutingTable RoutingTable { get; }
+ void EnsureRoutingTableForMode(AccessMode mode);
+ }
+}
\ No newline at end of file
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs
index 7d7ab7014..9adf4874a 100644
--- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs
@@ -16,7 +16,6 @@
// limitations under the License.
using System;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Threading.Tasks;
using Neo4j.Driver.Internal.Connector;
using Neo4j.Driver.V1;
@@ -24,18 +23,13 @@
namespace Neo4j.Driver.Internal.Routing
{
- internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler
+ internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler, IClusterConnectionPoolManager
{
- private IRoutingTable _routingTable;
+ private readonly IRoutingTableManager _routingTableManager;
private readonly IClusterConnectionPool _clusterConnectionPool;
- private readonly IDictionary _routingContext;
private readonly ILogger _logger;
- private readonly object _syncLock = new object();
- private readonly Stopwatch _stopwatch;
- private readonly Uri _seed;
private volatile bool _disposeCalled = false;
- private bool _isReadingInAbsenceOfWriter = false;
public LoadBalancer(
RoutingSettings routingSettings,
@@ -43,29 +37,20 @@ internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler
ConnectionPoolSettings poolSettings,
ILogger logger)
{
+ var uris = connectionSettings.InitialServerUri.Resolve();
_clusterConnectionPool = new ClusterConnectionPool(
- connectionSettings, poolSettings, logger);
-
- _stopwatch = new Stopwatch();
- _routingTable = new RoundRobinRoutingTable(_stopwatch);
-
- _seed = connectionSettings.InitialServerUri;
- _routingContext = routingSettings.RoutingContext;
+ connectionSettings, poolSettings, uris, logger);
+ _routingTableManager = new RoutingTableManager(routingSettings, this, connectionSettings.InitialServerUri, uris, logger);
_logger = logger;
-
- var uris = _seed.Resolve();
- PrependRouters(uris);
}
// for test only
internal LoadBalancer(
IClusterConnectionPool clusterConnPool,
- IRoutingTable routingTable,
- Uri seed = null)
+ IRoutingTableManager routingTableManager)
{
_clusterConnectionPool = clusterConnPool;
- _routingTable = routingTable;
- _seed = seed;
+ _routingTableManager = routingTableManager;
}
public IConnection Acquire(AccessMode mode)
@@ -75,18 +60,7 @@ public IConnection Acquire(AccessMode mode)
ThrowObjectDisposedException();
}
- IConnection conn = null;
- switch (mode)
- {
- case AccessMode.Read:
- conn = AcquireReadConnection();
- break;
- case AccessMode.Write:
- conn = AcquireWriteConnection();
- break;
- default:
- throw new InvalidOperationException($"Unknown access mode {mode}.");
- }
+ IConnection conn = AcquireConnection(mode);
if (_disposeCalled)
{
@@ -103,13 +77,28 @@ public Task AcquireAsync(AccessMode mode)
public void OnConnectionError(Uri uri, Exception e)
{
_logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}.");
- _routingTable.Remove(uri);
+ _routingTableManager.RoutingTable.Remove(uri);
_clusterConnectionPool.Purge(uri);
}
public void OnWriteError(Uri uri)
{
- _routingTable.RemoveWriter(uri);
+ _routingTableManager.RoutingTable.RemoveWriter(uri);
+ }
+
+ public void AddConnectionPool(IEnumerable uris)
+ {
+ _clusterConnectionPool.Add(uris);
+ }
+
+ public void UpdateConnectionPool(IEnumerable uris)
+ {
+ _clusterConnectionPool.Update(uris);
+ }
+
+ public IConnection CreateClusterConnection(Uri uri)
+ {
+ return CreateClusterConnection(uri, AccessMode.Write);
}
public void Dispose()
@@ -124,216 +113,52 @@ protected virtual void Dispose(bool isDisposing)
return;
_disposeCalled = true;
// We cannot set routing table and cluster conn pool to null as we do not want get NPE in concurrent call of dispose and acquire
- _routingTable.Clear();
+ _routingTableManager.RoutingTable.Clear();
_clusterConnectionPool.Dispose();
// cannot set logger to null here otherwise we might concurrent call log and set log to null.
}
- private void PrependRouters(ISet uris)
- {
- _routingTable.PrependRouters(uris);
- _clusterConnectionPool.Add(uris);
- }
-
- private IConnection AcquireReadConnection()
+ private IConnection AcquireConnection(AccessMode mode)
{
- EnsureRoutingTableForMode(AccessMode.Read);
+ _routingTableManager.EnsureRoutingTableForMode(mode);
while (true)
{
Uri uri;
- if (!_routingTable.TryNextReader(out uri))
+ if (!_routingTableManager.RoutingTable.TryNext(mode, out uri))
{
// no server known to routingTable
break;
}
- IConnection conn = CreateClusterConnection(uri, AccessMode.Read);
- if (conn != null)
- {
- return conn;
- }
- }
- throw new SessionExpiredException("Failed to connect to any read server.");
- }
-
- private IConnection AcquireWriteConnection()
- {
- EnsureRoutingTableForMode(AccessMode.Write);
- while (true)
- {
- Uri uri;
- if (!_routingTable.TryNextWriter(out uri))
- {
- break;
- }
-
- IConnection conn = CreateClusterConnection(uri);
+ IConnection conn = CreateClusterConnection(uri, mode);
if (conn != null)
{
return conn;
}
+ //else connection already removed by clusterConnection onError method
}
- throw new SessionExpiredException("Failed to connect to any write server.");
- }
-
- private void EnsureRoutingTableForMode(AccessMode mode)
- {
- lock (_syncLock)
- {
- if (!IsRoutingTableStale(_routingTable, mode))
- {
- return;
- }
-
- var routingTable = UpdateRoutingTableWithInitialUriFallback();
- _clusterConnectionPool.Update(routingTable.All());
- _routingTable = routingTable;
- _logger?.Info($"Updated routingTable to be {_routingTable}");
- }
- }
-
- private bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read)
- {
- lock (_syncLock)
- {
- switch (mode)
- {
- case AccessMode.Read:
- if (routingTable.IsStale(AccessMode.Read))
- {
- return true;
- }
- _isReadingInAbsenceOfWriter = routingTable.IsStale(AccessMode.Write);
- return false;
- case AccessMode.Write:
- return routingTable.IsStale(AccessMode.Write);
- default:
- throw new InvalidOperationException($"Unknown access mode {mode}.");
- }
- }
- }
-
- internal IRoutingTable UpdateRoutingTableWithInitialUriFallback(
- Func, IRoutingTable> updateRoutingTableFunc = null,
- Func> resolveInitialUriFunc = null)
- {
- lock (_syncLock)
- {
- updateRoutingTableFunc = updateRoutingTableFunc ?? (u => UpdateRoutingTable(null, u));
- resolveInitialUriFunc = resolveInitialUriFunc ?? _seed.Resolve;
-
- var hasPrependedInitialRouters = false;
- if (_isReadingInAbsenceOfWriter)
- {
- PrependRouters(resolveInitialUriFunc());
- hasPrependedInitialRouters = true;
- }
-
- var triedUris = new HashSet();
- var routingTable = updateRoutingTableFunc(triedUris);
- if (routingTable != null)
- {
- return routingTable;
- }
-
- if (!hasPrependedInitialRouters)
- {
- var uris = resolveInitialUriFunc();
- uris.ExceptWith(triedUris);
- if (uris.Count != 0)
- {
- PrependRouters(uris);
- routingTable = updateRoutingTableFunc(null);
- if (routingTable != null)
- {
- return routingTable;
- }
- }
- }
- // We retied and tried our best however there is just no cluster.
- // This is the ultimate place we will inform the user that you need to re-create a driver
- throw new ServiceUnavailableException(
- "Failed to connect to any routing server. " +
- "Please make sure that the cluster is up and can be accessed by the driver and retry.");
- }
- }
-
- internal IRoutingTable UpdateRoutingTable(Func rediscoveryFunc = null,
- ISet < Uri> triedUris = null)
- {
- lock (_syncLock)
- {
- rediscoveryFunc = rediscoveryFunc ?? Rediscovery;
- while (true)
- {
- Uri uri;
- if (!_routingTable.TryNextRouter(out uri))
- {
- // no alive server
- return null;
- }
- triedUris?.Add(uri);
- IConnection conn = CreateClusterConnection(uri);
- if (conn != null)
- {
- try
- {
- var roundRobinRoutingTable = rediscoveryFunc(conn);
- if (!IsRoutingTableStale(roundRobinRoutingTable))
- {
- return roundRobinRoutingTable;
- }
- }
- catch (Exception e)
- {
- _logger?.Info($"Failed to update routing table with server uri={uri} due to error {e.Message}");
- if (e is SessionExpiredException)
- {
- // ignored
- // Already handled by clusterConn.OnConnectionError to remove from load balancer
- }
- else
- {
- throw;
- }
- }
- }
- }
- }
+ var name = mode == AccessMode.Read ? "read" : "wrtie";
+ throw new SessionExpiredException($"Failed to connect to any {name} server.");
}
- private ClusterConnection CreateClusterConnection(Uri uri, AccessMode mode = AccessMode.Write)
+ private IConnection CreateClusterConnection(Uri uri, AccessMode mode)
{
- lock (_syncLock)
+ try
{
- try
+ IConnection conn;
+ if (_clusterConnectionPool.TryAcquire(uri, out conn))
{
- IConnection conn;
- if (_clusterConnectionPool.TryAcquire(uri, out conn))
- {
- return new ClusterConnection(conn, uri, mode, this);
- }
- OnConnectionError(uri, new ArgumentException(
- $"Routing table {_routingTable} contains a server {uri} " +
- $"that is not known to cluster connection pool {_clusterConnectionPool}."));
+ return new ClusterConnection(conn, uri, mode, this);
}
- catch (ServiceUnavailableException e)
- {
- OnConnectionError(uri, e);
- }
- return null;
+ OnConnectionError(uri, new ArgumentException(
+ $"Routing table {_routingTableManager.RoutingTable} contains a server {uri} " +
+ $"that is not known to cluster connection pool {_clusterConnectionPool}."));
}
- }
-
- private IRoutingTable Rediscovery(IConnection conn)
- {
- lock (_syncLock)
+ catch (ServiceUnavailableException e)
{
- var discoveryManager = new ClusterDiscoveryManager(conn, _routingContext, _logger);
- discoveryManager.Rediscovery();
- return new RoundRobinRoutingTable(discoveryManager.Routers, discoveryManager.Readers,
- discoveryManager.Writers, _stopwatch, discoveryManager.ExpireAfterSeconds);
+ OnConnectionError(uri, e);
}
+ return null;
}
private void ThrowObjectDisposedException()
@@ -341,15 +166,9 @@ private void ThrowObjectDisposedException()
FailedToCreateConnection(this);
}
- internal bool IsReadingInAbsenceOfWriter
- {
- get { return _isReadingInAbsenceOfWriter; }
- set { _isReadingInAbsenceOfWriter = value; }
- }
-
public override string ToString()
{
- return $"{nameof(_routingTable)}: {{{_routingTable}}}, " +
+ return $"{nameof(_routingTableManager.RoutingTable)}: {{{_routingTableManager.RoutingTable}}}, " +
$"{nameof(_clusterConnectionPool)}: {{{_clusterConnectionPool}}}";
}
}
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs
index 30804dee6..4c1831c66 100644
--- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs
@@ -32,22 +32,23 @@ internal class RoundRobinRoutingTable : IRoutingTable
private readonly Stopwatch _stopwatch;
private readonly long _expireAfterSeconds;
- public RoundRobinRoutingTable(Stopwatch stopwatch, long expireAfterSeconds = 0)
+ public RoundRobinRoutingTable(IEnumerable routers, long expireAfterSeconds = 0)
{
_expireAfterSeconds = expireAfterSeconds;
- _stopwatch = stopwatch;
+ _stopwatch = new Stopwatch();
_stopwatch.Restart();
+ _routers.Add(routers);// init
}
public RoundRobinRoutingTable(IEnumerable routers, IEnumerable readers, IEnumerable writers,
- Stopwatch stopwatch, long expireAfterSeconds)
+ long expireAfterSeconds)
{
_routers.Add(routers);
_readers.Add(readers);
_writers.Add(writers);
_expireAfterSeconds = expireAfterSeconds;
- _stopwatch = stopwatch;
+ _stopwatch = new Stopwatch();
_stopwatch.Restart();
}
@@ -74,6 +75,19 @@ public bool TryNextWriter(out Uri uri)
return _writers.TryNext(out uri);
}
+ public bool TryNext(AccessMode mode, out Uri uri)
+ {
+ switch (mode)
+ {
+ case AccessMode.Read:
+ return TryNextReader(out uri);
+ case AccessMode.Write:
+ return TryNextWriter(out uri);
+ default:
+ throw new InvalidOperationException($"Unknown access mode {mode}");
+ }
+ }
+
public void Remove(Uri uri)
{
_routers.Remove(uri);
diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs
new file mode 100644
index 000000000..ecf431cfe
--- /dev/null
+++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs
@@ -0,0 +1,234 @@
+// Copyright (c) 2002-2017 "Neo Technology,"
+// Network Engine for Objects in Lund AB [http://neotechnology.com]
+//
+// This file is part of Neo4j.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+using System;
+using System.Collections.Generic;
+using Neo4j.Driver.Internal.Connector;
+using Neo4j.Driver.V1;
+
+namespace Neo4j.Driver.Internal.Routing
+{
+ internal class RoutingTableManager : IRoutingTableManager
+ {
+ private readonly ILogger _logger;
+
+ private readonly Uri _seedUri;
+ private readonly IDictionary _routingContext;
+
+ private IRoutingTable _routingTable;
+ public IRoutingTable RoutingTable
+ {
+ get => _routingTable;
+ set => _routingTable = value;
+ }
+
+ private readonly IClusterConnectionPoolManager _poolManager;
+
+ private readonly object _syncLock = new object();
+
+ private bool _isReadingInAbsenceOfWriter = false;
+ public bool IsReadingInAbsenceOfWriter
+ {
+ get => _isReadingInAbsenceOfWriter;
+ set => _isReadingInAbsenceOfWriter = value;
+ }
+
+ public RoutingTableManager(
+ RoutingSettings routingSettings,
+ IClusterConnectionPoolManager poolManager,
+ Uri seedUri,
+ ISet initUris,
+ ILogger logger) :
+ this(new RoundRobinRoutingTable(initUris),
+ routingSettings, poolManager, seedUri, logger)
+ {
+ }
+
+ public RoutingTableManager(
+ IRoutingTable routingTable,
+ RoutingSettings routingSettings,
+ IClusterConnectionPoolManager poolManager,
+ Uri seedUri,
+ ILogger logger)
+ {
+ _routingTable = routingTable;
+ _routingContext = routingSettings.RoutingContext;
+ _poolManager = poolManager;
+ _seedUri = seedUri;
+
+ _logger = logger;
+ }
+
+ public bool TryAcquireConnection(AccessMode mode, out Uri uri)
+ {
+ return _routingTable.TryNext(mode, out uri);
+ }
+
+ public void EnsureRoutingTableForMode(AccessMode mode)
+ {
+ lock (_syncLock)
+ {
+ if (!IsRoutingTableStale(_routingTable, mode))
+ {
+ return;
+ }
+
+ var routingTable = UpdateRoutingTableWithInitialUriFallback(_seedUri.Resolve());
+ _poolManager.UpdateConnectionPool(routingTable.All());
+ _routingTable = routingTable;
+ _logger?.Info($"Updated routingTable to be {_routingTable}");
+ }
+ }
+
+ public bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read)
+ {
+ lock (_syncLock)
+ {
+ switch (mode)
+ {
+ case AccessMode.Read:
+ if (routingTable.IsStale(AccessMode.Read))
+ {
+ return true;
+ }
+ _isReadingInAbsenceOfWriter = routingTable.IsStale(AccessMode.Write);
+ return false;
+ case AccessMode.Write:
+ return routingTable.IsStale(AccessMode.Write);
+ default:
+ throw new InvalidOperationException($"Unknown access mode {mode}.");
+ }
+ }
+ }
+
+ public void PrependRouters(ISet uris)
+ {
+ lock (_syncLock)
+ {
+ _routingTable.PrependRouters(uris);
+ _poolManager.AddConnectionPool(uris);
+ }
+ }
+
+ public IRoutingTable UpdateRoutingTableWithInitialUriFallback(ISet initialUriSet,
+ Func, IRoutingTable> updateRoutingTableFunc = null)
+ {
+ lock (_syncLock)
+ {
+ updateRoutingTableFunc = updateRoutingTableFunc ?? (u => UpdateRoutingTable(null, u));
+
+ var hasPrependedInitialRouters = false;
+ if (_isReadingInAbsenceOfWriter)
+ {
+ PrependRouters(initialUriSet);
+ hasPrependedInitialRouters = true;
+ }
+
+ var triedUris = new HashSet();
+ var routingTable = updateRoutingTableFunc(triedUris);
+ if (routingTable != null)
+ {
+ return routingTable;
+ }
+
+ if (!hasPrependedInitialRouters)
+ {
+ var uris = initialUriSet;
+ uris.ExceptWith(triedUris);
+ if (uris.Count != 0)
+ {
+ PrependRouters(uris);
+ routingTable = updateRoutingTableFunc(null);
+ if (routingTable != null)
+ {
+ return routingTable;
+ }
+ }
+ }
+ // We retied and tried our best however there is just no cluster.
+ // This is the ultimate place we will inform the user that you need to re-create a driver
+ throw new ServiceUnavailableException(
+ "Failed to connect to any routing server. " +
+ "Please make sure that the cluster is up and can be accessed by the driver and retry.");
+ }
+ }
+
+ public IRoutingTable UpdateRoutingTable(Func rediscoveryFunc = null,
+ ISet triedUris = null)
+ {
+ lock (_syncLock)
+ {
+ rediscoveryFunc = rediscoveryFunc ?? Rediscovery;
+ while (true)
+ {
+ Uri uri;
+ if (!_routingTable.TryNextRouter(out uri))
+ {
+ // no alive server
+ return null;
+ }
+ triedUris?.Add(uri);
+ IConnection conn = _poolManager.CreateClusterConnection(uri);
+ if (conn == null)
+ {
+ _routingTable.Remove(uri);
+ }
+ else
+ {
+ try
+ {
+ var roundRobinRoutingTable = rediscoveryFunc(conn);
+ if (!IsRoutingTableStale(roundRobinRoutingTable))
+ {
+ return roundRobinRoutingTable;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger?.Info(
+ $"Failed to update routing table with server uri={uri} due to error {e.Message}");
+ if (e is SessionExpiredException)
+ {
+ // ignored
+ // Already handled by clusterConn.OnConnectionError to remove from load balancer
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public IRoutingTable Rediscovery(IConnection conn)
+ {
+ lock (_syncLock)
+ {
+ var discoveryManager = new ClusterDiscoveryManager(conn, _routingContext, _logger);
+ discoveryManager.Rediscovery();
+ return new RoundRobinRoutingTable(discoveryManager.Routers, discoveryManager.Readers,
+ discoveryManager.Writers, discoveryManager.ExpireAfterSeconds);
+ }
+ }
+
+ public void Clear()
+ {
+ _routingTable.Clear();
+ }
+ }
+}