Skip to content

Commit

Permalink
Make it possible for protocols to modify the wireformat of LockResults
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Dec 14, 2016
1 parent 76a0ab4 commit 77d3630
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 94 deletions.
Expand Up @@ -45,16 +45,17 @@
import static org.neo4j.com.Protocol.VOID_SERIALIZER; import static org.neo4j.com.Protocol.VOID_SERIALIZER;
import static org.neo4j.com.Protocol.readBoolean; import static org.neo4j.com.Protocol.readBoolean;
import static org.neo4j.com.Protocol.readString; import static org.neo4j.com.Protocol.readString;
import static org.neo4j.kernel.ha.com.slave.MasterClient.LOCK_SERIALIZER;


public class HaRequestType210 extends AbstractHaRequestTypes public class HaRequestType210 extends AbstractHaRequestTypes
{ {
public HaRequestType210( LogEntryReader<ReadableClosablePositionAwareChannel> entryReader ) public HaRequestType210(
LogEntryReader<ReadableClosablePositionAwareChannel> entryReader,
ObjectSerializer<LockResult> lockResultObjectSerializer )
{ {
registerAllocateIds(); registerAllocateIds();
registerCreateRelationshipType(); registerCreateRelationshipType();
registerAcquireExclusiveLock(); registerAcquireExclusiveLock( lockResultObjectSerializer );
registerAcquireSharedLock(); registerAcquireSharedLock( lockResultObjectSerializer );
registerCommit( entryReader ); registerCommit( entryReader );
registerPullUpdates(); registerPullUpdates();
registerEndLockSession(); registerEndLockSession();
Expand Down Expand Up @@ -95,7 +96,7 @@ private void registerCreateRelationshipType()
register( Type.CREATE_RELATIONSHIP_TYPE, createRelationshipTypeTarget, INTEGER_SERIALIZER ); register( Type.CREATE_RELATIONSHIP_TYPE, createRelationshipTypeTarget, INTEGER_SERIALIZER );
} }


private void registerAcquireExclusiveLock() private void registerAcquireExclusiveLock( ObjectSerializer<LockResult> lockResultObjectSerializer )
{ {
register( Type.ACQUIRE_EXCLUSIVE_LOCK, new AquireLockCall() register( Type.ACQUIRE_EXCLUSIVE_LOCK, new AquireLockCall()
{ {
Expand All @@ -105,10 +106,10 @@ protected Response<LockResult> lock( Master master, RequestContext context, Reso
{ {
return master.acquireExclusiveLock( context, type, ids ); return master.acquireExclusiveLock( context, type, ids );
} }
}, LOCK_SERIALIZER, true ); }, lockResultObjectSerializer, true );
} }


private void registerAcquireSharedLock() private void registerAcquireSharedLock( ObjectSerializer<LockResult> lockResultObjectSerializer )
{ {
register( Type.ACQUIRE_SHARED_LOCK, new AquireLockCall() register( Type.ACQUIRE_SHARED_LOCK, new AquireLockCall()
{ {
Expand All @@ -118,7 +119,7 @@ protected Response<LockResult> lock( Master master, RequestContext context, Reso
{ {
return master.acquireSharedLock( context, type, ids ); return master.acquireSharedLock( context, type, ids );
} }
}, LOCK_SERIALIZER, true ); }, lockResultObjectSerializer, true );
} }


private void registerCommit( LogEntryReader<ReadableClosablePositionAwareChannel> entryReader ) private void registerCommit( LogEntryReader<ReadableClosablePositionAwareChannel> entryReader )
Expand Down
Expand Up @@ -21,10 +21,14 @@


import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;


import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;


import org.neo4j.com.Client; import org.neo4j.com.Client;
import org.neo4j.com.Deserializer; import org.neo4j.com.Deserializer;
import org.neo4j.com.ObjectSerializer;
import org.neo4j.com.Protocol; import org.neo4j.com.Protocol;
import org.neo4j.com.Protocol214; import org.neo4j.com.Protocol214;
import org.neo4j.com.ProtocolVersion; import org.neo4j.com.ProtocolVersion;
Expand All @@ -35,24 +39,29 @@
import org.neo4j.com.monitor.RequestMonitor; import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.ResponseUnpacker; import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.com.storecopy.StoreWriter; import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.ha.com.master.HandshakeResult; import org.neo4j.kernel.ha.com.master.HandshakeResult;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.master.MasterServer; import org.neo4j.kernel.ha.com.master.MasterServer;
import org.neo4j.kernel.ha.com.slave.MasterClient; import org.neo4j.kernel.ha.com.slave.MasterClient;
import org.neo4j.kernel.ha.id.IdAllocation; import org.neo4j.kernel.ha.id.IdAllocation;
import org.neo4j.kernel.ha.lock.LockResult; import org.neo4j.kernel.ha.lock.LockResult;
import org.neo4j.kernel.ha.lock.LockStatus;
import org.neo4j.kernel.impl.store.StoreId; import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.id.IdRange; import org.neo4j.kernel.impl.store.id.IdRange;
import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.store.id.IdType;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader; import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.util.HexPrinter;
import org.neo4j.kernel.monitoring.ByteCounterMonitor; import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.lock.ResourceType; import org.neo4j.storageengine.api.lock.ResourceType;


import static java.lang.String.format;
import static org.neo4j.com.Protocol.EMPTY_SERIALIZER; import static org.neo4j.com.Protocol.EMPTY_SERIALIZER;
import static org.neo4j.com.Protocol.VOID_DESERIALIZER; import static org.neo4j.com.Protocol.VOID_DESERIALIZER;
import static org.neo4j.com.Protocol.readString;
import static org.neo4j.com.Protocol.writeString; import static org.neo4j.com.Protocol.writeString;
import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION; import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION;


Expand All @@ -67,8 +76,65 @@ public class MasterClient214 extends Client<Master> implements MasterClient
{ {
public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 8, INTERNAL_PROTOCOL_VERSION ); public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 8, INTERNAL_PROTOCOL_VERSION );


private static final ObjectSerializer<LockResult> LOCK_RESULT_OBJECT_SERIALIZER = ( responseObject, result ) ->
{
result.writeByte( responseObject.getStatus().ordinal() );
if ( responseObject.getStatus() == LockStatus.DEAD_LOCKED )
{
writeString( result, responseObject.getMessage() );
}
};

private static final Deserializer<LockResult> LOCK_RESULT_DESERIALIZER = new Deserializer<LockResult>()
{
@Override
public LockResult read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOException
{
byte statusOrdinal = buffer.readByte();
LockStatus status;
try
{
status = LockStatus.values()[statusOrdinal];
}
catch ( ArrayIndexOutOfBoundsException e )
{
int maxBytesToPrint = 1024 * 40;
throw Exceptions.withMessage( e,
format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s",
e.getMessage(), statusOrdinal, maxBytesToPrint,
beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) );
}
return status == LockStatus.DEAD_LOCKED ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) )
: new LockResult( status );
}

private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint )
{
// read buffer from pos 0 - writeIndex
int prevIndex = buffer.readerIndex();
buffer.readerIndex( 0 );
try
{
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() );
PrintStream stream = new PrintStream( byteArrayStream );
HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 );
for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ )
{
printer.append( buffer.readByte() );
}
stream.flush();
return byteArrayStream.toString();
}
finally
{
buffer.readerIndex( prevIndex );
}
}
};

