diff --git a/enterprise/com/src/main/java/org/neo4j/com/Protocol201.java b/enterprise/com/src/main/java/org/neo4j/com/Protocol201.java deleted file mode 100644 index 3449ab4ea4d65..0000000000000 --- a/enterprise/com/src/main/java/org/neo4j/com/Protocol201.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.com; - -import java.nio.ByteBuffer; - -import org.jboss.netty.buffer.ChannelBuffer; - -import org.neo4j.kernel.impl.store.StoreId; - -public class Protocol201 extends Protocol -{ - public Protocol201( int chunkSize, byte applicationProtocolVersion, byte internalProtocolVersion ) - { - super( chunkSize, applicationProtocolVersion, internalProtocolVersion ); - } - - @Override - protected StoreId readStoreId( ChannelBuffer source, ByteBuffer byteBuffer ) - { - byteBuffer.clear(); - byteBuffer.limit( 8 + 8 + 8 ); // creation time, random id, store version - source.readBytes( byteBuffer ); - byteBuffer.flip(); - // read order matters - see Server.writeStoreId() for version 2.1.3 - long creationTime = byteBuffer.getLong(); - long randomId = byteBuffer.getLong(); - long storeVersion = byteBuffer.getLong(); - return new StoreId( creationTime, randomId, storeVersion, -1, -1 ); - } -} diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient210.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient210.java deleted file mode 100644 index 22f2ed888b6dd..0000000000000 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient210.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.kernel.ha; - -import org.jboss.netty.buffer.ChannelBuffer; - -import java.io.IOException; - -import org.neo4j.com.Client; -import org.neo4j.com.Deserializer; -import org.neo4j.com.Protocol; -import org.neo4j.com.Protocol201; -import org.neo4j.com.ProtocolVersion; -import org.neo4j.com.RequestContext; -import org.neo4j.com.RequestType; -import org.neo4j.com.Response; -import org.neo4j.com.Serializer; -import org.neo4j.com.monitor.RequestMonitor; -import org.neo4j.com.storecopy.ResponseUnpacker; -import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; -import org.neo4j.com.storecopy.StoreWriter; -import org.neo4j.kernel.ha.HaRequestTypes.Type; -import org.neo4j.kernel.ha.com.master.HandshakeResult; -import org.neo4j.kernel.ha.com.master.Master; -import org.neo4j.kernel.ha.com.master.MasterServer; -import org.neo4j.kernel.ha.com.slave.MasterClient; -import org.neo4j.kernel.ha.id.IdAllocation; -import org.neo4j.kernel.ha.lock.LockResult; -import org.neo4j.kernel.impl.store.StoreId; -import org.neo4j.kernel.impl.store.id.IdRange; -import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.transaction.TransactionRepresentation; -import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; -import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; -import org.neo4j.kernel.monitoring.ByteCounterMonitor; -import org.neo4j.logging.LogProvider; -import org.neo4j.storageengine.api.lock.ResourceType; - -import static org.neo4j.com.Protocol.EMPTY_SERIALIZER; -import static org.neo4j.com.Protocol.VOID_DESERIALIZER; -import static org.neo4j.com.Protocol.writeString; -import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION; - -/** - * The {@link org.neo4j.kernel.ha.com.master.Master} a slave should use to communicate with its master. It - * serializes requests and sends them to the master, more specifically - * {@link org.neo4j.kernel.ha.com.master.MasterServer} (which delegates to {@link org.neo4j.kernel.ha.com.master.MasterImpl} - * on the master side. - */ -public class MasterClient210 extends Client implements MasterClient -{ - /* Version 1 first version - * Version 2 since 2012-01-24 - * Version 3 since 2012-02-16 - * Version 4 since 2012-07-05 - * Version 5 since ? - * Version 6 since 2014-01-07 - * Version 7 since 2014-03-18 - */ - public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 7, INTERNAL_PROTOCOL_VERSION ); - - private final long lockReadTimeoutMillis; - private final HaRequestTypes requestTypes; - - public MasterClient210( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, - LogProvider logProvider, StoreId storeId, long readTimeoutMillis, - long lockReadTimeoutMillis, int maxConcurrentChannels, int chunkSize, - ResponseUnpacker responseUnpacker, - ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor, - LogEntryReader entryReader ) - { - super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, - MasterServer.FRAME_LENGTH, readTimeoutMillis, maxConcurrentChannels, chunkSize, - responseUnpacker, byteCounterMonitor, requestMonitor, entryReader ); - this.lockReadTimeoutMillis = lockReadTimeoutMillis; - this.requestTypes = new HaRequestType210( entryReader ); - } - - @Override - protected Protocol createProtocol( int chunkSize, byte applicationProtocolVersion ) - { - return new Protocol201( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() ); - } - - @Override - protected long getReadTimeout( RequestType type, long readTimeout ) - { - if ( Type.ACQUIRE_EXCLUSIVE_LOCK.is( type ) || Type.ACQUIRE_SHARED_LOCK.is( type ) ) - { - return lockReadTimeoutMillis; - } - if ( Type.COPY_STORE.is( type ) ) - { - return readTimeout * 2; - } - return readTimeout; - } - - @Override - protected boolean shouldCheckStoreId( RequestType type ) - { - return !Type.COPY_STORE.is( type ); - } - - @Override - public Response allocateIds( RequestContext context, final IdType idType ) - { - Serializer serializer = buffer -> buffer.writeByte( idType.ordinal() ); - Deserializer deserializer = ( buffer, temporaryBuffer ) -> readIdAllocation( buffer ); - return sendRequest( requestTypes.type( Type.ALLOCATE_IDS ), context, serializer, deserializer ); - } - - @Override - public Response createRelationshipType( RequestContext context, final String name ) - { - Serializer serializer = buffer -> writeString( buffer, name ); - Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); - return sendRequest( requestTypes.type( Type.CREATE_RELATIONSHIP_TYPE ), context, serializer, deserializer ); - } - - @Override - public Response createPropertyKey( RequestContext context, final String name ) - { - Serializer serializer = buffer -> writeString( buffer, name ); - Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); - return sendRequest( requestTypes.type( Type.CREATE_PROPERTY_KEY ), context, serializer, deserializer ); - } - - @Override - public Response createLabel( RequestContext context, final String name ) - { - Serializer serializer = buffer -> writeString( buffer, name ); - Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); - return sendRequest( requestTypes.type( Type.CREATE_LABEL ), context, serializer, deserializer ); - } - - @Override - public Response newLockSession( RequestContext context ) - { - return sendRequest( requestTypes.type( Type.NEW_LOCK_SESSION ), context, EMPTY_SERIALIZER, VOID_DESERIALIZER ); - } - - @Override - public Response acquireSharedLock( - RequestContext context, ResourceType type, long... resourceIds ) - { - return sendRequest( requestTypes.type( Type.ACQUIRE_SHARED_LOCK ), context, - new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); - } - - @Override - public Response acquireExclusiveLock( - RequestContext context, ResourceType type, long... resourceIds ) - { - return sendRequest( requestTypes.type( Type.ACQUIRE_EXCLUSIVE_LOCK ), context, - new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); - } - - @Override - public Response commit( RequestContext context, TransactionRepresentation tx ) - { - Serializer serializer = new Protocol.TransactionSerializer( tx ); - Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readLong(); - return sendRequest( requestTypes.type( Type.COMMIT ), context, serializer, deserializer ); - } - - @Override - public Response endLockSession( RequestContext context, final boolean success ) - { - Serializer serializer = buffer -> buffer.writeByte( success ? 1 : 0 ); - return sendRequest( requestTypes.type( Type.END_LOCK_SESSION ), context, serializer, VOID_DESERIALIZER ); - } - - @Override - public Response pullUpdates( RequestContext context ) - { - return pullUpdates( context, ResponseUnpacker.NO_OP_TX_HANDLER ); - } - - @Override - public Response pullUpdates( RequestContext context, TxHandler txHandler ) - { - return sendRequest( requestTypes.type( Type.PULL_UPDATES ), context, - EMPTY_SERIALIZER, VOID_DESERIALIZER, null, txHandler ); - } - - @Override - public Response handshake( final long txId, StoreId storeId ) - { - Serializer serializer = buffer -> buffer.writeLong( txId ); - Deserializer deserializer = - ( buffer, temporaryBuffer ) -> new HandshakeResult( buffer.readLong(), buffer.readLong() ); - return sendRequest( requestTypes.type( Type.HANDSHAKE ), RequestContext.EMPTY, - serializer, deserializer, storeId, ResponseUnpacker.NO_OP_TX_HANDLER ); - } - - @Override - public Response copyStore( RequestContext context, final StoreWriter writer ) - { - context = stripFromTransactions( context ); - return sendRequest( requestTypes.type( Type.COPY_STORE ), context, EMPTY_SERIALIZER, - createFileStreamDeserializer( writer ) ); - } - - protected Deserializer createFileStreamDeserializer( StoreWriter writer ) - { - return new Protocol.FileStreamsDeserializer210( writer ); - } - - private RequestContext stripFromTransactions( RequestContext context ) - { - return new RequestContext( context.getEpoch(), context.machineId(), context.getEventIdentifier(), - 0, context.getChecksum() ); - } - - @Override - public ProtocolVersion getProtocolVersion() - { - return PROTOCOL_VERSION; - } - - private static IdAllocation readIdAllocation( ChannelBuffer buffer ) - { - int numberOfDefragIds = buffer.readInt(); - long[] defragIds = new long[numberOfDefragIds]; - for ( int i = 0; i < numberOfDefragIds; i++ ) - { - defragIds[i] = buffer.readLong(); - } - long rangeStart = buffer.readLong(); - int rangeLength = buffer.readInt(); - long highId = buffer.readLong(); - long defragCount = buffer.readLong(); - return new IdAllocation( new IdRange( defragIds, rangeStart, rangeLength ), - highId, defragCount ); - } - - private static class AcquireLockSerializer implements Serializer - { - private final ResourceType type; - private final long[] resourceIds; - - AcquireLockSerializer( ResourceType type, long... resourceIds ) - { - this.type = type; - this.resourceIds = resourceIds; - } - - @Override - public void write( ChannelBuffer buffer ) throws IOException - { - buffer.writeInt( type.typeId() ); - buffer.writeInt( resourceIds.length ); - for ( long entity : resourceIds ) - { - buffer.writeLong( entity ); - } - } - } -} diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java index eec986de3f06a..9d172753b8d23 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java @@ -19,32 +19,78 @@ */ package org.neo4j.kernel.ha; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; + +import org.neo4j.com.Client; +import org.neo4j.com.Deserializer; import org.neo4j.com.Protocol; import org.neo4j.com.Protocol214; import org.neo4j.com.ProtocolVersion; +import org.neo4j.com.RequestContext; +import org.neo4j.com.RequestType; +import org.neo4j.com.Response; +import org.neo4j.com.Serializer; import org.neo4j.com.monitor.RequestMonitor; import org.neo4j.com.storecopy.ResponseUnpacker; +import org.neo4j.com.storecopy.StoreWriter; +import org.neo4j.kernel.ha.com.master.HandshakeResult; +import org.neo4j.kernel.ha.com.master.Master; +import org.neo4j.kernel.ha.com.master.MasterServer; +import org.neo4j.kernel.ha.com.slave.MasterClient; +import org.neo4j.kernel.ha.id.IdAllocation; +import org.neo4j.kernel.ha.lock.LockResult; import org.neo4j.kernel.impl.store.StoreId; +import org.neo4j.kernel.impl.store.id.IdRange; +import org.neo4j.kernel.impl.store.id.IdType; +import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.monitoring.ByteCounterMonitor; import org.neo4j.logging.LogProvider; +import org.neo4j.storageengine.api.lock.ResourceType; +import static org.neo4j.com.Protocol.EMPTY_SERIALIZER; +import static org.neo4j.com.Protocol.VOID_DESERIALIZER; +import static org.neo4j.com.Protocol.writeString; import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION; -public class MasterClient214 extends MasterClient210 +/** + * The {@link org.neo4j.kernel.ha.com.master.Master} a slave should use to communicate with its master. It + * serializes requests and sends them to the master, more specifically + * {@link org.neo4j.kernel.ha.com.master.MasterServer} (which delegates to + * {@link org.neo4j.kernel.ha.com.master.MasterImpl} + * on the master side. + */ +public class MasterClient214 extends Client implements MasterClient { + /* Version 1 first version + * Version 2 since 2012-01-24 + * Version 3 since 2012-02-16 + * Version 4 since 2012-07-05 + * Version 5 since ? + * Version 6 since 2014-01-07 + * Version 7 since 2014-03-18 + * Version 8 since 2014-08-27 + */ public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 8, INTERNAL_PROTOCOL_VERSION ); + private final long lockReadTimeoutMillis; + private final HaRequestTypes requestTypes; + public MasterClient214( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, - LogProvider logProvider, StoreId storeId, long readTimeoutSeconds, - long lockReadTimeout, int maxConcurrentChannels, int chunkSize, ResponseUnpacker unpacker, + LogProvider logProvider, StoreId storeId, long readTimeoutMillis, + long lockReadTimeoutMillis, int maxConcurrentChannels, int chunkSize, + ResponseUnpacker responseUnpacker, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor, LogEntryReader entryReader ) { - super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, readTimeoutSeconds, - lockReadTimeout, maxConcurrentChannels, chunkSize, unpacker, byteCounterMonitor, - requestMonitor, entryReader ); + super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, + MasterServer.FRAME_LENGTH, readTimeoutMillis, maxConcurrentChannels, chunkSize, + responseUnpacker, byteCounterMonitor, requestMonitor, entryReader ); + this.lockReadTimeoutMillis = lockReadTimeoutMillis; + this.requestTypes = new HaRequestType210( entryReader ); } @Override @@ -58,4 +104,179 @@ public ProtocolVersion getProtocolVersion() { return PROTOCOL_VERSION; } + + @Override + protected long getReadTimeout( RequestType type, long readTimeout ) + { + if ( HaRequestTypes.Type.ACQUIRE_EXCLUSIVE_LOCK.is( type ) || + HaRequestTypes.Type.ACQUIRE_SHARED_LOCK.is( type ) ) + { + return lockReadTimeoutMillis; + } + if ( HaRequestTypes.Type.COPY_STORE.is( type ) ) + { + return readTimeout * 2; + } + return readTimeout; + } + + @Override + protected boolean shouldCheckStoreId( RequestType type ) + { + return !HaRequestTypes.Type.COPY_STORE.is( type ); + } + + @Override + public Response allocateIds( RequestContext context, final IdType idType ) + { + Serializer serializer = buffer -> buffer.writeByte( idType.ordinal() ); + Deserializer deserializer = ( buffer, temporaryBuffer ) -> readIdAllocation( buffer ); + return sendRequest( requestTypes.type( HaRequestTypes.Type.ALLOCATE_IDS ), context, serializer, deserializer ); + } + + @Override + public Response createRelationshipType( RequestContext context, final String name ) + { + Serializer serializer = buffer -> writeString( buffer, name ); + Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); + return sendRequest( requestTypes.type( HaRequestTypes.Type.CREATE_RELATIONSHIP_TYPE ), context, serializer, + deserializer ); + } + + @Override + public Response createPropertyKey( RequestContext context, final String name ) + { + Serializer serializer = buffer -> writeString( buffer, name ); + Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); + return sendRequest( requestTypes.type( HaRequestTypes.Type.CREATE_PROPERTY_KEY ), context, serializer, + deserializer ); + } + + @Override + public Response createLabel( RequestContext context, final String name ) + { + Serializer serializer = buffer -> writeString( buffer, name ); + Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readInt(); + return sendRequest( requestTypes.type( HaRequestTypes.Type.CREATE_LABEL ), context, serializer, deserializer ); + } + + @Override + public Response newLockSession( RequestContext context ) + { + return sendRequest( requestTypes.type( HaRequestTypes.Type.NEW_LOCK_SESSION ), context, EMPTY_SERIALIZER, + VOID_DESERIALIZER ); + } + + @Override + public Response acquireSharedLock( + RequestContext context, ResourceType type, long... resourceIds ) + { + return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_SHARED_LOCK ), context, + new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); + } + + @Override + public Response acquireExclusiveLock( + RequestContext context, ResourceType type, long... resourceIds ) + { + return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_EXCLUSIVE_LOCK ), context, + new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); + } + + @Override + public Response commit( RequestContext context, TransactionRepresentation tx ) + { + Serializer serializer = new Protocol.TransactionSerializer( tx ); + Deserializer deserializer = ( buffer, temporaryBuffer ) -> buffer.readLong(); + return sendRequest( requestTypes.type( HaRequestTypes.Type.COMMIT ), context, serializer, deserializer ); + } + + @Override + public Response endLockSession( RequestContext context, final boolean success ) + { + Serializer serializer = buffer -> buffer.writeByte( success ? 1 : 0 ); + return sendRequest( requestTypes.type( HaRequestTypes.Type.END_LOCK_SESSION ), context, serializer, + VOID_DESERIALIZER ); + } + + @Override + public Response pullUpdates( RequestContext context ) + { + return pullUpdates( context, ResponseUnpacker.NO_OP_TX_HANDLER ); + } + + @Override + public Response pullUpdates( RequestContext context, ResponseUnpacker.TxHandler txHandler ) + { + return sendRequest( requestTypes.type( HaRequestTypes.Type.PULL_UPDATES ), context, + EMPTY_SERIALIZER, VOID_DESERIALIZER, null, txHandler ); + } + + @Override + public Response handshake( final long txId, StoreId storeId ) + { + Serializer serializer = buffer -> buffer.writeLong( txId ); + Deserializer deserializer = + ( buffer, temporaryBuffer ) -> new HandshakeResult( buffer.readLong(), buffer.readLong() ); + return sendRequest( requestTypes.type( HaRequestTypes.Type.HANDSHAKE ), RequestContext.EMPTY, + serializer, deserializer, storeId, ResponseUnpacker.NO_OP_TX_HANDLER ); + } + + @Override + public Response copyStore( RequestContext context, final StoreWriter writer ) + { + context = stripFromTransactions( context ); + return sendRequest( requestTypes.type( HaRequestTypes.Type.COPY_STORE ), context, EMPTY_SERIALIZER, + createFileStreamDeserializer( writer ) ); + } + + protected Deserializer createFileStreamDeserializer( StoreWriter writer ) + { + return new Protocol.FileStreamsDeserializer210( writer ); + } + + private RequestContext stripFromTransactions( RequestContext context ) + { + return new RequestContext( context.getEpoch(), context.machineId(), context.getEventIdentifier(), + 0, context.getChecksum() ); + } + + private static IdAllocation readIdAllocation( ChannelBuffer buffer ) + { + int numberOfDefragIds = buffer.readInt(); + long[] defragIds = new long[numberOfDefragIds]; + for ( int i = 0; i < numberOfDefragIds; i++ ) + { + defragIds[i] = buffer.readLong(); + } + long rangeStart = buffer.readLong(); + int rangeLength = buffer.readInt(); + long highId = buffer.readLong(); + long defragCount = buffer.readLong(); + return new IdAllocation( new IdRange( defragIds, rangeStart, rangeLength ), + highId, defragCount ); + } + + private static class AcquireLockSerializer implements Serializer + { + private final ResourceType type; + private final long[] resourceIds; + + AcquireLockSerializer( ResourceType type, long... resourceIds ) + { + this.type = type; + this.resourceIds = resourceIds; + } + + @Override + public void write( ChannelBuffer buffer ) throws IOException + { + buffer.writeInt( type.typeId() ); + buffer.writeInt( resourceIds.length ); + for ( long entity : resourceIds ) + { + buffer.writeLong( entity ); + } + } + } } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient310.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient310.java index 1cefdb281f912..b62aea5018492 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient310.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient310.java @@ -40,13 +40,13 @@ public class MasterClient310 extends MasterClient214 public MasterClient310( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, LogProvider logProvider, StoreId storeId, - long readTimeoutSeconds, long lockReadTimeout, int maxConcurrentChannels, int chunkSize, + long readTimeoutMillis, long lockReadTimeout, int maxConcurrentChannels, int chunkSize, ResponseUnpacker unpacker, ByteCounterMonitor byteCounterMonitor, RequestMonitor requestMonitor, LogEntryReader entryReader ) { - super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, readTimeoutSeconds, + super( destinationHostNameOrIp, destinationPort, originHostNameOrIp, logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize, unpacker, byteCounterMonitor, requestMonitor, entryReader ); } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/SwitchToSlave.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/SwitchToSlave.java index fb50b46d9e81e..c18d747008be6 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/SwitchToSlave.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/cluster/SwitchToSlave.java @@ -49,7 +49,7 @@ import org.neo4j.kernel.ha.BranchedDataPolicy; import org.neo4j.kernel.ha.DelegateInvocationHandler; import org.neo4j.kernel.ha.HaSettings; -import org.neo4j.kernel.ha.MasterClient210; +import org.neo4j.kernel.ha.MasterClient214; import org.neo4j.kernel.ha.PullerFactory; import org.neo4j.kernel.ha.StoreOutOfDateException; import org.neo4j.kernel.ha.StoreUnableToParticipateInClusterException; @@ -561,7 +561,7 @@ private MasterClient newMasterClient( URI masterUri, URI me, StoreId storeId, Li { MasterClient masterClient = masterClientResolver.instantiate( masterUri.getHost(), masterUri.getPort(), me.getHost(), monitors, storeId, life ); - if ( masterClient.getProtocolVersion().compareTo( MasterClient210.PROTOCOL_VERSION ) < 0 ) + if ( masterClient.getProtocolVersion().compareTo( MasterClient214.PROTOCOL_VERSION ) < 0 ) { idGeneratorFactory.enableCompatibilityMode(); } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClientResolver.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClientResolver.java index 3f95c81d9ca78..29e4fc33e58fe 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClientResolver.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClientResolver.java @@ -29,7 +29,6 @@ import org.neo4j.com.ProtocolVersion; import org.neo4j.com.monitor.RequestMonitor; import org.neo4j.com.storecopy.ResponseUnpacker; -import org.neo4j.kernel.ha.MasterClient210; import org.neo4j.kernel.ha.MasterClient214; import org.neo4j.kernel.ha.MasterClient310; import org.neo4j.kernel.ha.com.master.InvalidEpochException; @@ -69,7 +68,7 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination public MasterClientResolver( LogProvider logProvider, ResponseUnpacker responseUnpacker, InvalidEpochExceptionHandler invalidEpochHandler, - int readTimeout, int lockReadTimeout, int channels, int chunkSize, + int readTimeoutMillis, int lockReadTimeout, int channels, int chunkSize, Supplier> logEntryReader ) { this.logEntryReader = logEntryReader; @@ -77,12 +76,10 @@ public MasterClientResolver( LogProvider logProvider, ResponseUnpacker responseU this.responseUnpacker = responseUnpacker; this.invalidEpochHandler = invalidEpochHandler; - protocolToFactoryMapping = new HashMap<>( 3, 1 ); - protocolToFactoryMapping.put( MasterClient210.PROTOCOL_VERSION, new F210( logProvider, readTimeout, lockReadTimeout, + protocolToFactoryMapping = new HashMap<>( 2, 1 ); + protocolToFactoryMapping.put( MasterClient214.PROTOCOL_VERSION, new F214( logProvider, readTimeoutMillis, lockReadTimeout, channels, chunkSize ) ); - protocolToFactoryMapping.put( MasterClient214.PROTOCOL_VERSION, new F214( logProvider, readTimeout, lockReadTimeout, - channels, chunkSize ) ); - protocolToFactoryMapping.put( MasterClient310.PROTOCOL_VERSION, new F310( logProvider, readTimeout, lockReadTimeout, + protocolToFactoryMapping.put( MasterClient310.PROTOCOL_VERSION, new F310( logProvider, readTimeoutMillis, lockReadTimeout, channels, chunkSize ) ); } @@ -129,47 +126,28 @@ private MasterClientFactory assignDefaultFactory() private abstract static class StaticMasterClientFactory implements MasterClientFactory { final LogProvider logProvider; - final int readTimeoutSeconds; + final int readTimeoutMillis; final int lockReadTimeout; final int maxConcurrentChannels; final int chunkSize; - StaticMasterClientFactory( LogProvider logProvider, int readTimeoutSeconds, int lockReadTimeout, + StaticMasterClientFactory( LogProvider logProvider, int readTimeoutMillis, int lockReadTimeout, int maxConcurrentChannels, int chunkSize ) { this.logProvider = logProvider; - this.readTimeoutSeconds = readTimeoutSeconds; + this.readTimeoutMillis = readTimeoutMillis; this.lockReadTimeout = lockReadTimeout; this.maxConcurrentChannels = maxConcurrentChannels; this.chunkSize = chunkSize; } } - private final class F210 extends StaticMasterClientFactory - { - private F210( LogProvider logProvider, int readTimeoutSeconds, int lockReadTimeout, int maxConcurrentChannels, - int chunkSize ) - { - super( logProvider, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize ); - } - - @Override - public MasterClient instantiate( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, - Monitors monitors, StoreId storeId, LifeSupport life ) - { - return life.add( new MasterClient210( destinationHostNameOrIp, destinationPort, originHostNameOrIp, - logProvider, storeId, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize, - responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient210.class ), - monitors.newMonitor( RequestMonitor.class, MasterClient210.class ), logEntryReader.get() ) ); - } - } - private final class F214 extends StaticMasterClientFactory { - private F214( LogProvider logProvider, int readTimeoutSeconds, int lockReadTimeout, int maxConcurrentChannels, + private F214( LogProvider logProvider, int readTimeoutMillis, int lockReadTimeout, int maxConcurrentChannels, int chunkSize ) { - super( logProvider, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize ); + super( logProvider, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize ); } @Override @@ -177,7 +155,7 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination Monitors monitors, StoreId storeId, LifeSupport life ) { return life.add( new MasterClient214( destinationHostNameOrIp, destinationPort, originHostNameOrIp, - logProvider, storeId, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize, + logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize, responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient214.class ), monitors.newMonitor( RequestMonitor.class, MasterClient214.class ), logEntryReader.get() ) ); } @@ -185,10 +163,10 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination private final class F310 extends StaticMasterClientFactory { - private F310( LogProvider logProvider, int readTimeoutSeconds, int lockReadTimeout, int maxConcurrentChannels, + private F310( LogProvider logProvider, int readTimeoutMillis, int lockReadTimeout, int maxConcurrentChannels, int chunkSize ) { - super( logProvider, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize ); + super( logProvider, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize ); } @Override @@ -196,7 +174,7 @@ public MasterClient instantiate( String destinationHostNameOrIp, int destination Monitors monitors, StoreId storeId, LifeSupport life ) { return life.add( new MasterClient310( destinationHostNameOrIp, destinationPort, originHostNameOrIp, - logProvider, storeId, readTimeoutSeconds, lockReadTimeout, maxConcurrentChannels, chunkSize, + logProvider, storeId, readTimeoutMillis, lockReadTimeout, maxConcurrentChannels, chunkSize, responseUnpacker, monitors.newMonitor( ByteCounterMonitor.class, MasterClient214.class ), monitors.newMonitor( RequestMonitor.class, MasterClient214.class ), logEntryReader.get() ) ); } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/slave/MasterClientResolverTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/slave/MasterClientResolverTest.java index e0fb034b594bf..b7561b57ef183 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/slave/MasterClientResolverTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/com/slave/MasterClientResolverTest.java @@ -24,8 +24,8 @@ import org.neo4j.com.IllegalProtocolVersionException; import org.neo4j.com.storecopy.ResponseUnpacker; import org.neo4j.function.Suppliers; -import org.neo4j.kernel.ha.MasterClient210; import org.neo4j.kernel.ha.MasterClient214; +import org.neo4j.kernel.ha.MasterClient310; import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; @@ -55,7 +55,7 @@ ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, mock( InvalidEpochExceptionHandler.cla life.start(); MasterClient masterClient1 = resolver.instantiate( "cluster://localhost", 44, null, new Monitors(), StoreId.DEFAULT, life ); - assertThat( masterClient1, instanceOf( MasterClient214.class ) ); + assertThat( masterClient1, instanceOf( MasterClient310.class ) ); } finally { @@ -63,8 +63,8 @@ ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, mock( InvalidEpochExceptionHandler.cla } IllegalProtocolVersionException illegalProtocolVersionException = new IllegalProtocolVersionException( - MasterClient210.PROTOCOL_VERSION.getApplicationProtocol(), MasterClient214.PROTOCOL_VERSION.getApplicationProtocol(), + MasterClient310.PROTOCOL_VERSION.getApplicationProtocol(), "Protocol is too modern" ); // When @@ -78,7 +78,7 @@ ResponseUnpacker.NO_OP_RESPONSE_UNPACKER, mock( InvalidEpochExceptionHandler.cla MasterClient masterClient2 = resolver.instantiate( "cluster://localhost", 55, null, new Monitors(), StoreId.DEFAULT, life ); - assertThat( masterClient2, instanceOf( MasterClient210.class ) ); + assertThat( masterClient2, instanceOf( MasterClient214.class ) ); } finally { diff --git a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/cluster/NetworkMetrics.java b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/cluster/NetworkMetrics.java index 0128e710adf9f..20aed301eb68c 100644 --- a/enterprise/metrics/src/main/java/org/neo4j/metrics/source/cluster/NetworkMetrics.java +++ b/enterprise/metrics/src/main/java/org/neo4j/metrics/source/cluster/NetworkMetrics.java @@ -23,7 +23,7 @@ import com.codahale.metrics.MetricRegistry; import org.neo4j.com.storecopy.ToNetworkStoreWriter; -import org.neo4j.kernel.ha.MasterClient210; +import org.neo4j.kernel.ha.MasterClient214; import org.neo4j.kernel.ha.com.master.MasterServer; import org.neo4j.kernel.impl.annotations.Documented; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -64,7 +64,7 @@ public void start() monitors.addMonitorListener( masterNetworkTransactionWrites, MasterServer.class.getName() ); monitors.addMonitorListener( masterNetworkStoreWrites, ToNetworkStoreWriter.class.getName(), ToNetworkStoreWriter.STORE_COPIER_MONITOR_TAG ); - monitors.addMonitorListener( slaveNetworkTransactionWrites, MasterClient210.class.getName() ); + monitors.addMonitorListener( slaveNetworkTransactionWrites, MasterClient214.class.getName() ); registry.register( MASTER_NETWORK_TX_WRITES, (Gauge) masterNetworkTransactionWrites::getBytesWritten ); registry.register( MASTER_NETWORK_STORE_WRITES, (Gauge) masterNetworkStoreWrites::getBytesWritten );