Skip to content

Commit

Permalink
Make LockResult have a message independent of its LockStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Dec 14, 2016
1 parent ed8c0dc commit d363df2
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 125 deletions.
26 changes: 26 additions & 0 deletions enterprise/com/src/main/java/org/neo4j/com/Client.java
Expand Up @@ -30,6 +30,8 @@
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.queue.BlockingReadHandler;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -46,6 +48,7 @@
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.util.HexPrinter;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -432,6 +435,29 @@ private static BlockingReadHandler<ChannelBuffer> extractBlockingReadHandler( Ch
return (BlockingReadHandler<ChannelBuffer>) pipeline.get( BLOCKING_CHANNEL_HANDLER_NAME );
}

protected static String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint )
{
// read buffer from pos 0 - writeIndex
int prevIndex = buffer.readerIndex();
buffer.readerIndex( 0 );
try
{
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() );
PrintStream stream = new PrintStream( byteArrayStream );
HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 );
for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ )
{
printer.append( buffer.readByte() );
}
stream.flush();
return byteArrayStream.toString();
}
finally
{
buffer.readerIndex( prevIndex );
}
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -21,10 +21,7 @@

import org.jboss.netty.buffer.ChannelBuffer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;

import org.neo4j.com.Client;
import org.neo4j.com.Deserializer;
Expand Down Expand Up @@ -53,7 +50,6 @@
import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.impl.util.HexPrinter;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.lock.ResourceType;
Expand Down Expand Up @@ -85,53 +81,32 @@ public class MasterClient214 extends Client<Master> implements MasterClient
}
};

public static final Deserializer<LockResult> LOCK_RESULT_DESERIALIZER = new Deserializer<LockResult>()
public static final Deserializer<LockResult> LOCK_RESULT_DESERIALIZER = ( buffer, temporaryBuffer ) ->
{
@Override
public LockResult read( ChannelBuffer buffer, ByteBuffer temporaryBuffer ) throws IOException
byte statusOrdinal = buffer.readByte();
LockStatus status;
try
{
byte statusOrdinal = buffer.readByte();
LockStatus status;
try
{
status = LockStatus.values()[statusOrdinal];
}
catch ( ArrayIndexOutOfBoundsException e )
{
int maxBytesToPrint = 1024 * 40;
throw Exceptions.withMessage( e,
format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s",
e.getMessage(), statusOrdinal, maxBytesToPrint,
beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) );
}
return status == LockStatus.DEAD_LOCKED ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) )
: new LockResult( status );
status = LockStatus.values()[statusOrdinal];
}

private String beginningOfBufferAsHexString( ChannelBuffer buffer, int maxBytesToPrint )
catch ( ArrayIndexOutOfBoundsException e )
{
// read buffer from pos 0 - writeIndex
int prevIndex = buffer.readerIndex();
buffer.readerIndex( 0 );
try
{
ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream( buffer.readableBytes() );
PrintStream stream = new PrintStream( byteArrayStream );
HexPrinter printer = new HexPrinter( stream ).withLineNumberDigits( 4 );
for ( int i = 0; buffer.readable() && i < maxBytesToPrint; i++ )
{
printer.append( buffer.readByte() );
}
stream.flush();
return byteArrayStream.toString();
}
finally
{
buffer.readerIndex( prevIndex );
}
throw withInvalidOrdinalMessage( buffer, statusOrdinal, e );
}
return status == LockStatus.DEAD_LOCKED ? new LockResult( LockStatus.DEAD_LOCKED, readString( buffer ) )
: new LockResult( status );
};

protected static ArrayIndexOutOfBoundsException withInvalidOrdinalMessage(
ChannelBuffer buffer, byte statusOrdinal, ArrayIndexOutOfBoundsException e )
{
int maxBytesToPrint = 1024 * 40;
return Exceptions.withMessage( e,
format( "%s | read invalid ordinal %d. First %db of this channel buffer is:%n%s",
e.getMessage(), statusOrdinal, maxBytesToPrint,
beginningOfBufferAsHexString( buffer, maxBytesToPrint ) ) );
}

