Skip to content

Commit

Permalink
Flush the same log channel as the one written to
Browse files Browse the repository at this point in the history
Since forcing the log channel was not synchronized with the preceding
writing of the buffer to the channel, it was possible that the force of
the channel ended up forcing the next channel. In order to make it
easier to reason about the forcing of the channel we introduce a
Flushable object that we obtain under said sycnronization but flush
outside of it. This gives us the best of both worlds, shorter
synchronized regions, but the ability to reason about what we force.

Note that since the previous API called this method flush() but the new
API uses the existing java.io.Flushable interface, which defines a
method called flush() this description uses these two words almost
interchangeably, but uses force to refer to previous behaviour and
flush when talking about new behaviour. The implementation of flush()
is to call the same underlying force-method as the previous force()
method called, but binds directly to that object instead of through
a volatile lookup.
  • Loading branch information
thobe committed Jul 10, 2015
1 parent ba5cfcb commit 042d187
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 66 deletions.
Expand Up @@ -132,4 +132,10 @@ public void close() throws IOException
{
throw new UnsupportedOperationException();
}

@Override
public void flush() throws IOException
{
force( false );
}
}
3 changes: 2 additions & 1 deletion community/io/src/main/java/org/neo4j/io/fs/StoreChannel.java
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.io.fs;

import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
Expand All @@ -28,7 +29,7 @@
import java.nio.channels.SeekableByteChannel;

