From 75f2d1fa934f0660f662499c4f0820266387d735 Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Wed, 28 Sep 2016 10:11:13 +0200 Subject: [PATCH] Always close transactions In cases where unexpected errors occur we must still make sure we always close all opened transactions, or catastrophe ensues. --- .../bolt/v1/messaging/BoltMessageRouter.java | 136 ++--------------- .../messaging/MessageProcessingHandler.java | 141 ++++++++++++++++++ .../bolt/v1/runtime/BoltStateMachine.java | 37 +++-- .../v1/runtime/TransactionStateMachine.java | 62 ++++---- .../bolt/v1/transport/BoltProtocolV1.java | 9 +- .../MessageProcessingHandlerTest.java | 57 +++++++ .../bolt/v1/runtime/BoltStateMachineTest.java | 26 ++++ 7 files changed, 300 insertions(+), 168 deletions(-) create mode 100644 community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java create mode 100644 community/bolt/src/test/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandlerTest.java diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java index b75faf9046e27..da6d755985d3c 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/BoltMessageRouter.java @@ -21,12 +21,9 @@ package org.neo4j.bolt.v1.messaging; import java.io.IOException; -import java.util.HashMap; import java.util.Map; -import org.neo4j.bolt.v1.runtime.BoltResponseHandler; import org.neo4j.bolt.v1.runtime.BoltWorker; -import org.neo4j.bolt.v1.runtime.Neo4jError; import org.neo4j.bolt.v1.runtime.spi.BoltResult; import org.neo4j.bolt.v1.runtime.spi.Record; import org.neo4j.logging.Log; @@ -47,12 +44,12 @@ public class BoltMessageRouter implements BoltRequestMessageHandler output, - Runnable onEachCompletedRequest ) + Runnable onEachCompletedRequest ) { - this.initHandler = new InitHandler( output, onEachCompletedRequest, log ); - this.runHandler = new RunHandler( output, onEachCompletedRequest, log ); - this.resultHandler = new ResultHandler( output, onEachCompletedRequest, log ); - this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, log ); + this.initHandler = new InitHandler( output, onEachCompletedRequest, worker, log ); + this.runHandler = new RunHandler( output, onEachCompletedRequest, worker, log ); + this.resultHandler = new ResultHandler( output, onEachCompletedRequest, worker, log ); + this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, worker, log ); this.worker = worker; } @@ -95,137 +92,30 @@ public void onPullAll() worker.enqueue( session -> session.pullAll( resultHandler ) ); } - static class MessageProcessingHandler implements BoltResponseHandler - { - protected final Map metadata = new HashMap<>(); - - // TODO: move this somewhere more sane (when modules are unified) - static void publishError( BoltResponseMessageHandler out, Neo4jError error ) - throws IOException - { - if ( !error.status().code().classification().shouldRespondToClient() ) - { - // If not intended for client, we only return an error reference. This must - // be cross-referenced with the log files for full error detail. - out.onFailure( error.status(), String.format( - "An unexpected failure occurred, see details in the database " + - "logs, reference number %s.", error.reference() ) ); - } - else - { - // If intended for client, we forward the message as-is. - out.onFailure( error.status(), error.message() ); - } - } - - protected final Log log; - - protected final BoltResponseMessageHandler handler; - - private Neo4jError error; - private final Runnable onFinish; - private boolean ignored; - - MessageProcessingHandler( BoltResponseMessageHandler handler, Runnable onFinish, Log logger ) - { - this.handler = handler; - this.onFinish = onFinish; - this.log = logger; - } - - @Override - public void onStart() - { - } - - @Override - public void onRecords( BoltResult result, boolean pull ) throws Exception - { - } - - @Override - public void onMetadata( String key, Object value ) - { - metadata.put( key, value ); - } - - @Override - public void markIgnored() - { - this.ignored = true; - } - - @Override - public void markFailed( Neo4jError error ) - { - this.error = error; - } - - @Override - public void onFinish() - { - try - { - if ( ignored ) - { - handler.onIgnored(); - } - else if ( error != null ) - { - publishError( handler, error ); - } - else - { - handler.onSuccess( getMetadata() ); - } - } - catch ( Throwable e ) - { - // TODO: we've lost the ability to communicate with the client. Shut down the session, close transactions. - log.error( "Failed to write response to driver", e ); - } - finally - { - onFinish.run(); - clearState(); - } - } - - Map getMetadata() - { - return metadata; - } - - void clearState() - { - error = null; - ignored = false; - metadata.clear(); - } - } - private static class InitHandler extends MessageProcessingHandler { - InitHandler( BoltResponseMessageHandler handler, Runnable onCompleted, Log log ) + InitHandler( BoltResponseMessageHandler handler, Runnable onCompleted, BoltWorker worker, Log log ) { - super( handler, onCompleted, log ); + super( handler, onCompleted, worker, log ); } } private static class RunHandler extends MessageProcessingHandler { - RunHandler( BoltResponseMessageHandler handler, Runnable onCompleted, Log log ) + RunHandler( BoltResponseMessageHandler handler, Runnable onCompleted, BoltWorker worker, Log log ) { - super( handler, onCompleted, log ); + super( handler, onCompleted, worker, log ); } } + private static class ResultHandler extends MessageProcessingHandler { - ResultHandler( BoltResponseMessageHandler handler, Runnable onCompleted, Log log ) + ResultHandler( BoltResponseMessageHandler handler, Runnable onCompleted, BoltWorker worker, + Log log ) { - super( handler, onCompleted, log ); + super( handler, onCompleted, worker, log ); } @Override diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java new file mode 100644 index 0000000000000..acd1076dafb77 --- /dev/null +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/messaging/MessageProcessingHandler.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v1.messaging; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.neo4j.bolt.v1.runtime.BoltResponseHandler; +import org.neo4j.bolt.v1.runtime.BoltWorker; +import org.neo4j.bolt.v1.runtime.Neo4jError; +import org.neo4j.bolt.v1.runtime.spi.BoltResult; +import org.neo4j.logging.Log; + +class MessageProcessingHandler implements BoltResponseHandler +{ + protected final Map metadata = new HashMap<>(); + + // TODO: move this somewhere more sane (when modules are unified) + static void publishError( BoltResponseMessageHandler out, Neo4jError error ) + throws IOException + { + if ( !error.status().code().classification().shouldRespondToClient() ) + { + // If not intended for client, we only return an error reference. This must + // be cross-referenced with the log files for full error detail. + out.onFailure( error.status(), String.format( + "An unexpected failure occurred, see details in the database " + + "logs, reference number %s.", error.reference() ) ); + } + else + { + // If intended for client, we forward the message as-is. + out.onFailure( error.status(), error.message() ); + } + } + + protected final Log log; + protected final BoltWorker worker; + protected final BoltResponseMessageHandler handler; + + private Neo4jError error; + private final Runnable onFinish; + private boolean ignored; + + MessageProcessingHandler( BoltResponseMessageHandler handler, Runnable onFinish, BoltWorker worker, + Log logger ) + { + this.handler = handler; + this.onFinish = onFinish; + this.worker = worker; + this.log = logger; + } + + @Override + public void onStart() + { + } + + @Override + public void onRecords( BoltResult result, boolean pull ) throws Exception + { + } + + @Override + public void onMetadata( String key, Object value ) + { + metadata.put( key, value ); + } + + @Override + public void markIgnored() + { + this.ignored = true; + } + + @Override + public void markFailed( Neo4jError error ) + { + this.error = error; + } + + @Override + public void onFinish() + { + try + { + if ( ignored ) + { + handler.onIgnored(); + } + else if ( error != null ) + { + publishError( handler, error ); + } + else + { + handler.onSuccess( getMetadata() ); + } + } + catch ( Throwable e ) + { + worker.halt(); + log.error( "Failed to write response to driver", e ); + } + finally + { + onFinish.run(); + clearState(); + } + } + + Map getMetadata() + { + return metadata; + } + + void clearState() + { + error = null; + ignored = false; + metadata.clear(); + } +} diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java index c48b0bbe9967f..cff369e0ca1ec 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java @@ -264,24 +264,20 @@ public boolean isClosed() public void close() { - if ( !ctx.closed ) + try { - if ( onClose != null ) + //Only run onClose, once + if ( !ctx.closed && onClose != null ) { onClose.run(); } - try - { - ctx.statementProcessor.reset(); - } - catch ( TransactionFailureException e ) - { - throw new RuntimeException( e ); - } - finally - { - ctx.closed = true; - } + } + finally + { + ctx.closed = true; + //However a new transaction may have been created + //so we must always to reset + reset(); } } @@ -628,7 +624,6 @@ State resetMachine( BoltStateMachine machine ) throws BoltConnectionFatality throw new BoltConnectionFatality( e.getMessage() ); } } - } private static void fail( BoltStateMachine machine, Neo4jError neo4jError ) @@ -641,6 +636,18 @@ private static void fail( BoltStateMachine machine, Neo4jError neo4jError ) machine.ctx.markFailed( neo4jError ); } + private void reset() + { + try + { + ctx.statementProcessor.reset(); + } + catch ( TransactionFailureException e ) + { + throw new RuntimeException( e ); + } + } + static class MutableConnectionState implements BoltResponseHandler { private static final NullStatementProcessor NULL_STATEMENT_PROCESSOR = new NullStatementProcessor(); diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java index 82cdc670753bf..3e9e34b5fa622 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/TransactionStateMachine.java @@ -199,15 +199,7 @@ private BoltResultHandle execute( MutableTransactionState ctx, SPI spi, { return executeQuery( ctx, spi, statement, params, () -> { - try // On fail - { - ctx.currentTransaction.failure(); - ctx.currentTransaction.close(); - } - finally - { - ctx.currentTransaction = null; - } + closeTransaction( ctx, false ); } ); } @@ -218,12 +210,7 @@ void streamResult( MutableTransactionState ctx, assert ctx.currentResult != null; resultConsumer.accept( ctx.currentResult ); ctx.currentResult.close(); - if ( ctx.currentTransaction != null ) - { - ctx.currentTransaction.success(); - ctx.currentTransaction.close(); - ctx.currentTransaction = null; - } + closeTransaction( ctx, true ); } }, EXPLICIT_TRANSACTION @@ -239,10 +226,7 @@ State run( MutableTransactionState ctx, SPI spi, String statement, Map. + */ +package org.neo4j.bolt.v1.messaging; + +import org.junit.Test; + +import java.util.Map; + +import org.neo4j.bolt.v1.runtime.BoltWorker; +import org.neo4j.logging.Log; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@SuppressWarnings( "unchecked" ) +public class MessageProcessingHandlerTest +{ + @Test + public void shouldCallHaltOnUnexpectedFailures() throws Exception + { + // Given + BoltResponseMessageHandler msgHandler = mock( BoltResponseMessageHandler.class ); + doThrow( new RuntimeException( "Something went horribly wrong" ) ) + .when( msgHandler ) + .onSuccess( any(Map.class) ); + + BoltWorker worker = mock( BoltWorker.class ); + MessageProcessingHandler handler = + new MessageProcessingHandler( msgHandler, mock( Runnable.class ), + worker, mock( Log.class ) ); + + // When + handler.onFinish(); + + // Then + verify( worker ).halt(); + } +} diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java index 6205535bd82cd..d5befeaa88e5f 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java @@ -31,13 +31,17 @@ import org.neo4j.kernel.api.exceptions.Status; import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.neo4j.bolt.testing.BoltMatchers.canReset; import static org.neo4j.bolt.testing.BoltMatchers.failedWithStatus; import static org.neo4j.bolt.testing.BoltMatchers.hasNoTransaction; @@ -322,6 +326,7 @@ public void testRollbackError() throws Throwable // And given that transaction will fail to roll back TransactionStateMachine txMachine = (TransactionStateMachine) machine.ctx.statementProcessor; + when( txMachine.ctx.currentTransaction.isOpen() ).thenReturn( true ); doThrow( new TransactionFailureException( "No Mr. Bond, I expect you to die." ) ). when( txMachine.ctx.currentTransaction ).close(); @@ -445,4 +450,25 @@ public void shouldTerminateOnAuthExpiryDuringSTREAMING() throws Throwable assertException( () -> machine.discardAll( responseHandler ), BoltConnectionAuthFatality.class, "Auth expired!" ); } + + @Test + public void callResetEvenThoughAlreadyClosed() throws Throwable + { + // Given + BoltStateMachine machine = newMachine( READY ); + + // When we close + TransactionStateMachine statementProcessor = (TransactionStateMachine) machine.statementProcessor(); + machine.close(); + assertThat(statementProcessor.ctx.currentTransaction, nullValue()); + assertTrue(machine.ctx.closed); + + //But someone runs a query and thus opens a new transaction + statementProcessor.run( "RETURN 1", Collections.emptyMap() ); + assertThat(statementProcessor.ctx.currentTransaction, notNullValue()); + + // Then, when we close again we should make sure the transaction is closed againg + machine.close(); + assertThat(statementProcessor.ctx.currentTransaction, nullValue()); + } }