Skip to content

Commit

Permalink
Resolved issue with port range not working for backup
Browse files Browse the repository at this point in the history
  • Loading branch information
phughk committed Mar 14, 2018
1 parent 5c123b4 commit 862a3a2
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 26 deletions.
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core;

import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.SocketAddressParser;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;

class TransactionBackupServiceAddressResolver
{
ListenSocketAddress backupAddressForTxProtocol( Config config )
{
// We cannot use the backup address setting directly as IPv6 isn't processed during config read
String settingName = OnlineBackupSettings.online_backup_server.name();
HostnamePort resolvedValueFromConfig = resolved( config );
String modifiedLiteralValueToAvoidRange = String.format( "%s:%d", resolvedValueFromConfig.getHost(), resolvedValueFromConfig.getPort() );
String defaultHostname = resolvedValueFromConfig.getHost();
AdvertisedSocketAddress advertisedSocketAddress =
SocketAddressParser.deriveSocketAddress( settingName, modifiedLiteralValueToAvoidRange, defaultHostname, resolvedValueFromConfig.getPort(),
AdvertisedSocketAddress::new );
return new ListenSocketAddress( advertisedSocketAddress.getHostname(), advertisedSocketAddress.getPort() );
}

private HostnamePort resolved( Config config )
{
int defaultPort = Config.defaults().get( OnlineBackupSettings.online_backup_server ).getPort();
HostnamePort resolved = config.get( OnlineBackupSettings.online_backup_server );
if ( resolved.getPort() == 0 ) // Was the port not specified by user?
{
resolved = new HostnamePort( resolved.getHost(), defaultPort );
}
return resolved;
}
}
Expand Up @@ -28,72 +28,77 @@
import org.neo4j.causalclustering.core.state.CoreSnapshotService;
import org.neo4j.causalclustering.handlers.PipelineWrapper;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.SocketAddressParser;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.impl.factory.PlatformModule;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex;
import org.neo4j.kernel.impl.util.Dependencies;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

public class TransactionBackupServiceProvider
{
private final LogProvider logProvider;
private final LogProvider userLogProvider;
private final Supplier<StoreId> localDatabaseStoreIdSupplier;
private final PlatformModule platformModule;
private final Dependencies dependencies;
private final Monitors monitors;
private final PageCache pageCache;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final Supplier<NeoStoreDataSource> localDatabaseDataSourceSupplier;
private final BooleanSupplier localDatabaseIsAvailable;
private final CoreSnapshotService coreSnapshotService;
private final FileSystemAbstraction fileSystem;
private final PipelineWrapper serverPipelineWrapper;
private final TransactionBackupServiceAddressResolver transactionBackupServiceAddressResolver;

public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> localDatabaseStoreIdSupplier,
PlatformModule platformModule, Supplier<NeoStoreDataSource> localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable,
CoreSnapshotService coreSnapshotService, FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper )
Dependencies dependencies, Monitors monitors, PageCache pageCache, StoreCopyCheckPointMutex storeCopyCheckPointMutex,
Supplier<NeoStoreDataSource> localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable, CoreSnapshotService coreSnapshotService,
FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper )
{
this.logProvider = logProvider;
this.userLogProvider = userLogProvider;
this.localDatabaseStoreIdSupplier = localDatabaseStoreIdSupplier;
this.platformModule = platformModule;
this.dependencies = dependencies;
this.monitors = monitors;
this.pageCache = pageCache;
this.storeCopyCheckPointMutex = storeCopyCheckPointMutex;
this.localDatabaseDataSourceSupplier = localDatabaseDataSourceSupplier;
this.localDatabaseIsAvailable = localDatabaseIsAvailable;
this.coreSnapshotService = coreSnapshotService;
this.fileSystem = fileSystem;
this.serverPipelineWrapper = serverPipelineWrapper;
this.transactionBackupServiceAddressResolver = new TransactionBackupServiceAddressResolver();
}