private final long lockReadTimeoutMillis; private final long lockReadTimeoutMillis;
private final HaRequestTypes requestTypes; private final HaRequestTypes requestTypes;
private final Deserializer<LockResult> lockResultDeserializer;


public MasterClient214( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp, public MasterClient214( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId, long readTimeoutMillis, LogProvider logProvider, StoreId storeId, long readTimeoutMillis,
Expand All @@ -81,7 +147,8 @@ public MasterClient214( String destinationHostNameOrIp, int destinationPort, Str
MasterServer.FRAME_LENGTH, readTimeoutMillis, maxConcurrentChannels, chunkSize, MasterServer.FRAME_LENGTH, readTimeoutMillis, maxConcurrentChannels, chunkSize,
responseUnpacker, byteCounterMonitor, requestMonitor, entryReader ); responseUnpacker, byteCounterMonitor, requestMonitor, entryReader );
this.lockReadTimeoutMillis = lockReadTimeoutMillis; this.lockReadTimeoutMillis = lockReadTimeoutMillis;
this.requestTypes = new HaRequestType210( entryReader ); this.requestTypes = new HaRequestType210( entryReader, createLockResultSerializer() );
this.lockResultDeserializer = createLockResultDeserializer();
} }


@Override @Override
Expand All @@ -96,6 +163,18 @@ public ProtocolVersion getProtocolVersion()
return PROTOCOL_VERSION; return PROTOCOL_VERSION;
} }


@Override
public ObjectSerializer<LockResult> createLockResultSerializer()
{
return LOCK_RESULT_OBJECT_SERIALIZER;
}

@Override
public Deserializer<LockResult> createLockResultDeserializer()
{
return LOCK_RESULT_DESERIALIZER;
}

