Skip to content

Commit

Permalink
Add a new HA protocol version for 3.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Dec 14, 2016
1 parent c72decf commit 76a0ab4
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 36 deletions.
2 changes: 1 addition & 1 deletion enterprise/com/src/main/java/org/neo4j/com/Client.java
Expand Up @@ -179,7 +179,7 @@ private ComException traceComException( ComException exception, String tracePoin

protected Protocol createProtocol( int chunkSize, byte applicationProtocolVersion )
{
return new Protocol310( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() );
return new Protocol320( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() );
}

public abstract ProtocolVersion getProtocolVersion();
Expand Down
28 changes: 28 additions & 0 deletions enterprise/com/src/main/java/org/neo4j/com/Protocol320.java
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.com;

public class Protocol320 extends Protocol310
{
public Protocol320( int chunkSize, byte applicationProtocolVersion, byte internalProtocolVersion )
{
super( chunkSize, applicationProtocolVersion, internalProtocolVersion );
}
}
Expand Up @@ -36,7 +36,6 @@

public class MasterClient310 extends MasterClient214
{

/* Version 1 first version
* Version 2 since 2012-01-24
* Version 3 since 2012-02-16
Expand Down
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha;

import org.neo4j.com.Protocol;
import org.neo4j.com.Protocol320;
import org.neo4j.com.ProtocolVersion;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.LogProvider;

import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION;

public class MasterClient320 extends MasterClient310
{
/* Version 1 first version
* Version 2 since 2012-01-24
* Version 3 since 2012-02-16
* Version 4 since 2012-07-05
* Version 5 since ?
* Version 6 since 2014-01-07
* Version 7 since 2014-03-18
* Version 8 since 2014-08-27
* Version 9 since 3.1.0, 2016-09-20
* Version 10 since 3.2.0, 2016-12-07
*/
public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 10, INTERNAL_PROTOCOL_VERSION );

public MasterClient320( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId,
long readTimeoutMillis, long lockReadTimeout, int maxConcurrentChannels, int chunkSize,
ResponseUnpacker unpacker,
ByteCounterMonitor byteCounterMonitor,
RequestMonitor requestMonitor,
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader )
{
super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, readTimeoutMillis,
lockReadTimeout, maxConcurrentChannels, chunkSize, unpacker, byteCounterMonitor, requestMonitor,
entryReader );
}

@Override
protected Protocol createProtocol( int chunkSize, byte applicationProtocolVersion )
{
return new Protocol320( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() );
}

@Override
public ProtocolVersion getProtocolVersion()
{
return PROTOCOL_VERSION;
}
}
Expand Up @@ -35,7 +35,7 @@
import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.lock.LockResult;
import org.neo4j.kernel.ha.lock.LockStatus;
Expand Down Expand Up @@ -105,7 +105,7 @@ private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesT
}
};

public static final ProtocolVersion CURRENT = MasterClient310.PROTOCOL_VERSION;
public static final ProtocolVersion CURRENT = MasterClient320.PROTOCOL_VERSION;

@Override
public Response<Integer> createRelationshipType( RequestContext context, final String name );
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.kernel.ha.MasterClient214;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
Expand Down Expand Up @@ -76,11 +77,13 @@ public MasterClientResolver( LogProvider logProvider, ResponseUnpacker responseU
this.responseUnpacker = responseUnpacker;
this.invalidEpochHandler = invalidEpochHandler;

protocolToFactoryMapping = new HashMap<>( 2, 1 );
protocolToFactoryMapping = new HashMap<>( 3, 1 );
protocolToFactoryMapping.put( MasterClient214.PROTOCOL_VERSION, new F214( logProvider, readTimeoutMillis, lockReadTimeout,
channels, chunkSize ) );
protocolToFactoryMapping.put( MasterClient310.PROTOCOL_VERSION, new F310( logProvider, readTimeoutMillis, lockReadTimeout,
channels, chunkSize ) );
protocolToFactoryMapping.put( MasterClient320.PROTOCOL_VERSION, new F320( logProvider, readTimeoutMillis, lockReadTimeout,
channels, chunkSize ) );
}

@Override
Expand Down Expand Up @@ -120,7 +123,7 @@ private MasterClientFactory getFor( ProtocolVersion protocolVersion )

private MasterClientFactory assignDefaultFactory()
{
return getFor( MasterClient310.PROTOCOL_VERSION );
return getFor( MasterClient320.PROTOCOL_VERSION );
}

