Skip to content

Commit

Permalink
Revert "Merge pull request #7185 from davidegrohmann/3.1-fix-flaky-te…
Browse files Browse the repository at this point in the history
…st-windows"

This reverts commit 8d38e4c, reversing
changes made to 85a0387.
  • Loading branch information
jimwebber committed May 23, 2016
1 parent 8d38e4c commit dd8bb6e
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 439 deletions.
Expand Up @@ -33,6 +33,11 @@ public interface ReadableRaftLog
*/
long prevIndex();

// /**
// * @return The index of the last committed entry.
// */
// long commitIndex();

/**
* Reads the term associated with the entry at the supplied index.
*
Expand Down
Expand Up @@ -95,7 +95,8 @@ public PhysicalRaftLogFile( FileSystemAbstraction fileSystem, PhysicalRaftLogFil
public void init() throws IOException
{
// Make sure at least a bare bones log file is available before recovery
openLogChannelForVersion( logFiles.getHighestLogVersion() ).close();
channel = openLogChannelForVersion( logFiles.getHighestLogVersion() );
channel.close();
}

@Override
Expand Down
Expand Up @@ -32,7 +32,7 @@
* segment to the next in a transparent manner. It can thus be mainly viewed as a factory for a
* smart segment-crossing cursor.
*/
class EntryStore implements AutoCloseable
class EntryStore
{
private Segments segments;

Expand Down Expand Up @@ -123,10 +123,4 @@ public EntryRecord get()
}
};
}

