From 2496a81254e6bc7f8e10988acb89d58606697331 Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Wed, 12 Jul 2017 14:59:24 +0200 Subject: [PATCH] Refactoring to pull RoutingTable and Rediscovery in to RoutingTableManager to simplify the logic in LoadBalancer. --- .../Neo4j.Driver.Tests.csproj | 1 + .../Routing/ClusterConnectionPoolTests.cs | 18 + .../Routing/LoadBalancerTests.cs | 737 ++++-------------- .../Routing/RoundRobinRoutingTableTests.cs | 25 +- .../Routing/RoutingTableManagerTests.cs | 445 +++++++++++ .../Neo4j.Driver/Internal/ConnectionPool.cs | 5 +- .../Internal/Routing/ClusterConnectionPool.cs | 6 +- .../Routing/IClusterConnectionPoolManager.cs | 29 + .../Internal/Routing/IRoutingTable.cs | 19 +- .../Internal/Routing/IRoutingTableManager.cs | 26 + .../Internal/Routing/LoadBalancer.cs | 271 ++----- .../Routing/RoundRobinRoutingTable.cs | 22 +- .../Internal/Routing/RoutingTableManager.cs | 234 ++++++ 13 files changed, 990 insertions(+), 848 deletions(-) create mode 100644 Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs create mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs create mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs create mode 100644 Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj index d800e715b..07444a6af 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Neo4j.Driver.Tests.csproj @@ -116,6 +116,7 @@ + diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs index 420bbdf5d..50b6822b2 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/ClusterConnectionPoolTests.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using FluentAssertions; using Moq; @@ -32,6 +33,23 @@ public class ClusterConnectionPoolTests { private static Uri ServerUri { get; } = new Uri("bolt+routing://1234:5678"); + public class Constructor + { + [Fact] + public void ShouldEnsureInitialRouter() + { + var uris = new HashSet{new Uri("bolt://123:456")}; + var config = Config.DefaultConfig; + var connSettings = new ConnectionSettings(ServerUri, new Mock().Object, config); + var poolSettings = new ConnectionPoolSettings(config); + + var pool = new ClusterConnectionPool(connSettings, poolSettings, uris, null); + + pool.ToString().Should().Be( + "[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]"); + } + } + public class TryAcquireMethod { [Fact] diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs index d95997728..abf356aaa 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/LoadBalancerTests.cs @@ -16,526 +16,34 @@ // limitations under the License. using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading; +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. using FluentAssertions; using Moq; -using Neo4j.Driver.Internal; using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.Internal.Routing; using Neo4j.Driver.V1; using Xunit; +using static Neo4j.Driver.Tests.Routing.RoutingTableManagerTests; namespace Neo4j.Driver.Tests.Routing { public class LoadBalancerTests { - public class Constructor - { - [Fact] - public void ShouldEnsureInitialRouter() - { - var uri = new Uri("bolt://123:456"); - var config = Config.DefaultConfig; - var routingSettings = new RoutingSettings(new Dictionary()); - var connSettings = new ConnectionSettings(uri, new Mock().Object, config); - var poolSettings = new ConnectionPoolSettings(config); - - var loadbalancer = new LoadBalancer(routingSettings, connSettings, poolSettings, null); - - loadbalancer.ToString().Should().Be( - "_routingTable: {[_routers: bolt://123:456/], [_detachedRouters: ], [_readers: ], [_writers: ]}, " + - "_clusterConnectionPool: {[{bolt://123:456/ : _availableConnections: {[]}, _inUseConnections: {[]}}]}"); - } - } - - public class AcquireConnectionMethod - { - public class UpdateRoutingTableWithInitialUriFallbackMethod - { - [Fact] - public void ShouldPrependInitialRouterIfWriterIsAbsent() - { - // Given - var uri = new Uri("bolt+routing://123:456"); - - var routingTableMock = new Mock(); - routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) - .Callback>(r => r.Single().Should().Be(uri)); - - var poolMock = new Mock(); - poolMock.Setup(x => x.Add(It.IsAny>())) - .Callback>(r => r.Single().Should().Be(uri)); - - var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object, uri); - balancer.IsReadingInAbsenceOfWriter = true; - var routingTableReturnMock = new Mock(); - - // When - // should throw an exception as the initial routers should not be tried again - var exception = Record.Exception(()=> - balancer.UpdateRoutingTableWithInitialUriFallback(c => c != null ? null : routingTableReturnMock.Object)); - exception.Should().BeOfType(); - - // Then - poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once); - routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); - } - - [Fact] - public void ShouldAddInitialUriWhenNoAvailableRouters() - { - // Given - var uri = new Uri("bolt+routing://123:456"); - - var routingTableMock = new Mock(); - routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) - .Callback>(r => r.Single().Should().Be(uri)); - - var poolMock = new Mock(); - poolMock.Setup(x => x.Add(It.IsAny>())) - .Callback>(r => r.Single().Should().Be(uri)); - - var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object, uri); - - var routingTableReturnMock = new Mock(); - - // When - balancer.UpdateRoutingTableWithInitialUriFallback(c => c != null ? null : routingTableReturnMock.Object); - - // Then - poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once); - routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); - } - - [Fact] - public void ShouldNotTryInitialUriIfAlreadyTried() - { - // Given - var a = new Uri("bolt+routing://123:456"); - var b = new Uri("bolt+routing://123:789"); - var s = a; // should not be retried - var t = new Uri("bolt+routing://222:123"); // this should be retried - - var routingTableMock = new Mock(); - routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) - // ensure the retried is only t - .Callback>(set => set.Single().Should().Be(t)); - - var poolMock = new Mock(); - poolMock.Setup(x => x.Add(It.IsAny>())) - // ensure the retried is only t - .Callback>(set => set.Single().Should().Be(t)); - - var balancer = new LoadBalancer(poolMock.Object, routingTableMock.Object); - - Func, IRoutingTable> updateRoutingTableFunc = set => - { - if (set != null) - { - set.Add(a); - set.Add(b); - return null; - } - else - { - return new Mock().Object; - } - }; - Func> resolveInitialUriFunc = () => new HashSet {s, t}; - // When - balancer.UpdateRoutingTableWithInitialUriFallback(updateRoutingTableFunc, resolveInitialUriFunc); - - // Then - // verify the method is actually called - poolMock.Verify(x => x.Add(It.IsAny>()), Times.Once); - routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); - } - } - - public class UpdateRoutingTableMethod - { - [Fact] - public void ConcurrentUpdateRequestsHaveNoEffect() - { - // Given - var uri = new Uri("bolt+routing://123:456"); - var balancer = SetupLoadBalancer(new[] {uri}); - var balancerRoutingTable = NewRoutingTable(new[] {uri}); - - var anotherUri = new Uri("bolt+routing://123:789"); - var updateCount = 0; - var directReturnCount = 0; - Func updateRoutingTableFunc = - connection => - { - if (!balancerRoutingTable.IsStale(AccessMode.Write)) - { - Interlocked.Add(ref directReturnCount, 1); - return balancerRoutingTable; - } - Interlocked.Add(ref updateCount, 1); - // needs to return a valid routing table to stop update routing table - var newTable = NewRoutingTable(new[] { anotherUri }, new[] { anotherUri }, new[] { anotherUri }); - balancerRoutingTable = newTable; - return newTable; - }; - - // When calling update routing table in many threads - var size = 10; - Thread[] threads = new Thread[size]; - for (var i = 0; i < size; i++) - { - var thread = new Thread(() => - { - var result = balancer.UpdateRoutingTable(updateRoutingTableFunc); - result.All().Should().ContainInOrder(anotherUri); - }); - threads[i] = thread; - thread.Start(); - } - - foreach (var thread in threads) - { - thread.Join(); - } - updateCount.Should().Be(1); - directReturnCount.Should().Be(size-1); - } - - [Fact] - public void UpdateReplaceEntireRoutingTable() - { - // Given - var uriA = new Uri("bolt+routing://123a:456"); - var uriB = new Uri("bolt+routing://123b:456"); - var uriC = new Uri("bolt+routing://123c:456"); - var balancer = SetupLoadBalancer(new[] { uriA }, new[] { uriB }, new[] { uriC }); - - // When - var uriX = new Uri("bolt+routing://123x:789"); - var uriY = new Uri("bolt+routing://123y:789"); - var uriZ = new Uri("bolt+routing://123z:789"); - var result = balancer.UpdateRoutingTable(connection - => NewRoutingTable(new[] {uriX}, new[] { uriY }, new[] { uriZ })); - - // Then - result.All().Should().Contain(new [] {uriX, uriY, uriZ}); - } - - [Fact] - public void ShouldForgetAndTryNextRouterWhenFailedWithConnectionError() - { - // Given - var uriA = new Uri("bolt+routing://123:456"); - var uriB = new Uri("bolt+routing://123:789"); - - // This ensures that uri and uri2 will return in order - var routingTable = new ListBasedRoutingTable(new List {uriA, uriB}); - var balancer = SetupLoadBalancer(routingTable); - - // When - var newRoutingTable = balancer.UpdateRoutingTable(connection => - { - // the second connectin will give a new routingTable - if (connection.Server.Address.Equals(uriA.ToString())) // uriA - { - ((ClusterConnection)connection).OnError(new ServiceUnavailableException("failed init")); - } - if (connection.Server.Address.Equals(uriB.ToString())) // uriB - { - return NewRoutingTable(new[] {uriA}, new [] {uriA}, new []{uriA}); - } - - throw new NotSupportedException($"Unknown uri: {connection.Server.Address}"); - }); - - // Then - newRoutingTable.All().Should().ContainInOrder(uriA); - routingTable.All().Should().ContainInOrder(uriB); - } - - [Fact] - public void ShouldPropagateServiceUnavailable() - { - var balancer = SetupLoadBalancer(new[] {new Uri("bolt+routing://123:456")}); - - var exception = Record.Exception(()=>balancer.UpdateRoutingTable( - conn => { throw new ServiceUnavailableException("Procedure not found"); })); - - exception.Should().BeOfType(); - exception.Message.Should().Be("Procedure not found"); - } - - [Fact] - public void ShouldTryNextRouterIfNoReader() - { - // Given - var uriA = new Uri("bolt+routing://123:1"); - var uriB = new Uri("bolt+routing://123:2"); - - var uriX = new Uri("bolt+routing://456:1"); - var uriY = new Uri("bolt+routing://789:2"); - - var balancer = SetupLoadBalancer(new ListBasedRoutingTable(new List {uriA, uriB})); - - // When - var updateRoutingTable = balancer.UpdateRoutingTable(conn => - { - if (conn.Server.Address.Equals(uriA.ToString())) - { - return NewRoutingTable(new[] {uriX}, new Uri[0], new[] {uriX}); - } - if (conn.Server.Address.Equals(uriB.ToString())) - { - return NewRoutingTable(new[] {uriY}, new[] {uriY}, new[] {uriY}); - } - throw new NotSupportedException($"Unknown uri: {conn.Server.Address}"); - }); - - // Then - updateRoutingTable.All().Should().ContainInOrder(uriY); - balancer.IsReadingInAbsenceOfWriter.Should().BeFalse(); - } - - [Fact] - public void ShouldAcceptRoutingTableIfNoWriter() - { - // Given - var uriA = new Uri("bolt+routing://123:1"); - var uriX = new Uri("bolt+routing://456:1"); - - var balancer = SetupLoadBalancer(new ListBasedRoutingTable(new List { uriA })); - - // When - var updateRoutingTable = balancer.UpdateRoutingTable(conn => - { - if (conn.Server.Address.Equals(uriA.ToString())) - { - return NewRoutingTable(new[] {uriX}, new[] {uriX}); - } - throw new NotSupportedException($"Unknown uri: {conn.Server.Address}"); - }); - - // Then - updateRoutingTable.All().Should().ContainInOrder(uriX); - balancer.IsReadingInAbsenceOfWriter.Should().BeTrue(); - } - - [Theory] - [InlineData(1, 1, 1)] - [InlineData(2, 1, 1)] - [InlineData(1, 2, 1)] - [InlineData(2, 2, 1)] - [InlineData(1, 1, 2)] - [InlineData(2, 1, 2)] - [InlineData(1, 2, 2)] - [InlineData(2, 2, 2)] - [InlineData(3, 1, 2)] - [InlineData(3, 2, 1)] - public void ShouldAcceptValidRoutingTables(int routerCount, int writerCount, int readerCount) - { - var balancer = SetupLoadBalancer(new[] {new Uri("bolt+routing://123:45")}); - var newRoutingTable = NewRoutingTable(routerCount, readerCount, writerCount); - var result = balancer.UpdateRoutingTable(connection => newRoutingTable); - - // Then - result.All().Should().Contain(newRoutingTable.All()); - newRoutingTable.All().Should().Contain(result.All()); - balancer.IsReadingInAbsenceOfWriter.Should().BeFalse(); - } - - [Fact] - public void ShouldPropagateProtocolError() - { - var balancer = SetupLoadBalancer(new[] { new Uri("bolt+routing://123:456") }); - - var exception = Record.Exception(() => balancer.UpdateRoutingTable( - conn => { throw new ProtocolException("Cannot parse procedure result"); })); - - exception.Should().BeOfType(); - exception.Message.Should().Be("Cannot parse procedure result"); - } - - [Fact] - public void ShouldPropagateAuthenticationException() - { - // Given - var uri = new Uri("bolt+routing://123:456"); - // a routing table which knows a uri - var routingTable = NewRoutingTable(new[] {uri}); - - var mockedClusterPool = new Mock(); - var balancer = new LoadBalancer(mockedClusterPool.Object, routingTable); - - var mockedConn = new Mock(); - var conn = mockedConn.Object; - mockedClusterPool.Setup(x => x.TryAcquire(uri, out conn)).Callback(() => - { - throw new AuthenticationException("Failed to auth the client to the server."); - }); - - // When - var error = Record.Exception(() => balancer.UpdateRoutingTable()); - - // Then - error.Should().BeOfType(); - error.Message.Should().Contain("Failed to auth the client to the server."); - - // while the server is not removed - routingTable.All().Should().ContainInOrder(uri); - } - } - - public class AcquireReadWriteConnectionMethod - { - [Theory] - [InlineData(AccessMode.Read)] - [InlineData(AccessMode.Write)] - public void ShouldThrowSessionExpiredExceptionIfNoServerAvailable(AccessMode mode) - { - // Given - var mock = CreateRoutingTable(mode, null, false); - var balancer = new LoadBalancer(null, mock.Object); - - // When - var error = Record.Exception(()=>balancer.Acquire(mode)); - - // Then - error.Should().BeOfType(); - error.Message.Should().Contain("Failed to connect to any"); - } - - [Theory] - [InlineData(AccessMode.Read)] - [InlineData(AccessMode.Write)] - public void ShouldReturnConnectionWithCorrectMode(AccessMode mode) - { - // Given - var uri = new Uri("bolt+routing://123:456"); - var mock = CreateRoutingTable(mode, uri); - mock.Setup(m => m.All()).Returns(new HashSet { uri }); - var balancer = SetupLoadBalancer(mock.Object); - - // When - var acquiredConn = balancer.Acquire(mode); - - // Then - acquiredConn.Server.Address.Should().Be(uri.ToString()); - } - - private static Mock CreateRoutingTable(AccessMode mode, Uri uri, bool hasNext = true) - { - var mock = new Mock(); - mock.Setup(m => m.IsStale(It.IsAny())).Returns(false); - if (mode == AccessMode.Read) - { - mock.SetupSequence(m => m.TryNextReader(out uri)).Returns(hasNext).Returns(false); - } - else - { - mock.SetupSequence(m => m.TryNextWriter(out uri)).Returns(hasNext).Returns(false); - } - return mock; - } - - [Theory] - [InlineData(AccessMode.Read)] - [InlineData(AccessMode.Write)] - public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode) - { - // Given - var uri = new Uri("bolt+routing://123:456"); - var routingTableMock = CreateRoutingTable(mode, uri); - - var clusterConnPoolMock = new Mock(); - IConnection conn = null; - clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)) - .Callback(() => - { - throw new ServiceUnavailableException("failed init"); - }); - - var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object); - - // When - var error = Record.Exception(() => balancer.Acquire(mode)); - - // Then - error.Should().BeOfType(); - error.Message.Should().Contain("Failed to connect to any"); - - // should be removed - routingTableMock.Verify(m=>m.Remove(uri), Times.Once); - clusterConnPoolMock.Verify(m=>m.Purge(uri), Times.Once); - } - - [Theory] - [InlineData(AccessMode.Read)] - [InlineData(AccessMode.Write)] - public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode) - { - // Given - var uri = new Uri("bolt+routing://123:456"); - var routingTableMock = CreateRoutingTable(mode, uri); - - var clusterConnPoolMock = new Mock(); - IConnection conn = null; - clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)) - .Callback(() => - { - throw new SecurityException("Failed to establish ssl connection with the server"); - }); - - var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object); - - // When - var error = Record.Exception(() => balancer.Acquire(mode)); - - // Then - error.Should().BeOfType(); - error.Message.Should().Contain("ssl connection with the server"); - - // while the server is not removed - routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); - } - - [Theory] - [InlineData(AccessMode.Read)] - [InlineData(AccessMode.Write)] - public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode) - { - // Given - var uri = new Uri("bolt+routing://123:456"); - var routingTableMock = CreateRoutingTable(mode, uri); - - var clusterConnPoolMock = new Mock(); - IConnection conn = null; - clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(false) - .Callback(() => - { - throw new ProtocolException("do not understand struct 0x01"); - }); - - var balancer = new LoadBalancer(clusterConnPoolMock.Object, routingTableMock.Object); - - // When - var error = Record.Exception(() => balancer.Acquire(mode)); - - // Then - error.Should().BeOfType(); - error.Message.Should().Contain("do not understand struct 0x01"); - - // while the server is not removed - routingTableMock.Verify(m => m.Remove(uri), Times.Never); - clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); - } - } - } - public class ClusterErrorHandlerTests { public class OnConnectionErrorMethod @@ -546,7 +54,9 @@ public void ShouldRmoveFromLoadBalancer() var clusterPoolMock = new Mock(); var routingTableMock = new Mock(); var uri = new Uri("https://neo4j.com"); - var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableMock.Object); + var routingTableManagerMock = new Mock(); + routingTableManagerMock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); + var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnConnectionError(uri, new ClientException()); clusterPoolMock.Verify(x=>x.Purge(uri),Times.Once); @@ -563,7 +73,9 @@ public void ShouldRemoveWriterFromRoutingTable() var clusterPoolMock = new Mock(); var routingTableMock = new Mock(); var uri = new Uri("https://neo4j.com"); - var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableMock.Object); + var routingTableManagerMock = new Mock(); + routingTableManagerMock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); + var loadBalancer = new LoadBalancer(clusterPoolMock.Object, routingTableManagerMock.Object); loadBalancer.OnWriteError(uri); clusterPoolMock.Verify(x => x.Purge(uri), Times.Never); @@ -573,127 +85,140 @@ public void ShouldRemoveWriterFromRoutingTable() } } - internal class ListBasedRoutingTable : IRoutingTable + public class AcquireMethod { - private readonly List _routers; - private readonly List _removed; - private int _count = 0; - - public ListBasedRoutingTable(List routers) + [Theory] + [InlineData(AccessMode.Read)] + [InlineData(AccessMode.Write)] + public void ShouldThrowSessionExpiredExceptionIfNoServerAvailable(AccessMode mode) { - _routers = routers; - _removed = new List(); - } - public bool IsStale(AccessMode mode) - { - return false; - } + // Given + var mock = new Mock(); + mock.Setup(x => x.RoutingTable).Returns(NewMockedRoutingTable(mode, null, false).Object); + var balancer = new LoadBalancer(null, mock.Object); - public bool TryNextRouter(out Uri uri) - { - var pos = _count++ % _routers.Count; - uri = _routers[pos]; - return true; - } + // When + var error = Record.Exception(() => balancer.Acquire(mode)); - public bool TryNextReader(out Uri uri) - { - throw new NotSupportedException(); + // Then + error.Should().BeOfType(); + error.Message.Should().Contain("Failed to connect to any"); } - public bool TryNextWriter(out Uri uri) + [Theory] + [InlineData(AccessMode.Read)] + [InlineData(AccessMode.Write)] + public void ShouldReturnConnectionWithCorrectMode(AccessMode mode) { - throw new NotSupportedException(); - } + // Given + var uri = new Uri("bolt+routing://123:456"); + var mock = new Mock(); + var routingTableMock = NewMockedRoutingTable(mode, uri); + mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); - public void Remove(Uri uri) - { - _removed.Add(uri); - } + var clusterPoolMock = new Mock(); + var mockedConn = new Mock(); + mockedConn.Setup(x => x.Server.Address).Returns(uri.ToString); + var conn = mockedConn.Object; + clusterPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(true); + var balancer = new LoadBalancer(clusterPoolMock.Object, mock.Object); + + // When + var acquiredConn = balancer.Acquire(mode); - public void RemoveWriter(Uri uri) - { - throw new NotSupportedException(); + // Then + acquiredConn.Server.Address.Should().Be(uri.ToString()); } - public ISet All() + [Theory] + [InlineData(AccessMode.Read)] + [InlineData(AccessMode.Write)] + public void ShouldForgetServerWhenFailedToEstablishConn(AccessMode mode) { - return new HashSet(_routers.Distinct().Except(_removed.Distinct())); - } + // Given + var uri = new Uri("bolt+routing://123:456"); + var routingTableMock = NewMockedRoutingTable(mode, uri); + var mock = new Mock(); + mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); - public void Clear() - { - throw new NotSupportedException(); - } + var clusterConnPoolMock = new Mock(); + IConnection conn = null; + clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)) + .Callback(() => throw new ServiceUnavailableException("failed init")); - public void PrependRouters(IEnumerable uris) - { - throw new NotSupportedException(); - } - } + var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object); - private static IRoutingTable NewRoutingTable(int routerCount, int readerCount, int writerCount) - { - return NewRoutingTable(GenerateServerUris(routerCount), GenerateServerUris(readerCount), - GenerateServerUris(writerCount)); - } + // When + var error = Record.Exception(() => balancer.Acquire(mode)); - private static IEnumerable GenerateServerUris(int count) - { - var uris = new Uri[count]; - for (var i = 0; i < count; i++) - { - uris[i] = new Uri($"bolt+routing://127.0.0.1:{i + 9001}"); - } - return uris; - } + // Then + error.Should().BeOfType(); + error.Message.Should().Contain("Failed to connect to any"); - private static IRoutingTable NewRoutingTable( - IEnumerable routers = null, - IEnumerable readers = null, - IEnumerable writers = null) - { - // assign default value of uri - if (routers == null) - { - routers = new Uri[0]; + // should be removed + routingTableMock.Verify(m => m.Remove(uri), Times.Once); + clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Once); } - if (readers == null) + + [Theory] + [InlineData(AccessMode.Read)] + [InlineData(AccessMode.Write)] + public void ShouldThrowErrorDirectlyIfSecurityError(AccessMode mode) { - readers = new Uri[0]; + // Given + var uri = new Uri("bolt+routing://123:456"); + var routingTableMock = NewMockedRoutingTable(mode, uri); + var mock = new Mock(); + mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); + + var clusterConnPoolMock = new Mock(); + IConnection conn = null; + clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)) + .Callback(() => throw new SecurityException("Failed to establish ssl connection with the server")); + + var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object); + + // When + var error = Record.Exception(() => balancer.Acquire(mode)); + + // Then + error.Should().BeOfType(); + error.Message.Should().Contain("ssl connection with the server"); + + // while the server is not removed + routingTableMock.Verify(m => m.Remove(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); } - if (writers == null) + + [Theory] + [InlineData(AccessMode.Read)] + [InlineData(AccessMode.Write)] + public void ShouldThrowErrorDirectlyIfProtocolError(AccessMode mode) { - writers = new Uri[0]; - } - return new RoundRobinRoutingTable(routers, readers, writers, new Stopwatch(), 1000); - } + // Given + var uri = new Uri("bolt+routing://123:456"); + var routingTableMock = NewMockedRoutingTable(mode, uri); + var mock = new Mock(); + mock.Setup(x => x.RoutingTable).Returns(routingTableMock.Object); - private static LoadBalancer SetupLoadBalancer( - IEnumerable routers = null, - IEnumerable readers = null, - IEnumerable writers = null) - { - // create a routing table which knows a few servers - var routingTable = NewRoutingTable(routers, readers, writers); - return SetupLoadBalancer(routingTable); - } + var clusterConnPoolMock = new Mock(); + IConnection conn = null; + clusterConnPoolMock.Setup(x => x.TryAcquire(uri, out conn)).Returns(false) + .Callback(() => throw new ProtocolException("do not understand struct 0x01")); - private static LoadBalancer SetupLoadBalancer(IRoutingTable routingTable) - { - var uris = routingTable.All(); + var balancer = new LoadBalancer(clusterConnPoolMock.Object, mock.Object); - // create a mocked cluster connection pool, which will return the same connection for each different uri - var mockedClusterPool = new Mock(); + // When + var error = Record.Exception(() => balancer.Acquire(mode)); - foreach (var uri in uris) - { - var mockedConn = new Mock(); - mockedConn.Setup(x => x.Server.Address).Returns(uri.ToString); - var conn = mockedConn.Object; - mockedClusterPool.Setup(x => x.TryAcquire(uri, out conn)).Returns(true); + // Then + error.Should().BeOfType(); + error.Message.Should().Contain("do not understand struct 0x01"); + + // while the server is not removed + routingTableMock.Verify(m => m.Remove(uri), Times.Never); + clusterConnPoolMock.Verify(m => m.Purge(uri), Times.Never); } - return new LoadBalancer(mockedClusterPool.Object, routingTable); } } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs index d8a986f2c..b261a9999 100644 --- a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoundRobinRoutingTableTests.cs @@ -17,7 +17,10 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using FluentAssertions; +using Moq; +using Neo4j.Driver.Internal; using Neo4j.Driver.Internal.Routing; using Neo4j.Driver.V1; using Xunit; @@ -36,6 +39,23 @@ private static IEnumerable CreateUriArray(int count) return uris; } + public class Constructor + { + [Fact] + public void ShouldEnsureInitialRouter() + { + var initUri = new Uri("bolt://123:456"); + var routers = new HashSet{initUri}; + var table = new RoundRobinRoutingTable(routers); + + Uri uri; + table.TryNextRouter(out uri).Should().BeTrue(); + uri.Should().Be(initUri); + + table.All().Single().Should().Be(initUri); + } + } + public class IsStatleMethod { [Theory] [InlineData(1, 2, 1, 5 * 60, false)] // 1 router, 2 reader, 1 writer @@ -49,7 +69,6 @@ public void ShouldBeStaleInReadModeIfOnlyHaveOneRouter(int routerCount, int read CreateUriArray(routerCount), CreateUriArray(readerCount), CreateUriArray(writerCount), - new Stopwatch(), expireAfterSeconds); table.IsStale(AccessMode.Read).Should().Be(isStale); } @@ -66,7 +85,6 @@ public void ShouldBeStaleInWriteModeIfOnlyHaveOneRouter(int routerCount, int rea CreateUriArray(routerCount), CreateUriArray(readerCount), CreateUriArray(writerCount), - new Stopwatch(), expireAfterSeconds); table.IsStale(AccessMode.Write).Should().Be(isStale); } @@ -82,7 +100,7 @@ public void ShouldInjectInFront() CreateUriArray(3), CreateUriArray(0), CreateUriArray(0), - new Stopwatch(), 5 * 60); + 5 * 60); Uri router; table.TryNextRouter(out router); var head = new Uri("http://neo4j:10"); @@ -102,6 +120,5 @@ public void ShouldInjectInFront() router.Should().Be(head); } } - } } diff --git a/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs new file mode 100644 index 000000000..219e14693 --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver.Tests/Routing/RoutingTableManagerTests.cs @@ -0,0 +1,445 @@ +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +using System; +using System.Collections.Generic; +using System.Linq; +using FluentAssertions; +using Moq; +using Neo4j.Driver.Internal; +using Neo4j.Driver.Internal.Connector; +using Neo4j.Driver.Internal.Routing; +using Neo4j.Driver.V1; +using Xunit; + +namespace Neo4j.Driver.Tests.Routing +{ + public class RoutingTableManagerTests + { + + internal static RoutingTableManager NewRoutingTableManager( + IRoutingTable routingTable, + IClusterConnectionPoolManager poolManager, + Uri seedUri) + { + return new RoutingTableManager( + routingTable, + new RoutingSettings(new Dictionary()), + poolManager, seedUri, null); + } + + internal static Mock NewMockedRoutingTable(AccessMode mode, Uri uri, bool hasNext = true) + { + var mock = new Mock(); + mock.Setup(m => m.IsStale(It.IsAny())).Returns(false); + mock.SetupSequence(m => m.TryNext(mode, out uri)).Returns(hasNext).Returns(false); + return mock; + } + + private static IRoutingTable NewRoutingTable( + IEnumerable routers = null, + IEnumerable readers = null, + IEnumerable writers = null) + { + // assign default value of uri + if (routers == null) + { + routers = new Uri[0]; + } + if (readers == null) + { + readers = new Uri[0]; + } + if (writers == null) + { + writers = new Uri[0]; + } + return new RoundRobinRoutingTable(routers, readers, writers, 1000); + } + + public class UpdateRoutingTableWithInitialUriFallbackMethod + { + [Fact] + public void ShouldPrependInitialRouterIfWriterIsAbsent() + { + // Given + var uri = new Uri("bolt+routing://123:456"); + + var routingTableMock = new Mock(); + routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(uri)); + + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(uri)); + + var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null); + manager.IsReadingInAbsenceOfWriter = true; + var routingTableReturnMock = new Mock(); + + // When + // should throw an exception as the initial routers should not be tried again + var exception = Record.Exception(() => + manager.UpdateRoutingTableWithInitialUriFallback(new HashSet {uri}, c => c != null + ? null + : routingTableReturnMock.Object)); + exception.Should().BeOfType(); + + // Then + poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once); + routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); + } + + [Fact] + public void ShouldAddInitialUriWhenNoAvailableRouters() + { + // Given + var uri = new Uri("bolt+routing://123:456"); + + var routingTableMock = new Mock(); + routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(uri)); + + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(uri)); + + var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null); + var routingTableReturnMock = new Mock(); + + // When + manager.UpdateRoutingTableWithInitialUriFallback(new HashSet {uri}, c => c != null + ? null + : routingTableReturnMock.Object); + + // Then + poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once); + routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); + } + + [Fact] + public void ShouldNotTryInitialUriIfAlreadyTried() + { + // Given + var a = new Uri("bolt+routing://123:456"); + var b = new Uri("bolt+routing://123:789"); + var s = a; // should not be retried + var t = new Uri("bolt+routing://222:123"); // this should be retried + + var routingTableMock = new Mock(); + routingTableMock.Setup(x => x.PrependRouters(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(t)); + + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.AddConnectionPool(It.IsAny>())) + .Callback>(r => r.Single().Should().Be(t)); + + var manager = NewRoutingTableManager(routingTableMock.Object, poolManagerMock.Object, null); + + IRoutingTable UpdateRoutingTableFunc(ISet set) + { + if (set != null) + { + set.Add(a); + set.Add(b); + return null; + } + else + { + return new Mock().Object; + } + } + + // When + var initialUriSet = new HashSet {s, t}; + manager.UpdateRoutingTableWithInitialUriFallback(initialUriSet, UpdateRoutingTableFunc); + + // Then + // verify the method is actually called + poolManagerMock.Verify(x => x.AddConnectionPool(It.IsAny>()), Times.Once); + routingTableMock.Verify(x => x.PrependRouters(It.IsAny>()), Times.Once); + } + } + + public class UpdateRoutingTableMethod + { + [Fact] + public void ShouldForgetAndTryNextRouterWhenConnectionIsNull() + { + // Given + var uriA = new Uri("bolt+routing://123:456"); + var uriB = new Uri("bolt+routing://123:789"); + + // This ensures that uri and uri2 will return in order + var routingTable = new ListBasedRoutingTable(new List {uriA, uriB}); + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.CreateClusterConnection(It.IsAny())) + .Returns((ClusterConnection) null); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + // When + var newRoutingTable = manager.UpdateRoutingTable(connection => + throw new NotSupportedException($"Unknown uri: {connection.Server.Address}")); + + // Then + newRoutingTable.Should().BeNull(); + routingTable.All().Should().BeEmpty(); + } + + [Fact] + public void ShouldForgetAndTryNextRouterWhenFailedWithConnectionError() + { + // Given + var uriA = new Uri("bolt+routing://123:456"); + var uriB = new Uri("bolt+routing://123:789"); + var connA = new Mock().Object; + var connB = new Mock().Object; + + // This ensures that uri and uri2 will return in order + var routingTable = new ListBasedRoutingTable(new List {uriA, uriB}); + var poolManagerMock = new Mock(); + poolManagerMock.SetupSequence(x => x.CreateClusterConnection(It.IsAny())) + .Returns(connA).Returns(connB); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + // When + var newRoutingTable = manager.UpdateRoutingTable(connection => + { + // the second connectin will give a new routingTable + if (connection.Equals(connA)) // uriA + { + routingTable.Remove(uriA); + throw new SessionExpiredException("failed init"); + } + if (connection.Equals(connB)) // uriB + { + return NewRoutingTable(new[] {uriA}, new[] {uriA}, new[] {uriA}); + } + + throw new NotSupportedException($"Unknown uri: {connection.Server.Address}"); + }); + + // Then + newRoutingTable.All().Should().ContainInOrder(uriA); + routingTable.All().Should().ContainInOrder(uriB); + } + + [Fact] + public void ShouldPropagateServiceUnavailable() + { + var uri = new Uri("bolt+routing://123:456"); + var routingTable = new ListBasedRoutingTable(new List { uri }); + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.CreateClusterConnection(uri)) + .Returns(new Mock().Object); + + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + var exception = Record.Exception(() => manager.UpdateRoutingTable( + conn => throw new ServiceUnavailableException("Procedure not found"))); + + exception.Should().BeOfType(); + exception.Message.Should().Be("Procedure not found"); + } + + + [Fact] + public void ShouldPropagateProtocolError() + { + var uri = new Uri("bolt+routing://123:456"); + var routingTable = new ListBasedRoutingTable(new List {uri}); + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.CreateClusterConnection(uri)) + .Returns(new Mock().Object); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + var exception = Record.Exception(() => manager.UpdateRoutingTable( + conn => throw new ProtocolException("Cannot parse procedure result"))); + + exception.Should().BeOfType(); + exception.Message.Should().Be("Cannot parse procedure result"); + } + + [Fact] + public void ShouldPropagateAuthenticationException() + { + // Given + var uri = new Uri("bolt+routing://123:456"); + var routingTable = new ListBasedRoutingTable(new List {uri}); + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.CreateClusterConnection(uri)) + .Callback(() => throw new AuthenticationException("Failed to auth the client to the server.")); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + // When + var error = Record.Exception(() => manager.UpdateRoutingTable()); + + // Then + error.Should().BeOfType(); + error.Message.Should().Contain("Failed to auth the client to the server."); + + // while the server is not removed + routingTable.All().Should().ContainInOrder(uri); + } + + [Fact] + public void ShouldTryNextRouterIfNoReader() + { + // Given + var uriA = new Uri("bolt+routing://123:1"); + var uriB = new Uri("bolt+routing://123:2"); + var connA = new Mock().Object; + var connB = new Mock().Object; + + var uriX = new Uri("bolt+routing://456:1"); + var uriY = new Uri("bolt+routing://789:2"); + + var routingTable = new ListBasedRoutingTable(new List {uriA, uriB}); + var poolManagerMock = new Mock(); + poolManagerMock.SetupSequence(x => x.CreateClusterConnection(It.IsAny())) + .Returns(connA).Returns(connB); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + + // When + var updateRoutingTable = manager.UpdateRoutingTable(conn => + { + if (conn.Equals(connA)) + { + return NewRoutingTable(new[] {uriX}, new Uri[0], new[] {uriX}); + } + if (conn.Equals(connB)) + { + return NewRoutingTable(new[] {uriY}, new[] {uriY}, new[] {uriY}); + } + throw new NotSupportedException($"Unknown uri: {conn.Server.Address}"); + }); + + // Then + updateRoutingTable.All().Should().ContainInOrder(uriY); + manager.IsReadingInAbsenceOfWriter.Should().BeFalse(); + } + + [Fact] + public void ShouldAcceptRoutingTableIfNoWriter() + { + // Given + var uriA = new Uri("bolt+routing://123:1"); + var connA = new Mock().Object; + var uriX = new Uri("bolt+routing://456:1"); + + var routingTable = new ListBasedRoutingTable(new List {uriA}); + var poolManagerMock = new Mock(); + poolManagerMock.Setup(x => x.CreateClusterConnection(It.IsAny())) + .Returns(connA); + var manager = NewRoutingTableManager(routingTable, poolManagerMock.Object, null); + + // When + var updateRoutingTable = manager.UpdateRoutingTable(conn => + { + if (conn.Equals(connA)) + { + return NewRoutingTable(new[] {uriX}, new[] {uriX}); + } + throw new NotSupportedException($"Unknown uri: {conn.Server.Address}"); + }); + + // Then + updateRoutingTable.All().Should().ContainInOrder(uriX); + manager.IsReadingInAbsenceOfWriter.Should().BeTrue(); + } + } + + public class TryAcquireConnectionMethod + { + + + } + + internal class ListBasedRoutingTable : IRoutingTable + { + private readonly List _routers; + private List _removed; + private int _count = 0; + + public ListBasedRoutingTable(List routers) + { + _routers = routers; + _removed = new List(); + } + + public bool IsStale(AccessMode mode) + { + return false; + } + + public bool TryNextRouter(out Uri uri) + { + if (_count >= _routers.Count) + { + uri = null; + return false; + } + else + { + uri = _routers[_count++]; + return true; + } + } + + public bool TryNextReader(out Uri uri) + { + throw new NotSupportedException(); + } + + public bool TryNextWriter(out Uri uri) + { + throw new NotSupportedException(); + } + + public bool TryNext(AccessMode mode, out Uri uri) + { + throw new NotSupportedException(); + } + + public void Remove(Uri uri) + { + _removed.Add(uri); + } + + public void RemoveWriter(Uri uri) + { + throw new NotSupportedException(); + } + + public ISet All() + { + return new HashSet(_routers.Distinct().Except(_removed.Distinct())); + } + + public void Clear() + { + throw new NotSupportedException(); + } + + public void PrependRouters(IEnumerable uris) + { + throw new NotSupportedException(); + } + } + } +} + diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs index 723a40c16..406d0271d 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs @@ -296,15 +296,14 @@ public void Release(IPooledConnection connection) { if (_disposeCalled) { - // pool already disposed + // pool already disposed. return; } if (!_inUseConnections.TryRemove(connection)) { - // pool already disposed + // pool already disposed. return; } - if (IsConnectionReusable(connection)) { if (IsPoolFull()) diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs index 842abcfd2..4ae3ed676 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/ClusterConnectionPool.cs @@ -37,13 +37,13 @@ internal class ClusterConnectionPool : LoggerBase, IClusterConnectionPool public ClusterConnectionPool( ConnectionSettings connectionSettings, ConnectionPoolSettings poolSettings, - ILogger logger + IEnumerable initUris, ILogger logger ) : base(logger) { _connectionSettings = connectionSettings; _poolSettings = poolSettings; - + Add(initUris); } internal ClusterConnectionPool( @@ -53,7 +53,7 @@ ILogger logger ConnectionPoolSettings poolSettings=null, ILogger logger=null ) : - this(connSettings, poolSettings, logger) + this(connSettings, poolSettings, new HashSet(), logger) { _fakePool = connectionPool; _pools = clusterPool; diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs new file mode 100644 index 000000000..3b0eabe4f --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IClusterConnectionPoolManager.cs @@ -0,0 +1,29 @@ +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +using System; +using System.Collections.Generic; +using Neo4j.Driver.Internal.Connector; + +namespace Neo4j.Driver.Internal.Routing +{ + internal interface IClusterConnectionPoolManager + { + void AddConnectionPool(IEnumerable uris); + void UpdateConnectionPool(IEnumerable uris); + IConnection CreateClusterConnection(Uri uri); + } +} \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs index 4d9d9114a..2181a5d30 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTable.cs @@ -1,3 +1,19 @@ +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. using System; using System.Collections.Generic; using Neo4j.Driver.V1; @@ -8,8 +24,7 @@ internal interface IRoutingTable { bool IsStale(AccessMode mode); bool TryNextRouter(out Uri uri); - bool TryNextReader(out Uri uri); - bool TryNextWriter(out Uri uri); + bool TryNext(AccessMode mode, out Uri uri); void Remove(Uri uri); void RemoveWriter(Uri uri); ISet All(); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs new file mode 100644 index 000000000..1cae7a6c8 --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/IRoutingTableManager.cs @@ -0,0 +1,26 @@ +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +using Neo4j.Driver.V1; + +namespace Neo4j.Driver.Internal.Routing +{ + internal interface IRoutingTableManager + { + IRoutingTable RoutingTable { get; } + void EnsureRoutingTableForMode(AccessMode mode); + } +} \ No newline at end of file diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs index 7d7ab7014..9adf4874a 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/LoadBalancer.cs @@ -16,7 +16,6 @@ // limitations under the License. using System; using System.Collections.Generic; -using System.Diagnostics; using System.Threading.Tasks; using Neo4j.Driver.Internal.Connector; using Neo4j.Driver.V1; @@ -24,18 +23,13 @@ namespace Neo4j.Driver.Internal.Routing { - internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler + internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler, IClusterConnectionPoolManager { - private IRoutingTable _routingTable; + private readonly IRoutingTableManager _routingTableManager; private readonly IClusterConnectionPool _clusterConnectionPool; - private readonly IDictionary _routingContext; private readonly ILogger _logger; - private readonly object _syncLock = new object(); - private readonly Stopwatch _stopwatch; - private readonly Uri _seed; private volatile bool _disposeCalled = false; - private bool _isReadingInAbsenceOfWriter = false; public LoadBalancer( RoutingSettings routingSettings, @@ -43,29 +37,20 @@ internal class LoadBalancer : IConnectionProvider, IClusterErrorHandler ConnectionPoolSettings poolSettings, ILogger logger) { + var uris = connectionSettings.InitialServerUri.Resolve(); _clusterConnectionPool = new ClusterConnectionPool( - connectionSettings, poolSettings, logger); - - _stopwatch = new Stopwatch(); - _routingTable = new RoundRobinRoutingTable(_stopwatch); - - _seed = connectionSettings.InitialServerUri; - _routingContext = routingSettings.RoutingContext; + connectionSettings, poolSettings, uris, logger); + _routingTableManager = new RoutingTableManager(routingSettings, this, connectionSettings.InitialServerUri, uris, logger); _logger = logger; - - var uris = _seed.Resolve(); - PrependRouters(uris); } // for test only internal LoadBalancer( IClusterConnectionPool clusterConnPool, - IRoutingTable routingTable, - Uri seed = null) + IRoutingTableManager routingTableManager) { _clusterConnectionPool = clusterConnPool; - _routingTable = routingTable; - _seed = seed; + _routingTableManager = routingTableManager; } public IConnection Acquire(AccessMode mode) @@ -75,18 +60,7 @@ public IConnection Acquire(AccessMode mode) ThrowObjectDisposedException(); } - IConnection conn = null; - switch (mode) - { - case AccessMode.Read: - conn = AcquireReadConnection(); - break; - case AccessMode.Write: - conn = AcquireWriteConnection(); - break; - default: - throw new InvalidOperationException($"Unknown access mode {mode}."); - } + IConnection conn = AcquireConnection(mode); if (_disposeCalled) { @@ -103,13 +77,28 @@ public Task AcquireAsync(AccessMode mode) public void OnConnectionError(Uri uri, Exception e) { _logger?.Info($"Server at {uri} is no longer available due to error: {e.Message}."); - _routingTable.Remove(uri); + _routingTableManager.RoutingTable.Remove(uri); _clusterConnectionPool.Purge(uri); } public void OnWriteError(Uri uri) { - _routingTable.RemoveWriter(uri); + _routingTableManager.RoutingTable.RemoveWriter(uri); + } + + public void AddConnectionPool(IEnumerable uris) + { + _clusterConnectionPool.Add(uris); + } + + public void UpdateConnectionPool(IEnumerable uris) + { + _clusterConnectionPool.Update(uris); + } + + public IConnection CreateClusterConnection(Uri uri) + { + return CreateClusterConnection(uri, AccessMode.Write); } public void Dispose() @@ -124,216 +113,52 @@ protected virtual void Dispose(bool isDisposing) return; _disposeCalled = true; // We cannot set routing table and cluster conn pool to null as we do not want get NPE in concurrent call of dispose and acquire - _routingTable.Clear(); + _routingTableManager.RoutingTable.Clear(); _clusterConnectionPool.Dispose(); // cannot set logger to null here otherwise we might concurrent call log and set log to null. } - private void PrependRouters(ISet uris) - { - _routingTable.PrependRouters(uris); - _clusterConnectionPool.Add(uris); - } - - private IConnection AcquireReadConnection() + private IConnection AcquireConnection(AccessMode mode) { - EnsureRoutingTableForMode(AccessMode.Read); + _routingTableManager.EnsureRoutingTableForMode(mode); while (true) { Uri uri; - if (!_routingTable.TryNextReader(out uri)) + if (!_routingTableManager.RoutingTable.TryNext(mode, out uri)) { // no server known to routingTable break; } - IConnection conn = CreateClusterConnection(uri, AccessMode.Read); - if (conn != null) - { - return conn; - } - } - throw new SessionExpiredException("Failed to connect to any read server."); - } - - private IConnection AcquireWriteConnection() - { - EnsureRoutingTableForMode(AccessMode.Write); - while (true) - { - Uri uri; - if (!_routingTable.TryNextWriter(out uri)) - { - break; - } - - IConnection conn = CreateClusterConnection(uri); + IConnection conn = CreateClusterConnection(uri, mode); if (conn != null) { return conn; } + //else connection already removed by clusterConnection onError method } - throw new SessionExpiredException("Failed to connect to any write server."); - } - - private void EnsureRoutingTableForMode(AccessMode mode) - { - lock (_syncLock) - { - if (!IsRoutingTableStale(_routingTable, mode)) - { - return; - } - - var routingTable = UpdateRoutingTableWithInitialUriFallback(); - _clusterConnectionPool.Update(routingTable.All()); - _routingTable = routingTable; - _logger?.Info($"Updated routingTable to be {_routingTable}"); - } - } - - private bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read) - { - lock (_syncLock) - { - switch (mode) - { - case AccessMode.Read: - if (routingTable.IsStale(AccessMode.Read)) - { - return true; - } - _isReadingInAbsenceOfWriter = routingTable.IsStale(AccessMode.Write); - return false; - case AccessMode.Write: - return routingTable.IsStale(AccessMode.Write); - default: - throw new InvalidOperationException($"Unknown access mode {mode}."); - } - } - } - - internal IRoutingTable UpdateRoutingTableWithInitialUriFallback( - Func, IRoutingTable> updateRoutingTableFunc = null, - Func> resolveInitialUriFunc = null) - { - lock (_syncLock) - { - updateRoutingTableFunc = updateRoutingTableFunc ?? (u => UpdateRoutingTable(null, u)); - resolveInitialUriFunc = resolveInitialUriFunc ?? _seed.Resolve; - - var hasPrependedInitialRouters = false; - if (_isReadingInAbsenceOfWriter) - { - PrependRouters(resolveInitialUriFunc()); - hasPrependedInitialRouters = true; - } - - var triedUris = new HashSet(); - var routingTable = updateRoutingTableFunc(triedUris); - if (routingTable != null) - { - return routingTable; - } - - if (!hasPrependedInitialRouters) - { - var uris = resolveInitialUriFunc(); - uris.ExceptWith(triedUris); - if (uris.Count != 0) - { - PrependRouters(uris); - routingTable = updateRoutingTableFunc(null); - if (routingTable != null) - { - return routingTable; - } - } - } - // We retied and tried our best however there is just no cluster. - // This is the ultimate place we will inform the user that you need to re-create a driver - throw new ServiceUnavailableException( - "Failed to connect to any routing server. " + - "Please make sure that the cluster is up and can be accessed by the driver and retry."); - } - } - - internal IRoutingTable UpdateRoutingTable(Func rediscoveryFunc = null, - ISet < Uri> triedUris = null) - { - lock (_syncLock) - { - rediscoveryFunc = rediscoveryFunc ?? Rediscovery; - while (true) - { - Uri uri; - if (!_routingTable.TryNextRouter(out uri)) - { - // no alive server - return null; - } - triedUris?.Add(uri); - IConnection conn = CreateClusterConnection(uri); - if (conn != null) - { - try - { - var roundRobinRoutingTable = rediscoveryFunc(conn); - if (!IsRoutingTableStale(roundRobinRoutingTable)) - { - return roundRobinRoutingTable; - } - } - catch (Exception e) - { - _logger?.Info($"Failed to update routing table with server uri={uri} due to error {e.Message}"); - if (e is SessionExpiredException) - { - // ignored - // Already handled by clusterConn.OnConnectionError to remove from load balancer - } - else - { - throw; - } - } - } - } - } + var name = mode == AccessMode.Read ? "read" : "wrtie"; + throw new SessionExpiredException($"Failed to connect to any {name} server."); } - private ClusterConnection CreateClusterConnection(Uri uri, AccessMode mode = AccessMode.Write) + private IConnection CreateClusterConnection(Uri uri, AccessMode mode) { - lock (_syncLock) + try { - try + IConnection conn; + if (_clusterConnectionPool.TryAcquire(uri, out conn)) { - IConnection conn; - if (_clusterConnectionPool.TryAcquire(uri, out conn)) - { - return new ClusterConnection(conn, uri, mode, this); - } - OnConnectionError(uri, new ArgumentException( - $"Routing table {_routingTable} contains a server {uri} " + - $"that is not known to cluster connection pool {_clusterConnectionPool}.")); + return new ClusterConnection(conn, uri, mode, this); } - catch (ServiceUnavailableException e) - { - OnConnectionError(uri, e); - } - return null; + OnConnectionError(uri, new ArgumentException( + $"Routing table {_routingTableManager.RoutingTable} contains a server {uri} " + + $"that is not known to cluster connection pool {_clusterConnectionPool}.")); } - } - - private IRoutingTable Rediscovery(IConnection conn) - { - lock (_syncLock) + catch (ServiceUnavailableException e) { - var discoveryManager = new ClusterDiscoveryManager(conn, _routingContext, _logger); - discoveryManager.Rediscovery(); - return new RoundRobinRoutingTable(discoveryManager.Routers, discoveryManager.Readers, - discoveryManager.Writers, _stopwatch, discoveryManager.ExpireAfterSeconds); + OnConnectionError(uri, e); } + return null; } private void ThrowObjectDisposedException() @@ -341,15 +166,9 @@ private void ThrowObjectDisposedException() FailedToCreateConnection(this); } - internal bool IsReadingInAbsenceOfWriter - { - get { return _isReadingInAbsenceOfWriter; } - set { _isReadingInAbsenceOfWriter = value; } - } - public override string ToString() { - return $"{nameof(_routingTable)}: {{{_routingTable}}}, " + + return $"{nameof(_routingTableManager.RoutingTable)}: {{{_routingTableManager.RoutingTable}}}, " + $"{nameof(_clusterConnectionPool)}: {{{_clusterConnectionPool}}}"; } } diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs index 30804dee6..4c1831c66 100644 --- a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoundRobinRoutingTable.cs @@ -32,22 +32,23 @@ internal class RoundRobinRoutingTable : IRoutingTable private readonly Stopwatch _stopwatch; private readonly long _expireAfterSeconds; - public RoundRobinRoutingTable(Stopwatch stopwatch, long expireAfterSeconds = 0) + public RoundRobinRoutingTable(IEnumerable routers, long expireAfterSeconds = 0) { _expireAfterSeconds = expireAfterSeconds; - _stopwatch = stopwatch; + _stopwatch = new Stopwatch(); _stopwatch.Restart(); + _routers.Add(routers);// init } public RoundRobinRoutingTable(IEnumerable routers, IEnumerable readers, IEnumerable writers, - Stopwatch stopwatch, long expireAfterSeconds) + long expireAfterSeconds) { _routers.Add(routers); _readers.Add(readers); _writers.Add(writers); _expireAfterSeconds = expireAfterSeconds; - _stopwatch = stopwatch; + _stopwatch = new Stopwatch(); _stopwatch.Restart(); } @@ -74,6 +75,19 @@ public bool TryNextWriter(out Uri uri) return _writers.TryNext(out uri); } + public bool TryNext(AccessMode mode, out Uri uri) + { + switch (mode) + { + case AccessMode.Read: + return TryNextReader(out uri); + case AccessMode.Write: + return TryNextWriter(out uri); + default: + throw new InvalidOperationException($"Unknown access mode {mode}"); + } + } + public void Remove(Uri uri) { _routers.Remove(uri); diff --git a/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs new file mode 100644 index 000000000..ecf431cfe --- /dev/null +++ b/Neo4j.Driver/Neo4j.Driver/Internal/Routing/RoutingTableManager.cs @@ -0,0 +1,234 @@ +// Copyright (c) 2002-2017 "Neo Technology," +// Network Engine for Objects in Lund AB [http://neotechnology.com] +// +// This file is part of Neo4j. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +using System; +using System.Collections.Generic; +using Neo4j.Driver.Internal.Connector; +using Neo4j.Driver.V1; + +namespace Neo4j.Driver.Internal.Routing +{ + internal class RoutingTableManager : IRoutingTableManager + { + private readonly ILogger _logger; + + private readonly Uri _seedUri; + private readonly IDictionary _routingContext; + + private IRoutingTable _routingTable; + public IRoutingTable RoutingTable + { + get => _routingTable; + set => _routingTable = value; + } + + private readonly IClusterConnectionPoolManager _poolManager; + + private readonly object _syncLock = new object(); + + private bool _isReadingInAbsenceOfWriter = false; + public bool IsReadingInAbsenceOfWriter + { + get => _isReadingInAbsenceOfWriter; + set => _isReadingInAbsenceOfWriter = value; + } + + public RoutingTableManager( + RoutingSettings routingSettings, + IClusterConnectionPoolManager poolManager, + Uri seedUri, + ISet initUris, + ILogger logger) : + this(new RoundRobinRoutingTable(initUris), + routingSettings, poolManager, seedUri, logger) + { + } + + public RoutingTableManager( + IRoutingTable routingTable, + RoutingSettings routingSettings, + IClusterConnectionPoolManager poolManager, + Uri seedUri, + ILogger logger) + { + _routingTable = routingTable; + _routingContext = routingSettings.RoutingContext; + _poolManager = poolManager; + _seedUri = seedUri; + + _logger = logger; + } + + public bool TryAcquireConnection(AccessMode mode, out Uri uri) + { + return _routingTable.TryNext(mode, out uri); + } + + public void EnsureRoutingTableForMode(AccessMode mode) + { + lock (_syncLock) + { + if (!IsRoutingTableStale(_routingTable, mode)) + { + return; + } + + var routingTable = UpdateRoutingTableWithInitialUriFallback(_seedUri.Resolve()); + _poolManager.UpdateConnectionPool(routingTable.All()); + _routingTable = routingTable; + _logger?.Info($"Updated routingTable to be {_routingTable}"); + } + } + + public bool IsRoutingTableStale(IRoutingTable routingTable, AccessMode mode = AccessMode.Read) + { + lock (_syncLock) + { + switch (mode) + { + case AccessMode.Read: + if (routingTable.IsStale(AccessMode.Read)) + { + return true; + } + _isReadingInAbsenceOfWriter = routingTable.IsStale(AccessMode.Write); + return false; + case AccessMode.Write: + return routingTable.IsStale(AccessMode.Write); + default: + throw new InvalidOperationException($"Unknown access mode {mode}."); + } + } + } + + public void PrependRouters(ISet uris) + { + lock (_syncLock) + { + _routingTable.PrependRouters(uris); + _poolManager.AddConnectionPool(uris); + } + } + + public IRoutingTable UpdateRoutingTableWithInitialUriFallback(ISet initialUriSet, + Func, IRoutingTable> updateRoutingTableFunc = null) + { + lock (_syncLock) + { + updateRoutingTableFunc = updateRoutingTableFunc ?? (u => UpdateRoutingTable(null, u)); + + var hasPrependedInitialRouters = false; + if (_isReadingInAbsenceOfWriter) + { + PrependRouters(initialUriSet); + hasPrependedInitialRouters = true; + } + + var triedUris = new HashSet(); + var routingTable = updateRoutingTableFunc(triedUris); + if (routingTable != null) + { + return routingTable; + } + + if (!hasPrependedInitialRouters) + { + var uris = initialUriSet; + uris.ExceptWith(triedUris); + if (uris.Count != 0) + { + PrependRouters(uris); + routingTable = updateRoutingTableFunc(null); + if (routingTable != null) + { + return routingTable; + } + } + } + // We retied and tried our best however there is just no cluster. + // This is the ultimate place we will inform the user that you need to re-create a driver + throw new ServiceUnavailableException( + "Failed to connect to any routing server. " + + "Please make sure that the cluster is up and can be accessed by the driver and retry."); + } + } + + public IRoutingTable UpdateRoutingTable(Func rediscoveryFunc = null, + ISet triedUris = null) + { + lock (_syncLock) + { + rediscoveryFunc = rediscoveryFunc ?? Rediscovery; + while (true) + { + Uri uri; + if (!_routingTable.TryNextRouter(out uri)) + { + // no alive server + return null; + } + triedUris?.Add(uri); + IConnection conn = _poolManager.CreateClusterConnection(uri); + if (conn == null) + { + _routingTable.Remove(uri); + } + else + { + try + { + var roundRobinRoutingTable = rediscoveryFunc(conn); + if (!IsRoutingTableStale(roundRobinRoutingTable)) + { + return roundRobinRoutingTable; + } + } + catch (Exception e) + { + _logger?.Info( + $"Failed to update routing table with server uri={uri} due to error {e.Message}"); + if (e is SessionExpiredException) + { + // ignored + // Already handled by clusterConn.OnConnectionError to remove from load balancer + } + else + { + throw; + } + } + } + } + } + } + + public IRoutingTable Rediscovery(IConnection conn) + { + lock (_syncLock) + { + var discoveryManager = new ClusterDiscoveryManager(conn, _routingContext, _logger); + discoveryManager.Rediscovery(); + return new RoundRobinRoutingTable(discoveryManager.Routers, discoveryManager.Readers, + discoveryManager.Writers, discoveryManager.ExpireAfterSeconds); + } + } + + public void Clear() + { + _routingTable.Clear(); + } + } +}