private abstract static class StaticMasterClientFactory implements MasterClientFactory
Expand Down Expand Up @@ -156,8 +159,8 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination
{
return life.add( new MasterClient214( destinationHostNameOrIp, destinationPort, originHostNameOrIp,
logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize,
responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient310.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient310.class ), logEntryReader.get() ) );
responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient320.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient320.class ), logEntryReader.get() ) );
}
}

Expand All @@ -175,8 +178,27 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination
{
return life.add( new MasterClient310( destinationHostNameOrIp, destinationPort, originHostNameOrIp,
logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize,
responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient310.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient310.class ), logEntryReader.get() ) );
responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient320.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient320.class ), logEntryReader.get() ) );
}
}

private final class F320 extends StaticMasterClientFactory
{
private F320( LogProvider logProvider, int readTimeoutMillis, int lockReadTimeout, int maxConcurrentChannels,
int chunkSize )
{
super( logProvider, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize );
}

@Override
public MasterClient instantiate( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
Monitors monitors, StoreId storeId, LifeSupport life )
{
return life.add( new MasterClient320( destinationHostNameOrIp, destinationPort, originHostNameOrIp,
logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize,
responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient320.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient320.class ), logEntryReader.get() ) );
}
}
}
Expand Up @@ -40,7 +40,7 @@
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.com.master.ConversationManager;
import org.neo4j.kernel.ha.com.master.HandshakeResult;
import org.neo4j.kernel.ha.com.master.MasterImpl;
Expand Down Expand Up @@ -111,7 +111,7 @@ public void newClientsShouldNotIgnoreStoreIdDifferences() throws Throwable
newMasterServer( masterImplSPI );

StoreId storeId = StoreIdTestFactory.newStoreIdForCurrentVersion( 5, 6, 7, 8 );
MasterClient masterClient = newMasterClient310( storeId );
MasterClient masterClient = newMasterClient320( storeId );

// When
masterClient.handshake( 1, storeId );
Expand All @@ -135,7 +135,7 @@ public void clientShouldReadAndApplyTransactionLogsOnNewLockSessionRequest() thr
ResponseUnpacker unpacker = life.add(
new TransactionCommittingResponseUnpacker( deps, DEFAULT_BATCH_SIZE, 0 ) );

MasterClient masterClient = newMasterClient310( StoreId.DEFAULT, unpacker );
MasterClient masterClient = newMasterClient320( StoreId.DEFAULT, unpacker );

// When
masterClient.newLockSession( new RequestContext( 1, 2, 3, 4, 5 ) );
Expand All @@ -160,7 +160,7 @@ public void endLockSessionDoesNotUnpackResponse() throws Throwable

newMasterServer( masterImplSPI );

MasterClient client = newMasterClient310( storeId, responseUnpacker );
MasterClient client = newMasterClient320( storeId, responseUnpacker );

HandshakeResult handshakeResult;
try ( Response<HandshakeResult> handshakeResponse = client.handshake( 1, storeId ) )
Expand Down Expand Up @@ -205,17 +205,17 @@ private MasterServer newMasterServer( MasterImpl masterImpl ) throws Throwable
ConversationManager.class ), logEntryReader ) );
}

private MasterClient newMasterClient310( StoreId storeId ) throws Throwable
private MasterClient newMasterClient320( StoreId storeId ) throws Throwable
{
return newMasterClient310( storeId, NO_OP_RESPONSE_UNPACKER );
return newMasterClient320( storeId, NO_OP_RESPONSE_UNPACKER );
}

private MasterClient newMasterClient310( StoreId storeId, ResponseUnpacker responseUnpacker ) throws Throwable
private MasterClient newMasterClient320( StoreId storeId, ResponseUnpacker responseUnpacker ) throws Throwable
{
return life.add( new MasterClient310( MASTER_SERVER_HOST, MASTER_SERVER_PORT, null, NullLogProvider.getInstance(),
return life.add( new MasterClient320( MASTER_SERVER_HOST, MASTER_SERVER_PORT, null, NullLogProvider.getInstance(),
storeId, TIMEOUT, TIMEOUT, 1, CHUNK_SIZE, responseUnpacker,
monitors.newMonitor( ByteCounterMonitor.class, MasterClient310.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient310.class ), logEntryReader ) );
monitors.newMonitor( ByteCounterMonitor.class, MasterClient320.class ),
monitors.newMonitor( RequestMonitor.class, MasterClient320.class ), logEntryReader ) );
}

