From 77d363048becf2d37faf891650a2a4b2c95be11d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 7 Dec 2016 17:24:40 +0100 Subject: [PATCH] Make it possible for protocols to modify the wireformat of LockResults --- .../org/neo4j/kernel/ha/HaRequestType210.java | 17 ++-- .../org/neo4j/kernel/ha/MasterClient214.java | 85 ++++++++++++++++++- .../kernel/ha/com/slave/MasterClient.java | 75 +--------------- .../org/neo4j/kernel/ha/lock/LockStatus.java | 12 --- 4 files changed, 95 insertions(+), 94 deletions(-) diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java index df0a10cb01a5b..9a374337e4fb2 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/HaRequestType210.java @@ -45,16 +45,17 @@ import static org.neo4j.com.Protocol.VOID_SERIALIZER; import static org.neo4j.com.Protocol.readBoolean; import static org.neo4j.com.Protocol.readString; -import static org.neo4j.kernel.ha.com.slave.MasterClient.LOCK_SERIALIZER; public class HaRequestType210 extends AbstractHaRequestTypes { - public HaRequestType210( LogEntryReader entryReader ) + public HaRequestType210( + LogEntryReader entryReader, + ObjectSerializer lockResultObjectSerializer ) { registerAllocateIds(); registerCreateRelationshipType(); - registerAcquireExclusiveLock(); - registerAcquireSharedLock(); + registerAcquireExclusiveLock( lockResultObjectSerializer ); + registerAcquireSharedLock( lockResultObjectSerializer ); registerCommit( entryReader ); registerPullUpdates(); registerEndLockSession(); @@ -95,7 +96,7 @@ private void registerCreateRelationshipType() register( Type.CREATE_RELATIONSHIP_TYPE, createRelationshipTypeTarget, INTEGER_SERIALIZER ); } - private void registerAcquireExclusiveLock() + private void registerAcquireExclusiveLock( ObjectSerializer lockResultObjectSerializer ) { register( Type.ACQUIRE_EXCLUSIVE_LOCK, new AquireLockCall() { @@ -105,10 +106,10 @@ protected Response lock( Master master, RequestContext context, Reso { return master.acquireExclusiveLock( context, type, ids ); } - }, LOCK_SERIALIZER, true ); + }, lockResultObjectSerializer, true ); } - private void registerAcquireSharedLock() + private void registerAcquireSharedLock( ObjectSerializer lockResultObjectSerializer ) { register( Type.ACQUIRE_SHARED_LOCK, new AquireLockCall() { @@ -118,7 +119,7 @@ protected Response lock( Master master, RequestContext context, Reso { return master.acquireSharedLock( context, type, ids ); } - }, LOCK_SERIALIZER, true ); + }, lockResultObjectSerializer, true ); } private void registerCommit( LogEntryReader entryReader ) diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java index 51cdeb7693869..73c92c1223cbc 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/MasterClient214.java @@ -21,10 +21,14 @@ import org.jboss.netty.buffer.ChannelBuffer; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; import org.neo4j.com.Client; import org.neo4j.com.Deserializer; +import org.neo4j.com.ObjectSerializer; import org.neo4j.com.Protocol; import org.neo4j.com.Protocol214; import org.neo4j.com.ProtocolVersion; @@ -35,24 +39,29 @@ import org.neo4j.com.monitor.RequestMonitor; import org.neo4j.com.storecopy.ResponseUnpacker; import org.neo4j.com.storecopy.StoreWriter; +import org.neo4j.helpers.Exceptions; import org.neo4j.kernel.ha.com.master.HandshakeResult; import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.MasterServer; import org.neo4j.kernel.ha.com.slave.MasterClient; import org.neo4j.kernel.ha.id.IdAllocation; import org.neo4j.kernel.ha.lock.LockResult; +import org.neo4j.kernel.ha.lock.LockStatus; import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.id.IdRange; import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; +import org.neo4j.kernel.impl.util.HexPrinter; import org.neo4j.kernel.monitoring.ByteCounterMonitor; import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.lock.ResourceType; +import static java.lang.String.format; import static org.neo4j.com.Protocol.EMPTY_SERIALIZER; import static org.neo4j.com.Protocol.VOID_DESERIALIZER; +import static org.neo4j.com.Protocol.readString; import static org.neo4j.com.Protocol.writeString; import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION; @@ -67,8 +76,65 @@ public class MasterClient214 extends Client implements MasterClient { public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 8, INTERNAL_PROTOCOL_VERSION ); + private static final ObjectSerializer LOCK_RESULT_OBJECT_SERIALIZER = ( responseObject, result ) -> + { + result.writeByte( responseObject.getStatus().ordinal() ); + if ( responseObject.getStatus() == LockStatus.DEAD_LOCKED ) + { + writeString( result, responseObject.getMessage() ); + } + }; + + private static final Deserializer LOCK_RESULT_DESERIALIZER = new Deserializer() + { + @Override + public LockResult read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOException + { + byte statusOrdinal = buffer.readByte(); + LockStatus status; + try + { + status = LockStatus.values()[statusOrdinal]; + } + catch ( ArrayIndexOutOfBoundsException e ) + { + int maxBytesToPrint = 1024 * 40; + throw Exceptions.withMessage( e, + format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s", + e.getMessage(), statusOrdinal, maxBytesToPrint, + beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) ); + } + return status == LockStatus.DEAD_LOCKED ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) ) + : new LockResult( status ); + } + + private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint ) + { + // read buffer from pos 0 - writeIndex + int prevIndex = buffer.readerIndex(); + buffer.readerIndex( 0 ); + try + { + ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() ); + PrintStream stream = new PrintStream( byteArrayStream ); + HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 ); + for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ ) + { + printer.append( buffer.readByte() ); + } + stream.flush(); + return byteArrayStream.toString(); + } + finally + { + buffer.readerIndex( prevIndex ); + } + } + }; + private final long lockReadTimeoutMillis; private final HaRequestTypes requestTypes; + private final Deserializer lockResultDeserializer; public MasterClient214( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, LogProvider logProvider, StoreId storeId, long readTimeoutMillis, @@ -81,7 +147,8 @@ public MasterClient214( String destinationHostNameOrIp, int destinationPort, Str MasterServer.FRAME_LENGTH, readTimeoutMillis, maxConcurrentChannels, chunkSize, responseUnpacker, byteCounterMonitor, requestMonitor, entryReader ); this.lockReadTimeoutMillis = lockReadTimeoutMillis; - this.requestTypes = new HaRequestType210( entryReader ); + this.requestTypes = new HaRequestType210( entryReader, createLockResultSerializer() ); + this.lockResultDeserializer = createLockResultDeserializer(); } @Override @@ -96,6 +163,18 @@ public ProtocolVersion getProtocolVersion() return PROTOCOL_VERSION; } + @Override + public ObjectSerializer createLockResultSerializer() + { + return LOCK_RESULT_OBJECT_SERIALIZER; + } + + @Override + public Deserializer createLockResultDeserializer() + { + return LOCK_RESULT_DESERIALIZER; + } + @Override protected long getReadTimeout( RequestType type, long readTimeout ) { @@ -163,7 +242,7 @@ public Response acquireSharedLock( RequestContext context, ResourceType type, long... resourceIds ) { return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_SHARED_LOCK ), context, - new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); + new AcquireLockSerializer( type, resourceIds ), lockResultDeserializer ); } @Override @@ -171,7 +250,7 @@ public Response acquireExclusiveLock( RequestContext context, ResourceType type, long... resourceIds ) { return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_EXCLUSIVE_LOCK ), context, - new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); + new AcquireLockSerializer( type, resourceIds ), lockResultDeserializer ); } @Override diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClient.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClient.java index 5eb3100a1100c..ddcc312536efe 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClient.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/MasterClient.java @@ -19,13 +19,6 @@ */ package org.neo4j.kernel.ha.com.slave; -import org.jboss.netty.buffer.ChannelBuffer; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.ByteBuffer; - import org.neo4j.com.ComExceptionHandler; import org.neo4j.com.Deserializer; import org.neo4j.com.ObjectSerializer; @@ -34,77 +27,13 @@ import org.neo4j.com.Response; import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import org.neo4j.com.storecopy.StoreWriter; -import org.neo4j.helpers.Exceptions; import org.neo4j.kernel.ha.MasterClient320; import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.lock.LockResult; -import org.neo4j.kernel.ha.lock.LockStatus; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; -import org.neo4j.kernel.impl.util.HexPrinter; - -import static java.lang.String.format; -import static org.neo4j.com.Protocol.readString; -import static org.neo4j.com.Protocol.writeString; public interface MasterClient extends Master { - static final ObjectSerializer LOCK_SERIALIZER = new ObjectSerializer() - { - @Override - public void write( LockResult responseObject, ChannelBuffer result ) throws IOException - { - result.writeByte( responseObject.getStatus().ordinal() ); - if ( responseObject.getStatus().hasMessage() ) - { - writeString( result, responseObject.getMessage() ); - } - } - }; - - static final Deserializer LOCK_RESULT_DESERIALIZER = new Deserializer() - { - @Override - public LockResult read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOException - { - byte statusOrdinal = buffer.readByte(); - LockStatus status; - try - { - status = LockStatus.values()[statusOrdinal]; - } - catch ( ArrayIndexOutOfBoundsException e ) - { - int maxBytesToPrint = 1024*40; - throw Exceptions.withMessage( e, format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s", - e.getMessage(), statusOrdinal, maxBytesToPrint, beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) ); - } - return status.hasMessage() ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) ) : new LockResult( status ); - } - - private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint ) - { - // read buffer from pos 0 - writeIndex - int prevIndex = buffer.readerIndex(); - buffer.readerIndex( 0 ); - try - { - ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() ); - PrintStream stream = new PrintStream( byteArrayStream ); - HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 ); - for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ ) - { - printer.append( buffer.readByte() ); - } - stream.flush(); - return byteArrayStream.toString(); - } - finally - { - buffer.readerIndex( prevIndex ); - } - } - }; - public static final ProtocolVersion CURRENT = MasterClient320.PROTOCOL_VERSION; @Override @@ -127,4 +56,8 @@ private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesT public void setComExceptionHandler( ComExceptionHandler handler ); public ProtocolVersion getProtocolVersion(); + + ObjectSerializer createLockResultSerializer(); + + Deserializer createLockResultDeserializer(); } diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockStatus.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockStatus.java index 32f9a2f3cab81..be7b6f096b8b8 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockStatus.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/LockStatus.java @@ -24,16 +24,4 @@ public enum LockStatus OK_LOCKED, NOT_LOCKED, DEAD_LOCKED - { - @Override - public boolean hasMessage() - { - return true; - } - }; - - public boolean hasMessage() - { - return false; - } }