Skip to content

Commit

Permalink
Introduce a way to close cleanly raft log segments
Browse files Browse the repository at this point in the history
Before these changes the store channels, reader pools, and writers
were abruptaly closed on db shutdown.
  • Loading branch information
davidegrohmann committed May 24, 2016
1 parent 854461c commit 15737fa
Show file tree
Hide file tree
Showing 11 changed files with 464 additions and 244 deletions.
Expand Up @@ -33,11 +33,6 @@ public interface ReadableRaftLog
*/ */
long prevIndex(); long prevIndex();


// /**
// * @return The index of the last committed entry.
// */
// long commitIndex();

/** /**
* Reads the term associated with the entry at the supplied index. * Reads the term associated with the entry at the supplied index.
* *
Expand Down
Expand Up @@ -95,8 +95,7 @@ public PhysicalRaftLogFile( FileSystemAbstraction fileSystem, PhysicalRaftLogFil
public void init() throws IOException public void init() throws IOException
{ {
// Make sure at least a bare bones log file is available before recovery // Make sure at least a bare bones log file is available before recovery
channel = openLogChannelForVersion( logFiles.getHighestLogVersion() ); openLogChannelForVersion( logFiles.getHighestLogVersion() ).close();
channel.close();
} }


@Override @Override
Expand Down
Expand Up @@ -32,7 +32,7 @@
* segment to the next in a transparent manner. It can thus be mainly viewed as a factory for a * segment to the next in a transparent manner. It can thus be mainly viewed as a factory for a
* smart segment-crossing cursor. * smart segment-crossing cursor.
*/ */
class EntryStore class EntryStore implements AutoCloseable
{ {
private Segments segments; private Segments segments;


Expand Down Expand Up @@ -123,4 +123,10 @@ public EntryRecord get()
} }
}; };
} }

@Override
public void close() throws DisposedException
{
segments.close();
}
} }
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.storageengine.api.ReadPastEndException; import org.neo4j.storageengine.api.ReadPastEndException;
import org.neo4j.storageengine.api.WritableChannel; import org.neo4j.storageengine.api.WritableChannel;


import static java.lang.String.format;
import static org.neo4j.coreedge.raft.log.EntryRecord.read; import static org.neo4j.coreedge.raft.log.EntryRecord.read;