public TransactionBackupServiceProvider( LogProvider logProvider, LogProvider userLogProvider, Supplier<StoreId> localDatabaseStoreIdSupplier,
PlatformModule platformModule, Supplier<NeoStoreDataSource> localDatabaseDataSourceSupplier, BooleanSupplier localDatabaseIsAvailable,
CoreSnapshotService coreSnapshotService, FileSystemAbstraction fileSystem, PipelineWrapper serverPipelineWrapper )
{
this( logProvider, userLogProvider, localDatabaseStoreIdSupplier, platformModule.dependencies, platformModule.monitors, platformModule.pageCache,
platformModule.storeCopyCheckPointMutex, localDatabaseDataSourceSupplier, localDatabaseIsAvailable, coreSnapshotService, fileSystem,
serverPipelineWrapper );
}

public Optional<CatchupServer> resolveIfBackupEnabled( Config config )
{
if ( config.get( OnlineBackupSettings.online_backup_enabled ) )
{
return Optional.of( new CatchupServer( logProvider, userLogProvider, localDatabaseStoreIdSupplier,
platformModule.dependencies.provideDependency( TransactionIdStore.class ),
platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabaseDataSourceSupplier, localDatabaseIsAvailable,
coreSnapshotService, platformModule.monitors, new CheckpointerSupplier( platformModule.dependencies ), fileSystem, platformModule.pageCache,
backupAddressForTxProtocol( config ), platformModule.storeCopyCheckPointMutex, serverPipelineWrapper ) );
return Optional.of(
new CatchupServer( logProvider, userLogProvider, localDatabaseStoreIdSupplier, dependencies.provideDependency( TransactionIdStore.class ),
dependencies.provideDependency( LogicalTransactionStore.class ), localDatabaseDataSourceSupplier, localDatabaseIsAvailable,
coreSnapshotService, monitors, new CheckpointerSupplier( dependencies ), fileSystem, pageCache,
transactionBackupServiceAddressResolver.backupAddressForTxProtocol( config ), storeCopyCheckPointMutex, serverPipelineWrapper ) );
}
else
{
return Optional.empty();
}
}

private static ListenSocketAddress backupAddressForTxProtocol( Config config )
{
// We cannot use the backup address setting directly as IPv6 isn't processed during config read
String settingName = OnlineBackupSettings.online_backup_server.name();
String literalValue = config.getRaw( settingName ).orElse( OnlineBackupSettings.online_backup_server.getDefaultValue() );
HostnamePort resolvedValueFromConfig = config.get( OnlineBackupSettings.online_backup_server );
String defaultHostname = resolvedValueFromConfig.getHost();
Integer defaultPort = resolvedValueFromConfig.getPort();
AdvertisedSocketAddress advertisedSocketAddress =
SocketAddressParser.deriveSocketAddress( settingName, literalValue, defaultHostname, defaultPort, AdvertisedSocketAddress::new );
return new ListenSocketAddress( advertisedSocketAddress.getHostname(), advertisedSocketAddress.getPort() );
}
}
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.core;

import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;

public class TransactionBackupServiceProviderTest
{
private static final HostnamePort DEFAULT = Config.defaults().get( OnlineBackupSettings.online_backup_server );

@Test
public void backupWithRangeCanBeProcessed()
{
// given
TransactionBackupServiceAddressResolver subject = new TransactionBackupServiceAddressResolver();

// and config with range
Config config = Config.defaults();
config.augment( OnlineBackupSettings.online_backup_server, "127.0.0.1:6362-6372" );

// then
Assert.assertEquals( new ListenSocketAddress( "127.0.0.1", 6362 ), subject.backupAddressForTxProtocol( config ) );
}

@Test
public void backupOverrideWithoutPortGetsDefaultPort()
{
// with params
List<String> params = Arrays.asList( "127.0.0.1:", "127.0.0.1" );
for ( String testedValue : params )
{
// given
TransactionBackupServiceAddressResolver subject = new TransactionBackupServiceAddressResolver();

// and config without a port
Config config = Config.defaults();
config.augment( OnlineBackupSettings.online_backup_server, testedValue );

// when
ListenSocketAddress resolvedAddress = subject.backupAddressForTxProtocol( config );

// then
Assert.assertEquals( new ListenSocketAddress( "127.0.0.1", DEFAULT.getPort() ), resolvedAddress );
}
}
}

0 comments on commit 862a3a2

Please sign in to comment.