Skip to content

Commit

Permalink
Better error handling in GetIndexSnapshotRequestHandler
Browse files Browse the repository at this point in the history
* Now resets protocol if to far away response was given.

* Resets protocol and respond with E_UNKNOWN if any exception gets thrown
  • Loading branch information
RagnarW committed Apr 11, 2018
1 parent 8177e12 commit fb2e5ea
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 22 deletions.
Expand Up @@ -62,24 +62,23 @@ public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier<
protected void channelRead0( ChannelHandlerContext ctx, GetIndexFilesRequest snapshotRequest ) throws IOException protected void channelRead0( ChannelHandlerContext ctx, GetIndexFilesRequest snapshotRequest ) throws IOException
{ {
CloseablesListener closeablesListener = new CloseablesListener(); CloseablesListener closeablesListener = new CloseablesListener();
NeoStoreDataSource neoStoreDataSource = dataSource.get(); StoreCopyFinishedResponse.Status responseStatus = StoreCopyFinishedResponse.Status.E_UNKNOWN;
if ( !hasSameStoreId( snapshotRequest.expectedStoreId(), neoStoreDataSource ) ) try
{ {
storeFileStreamingProtocol.end( ctx, StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH ); NeoStoreDataSource neoStoreDataSource = dataSource.get();
protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); if ( !hasSameStoreId( snapshotRequest.expectedStoreId(), neoStoreDataSource ) )
} {
else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), checkpointerSupplier.get() ) ) responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH;
{ }
storeFileStreamingProtocol.end( ctx, StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND ); else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), checkpointerSupplier.get() ) )
}
else
{
StoreCopyFinishedResponse.Status status = StoreCopyFinishedResponse.Status.E_UNKNOWN;
File storeDir = neoStoreDataSource.getStoreDir();
ResourceIterator<StoreFileMetadata> resourceIterator =
neoStoreDataSource.getNeoStoreFileListing().getNeoStoreFileIndexListing().getSnapshot( snapshotRequest.indexId() );
try
{ {
responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND;
}
else
{
File storeDir = neoStoreDataSource.getStoreDir();
ResourceIterator<StoreFileMetadata> resourceIterator =
neoStoreDataSource.getNeoStoreFileListing().getNeoStoreFileIndexListing().getSnapshot( snapshotRequest.indexId() );
closeablesListener.add( resourceIterator ); closeablesListener.add( resourceIterator );
while ( resourceIterator.hasNext() ) while ( resourceIterator.hasNext() )
{ {
Expand All @@ -89,13 +88,13 @@ else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), ch
int recordSize = storeFileMetadata.recordSize(); int recordSize = storeFileMetadata.recordSize();
storeFileStreamingProtocol.stream( ctx, new StoreResource( file, relativePath, recordSize, pageCache, fs ) ); storeFileStreamingProtocol.stream( ctx, new StoreResource( file, relativePath, recordSize, pageCache, fs ) );
} }
status = StoreCopyFinishedResponse.Status.SUCCESS; responseStatus = StoreCopyFinishedResponse.Status.SUCCESS;
}
finally
{
storeFileStreamingProtocol.end( ctx, status ).addListener( closeablesListener );
protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE );
} }
} }
finally
{
storeFileStreamingProtocol.end( ctx, responseStatus );
protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE );
}
} }
} }
@@ -0,0 +1,136 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.causalclustering.catchup.storecopy;

import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.Test;

import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.NeoStoreDataSource;
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GetIndexSnapshotRequestHandlerTest
{
private static final StoreId STORE_ID_MISMATCHING = new StoreId( 1, 1, 1, 1 );
private static final StoreId STORE_ID_MATCHING = new StoreId( 1, 2, 3, 4 );
private final DefaultFileSystemAbstraction fileSystemAbstraction = new DefaultFileSystemAbstraction();

private final NeoStoreDataSource neoStoreDataSource = mock( NeoStoreDataSource.class );
private final CheckPointer checkPointer = new FakeCheckPointer();
private final PageCache pageCache = mock( PageCache.class );
private EmbeddedChannel embeddedChannel;
private CatchupServerProtocol catchupServerProtocol;

@Before
public void setup()
{
catchupServerProtocol = new CatchupServerProtocol();
catchupServerProtocol.expect( CatchupServerProtocol.State.GET_STORE_FILE );
GetIndexSnapshotRequestHandler getIndexSnapshotRequestHandler =
new GetIndexSnapshotRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(),
pageCache, fileSystemAbstraction );
when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) );
embeddedChannel = new EmbeddedChannel( getIndexSnapshotRequestHandler );
}

@Test
public void shouldGiveProperErrorOnStoreIdMismatch()
{
embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MISMATCHING, 1, 1 ) );

assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() );
StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH );
assertEquals( expectedResponse, embeddedChannel.readOutbound() );

assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) );
}

@Test
public void shouldGiveProperErrorOnTxBehind()
{
embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MATCHING, 1, 2 ) );

assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() );
StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND );
assertEquals( expectedResponse, embeddedChannel.readOutbound() );

assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) );
}

@Test
public void shouldResetProtocolAndGiveErrorOnUncheckedException()
{
when( neoStoreDataSource.getStoreId() ).thenThrow( new IllegalStateException() );

try
{
embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MATCHING, 1, 1 ) );
fail();
}
catch ( IllegalStateException ignore )
{

}
assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() );
StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_UNKNOWN );
assertEquals( expectedResponse, embeddedChannel.readOutbound() );

assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) );
}

private class FakeCheckPointer implements CheckPointer
{
@Override
public long checkPointIfNeeded( TriggerInfo triggerInfo )
{
return 1;
}

@Override
public long tryCheckPoint( TriggerInfo triggerInfo )
{
return 1;
}

@Override
public long forceCheckPoint( TriggerInfo triggerInfo )
{
return 1;
}

@Override
public long lastCheckPointedTransactionId()
{
return 1;
}
}
}

0 comments on commit fb2e5ea

Please sign in to comment.