private static Response<Void> voidResponseWithTransactionLogs()
Expand Down
Expand Up @@ -20,8 +20,6 @@
package org.neo4j.kernel.ha.cluster;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -50,7 +48,7 @@
import org.neo4j.kernel.ha.BranchedDataException;
import org.neo4j.kernel.ha.BranchedDataPolicy;
import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.PullerFactory;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.UpdatePuller;
Expand Down Expand Up @@ -80,7 +78,7 @@
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.NullLogProvider;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -251,14 +249,10 @@ public void updatesPulledAndPullingScheduledOnSwitchToSlave() throws Throwable

when( pullerFactory.createUpdatePullerScheduler( updatePuller ) ).thenReturn( pullerScheduler );
// emulate lifecycle start call on scheduler
doAnswer( new Answer()
doAnswer( invocationOnMock ->
{
@Override
public Object answer( InvocationOnMock invocationOnMock ) throws Throwable
{
pullerScheduler.init();
return null;
}
pullerScheduler.init();
return null;
} ).when( communicationLife ).start();

switchToSlave.switchToSlave( communicationLife, localhost, localhost, mock( CancellationRequest.class ) );
Expand Down Expand Up @@ -297,7 +291,7 @@ private SwitchToSlaveBranchThenCopy newSwitchToSlaveSpy( PageCache pageCacheMock
when( master.getHARole() ).thenReturn( HighAvailabilityModeSwitcher.MASTER );
when( master.hasRole( eq( HighAvailabilityModeSwitcher.MASTER ) ) ).thenReturn( true );
when( master.getInstanceId() ).thenReturn( new InstanceId( 1 ) );
when( clusterMembers.getMembers() ).thenReturn( asList( master ) );
when( clusterMembers.getMembers() ).thenReturn( singletonList( master ) );

Dependencies resolver = new Dependencies();
resolver.satisfyDependencies( requestContextFactory, clusterMembers,
Expand All @@ -318,7 +312,7 @@ private SwitchToSlaveBranchThenCopy newSwitchToSlaveSpy( PageCache pageCacheMock
Response<HandshakeResult> response = mock( Response.class );
when( response.response() ).thenReturn( new HandshakeResult( 42, 2 ) );
when( masterClient.handshake( anyLong(), any( StoreId.class ) ) ).thenReturn( response );
when( masterClient.getProtocolVersion() ).thenReturn( MasterClient310.PROTOCOL_VERSION );
when( masterClient.getProtocolVersion() ).thenReturn( MasterClient320.PROTOCOL_VERSION );

TransactionIdStore transactionIdStoreMock = mock( TransactionIdStore.class );
// note that the checksum (the second member of the array) is the same as the one in the handshake mock above
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.function.Suppliers;
import org.neo4j.kernel.ha.MasterClient214;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
Expand Down Expand Up @@ -55,7 +56,7 @@ ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, mock( InvalidEpochExceptionHandler.cla
life.start();
MasterClient masterClient1 =
resolver.instantiate( "cluster://localhost", 44, null, new Monitors(), StoreId.DEFAULT, life );
assertThat( masterClient1, instanceOf( MasterClient310.class ) );
assertThat( masterClient1, instanceOf( MasterClient320.class ) );
}
finally
{
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.codahale.metrics.MetricRegistry;

import org.neo4j.com.storecopy.ToNetworkStoreWriter;
import org.neo4j.kernel.ha.MasterClient310;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.com.master.MasterServer;
import org.neo4j.kernel.impl.annotations.Documented;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
Expand Down Expand Up @@ -64,7 +64,7 @@ public void start()
monitors.addMonitorListener( masterNetworkTransactionWrites, MasterServer.class.getName() );
monitors.addMonitorListener( masterNetworkStoreWrites, ToNetworkStoreWriter.class.getName(),
ToNetworkStoreWriter.STORE_COPIER_MONITOR_TAG );
monitors.addMonitorListener( slaveNetworkTransactionWrites, MasterClient310.class.getName() );
monitors.addMonitorListener( slaveNetworkTransactionWrites, MasterClient320.class.getName() );

registry.register( MASTER_NETWORK_TX_WRITES, (Gauge<Long>) masterNetworkTransactionWrites::getBytesWritten );
registry.register( MASTER_NETWORK_STORE_WRITES, (Gauge<Long>) masterNetworkStoreWrites::getBytesWritten );
Expand Down

0 comments on commit 76a0ab4

Please sign in to comment.