Skip to content

Commit

Permalink
Term, Vote and RaftLog file-based implementations use writeAll instea…
Browse files Browse the repository at this point in the history
…d of write

Previous implementations did not properly check the return value of write() calls,
 so half written entries were a possibility. This ensures that the complete buffer
 contents always end up on disk or failure is raised.
Renames the previous Vote, Term and RaftLog tests to Contract tests to allow for
 file based, implementation specific testing.
  • Loading branch information
digitalstain committed Nov 30, 2015
1 parent 80d4838 commit 8d372cf
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 44 deletions.
Expand Up @@ -307,7 +307,7 @@ private void writeEntry( Entry entry ) throws IOException
buffer.putLong( entry.contentPointer ); buffer.putLong( entry.contentPointer );
buffer.flip(); buffer.flip();


entriesChannel.write( buffer, (appendIndex + 1) * ENTRY_RECORD_LENGTH ); entriesChannel.writeAll( buffer, (appendIndex + 1) * ENTRY_RECORD_LENGTH );
entriesChannel.force( false ); entriesChannel.force( false );
} }


Expand All @@ -334,8 +334,8 @@ private int writeContent( RaftLogEntry logEntry ) throws MarshallingException, I
ByteBuffer contentLengthBuffer = ByteBuffer.allocate( CONTENT_LENGTH_BYTES ); ByteBuffer contentLengthBuffer = ByteBuffer.allocate( CONTENT_LENGTH_BYTES );
contentLengthBuffer.putInt( length ); contentLengthBuffer.putInt( length );
contentLengthBuffer.flip(); contentLengthBuffer.flip();
contentChannel.write( contentLengthBuffer, contentOffset ); contentChannel.writeAll( contentLengthBuffer, contentOffset );
contentChannel.write( contentBuffer, contentOffset + CONTENT_LENGTH_BYTES ); contentChannel.writeAll( contentBuffer, contentOffset + CONTENT_LENGTH_BYTES );
contentChannel.force( false ); contentChannel.force( false );


return length; return length;
Expand All @@ -359,7 +359,7 @@ private void storeCommitIndex( long commitIndex ) throws IOException
ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES ); ByteBuffer buffer = ByteBuffer.allocate( COMMIT_INDEX_BYTES );
buffer.putLong( commitIndex ); buffer.putLong( commitIndex );
buffer.flip(); buffer.flip();
commitChannel.write( buffer, 0 ); commitChannel.writeAll( buffer, 0 );
commitChannel.force( false ); commitChannel.force( false );
} }


Expand Down
Expand Up @@ -67,7 +67,7 @@ public void update( long newTerm ) throws RaftStorageException
buffer.putLong( newTerm ); buffer.putLong( newTerm );
buffer.flip(); buffer.flip();


channel.write( buffer, 0 ); channel.writeAll( buffer, 0 );
channel.force( false ); channel.force( false );
} }
catch ( IOException e ) catch ( IOException e )
Expand Down
Expand Up @@ -71,7 +71,7 @@ public void update( CoreMember votedFor ) throws RaftStorageException
CoreMemberMarshal.serialize( votedFor, byteBuf ); CoreMemberMarshal.serialize( votedFor, byteBuf );
ByteBuffer buffer = byteBuf.nioBuffer(); ByteBuffer buffer = byteBuf.nioBuffer();


channel.write( buffer, 0 ); channel.writeAll( buffer, 0 );
} }
channel.force( false ); channel.force( false );
} }
Expand Down
Expand Up @@ -19,7 +19,7 @@
*/ */
package org.neo4j.coreedge.raft.log; package org.neo4j.coreedge.raft.log;


