From 862a3a2150f3db47fc0baf5d6e036af178fd398a Mon Sep 17 00:00:00 2001 From: Przemek Hugh Kaznowski Date: Wed, 14 Mar 2018 10:46:07 +0000 Subject: [PATCH] Resolved issue with port range not working for backup --- ...ansactionBackupServiceAddressResolver.java | 54 ++++++++++++++ .../TransactionBackupServiceProvider.java | 57 ++++++++------- .../TransactionBackupServiceProviderTest.java | 72 +++++++++++++++++++ 3 files changed, 157 insertions(+), 26 deletions(-) create mode 100644 enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceAddressResolver.java create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/TransactionBackupServiceProviderTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceAddressResolver.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceAddressResolver.java new file mode 100644 index 0000000000000..2701125ee2eba --- /dev/null +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceAddressResolver.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core; + +import org.neo4j.helpers.AdvertisedSocketAddress; +import org.neo4j.helpers.HostnamePort; +import org.neo4j.helpers.ListenSocketAddress; +import org.neo4j.helpers.SocketAddressParser; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; + +class TransactionBackupServiceAddressResolver +{ + ListenSocketAddress backupAddressForTxProtocol( Config config ) + { + // We cannot use the backup address setting directly as IPv6 isn't processed during config read + String settingName = OnlineBackupSettings.online_backup_server.name(); + HostnamePort resolvedValueFromConfig = resolved( config ); + String modifiedLiteralValueToAvoidRange = String.format( "%s:%d", resolvedValueFromConfig.getHost(), resolvedValueFromConfig.getPort() ); + String defaultHostname = resolvedValueFromConfig.getHost(); + AdvertisedSocketAddress advertisedSocketAddress = + SocketAddressParser.deriveSocketAddress( settingName, modifiedLiteralValueToAvoidRange, defaultHostname, resolvedValueFromConfig.getPort(), + AdvertisedSocketAddress::new ); + return new ListenSocketAddress( advertisedSocketAddress.getHostname(), advertisedSocketAddress.getPort() ); + } + + private HostnamePort resolved( Config config ) + { + int defaultPort = Config.defaults().get( OnlineBackupSettings.online_backup_server ).getPort(); + HostnamePort resolved = config.get( OnlineBackupSettings.online_backup_server ); + if ( resolved.getPort() == 0 ) // Was the port not specified by user? + { + resolved = new HostnamePort( resolved.getHost(), defaultPort ); + } + return resolved; + } +} diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java index 4cb56eac144ab..75148f1f8a55d 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/TransactionBackupServiceProvider.java @@ -28,17 +28,17 @@ import org.neo4j.causalclustering.core.state.CoreSnapshotService; import org.neo4j.causalclustering.handlers.PipelineWrapper; import org.neo4j.causalclustering.identity.StoreId; -import org.neo4j.helpers.AdvertisedSocketAddress; -import org.neo4j.helpers.HostnamePort; -import org.neo4j.helpers.ListenSocketAddress; -import org.neo4j.helpers.SocketAddressParser; import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; import org.neo4j.kernel.impl.factory.PlatformModule; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; +import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex; +import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; public class TransactionBackupServiceProvider @@ -46,54 +46,59 @@ public class TransactionBackupServiceProvider private final LogProvider logProvider; private final LogProvider userLogProvider; private final Supplier localDatabaseStoreIdSupplier; - private final PlatformModule platformModule; + private final Dependencies dependencies; + private final Monitors monitors; + private final PageCache pageCache; + private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; private final Supplier localDatabaseDataSourceSupplier; private final BooleanSupplier localDatabaseIsAvailable; private final CoreSnapshotService coreSnapshotService; private final FileSystemAbstraction fileSystem; private final PipelineWrapper serverPipelineWrapper; + private final TransactionBackupServiceAddressResolver transactionBackupServiceAddressResolver; public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, Supplier localDatabaseStoreIdSupplier, - PlatformModule platformModule, Supplier localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable, - CoreSnapshotService coreSnapshotService, FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper ) + Dependencies dependencies, Monitors monitors, PageCache pageCache, StoreCopyCheckPointMutex storeCopyCheckPointMutex, + Supplier localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable, CoreSnapshotService coreSnapshotService, + FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper ) { this.logProvider = logProvider; this.userLogProvider = userLogProvider; this.localDatabaseStoreIdSupplier = localDatabaseStoreIdSupplier; - this.platformModule = platformModule; + this.dependencies = dependencies; + this.monitors = monitors; + this.pageCache = pageCache; + this.storeCopyCheckPointMutex = storeCopyCheckPointMutex; this.localDatabaseDataSourceSupplier = localDatabaseDataSourceSupplier; this.localDatabaseIsAvailable = localDatabaseIsAvailable; this.coreSnapshotService = coreSnapshotService; this.fileSystem = fileSystem; this.serverPipelineWrapper = serverPipelineWrapper; + this.transactionBackupServiceAddressResolver = new TransactionBackupServiceAddressResolver(); + } + + public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, Supplier localDatabaseStoreIdSupplier, + PlatformModule platformModule, Supplier localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable, + CoreSnapshotService coreSnapshotService, FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper ) + { + this( logProvider, userLogProvider, localDatabaseStoreIdSupplier, platformModule.dependencies, platformModule.monitors, platformModule.pageCache, + platformModule.storeCopyCheckPointMutex, localDatabaseDataSourceSupplier, localDatabaseIsAvailable, coreSnapshotService, fileSystem, + serverPipelineWrapper ); } public Optional resolveIfBackupEnabled( Config config ) { if ( config.get( OnlineBackupSettings.online_backup_enabled ) ) { - return Optional.of( new CatchupServer( logProvider, userLogProvider, localDatabaseStoreIdSupplier, - platformModule.dependencies.provideDependency( TransactionIdStore.class ), - platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabaseDataSourceSupplier, localDatabaseIsAvailable, - coreSnapshotService, platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem, platformModule.pageCache, - backupAddressForTxProtocol( config ), platformModule.storeCopyCheckPointMutex, serverPipelineWrapper ) ); + return Optional.of( + new CatchupServer( logProvider, userLogProvider, localDatabaseStoreIdSupplier, dependencies.provideDependency( TransactionIdStore.class ), + dependencies.provideDependency( LogicalTransactionStore.class ), localDatabaseDataSourceSupplier, localDatabaseIsAvailable, + coreSnapshotService, monitors, new CheckpointerSupplier( dependencies ), fileSystem, pageCache, + transactionBackupServiceAddressResolver.backupAddressForTxProtocol( config ), storeCopyCheckPointMutex, serverPipelineWrapper ) ); } else { return Optional.empty(); } } - - private static ListenSocketAddress backupAddressForTxProtocol( Config config ) - { - // We cannot use the backup address setting directly as IPv6 isn't processed during config read - String settingName = OnlineBackupSettings.online_backup_server.name(); - String literalValue = config.getRaw( settingName ).orElse( OnlineBackupSettings.online_backup_server.getDefaultValue() ); - HostnamePort resolvedValueFromConfig = config.get( OnlineBackupSettings.online_backup_server ); - String defaultHostname = resolvedValueFromConfig.getHost(); - Integer defaultPort = resolvedValueFromConfig.getPort(); - AdvertisedSocketAddress advertisedSocketAddress = - SocketAddressParser.deriveSocketAddress( settingName, literalValue, defaultHostname, defaultPort, AdvertisedSocketAddress::new ); - return new ListenSocketAddress( advertisedSocketAddress.getHostname(), advertisedSocketAddress.getPort() ); - } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/TransactionBackupServiceProviderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/TransactionBackupServiceProviderTest.java new file mode 100644 index 0000000000000..8039c706563a7 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/TransactionBackupServiceProviderTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import org.neo4j.helpers.HostnamePort; +import org.neo4j.helpers.ListenSocketAddress; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings; + +public class TransactionBackupServiceProviderTest +{ + private static final HostnamePort DEFAULT = Config.defaults().get( OnlineBackupSettings.online_backup_server ); + + @Test + public void backupWithRangeCanBeProcessed() + { + // given + TransactionBackupServiceAddressResolver subject = new TransactionBackupServiceAddressResolver(); + + // and config with range + Config config = Config.defaults(); + config.augment( OnlineBackupSettings.online_backup_server, "127.0.0.1:6362-6372" ); + + // then + Assert.assertEquals( new ListenSocketAddress( "127.0.0.1", 6362 ), subject.backupAddressForTxProtocol( config ) ); + } + + @Test + public void backupOverrideWithoutPortGetsDefaultPort() + { + // with params + List params = Arrays.asList( "127.0.0.1:", "127.0.0.1" ); + for ( String testedValue : params ) + { + // given + TransactionBackupServiceAddressResolver subject = new TransactionBackupServiceAddressResolver(); + + // and config without a port + Config config = Config.defaults(); + config.augment( OnlineBackupSettings.online_backup_server, testedValue ); + + // when + ListenSocketAddress resolvedAddress = subject.backupAddressForTxProtocol( config ); + + // then + Assert.assertEquals( new ListenSocketAddress( "127.0.0.1", DEFAULT.getPort() ), resolvedAddress ); + } + } +}