@Override @Override
protected long getReadTimeout( RequestType<Master> type, long readTimeout ) protected long getReadTimeout( RequestType<Master> type, long readTimeout )
{ {
Expand Down Expand Up @@ -163,15 +242,15 @@ public Response<LockResult> acquireSharedLock(
RequestContext context, ResourceType type, long... resourceIds ) RequestContext context, ResourceType type, long... resourceIds )
{ {
return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_SHARED_LOCK ), context, return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_SHARED_LOCK ), context,
new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); new AcquireLockSerializer( type, resourceIds ), lockResultDeserializer );
} }


@Override @Override
public Response<LockResult> acquireExclusiveLock( public Response<LockResult> acquireExclusiveLock(
RequestContext context, ResourceType type, long... resourceIds ) RequestContext context, ResourceType type, long... resourceIds )
{ {
return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_EXCLUSIVE_LOCK ), context, return sendRequest( requestTypes.type( HaRequestTypes.Type.ACQUIRE_EXCLUSIVE_LOCK ), context,
new AcquireLockSerializer( type, resourceIds ), LOCK_RESULT_DESERIALIZER ); new AcquireLockSerializer( type, resourceIds ), lockResultDeserializer );
} }


@Override @Override
Expand Down
Expand Up @@ -19,13 +19,6 @@
*/ */
package org.neo4j.kernel.ha.com.slave; package org.neo4j.kernel.ha.com.slave;


import org.jboss.netty.buffer.ChannelBuffer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;

import org.neo4j.com.ComExceptionHandler; import org.neo4j.com.ComExceptionHandler;
import org.neo4j.com.Deserializer; import org.neo4j.com.Deserializer;
import org.neo4j.com.ObjectSerializer; import org.neo4j.com.ObjectSerializer;
Expand All @@ -34,77 +27,13 @@
import org.neo4j.com.Response; import org.neo4j.com.Response;
import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler; import org.neo4j.com.storecopy.ResponseUnpacker.TxHandler;
import org.neo4j.com.storecopy.StoreWriter; import org.neo4j.com.storecopy.StoreWriter;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.ha.MasterClient320; import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.lock.LockResult; import org.neo4j.kernel.ha.lock.LockResult;
import org.neo4j.kernel.ha.lock.LockStatus;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.util.HexPrinter;

import static java.lang.String.format;
import static org.neo4j.com.Protocol.readString;
import static org.neo4j.com.Protocol.writeString;


public interface MasterClient extends Master public interface MasterClient extends Master
{ {
static final ObjectSerializer<LockResult> LOCK_SERIALIZER = new ObjectSerializer<LockResult>()
{
@Override
public void write( LockResult responseObject, ChannelBuffer result ) throws IOException
{
result.writeByte( responseObject.getStatus().ordinal() );
if ( responseObject.getStatus().hasMessage() )
{
writeString( result, responseObject.getMessage() );
}
}
};

static final Deserializer<LockResult> LOCK_RESULT_DESERIALIZER = new Deserializer<LockResult>()
{
@Override
public LockResult read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOException
{
byte statusOrdinal = buffer.readByte();
LockStatus status;
try
{
status = LockStatus.values()[statusOrdinal];
}
catch ( ArrayIndexOutOfBoundsException e )
{
int maxBytesToPrint = 1024*40;
throw Exceptions.withMessage( e, format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s",
e.getMessage(), statusOrdinal, maxBytesToPrint, beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) );
}
return status.hasMessage() ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) ) : new LockResult( status );
}

private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint )
{
// read buffer from pos 0 - writeIndex
int prevIndex = buffer.readerIndex();
buffer.readerIndex( 0 );
try
{
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() );
PrintStream stream = new PrintStream( byteArrayStream );
HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 );
for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ )
{
printer.append( buffer.readByte() );
}
stream.flush();
return byteArrayStream.toString();
}
finally
{
buffer.readerIndex( prevIndex );
}
}
};

public static final ProtocolVersion CURRENT = MasterClient320.PROTOCOL_VERSION; public static final ProtocolVersion CURRENT = MasterClient320.PROTOCOL_VERSION;


@Override @Override
Expand All @@ -127,4 +56,8 @@ private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesT
public void setComExceptionHandler( ComExceptionHandler handler ); public void setComExceptionHandler( ComExceptionHandler handler );


public ProtocolVersion getProtocolVersion(); public ProtocolVersion getProtocolVersion();

ObjectSerializer<LockResult> createLockResultSerializer();

Deserializer<LockResult> createLockResultDeserializer();
} }
Expand Up @@ -24,16 +24,4 @@ public enum LockStatus
OK_LOCKED, OK_LOCKED,
NOT_LOCKED, NOT_LOCKED,
DEAD_LOCKED DEAD_LOCKED
{
@Override
public boolean hasMessage()
{
return true;
}
};

public boolean hasMessage()
{
return false;
}
} }

0 comments on commit 77d3630

Please sign in to comment.