public interface StoreChannel
extends SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel, InterruptibleChannel
extends Flushable, SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel, InterruptibleChannel
{
/**
* Attempts to acquire an exclusive lock on this channel's file.
Expand Down
Expand Up @@ -164,4 +164,10 @@ public long size() throws IOException
{
return channel.size();
}

@Override
public void flush() throws IOException
{
force( false );
}
}
Expand Up @@ -197,4 +197,10 @@ public long size() throws IOException
adversary.injectFailure( IOException.class );
return delegate.size();
}

@Override
public void flush() throws IOException
{
force( false );
}
}
Expand Up @@ -125,4 +125,10 @@ public StoreChannel position( long newPosition ) throws IOException
delegate.position( newPosition );
return this;
}

@Override
public void flush() throws IOException
{
delegate.flush();
}
}
Expand Up @@ -148,4 +148,10 @@ public void close() throws IOException
{
inner.close();
}

@Override
public void flush() throws IOException
{
inner.flush();
}
}
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.io.Flushable;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -387,15 +388,16 @@ public void force() throws IOException
// don't append while we're doing that. The way rotation is coordinated we can't synchronize
// on logFile because it would cause deadlocks. Synchronizing on writer assumes that appenders
// also synchronize on writer.
Flushable flushable;
synchronized ( logFile )
{
writer.emptyBufferIntoChannelAndClearIt();
flushable = writer.emptyBufferIntoChannelAndClearIt();
}
// Force the writer outside of the lock.
// This allows multiple threads forcing at the same time to piggy-back onto one another.
// This allows other threads access to the buffer while the writer is being forced.
try
{
writer.force();
flushable.flush();
}
catch ( ClosedChannelException ignored )
{
Expand Down
Expand Up @@ -205,4 +205,10 @@ public int hashCode()
result = 31 * result + (int) (version ^ (version >>> 32));
return result;
}

@Override
public void flush() throws IOException
{
force( false );
}
}
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
Expand All @@ -45,19 +46,6 @@ public PhysicalWritableLogChannel( LogVersionedStoreChannel channel, int bufferS
this.buffer = ByteBuffer.allocate( bufferSize );
}

@Override
public void force() throws IOException
{
try
{
channel.force( false );
}
catch ( ClosedChannelException e )
{
handleClosedChannelException( e );
}
}

void setChannel( LogVersionedStoreChannel channel )
{
this.channel = channel;
Expand All @@ -68,9 +56,10 @@ void setChannel( LogVersionedStoreChannel channel )
* Currently that's done by acquiring the PhysicalLogFile monitor.
*/
@Override
public void emptyBufferIntoChannelAndClearIt() throws IOException
public Flushable emptyBufferIntoChannelAndClearIt() throws IOException
{
buffer.flip();
LogVersionedStoreChannel channel = this.channel;
try
{
channel.write( buffer );
Expand All @@ -80,6 +69,7 @@ public void emptyBufferIntoChannelAndClearIt() throws IOException
handleClosedChannelException( e );
}
buffer.clear();
return channel;
}

private void handleClosedChannelException( ClosedChannelException e ) throws ClosedChannelException
Expand Down
Expand Up @@ -20,20 +20,15 @@
package org.neo4j.kernel.impl.transaction.log;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;

public interface WritableLogChannel extends PositionAwareChannel, Closeable
{
/**
* Writes any changes not present in the channel yet and clears the buffer.
*/
void emptyBufferIntoChannelAndClearIt() throws IOException;

/**
* Forces the data that has already been written to the underlying channel, down to disk.
* Must not write any data to the channel.
*/
void force() throws IOException;
Flushable emptyBufferIntoChannelAndClearIt() throws IOException;

WritableLogChannel put( byte value ) throws IOException;

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.junit.Rule;
import org.junit.Test;

import java.io.Flushable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -90,37 +91,38 @@ public static void tearDownExecutor()
@Before
public void setUp()
{
WritableLogChannel channel = new InMemoryLogChannel()
class Channel extends InMemoryLogChannel implements Flushable
{
@Override
public void force() throws IOException
public Flushable emptyBufferIntoChannelAndClearIt()
{
try
{
forceSemaphore.release();
channelCommandQueue.put( ChannelCommand.force );
channelCommandQueue.put( ChannelCommand.emptyBufferIntoChannelAndClearIt );
}
catch ( InterruptedException e )
{
throw new IOException( e );
throw new RuntimeException( e );
}
return this;
}

@Override
public void emptyBufferIntoChannelAndClearIt()
public void flush() throws IOException
{
try
{
channelCommandQueue.put( ChannelCommand.emptyBufferIntoChannelAndClearIt );
forceSemaphore.release();
channelCommandQueue.put( ChannelCommand.force );
}
catch ( InterruptedException e )
{
throw new RuntimeException( e );
throw new IOException( e );
}
}
};

when( logFile.getWriter() ).thenReturn( channel );
when( logFile.getWriter() ).thenReturn( new Channel() );
}

private Runnable createForceAfterAppendRunnable( final BatchingTransactionAppender appender )
Expand Down
Expand Up @@ -22,7 +22,10 @@
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -68,6 +71,7 @@
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -257,7 +261,17 @@ public void shouldNotCallTransactionCommittedOnFailedForceLogToDisk() throws Exc
String failureMessage = "Forces a failure";
WritableLogChannel channel = spy( new InMemoryLogChannel() );
IOException failure = new IOException( failureMessage );
doThrow( failure ).when( channel ).force();
final Flushable flushable = mock( Flushable.class );
doAnswer( new Answer<Flushable>()
{
@Override
public Flushable answer( InvocationOnMock invocation ) throws Throwable
{
invocation.callRealMethod();
return flushable;
}
} ).when( channel ).emptyBufferIntoChannelAndClearIt();
doThrow( failure ).when( flushable ).flush();
LogFile logFile = mock( LogFile.class );
when( logFile.getWriter() ).thenReturn( channel );
TransactionMetadataCache metadataCache = new TransactionMetadataCache( 10, 10 );
Expand Down Expand Up @@ -390,7 +404,9 @@ public void shouldBeAbleToWriteACheckPoint() throws Throwable
transactionIdStore, BYPASS, kernelHealth );

WritableLogChannel channel = mock( WritableLogChannel.class, RETURNS_MOCKS );
when( channel.putLong( anyLong() )).thenReturn( channel );
Flushable flushable = mock( Flushable.class );
when( channel.emptyBufferIntoChannelAndClearIt() ).thenReturn( flushable );
when( channel.putLong( anyLong() ) ).thenReturn( channel );
when( logFile.getWriter() ).thenReturn( channel );

appender.start();
Expand All @@ -402,7 +418,7 @@ public void shouldBeAbleToWriteACheckPoint() throws Throwable
verify( channel, times( 1 ) ).putLong( 1l );
verify( channel, times( 1 ) ).putLong( 2l );
verify( channel, times( 1 ) ).emptyBufferIntoChannelAndClearIt();
verify( channel, times( 1 ) ).force();
verify( flushable, times( 1 ) ).flush();
verifyZeroInteractions( kernelHealth );
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.impl.transaction.log;

import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -27,6 +28,13 @@

public class InMemoryLogChannel implements WritableLogChannel, ReadableLogChannel
{
private static final Flushable NO_OP_FLUSHABLE = new Flushable()
{
@Override
public void flush() throws IOException
{
}
};
private final byte[] bytes = new byte[1000];
private final ByteBuffer asWriter = ByteBuffer.wrap( bytes );
private final ByteBuffer asReader = ByteBuffer.wrap( bytes );
Expand Down Expand Up @@ -87,11 +95,6 @@ public InMemoryLogChannel put( byte[] bytes, int length ) throws IOException
return this;
}

@Override
public void force() throws IOException
{
}

public StoreChannel getFileChannel()
{
throw new UnsupportedOperationException();
Expand All @@ -108,8 +111,9 @@ public void close() throws IOException
}

@Override
public void emptyBufferIntoChannelAndClearIt()
public Flushable emptyBufferIntoChannelAndClearIt()
{
return NO_OP_FLUSHABLE;
}

@Override
Expand Down
Expand Up @@ -96,8 +96,7 @@ public void shouldWriteSomeDataIntoTheLog() throws Exception
long longValue = 4854587;
writer.putInt( intValue );
writer.putLong( longValue );
writer.emptyBufferIntoChannelAndClearIt();
writer.force();
writer.emptyBufferIntoChannelAndClearIt().flush();

// THEN
try ( ReadableLogChannel reader = logFile.getReader( positionMarker.newPosition() ) )
Expand Down Expand Up @@ -138,15 +137,13 @@ transactionIdStore, logVersionRepository, mock( Monitor.class ),
writer.putInt( intValue );
writer.putLong( longValue );
writer.put( someBytes, someBytes.length );
writer.emptyBufferIntoChannelAndClearIt();
writer.force();
writer.emptyBufferIntoChannelAndClearIt().flush();
writer.getCurrentPosition( positionMarker );
LogPosition position2 = positionMarker.newPosition();
long longValue2 = 123456789L;
writer.putLong( longValue2 );
writer.put( someBytes, someBytes.length );
writer.emptyBufferIntoChannelAndClearIt();
writer.force();
writer.emptyBufferIntoChannelAndClearIt().flush();

// THEN
try ( ReadableLogChannel reader = logFile.getReader( position1 ) )
Expand Down
Expand Up @@ -140,8 +140,7 @@ public void shouldWriteThroughRotation() throws Exception
channel.putShort( shortValue );
channel.putInt( intValue );
channel.putLong( longValue );
channel.emptyBufferIntoChannelAndClearIt();
channel.force();
channel.emptyBufferIntoChannelAndClearIt().flush();

// "Rotate" and continue
storeChannel = fs.open( secondFile, "rw" );
Expand Down Expand Up @@ -213,16 +212,6 @@ public void shouldThrowIllegalStateExceptionAfterClosed() throws Exception
{
// THEN we should get an IllegalStateException, not a ClosedChannelException
}

try
{
channel.force();
fail( "Should have thrown exception" );
}
catch ( IllegalStateException e )
{
// THEN we should get an IllegalStateException, not a ClosedChannelException
}
}

@Test
Expand Down

0 comments on commit 042d187

Please sign in to comment.