/** /**
Expand All @@ -49,8 +50,9 @@
* *
* Concurrent reading is thread-safe. * Concurrent reading is thread-safe.
*/ */
class SegmentFile class SegmentFile implements AutoCloseable
{ {
static final String CLOSED_ERROR_MESSAGE = "segment file '%s' is closed";
private static final SegmentHeader.Marshal headerMarshal = new SegmentHeader.Marshal(); private static final SegmentHeader.Marshal headerMarshal = new SegmentHeader.Marshal();


private final Log log; private final Log log;
Expand All @@ -65,13 +67,10 @@ class SegmentFile
private boolean markedForDisposal; private boolean markedForDisposal;
private Runnable onDisposal; private Runnable onDisposal;
private volatile boolean isDisposed; private volatile boolean isDisposed;
private volatile boolean closed;


SegmentFile( SegmentFile( FileSystemAbstraction fileSystem, File file, ChannelMarshal<ReplicatedContent> contentMarshal,
FileSystemAbstraction fileSystem, LogProvider logProvider, SegmentHeader header )
File file,
ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider,
SegmentHeader header )
{ {
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.file = file; this.file = file;
Expand All @@ -82,12 +81,9 @@ class SegmentFile
readerPool = new StoreChannelPool( fileSystem, file, "r", logProvider ); readerPool = new StoreChannelPool( fileSystem, file, "r", logProvider );
} }


static SegmentFile create( static SegmentFile create( FileSystemAbstraction fileSystem, File file,
FileSystemAbstraction fileSystem, ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header )
File file, throws IOException
ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider,
SegmentHeader header ) throws IOException
{ {
SegmentFile segment = new SegmentFile( fileSystem, file, contentMarshal, logProvider, header ); SegmentFile segment = new SegmentFile( fileSystem, file, contentMarshal, logProvider, header );


Expand Down Expand Up @@ -159,6 +155,11 @@ private LogPosition findCachedStartingPosition( long offsetIndex )


private PhysicalFlushableChannel getOrCreateWriter() throws IOException private PhysicalFlushableChannel getOrCreateWriter() throws IOException
{ {
if ( closed )
{
throw new RuntimeException( format( CLOSED_ERROR_MESSAGE, file ) );
}

if ( bufferedWriter == null ) if ( bufferedWriter == null )
{ {
StoreChannel channel = fileSystem.open( file, "rw" ); StoreChannel channel = fileSystem.open( file, "rw" );
Expand All @@ -173,7 +174,7 @@ private PhysicalFlushableChannel getOrCreateWriter() throws IOException
*/ */
WritableChannel writer() throws IOException, DisposedException WritableChannel writer() throws IOException, DisposedException
{ {
if( markedForDisposal ) if ( markedForDisposal )
{ {
throw new DisposedException(); throw new DisposedException();
} }
Expand Down Expand Up @@ -223,7 +224,7 @@ void flush() throws IOException
*/ */
void markForDisposal( Runnable onDisposal ) throws DisposedException void markForDisposal( Runnable onDisposal ) throws DisposedException
{ {
if( markedForDisposal ) if ( markedForDisposal )
{ {
throw new DisposedException(); throw new DisposedException();
} }
Expand All @@ -236,7 +237,7 @@ void markForDisposal( Runnable onDisposal ) throws DisposedException


private synchronized void checkFullDisposal() private synchronized void checkFullDisposal()
{ {
if ( bufferedWriter == null && readerPool.isDisposed() ) if ( markedForDisposal && bufferedWriter == null && readerPool.isDisposed() )
{ {
isDisposed = true; isDisposed = true;
onDisposal.run(); onDisposal.run();
Expand All @@ -262,4 +263,17 @@ public long size()
{ {
return fileSystem.getFileSize( file ); return fileSystem.getFileSize( file );
} }

@Override
public void close() throws DisposedException
{
if ( closed )
{
throw new RuntimeException( format( CLOSED_ERROR_MESSAGE, file ) );
}

closed = true;
closeWriter();
readerPool.close();
}
} }
Expand Up @@ -101,6 +101,12 @@ public synchronized void start() throws IOException, RaftLogCompactedException,
entryStore = new EntryStore( state.segments ); entryStore = new EntryStore( state.segments );
} }


@Override
public void shutdown() throws DisposedException
{
entryStore.close();
}

@Override @Override
public synchronized long append( RaftLogEntry entry ) throws IOException public synchronized long append( RaftLogEntry entry ) throws IOException
{ {
Expand Down
Expand Up @@ -41,7 +41,7 @@
/** /**
* Keeps track of all the segments that the RAFT log consists of. * Keeps track of all the segments that the RAFT log consists of.
*/ */
class Segments class Segments implements AutoCloseable
{ {
private final OpenEndRangeMap<Long/*minIndex*/,SegmentFile> rangeMap = new OpenEndRangeMap<>(); private final OpenEndRangeMap<Long/*minIndex*/,SegmentFile> rangeMap = new OpenEndRangeMap<>();
private final List<SegmentFile> segmentFiles; private final List<SegmentFile> segmentFiles;
Expand Down Expand Up @@ -248,4 +248,33 @@ public ListIterator<SegmentFile> getSegmentFileIteratorAtEnd()
{ {
return segmentFiles.listIterator( segmentFiles.size() ); return segmentFiles.listIterator( segmentFiles.size() );
} }

@Override
public void close() throws DisposedException
{
RuntimeException error = null;
for ( SegmentFile segment : segmentFiles )
{
try
{
segment.close();
}
catch ( RuntimeException ex )
{
if ( error == null )
{
error = ex;
}
else
{
error.addSuppressed( ex );
}
}
}

if ( error != null )
{
throw error;
}
}
} }
Expand Up @@ -32,8 +32,10 @@
/** /**
* Keeps a pool of store channels available. * Keeps a pool of store channels available.
*/ */
class StoreChannelPool class StoreChannelPool implements AutoCloseable
{ {
static final String CLOSED_ERROR_MESSAGE = "store channel pool is closed";

private final FileSystemAbstraction fsa; private final FileSystemAbstraction fsa;
private final File file; private final File file;
private final String mode; private final String mode;
Expand All @@ -44,13 +46,13 @@ class StoreChannelPool
private boolean markedForDisposal; private boolean markedForDisposal;
private Runnable onDisposal; private Runnable onDisposal;
private boolean disposed; private boolean disposed;
private boolean closed;


StoreChannelPool( FileSystemAbstraction fsa, File file, String mode, LogProvider logProvider ) StoreChannelPool( FileSystemAbstraction fsa, File file, String mode, LogProvider logProvider )
{ {
this.fsa = fsa; this.fsa = fsa;
this.file = file; this.file = file;
this.mode = mode; this.mode = mode;

this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
} }


Expand All @@ -62,7 +64,11 @@ private StoreChannel create() throws IOException


synchronized StoreChannel acquire( long byteOffset ) throws IOException, DisposedException synchronized StoreChannel acquire( long byteOffset ) throws IOException, DisposedException
{ {
if( markedForDisposal ) if ( closed )
{
throw new RuntimeException( CLOSED_ERROR_MESSAGE );
}
if ( markedForDisposal )
{ {
throw new DisposedException(); throw new DisposedException();
} }
Expand All @@ -78,6 +84,10 @@ synchronized StoreChannel acquire( long byteOffset ) throws IOException, Dispose


synchronized void release( StoreChannel channel ) synchronized void release( StoreChannel channel )
{ {
if ( closed )
{
throw new RuntimeException( CLOSED_ERROR_MESSAGE );
}
channels.addFirst( channel ); channels.addFirst( channel );
checkForDisposal(); checkForDisposal();
} }
Expand Down Expand Up @@ -109,7 +119,7 @@ private void closeAllChannels()


synchronized void markForDisposal( Runnable onDisposal ) throws DisposedException synchronized void markForDisposal( Runnable onDisposal ) throws DisposedException
{ {
if( markedForDisposal ) if ( markedForDisposal )
{ {
throw new DisposedException(); throw new DisposedException();
} }
Expand All @@ -124,4 +134,15 @@ boolean isDisposed()
{ {
return disposed; return disposed;
} }

@Override
public synchronized void close() throws DisposedException
{
if ( closed )
{
throw new RuntimeException( CLOSED_ERROR_MESSAGE );
}
closed = true;
closeAllChannels();
}
} }
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log.segmented;

import org.junit.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class EntryStoreTest
{
@Test
public void segmentsShouldBeClosedWhenEntryStoreIsClosed() throws Exception
{
Segments segments = mock( Segments.class );
new EntryStore( segments ).close();
verify( segments ).close();
}
}

0 comments on commit 15737fa

Please sign in to comment.