Skip to content

Commit

Permalink
Always close transactions
Browse files Browse the repository at this point in the history
In cases where unexpected errors occur we must still make sure
we always close all opened transactions, or catastrophe ensues.
  • Loading branch information
pontusmelke committed Sep 29, 2016
1 parent 7c8bb17 commit 75f2d1f
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 168 deletions.
Expand Up @@ -21,12 +21,9 @@
package org.neo4j.bolt.v1.messaging;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.neo4j.bolt.v1.runtime.BoltResponseHandler;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.bolt.v1.runtime.spi.Record;
import org.neo4j.logging.Log;
Expand All @@ -47,12 +44,12 @@ public class BoltMessageRouter implements BoltRequestMessageHandler<RuntimeExcep
private BoltWorker worker;

public BoltMessageRouter( Log log, BoltWorker worker, BoltResponseMessageHandler<IOException> output,
Runnable onEachCompletedRequest )
Runnable onEachCompletedRequest )
{
this.initHandler = new InitHandler( output, onEachCompletedRequest, log );
this.runHandler = new RunHandler( output, onEachCompletedRequest, log );
this.resultHandler = new ResultHandler( output, onEachCompletedRequest, log );
this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, log );
this.initHandler = new InitHandler( output, onEachCompletedRequest, worker, log );
this.runHandler = new RunHandler( output, onEachCompletedRequest, worker, log );
this.resultHandler = new ResultHandler( output, onEachCompletedRequest, worker, log );
this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, worker, log );

this.worker = worker;
}
Expand Down Expand Up @@ -95,137 +92,30 @@ public void onPullAll()
worker.enqueue( session -> session.pullAll( resultHandler ) );
}

static class MessageProcessingHandler implements BoltResponseHandler
{
protected final Map<String, Object> metadata = new HashMap<>();

// TODO: move this somewhere more sane (when modules are unified)
static void publishError( BoltResponseMessageHandler<IOException> out, Neo4jError error )
throws IOException
{
if ( !error.status().code().classification().shouldRespondToClient() )
{
// If not intended for client, we only return an error reference. This must
// be cross-referenced with the log files for full error detail.
out.onFailure( error.status(), String.format(
"An unexpected failure occurred, see details in the database " +
"logs, reference number %s.", error.reference() ) );
}
else
{
// If intended for client, we forward the message as-is.
out.onFailure( error.status(), error.message() );
}
}

protected final Log log;

protected final BoltResponseMessageHandler<IOException> handler;

private Neo4jError error;
private final Runnable onFinish;
private boolean ignored;

MessageProcessingHandler( BoltResponseMessageHandler<IOException> handler, Runnable onFinish, Log logger )
{
this.handler = handler;
this.onFinish = onFinish;
this.log = logger;
}

@Override
public void onStart()
{
}

@Override
public void onRecords( BoltResult result, boolean pull ) throws Exception
{
}

@Override
public void onMetadata( String key, Object value )
{
metadata.put( key, value );
}

@Override
public void markIgnored()
{
this.ignored = true;
}

@Override
public void markFailed( Neo4jError error )
{
this.error = error;
}

@Override
public void onFinish()
{
try
{
if ( ignored )
{
handler.onIgnored();
}
else if ( error != null )
{
publishError( handler, error );
}
else
{
handler.onSuccess( getMetadata() );
}
}
catch ( Throwable e )
{
// TODO: we've lost the ability to communicate with the client. Shut down the session, close transactions.
log.error( "Failed to write response to driver", e );
}
finally
{
onFinish.run();
clearState();
}
}

Map<String, Object> getMetadata()
{
return metadata;
}

void clearState()
{
error = null;
ignored = false;
metadata.clear();
}
}

private static class InitHandler extends MessageProcessingHandler
{
InitHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, Log log )
InitHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker, Log log )
{
super( handler, onCompleted, log );
super( handler, onCompleted, worker, log );
}

}