@Override
public void close() throws DisposedException
{
segments.close();
}
}
Expand Up @@ -43,13 +43,13 @@
/**
* Keeps track of a segment of the RAFT log, i.e. a consecutive set of entries.
* A segment can have several concurrent readers but just a single writer.
* <p>
*
* The single writer should perform all write and control operations,
* since these are not thread-safe.
* <p>
*
* Concurrent reading is thread-safe.
*/
class SegmentFile implements AutoCloseable
class SegmentFile
{
private static final SegmentHeader.Marshal headerMarshal = new SegmentHeader.Marshal();

Expand All @@ -66,8 +66,12 @@ class SegmentFile implements AutoCloseable
private Runnable onDisposal;
private volatile boolean isDisposed;

SegmentFile( FileSystemAbstraction fileSystem, File file, ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider, SegmentHeader header )
SegmentFile(
FileSystemAbstraction fileSystem,
File file,
ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider,
SegmentHeader header )
{
this.fileSystem = fileSystem;
this.file = file;
Expand All @@ -78,9 +82,12 @@ class SegmentFile implements AutoCloseable
readerPool = new StoreChannelPool( fileSystem, file, "r", logProvider );
}

static SegmentFile create( FileSystemAbstraction fileSystem, File file,
ChannelMarshal<ReplicatedContent> contentMarshal, LogProvider logProvider, SegmentHeader header )
throws IOException
static SegmentFile create(
FileSystemAbstraction fileSystem,
File file,
ChannelMarshal<ReplicatedContent> contentMarshal,
LogProvider logProvider,
SegmentHeader header ) throws IOException
{
SegmentFile segment = new SegmentFile( fileSystem, file, contentMarshal, logProvider, header );

Expand Down Expand Up @@ -166,7 +173,7 @@ private PhysicalFlushableChannel getOrCreateWriter() throws IOException
*/
WritableChannel writer() throws IOException, DisposedException
{
if ( markedForDisposal )
if( markedForDisposal )
{
throw new DisposedException();
}
Expand Down Expand Up @@ -216,7 +223,7 @@ void flush() throws IOException
*/
void markForDisposal( Runnable onDisposal ) throws DisposedException
{
if ( markedForDisposal )
if( markedForDisposal )
{
throw new DisposedException();
}
Expand Down Expand Up @@ -255,15 +262,4 @@ public long size()
{
return fileSystem.getFileSize( file );
}

@Override
public void close() throws DisposedException
{
closeWriter();
if ( !markedForDisposal )
{
markForDisposal( () -> {
} );
}
}
}
Expand Up @@ -101,12 +101,6 @@ public synchronized void start() throws IOException, RaftLogCompactedException,
entryStore = new EntryStore( state.segments );
}

@Override
public void shutdown() throws DisposedException
{
entryStore.close();
}

@Override
public synchronized long append( RaftLogEntry entry ) throws IOException
{
Expand Down
Expand Up @@ -41,7 +41,7 @@
/**
* Keeps track of all the segments that the RAFT log consists of.
*/
class Segments implements AutoCloseable
class Segments
{
private final OpenEndRangeMap<Long/*minIndex*/,SegmentFile> rangeMap = new OpenEndRangeMap<>();
private final List<SegmentFile> segmentFiles;
Expand Down Expand Up @@ -248,33 +248,4 @@ public ListIterator<SegmentFile> getSegmentFileIteratorAtEnd()
{
return segmentFiles.listIterator( segmentFiles.size() );
}

@Override
public void close() throws DisposedException
{
DisposedException error = null;
for ( SegmentFile segment : segmentFiles )
{
try
{
segment.close();
}
catch ( DisposedException ex )
{
if ( error == null )
{
error = ex;
}
else
{
error.addSuppressed( ex );
}
}
}

if ( error != null )
{
throw error;
}
}
}
Expand Up @@ -35,19 +35,18 @@
import org.neo4j.coreedge.raft.ReplicatedString;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.test.rule.TargetDirectory;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;

public abstract class ConcurrentStressIT<T extends RaftLog & Lifecycle>
public abstract class ConcurrentStressIT
{
private static final int MAX_CONTENT_SIZE = 2048;
@Rule
public final TargetDirectory.TestDirectory dir = TargetDirectory.testDirForTest( getClass() );

protected abstract T createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable;
protected abstract RaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable;

@Test
public void readAndWrite() throws Throwable
Expand All @@ -58,36 +57,30 @@ public void readAndWrite() throws Throwable
private void readAndWrite( int nReaders, int time, TimeUnit unit ) throws Throwable
{
DefaultFileSystemAbstraction fsa = new DefaultFileSystemAbstraction();
T raftLog = createRaftLog( fsa, dir.directory() );
RaftLog raftLog = createRaftLog( fsa, dir.directory() );

try
{
ExecutorService es = Executors.newCachedThreadPool();

Collection<Future<Long>> futures = new ArrayList<>();
ExecutorService es = Executors.newCachedThreadPool();

futures.add( es.submit( new TimedTask( () -> {
write( raftLog );
}, time, unit ) ) );
Collection<Future<Long>> futures = new ArrayList<>();

for ( int i = 0; i < nReaders; i++ )
{
futures.add( es.submit( new TimedTask( () -> {
read( raftLog );
}, time, unit ) ) );
}
futures.add(
es.submit( new TimedTask( () -> {
write( raftLog );
}, time, unit ) )
);

for ( Future<Long> f : futures )
{
long iterations = f.get();
}

es.shutdown();
for ( int i = 0; i < nReaders; i++ )
{
futures.add(
es.submit( new TimedTask( () -> {
read( raftLog );
}, time, unit ) )
);
}
finally

for ( Future<Long> f : futures )
{
//noinspection ThrowFromFinallyBlock
raftLog.shutdown();
long iterations = f.get();
}
}

Expand Down Expand Up @@ -148,16 +141,14 @@ private void write( RaftLog raftLog )
}
}

private static final CharSequence CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

private String stringForIndex( long index )
{
int len = ((int) index) % MAX_CONTENT_SIZE + 1;
StringBuilder str = new StringBuilder( len );

while ( len-- > 0 )
{
str.append( CHARS.charAt( len % CHARS.length() ) );
str.append( (char) len );
}

return str.toString();
Expand Down
Expand Up @@ -24,41 +24,12 @@
import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.lifecycle.Lifecycle;

public class ConcurrentStressIT extends org.neo4j.coreedge.raft.log.ConcurrentStressIT<ConcurrentStressIT.LifecycledInMemoryRaftLog>
public class ConcurrentStressIT extends org.neo4j.coreedge.raft.log.ConcurrentStressIT
{
@Override
public LifecycledInMemoryRaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable
public RaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable
{
return new LifecycledInMemoryRaftLog();
}

public static class LifecycledInMemoryRaftLog extends InMemoryRaftLog implements Lifecycle
{

@Override
public void init() throws Throwable
{

}

@Override
public void start() throws Throwable
{

}

@Override
public void stop() throws Throwable
{

}

@Override
public void shutdown() throws Throwable
{

}
return new InMemoryRaftLog();
}
}
Expand Up @@ -28,10 +28,10 @@

import static org.neo4j.coreedge.server.CoreEdgeClusterSettings.raft_log_pruning;

public class ConcurrentStressIT extends org.neo4j.coreedge.raft.log.ConcurrentStressIT<SegmentedRaftLog>
public class ConcurrentStressIT extends org.neo4j.coreedge.raft.log.ConcurrentStressIT
{
@Override
public SegmentedRaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable
public RaftLog createRaftLog( FileSystemAbstraction fsa, File dir ) throws Throwable
{
SegmentedRaftLog raftLog = new SegmentedRaftLog( fsa, dir, 8 * 1024 * 1024, new DummyRaftableContentSerializer(), NullLogProvider.getInstance(), 8,
raft_log_pruning.getDefaultValue());
Expand Down

This file was deleted.

0 comments on commit dd8bb6e

Please sign in to comment.