private final long lockReadTimeoutMillis;
private final HaRequestTypes requestTypes;
private final Deserializer<LockResult> lockResultDeserializer;
Expand Down
Expand Up @@ -36,16 +36,6 @@

public class MasterClient310 extends MasterClient214
{
/* Version 1 first version
* Version 2 since 2012-01-24
* Version 3 since 2012-02-16
* Version 4 since 2012-07-05
* Version 5 since ?
* Version 6 since 2014-01-07
* Version 7 since 2014-03-18
* Version 8 since 2014-08-27
* Version 9 since 3.1.0, 2016-09-20
*/
public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 9, INTERNAL_PROTOCOL_VERSION );

public MasterClient310( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
Expand Down
Expand Up @@ -19,17 +19,23 @@
*/
package org.neo4j.kernel.ha;

import org.neo4j.com.Deserializer;
import org.neo4j.com.ObjectSerializer;
import org.neo4j.com.Protocol;
import org.neo4j.com.Protocol320;
import org.neo4j.com.ProtocolVersion;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.com.storecopy.ResponseUnpacker;
import org.neo4j.kernel.ha.lock.LockResult;
import org.neo4j.kernel.ha.lock.LockStatus;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
import org.neo4j.logging.LogProvider;

import static org.neo4j.com.Protocol.readString;
import static org.neo4j.com.Protocol.writeString;
import static org.neo4j.com.ProtocolVersion.INTERNAL_PROTOCOL_VERSION;

public class MasterClient320 extends MasterClient310
Expand All @@ -47,6 +53,44 @@ public class MasterClient320 extends MasterClient310
*/
public static final ProtocolVersion PROTOCOL_VERSION = new ProtocolVersion( (byte) 10, INTERNAL_PROTOCOL_VERSION );

// From 3.2.0 and onwards, LockResult messages can have messages, or not, independently of their LockStatus.
public static final ObjectSerializer<LockResult> LOCK_RESULT_OBJECT_SERIALIZER = ( responseObject, result ) ->
{
result.writeByte( responseObject.getStatus().ordinal() );
String message = responseObject.getMessage();
if ( message != null )
{
writeString( result, message );
}
else
{
result.writeInt( 0 );
}
};

public static final Deserializer<LockResult> LOCK_RESULT_DESERIALIZER = ( buffer, temporaryBuffer ) ->
{
byte statusOrdinal = buffer.readByte();
int messageLength = buffer.readInt();
LockStatus status;
try
{
status = LockStatus.values()[statusOrdinal];
}
catch ( ArrayIndexOutOfBoundsException e )
{
throw withInvalidOrdinalMessage( buffer, statusOrdinal, e );
}
if ( messageLength > 0 )
{
return new LockResult( status, readString( buffer, messageLength ) );
}
else
{
return new LockResult( status );
}
};

public MasterClient320( String destinationHostNameOrIp, int destinationPort, String originHostNameOrIp,
LogProvider logProvider, StoreId storeId,
long readTimeoutMillis, long lockReadTimeout, int maxConcurrentChannels, int chunkSize,
Expand All @@ -66,6 +110,18 @@ protected Protocol createProtocol( int chunkSize, byte applicationProtocolVersio
return new Protocol320( chunkSize, applicationProtocolVersion, getInternalProtocolVersion() );
}

@Override
public ObjectSerializer<LockResult> createLockResultSerializer()
{
return LOCK_RESULT_OBJECT_SERIALIZER;
}

@Override
public Deserializer<LockResult> createLockResultDeserializer()
{
return LOCK_RESULT_DESERIALIZER;
}

@Override
public ProtocolVersion getProtocolVersion()
{
Expand Down
Expand Up @@ -316,7 +316,9 @@ public Response<LockResult> acquireExclusiveLock( RequestContext context, Resour
}
catch ( NoSuchEntryException | ConcurrentAccessException e)
{
return spi.packTransactionObligationResponse( context, new LockResult( LockStatus.NOT_LOCKED, "Unable to acquire exclusive lock: " + e.getMessage() ) );
return spi.packTransactionObligationResponse( context, new LockResult(
LockStatus.NOT_LOCKED,
"Unable to acquire exclusive lock: " + e.getMessage() ) );
}
try
{
Expand All @@ -328,11 +330,15 @@ public Response<LockResult> acquireExclusiveLock( RequestContext context, Resour
}
catch ( DeadlockDetectedException e )
{
return spi.packTransactionObligationResponse( context, new LockResult( LockStatus.DEAD_LOCKED,"Can't acquire exclusive lock, because it would have caused a deadlock: " + e.getMessage() ) );
return spi.packTransactionObligationResponse( context, new LockResult(
LockStatus.DEAD_LOCKED,
"Can't acquire exclusive lock, because it would have caused a deadlock: " + e.getMessage() ) );
}
catch ( IllegalResourceException e )
{
return spi.packTransactionObligationResponse( context, new LockResult( LockStatus.NOT_LOCKED ) );
return spi.packTransactionObligationResponse( context, new LockResult(
LockStatus.NOT_LOCKED,
"Attempted to lock illegal resource: " + e.getMessage() ) );
}
finally
{
Expand All @@ -352,7 +358,9 @@ public Response<LockResult> acquireSharedLock( RequestContext context, ResourceT
}
catch ( NoSuchEntryException | ConcurrentAccessException e)
{
return spi.packTransactionObligationResponse( context, new LockResult( LockStatus.NOT_LOCKED, "Unable to acquire shared lock: " + e.getMessage() ) );
return spi.packTransactionObligationResponse( context, new LockResult(
LockStatus.NOT_LOCKED,
"Unable to acquire shared lock: " + e.getMessage() ) );
}
try
{
Expand All @@ -369,7 +377,9 @@ public Response<LockResult> acquireSharedLock( RequestContext context, ResourceT
}
catch ( IllegalResourceException e )
{
return spi.packTransactionObligationResponse( context, new LockResult( LockStatus.NOT_LOCKED ) );
return spi.packTransactionObligationResponse( context, new LockResult(
LockStatus.NOT_LOCKED,
"Attempted to lock illegal resource: " + e.getMessage() ) );
}
finally
{
Expand All @@ -388,12 +398,7 @@ public Map<Integer, Collection<RequestContext>> getOngoingTransactions()
Set<RequestContext> contexts = conversationManager.getActiveContexts();
for ( RequestContext context : contexts.toArray( new RequestContext[contexts.size()] ) )
{
Collection<RequestContext> txs = result.get( context.machineId() );
if ( txs == null )
{
txs = new ArrayList<>();
result.put( context.machineId(), txs );
}
Collection<RequestContext> txs = result.computeIfAbsent( context.machineId(), k -> new ArrayList<>() );
txs.add( context );
}
return result;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.com.TxChecksumVerifier;
import org.neo4j.com.monitor.RequestMonitor;
import org.neo4j.kernel.ha.HaRequestType210;
import org.neo4j.kernel.ha.MasterClient214;
import org.neo4j.kernel.ha.MasterClient320;
import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel;
import org.neo4j.kernel.impl.transaction.log.entry.LogEntryReader;
import org.neo4j.kernel.monitoring.ByteCounterMonitor;
Expand All @@ -53,7 +53,7 @@ public MasterServer( Master requestTarget, LogProvider logProvider, Configuratio
super( requestTarget, config, logProvider, FRAME_LENGTH, CURRENT, txVerifier,
Clocks.systemClock(), byteCounterMonitor, requestMonitor );
this.conversationManager = conversationManager;
this.requestTypes = new HaRequestType210( entryReader, MasterClient214.LOCK_RESULT_OBJECT_SERIALIZER );
this.requestTypes = new HaRequestType210( entryReader, MasterClient320.LOCK_RESULT_OBJECT_SERIALIZER );
}

@Override
Expand Down

0 comments on commit d363df2

Please sign in to comment.