From fb2e5ead7a6f39ca3966f77f497a2aa5c1784487 Mon Sep 17 00:00:00 2001 From: RagnarW Date: Fri, 6 Apr 2018 16:54:13 +0200 Subject: [PATCH] Better error handling in GetIndexSnapshotRequestHandler * Now resets protocol if to far away response was given. * Resets protocol and respond with E_UNKNOWN if any exception gets thrown --- .../GetIndexSnapshotRequestHandler.java | 43 +++--- .../GetIndexSnapshotRequestHandlerTest.java | 136 ++++++++++++++++++ 2 files changed, 157 insertions(+), 22 deletions(-) create mode 100644 enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandlerTest.java diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandler.java index 816108078e6b..0ba3190743ec 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandler.java @@ -62,24 +62,23 @@ public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier< protected void channelRead0( ChannelHandlerContext ctx, GetIndexFilesRequest snapshotRequest ) throws IOException { CloseablesListener closeablesListener = new CloseablesListener(); - NeoStoreDataSource neoStoreDataSource = dataSource.get(); - if ( !hasSameStoreId( snapshotRequest.expectedStoreId(), neoStoreDataSource ) ) + StoreCopyFinishedResponse.Status responseStatus = StoreCopyFinishedResponse.Status.E_UNKNOWN; + try { - storeFileStreamingProtocol.end( ctx, StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH ); - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); - } - else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), checkpointerSupplier.get() ) ) - { - storeFileStreamingProtocol.end( ctx, StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND ); - } - else - { - StoreCopyFinishedResponse.Status status = StoreCopyFinishedResponse.Status.E_UNKNOWN; - File storeDir = neoStoreDataSource.getStoreDir(); - ResourceIterator resourceIterator = - neoStoreDataSource.getNeoStoreFileListing().getNeoStoreFileIndexListing().getSnapshot( snapshotRequest.indexId() ); - try + NeoStoreDataSource neoStoreDataSource = dataSource.get(); + if ( !hasSameStoreId( snapshotRequest.expectedStoreId(), neoStoreDataSource ) ) + { + responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH; + } + else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), checkpointerSupplier.get() ) ) { + responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND; + } + else + { + File storeDir = neoStoreDataSource.getStoreDir(); + ResourceIterator resourceIterator = + neoStoreDataSource.getNeoStoreFileListing().getNeoStoreFileIndexListing().getSnapshot( snapshotRequest.indexId() ); closeablesListener.add( resourceIterator ); while ( resourceIterator.hasNext() ) { @@ -89,13 +88,13 @@ else if ( !isTransactionWithinReach( snapshotRequest.requiredTransactionId(), ch int recordSize = storeFileMetadata.recordSize(); storeFileStreamingProtocol.stream( ctx, new StoreResource( file, relativePath, recordSize, pageCache, fs ) ); } - status = StoreCopyFinishedResponse.Status.SUCCESS; - } - finally - { - storeFileStreamingProtocol.end( ctx, status ).addListener( closeablesListener ); - protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + responseStatus = StoreCopyFinishedResponse.Status.SUCCESS; } } + finally + { + storeFileStreamingProtocol.end( ctx, responseStatus ); + protocol.expect( CatchupServerProtocol.State.MESSAGE_TYPE ); + } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandlerTest.java new file mode 100644 index 000000000000..c33e52a65252 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/GetIndexSnapshotRequestHandlerTest.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.catchup.storecopy; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Before; +import org.junit.Test; + +import org.neo4j.causalclustering.catchup.CatchupServerProtocol; +import org.neo4j.causalclustering.catchup.ResponseMessageType; +import org.neo4j.causalclustering.identity.StoreId; +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.NeoStoreDataSource; +import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; +import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GetIndexSnapshotRequestHandlerTest +{ + private static final StoreId STORE_ID_MISMATCHING = new StoreId( 1, 1, 1, 1 ); + private static final StoreId STORE_ID_MATCHING = new StoreId( 1, 2, 3, 4 ); + private final DefaultFileSystemAbstraction fileSystemAbstraction = new DefaultFileSystemAbstraction(); + + private final NeoStoreDataSource neoStoreDataSource = mock( NeoStoreDataSource.class ); + private final CheckPointer checkPointer = new FakeCheckPointer(); + private final PageCache pageCache = mock( PageCache.class ); + private EmbeddedChannel embeddedChannel; + private CatchupServerProtocol catchupServerProtocol; + + @Before + public void setup() + { + catchupServerProtocol = new CatchupServerProtocol(); + catchupServerProtocol.expect( CatchupServerProtocol.State.GET_STORE_FILE ); + GetIndexSnapshotRequestHandler getIndexSnapshotRequestHandler = + new GetIndexSnapshotRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(), + pageCache, fileSystemAbstraction ); + when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); + embeddedChannel = new EmbeddedChannel( getIndexSnapshotRequestHandler ); + } + + @Test + public void shouldGiveProperErrorOnStoreIdMismatch() + { + embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MISMATCHING, 1, 1 ) ); + + assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() ); + StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH ); + assertEquals( expectedResponse, embeddedChannel.readOutbound() ); + + assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) ); + } + + @Test + public void shouldGiveProperErrorOnTxBehind() + { + embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MATCHING, 1, 2 ) ); + + assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() ); + StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND ); + assertEquals( expectedResponse, embeddedChannel.readOutbound() ); + + assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) ); + } + + @Test + public void shouldResetProtocolAndGiveErrorOnUncheckedException() + { + when( neoStoreDataSource.getStoreId() ).thenThrow( new IllegalStateException() ); + + try + { + embeddedChannel.writeInbound( new GetIndexFilesRequest( STORE_ID_MATCHING, 1, 1 ) ); + fail(); + } + catch ( IllegalStateException ignore ) + { + + } + assertEquals( ResponseMessageType.STORE_COPY_FINISHED, embeddedChannel.readOutbound() ); + StoreCopyFinishedResponse expectedResponse = new StoreCopyFinishedResponse( StoreCopyFinishedResponse.Status.E_UNKNOWN ); + assertEquals( expectedResponse, embeddedChannel.readOutbound() ); + + assertTrue( catchupServerProtocol.isExpecting( CatchupServerProtocol.State.MESSAGE_TYPE ) ); + } + + private class FakeCheckPointer implements CheckPointer + { + @Override + public long checkPointIfNeeded( TriggerInfo triggerInfo ) + { + return 1; + } + + @Override + public long tryCheckPoint( TriggerInfo triggerInfo ) + { + return 1; + } + + @Override + public long forceCheckPoint( TriggerInfo triggerInfo ) + { + return 1; + } + + @Override + public long lastCheckPointedTransactionId() + { + return 1; + } + } +}