private static class RunHandler extends MessageProcessingHandler
{
RunHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, Log log )
RunHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker, Log log )
{
super( handler, onCompleted, log );
super( handler, onCompleted, worker, log );
}

}

private static class ResultHandler extends MessageProcessingHandler
{
ResultHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, Log log )
ResultHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker,
Log log )
{
super( handler, onCompleted, log );
super( handler, onCompleted, worker, log );
}

@Override
Expand Down
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.messaging;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.neo4j.bolt.v1.runtime.BoltResponseHandler;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.logging.Log;

class MessageProcessingHandler implements BoltResponseHandler
{
protected final Map<String,Object> metadata = new HashMap<>();

// TODO: move this somewhere more sane (when modules are unified)
static void publishError( BoltResponseMessageHandler<IOException> out, Neo4jError error )
throws IOException
{
if ( !error.status().code().classification().shouldRespondToClient() )
{
// If not intended for client, we only return an error reference. This must
// be cross-referenced with the log files for full error detail.
out.onFailure( error.status(), String.format(
"An unexpected failure occurred, see details in the database " +
"logs, reference number %s.", error.reference() ) );
}
else
{
// If intended for client, we forward the message as-is.
out.onFailure( error.status(), error.message() );
}
}

protected final Log log;
protected final BoltWorker worker;
protected final BoltResponseMessageHandler<IOException> handler;

private Neo4jError error;
private final Runnable onFinish;
private boolean ignored;

MessageProcessingHandler( BoltResponseMessageHandler<IOException> handler, Runnable onFinish, BoltWorker worker,
Log logger )
{
this.handler = handler;
this.onFinish = onFinish;
this.worker = worker;
this.log = logger;
}

@Override
public void onStart()
{
}

@Override
public void onRecords( BoltResult result, boolean pull ) throws Exception
{
}

@Override
public void onMetadata( String key, Object value )
{
metadata.put( key, value );
}

@Override
public void markIgnored()
{
this.ignored = true;
}

@Override
public void markFailed( Neo4jError error )
{
this.error = error;
}

@Override
public void onFinish()
{
try
{
if ( ignored )
{
handler.onIgnored();
}
else if ( error != null )
{
publishError( handler, error );
}
else
{
handler.onSuccess( getMetadata() );
}
}
catch ( Throwable e )
{
worker.halt();
log.error( "Failed to write response to driver", e );
}
finally
{
onFinish.run();
clearState();
}
}

Map<String,Object> getMetadata()
{
return metadata;
}

void clearState()
{
error = null;
ignored = false;
metadata.clear();
}
}
Expand Up @@ -264,24 +264,20 @@ public boolean isClosed()

public void close()
{
if ( !ctx.closed )
try
{
if ( onClose != null )
//Only run onClose, once
if ( !ctx.closed && onClose != null )
{
onClose.run();
}
try
{
ctx.statementProcessor.reset();
}
catch ( TransactionFailureException e )
{
throw new RuntimeException( e );
}
finally
{
ctx.closed = true;
}
}
finally
{
ctx.closed = true;
//However a new transaction may have been created
//so we must always to reset
reset();
}
}

Expand Down Expand Up @@ -628,7 +624,6 @@ State resetMachine( BoltStateMachine machine ) throws BoltConnectionFatality
throw new BoltConnectionFatality( e.getMessage() );
}
}

}

private static void fail( BoltStateMachine machine, Neo4jError neo4jError )
Expand All @@ -641,6 +636,18 @@ private static void fail( BoltStateMachine machine, Neo4jError neo4jError )
machine.ctx.markFailed( neo4jError );
}

private void reset()
{
try
{
ctx.statementProcessor.reset();
}
catch ( TransactionFailureException e )
{
throw new RuntimeException( e );
}
}

static class MutableConnectionState implements BoltResponseHandler
{
private static final NullStatementProcessor NULL_STATEMENT_PROCESSOR = new NullStatementProcessor();
Expand Down

0 comments on commit 75f2d1f

Please sign in to comment.