public class InMemoryRaftLogTest extends RaftLogTest public class InMemoryRaftLogContractTest extends RaftLogContractTest
{ {
@Override @Override
public RaftLog createRaftLog() public RaftLog createRaftLog()
Expand Down
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.log;

import java.io.File;
import java.io.IOException;

import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.monitoring.Monitors;

public class NaiveDurableRaftLogContractTest extends RaftLogContractTest
{
@Override
public RaftLog createRaftLog() throws IOException
{
FileSystemAbstraction fileSystem = new EphemeralFileSystemAbstraction();
File directory = new File( "raft-log" );
fileSystem.mkdir( directory );

return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(), new Monitors() );
}
}
Expand Up @@ -19,22 +19,62 @@
*/ */
package org.neo4j.coreedge.raft.log; package org.neo4j.coreedge.raft.log;


import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File; import java.io.File;
import java.io.IOException; import java.nio.ByteBuffer;


import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.junit.Test;
import org.mockito.Matchers;
import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreFileChannel;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;


public class NaiveDurableRaftLogTest extends RaftLogTest public class NaiveDurableRaftLogTest
{ {
@Override @Test
public RaftLog createRaftLog() throws IOException public void shouldCallWriteAllWhenStoringEntries() throws Exception
{ {
FileSystemAbstraction fileSystem = new EphemeralFileSystemAbstraction(); // Given
File directory = new File( "raft-log" ); FileSystemAbstraction fsa = mock( FileSystemAbstraction.class );
fileSystem.mkdir( directory ); StoreFileChannel entriesChannel = mock( StoreFileChannel.class );
StoreFileChannel contentChannel = mock( StoreFileChannel.class );
StoreFileChannel commitChannel = mock( StoreFileChannel.class );

File directory = new File(".");

File entriesFile = new File( directory, "entries.log");
File contentFile = new File( directory, "content.log");
File commitFile = new File( directory, "commit.log");

when( fsa.open( Matchers.eq( entriesFile ), anyString() ) ).thenReturn( entriesChannel );
when( fsa.open( Matchers.eq( contentFile ), anyString() ) ).thenReturn( contentChannel );
when( fsa.open( Matchers.eq( commitFile ), anyString() ) ).thenReturn( commitChannel );

NaiveDurableRaftLog log = new NaiveDurableRaftLog( fsa, directory, new DummyRaftableContentSerializer(), new Monitors() );

// When
log.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );

// Then
verify( entriesChannel ).writeAll( any( ByteBuffer.class ), anyInt() );
verify( entriesChannel ).force( anyBoolean() );
verify( contentChannel, times( 2 ) ).writeAll( any( ByteBuffer.class ), anyInt() );
verify( contentChannel ).force( anyBoolean() );

// When
log.commit( 2 );


return new NaiveDurableRaftLog( fileSystem, directory, new DummyRaftableContentSerializer(), new Monitors() ); // Then
verify( commitChannel ).writeAll( any( ByteBuffer.class ), anyInt() );
verify( commitChannel ).force( anyBoolean() );
} }
} }
Expand Up @@ -36,7 +36,7 @@
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;


public abstract class RaftLogTest public abstract class RaftLogContractTest
{ {
public abstract RaftLog createRaftLog() throws Exception; public abstract RaftLog createRaftLog() throws Exception;


Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.File;

import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;

public class DurableTermStoreContractTest extends TermStoreContractTest
{
@Override
public TermStore createTermStore()
{
FileSystemAbstraction fileSystem = new EphemeralFileSystemAbstraction();
File directory = new File( "raft-log" );
fileSystem.mkdir( directory );
return new DurableTermStore( fileSystem, directory );
}
}
Expand Up @@ -19,19 +19,70 @@
*/ */
package org.neo4j.coreedge.raft.state; package org.neo4j.coreedge.raft.state;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File; import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;


import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.junit.Test;
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreFileChannel;


public class DurableTermStoreTest extends TermStoreTest public class DurableTermStoreTest
{ {
@Override @Test
public TermStore createTermStore() public void shouldCallWriteAllAndForceOnVoteUpdate() throws Exception
{ {
FileSystemAbstraction fileSystem = new EphemeralFileSystemAbstraction(); // Given
File directory = new File( "raft-log" ); StoreFileChannel channel = mock( StoreFileChannel.class );
fileSystem.mkdir( directory ); FileSystemAbstraction fsa = mock( FileSystemAbstraction.class );
return new DurableTermStore( fileSystem, directory ); when( fsa.open( any( File.class ), anyString() ) ).thenReturn( channel );

DurableTermStore store = new DurableTermStore( fsa, new File("") );

// When
store.update( 100L );

// Then
verify( channel ).writeAll( any( ByteBuffer.class ), anyInt() );
verify( channel ).force( anyBoolean() );
}

@Test
public void termShouldRemainUnchangedOnFailureToWriteToDisk() throws Exception
{
// Given
StoreFileChannel channel = mock( StoreFileChannel.class );
FileSystemAbstraction fsa = mock( FileSystemAbstraction.class );
when( fsa.open( any( File.class ), anyString() ) ).thenReturn( channel );
doThrow( new IOException() ).when( channel ).writeAll( any( ByteBuffer.class ), anyInt() );

DurableTermStore store = new DurableTermStore( fsa, new File("") );

// Then
// Sanity check more than anything else, to make sure the failed update below will retain the value
assertEquals( 0, store.currentTerm() );

// When
try
{
store.update( 2 );
fail( "Test setup should have caused an exception here");
}
catch( Exception e )
{}

// Then
assertEquals( 0, store.currentTerm() );
} }
} }
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.File;

import org.junit.Rule;

import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.test.TargetDirectory;

public class DurableVoteStoreContractTest extends VoteStoreContractTest
{
@Rule
public final TargetDirectory.TestDirectory testDirectory = TargetDirectory.testDirForTest( getClass() );

@Override
public VoteStore<CoreMember> createVoteStore()
{
FileSystemAbstraction fileSystem = new DefaultFileSystemAbstraction();
File directory = testDirectory.directory( "raft-log" );
return new DurableVoteStore( fileSystem, directory );
}


}

0 comments on commit 8d372cf

Please sign in to comment.