Skip to content

Commit

Permalink
Log original failure when channel closed in Bolt
Browse files Browse the repository at this point in the history
Drivers can close socket connections while having ongoing transactions.
This results in transaction termination in the database. No errors can
then be reported back to the client because connection has been
terminated.

Previously Bolt server logged inability to write to a closed channel
and lost the original failure message
(`TransactionTerminatedException` in most cases).

This commit makes it always log the root cause. Also changed
`AssertableLogProvider` to use COWAL instead of a synchronized list
to allow concurrent iteration and modification.
  • Loading branch information
lutovich committed Nov 7, 2017
1 parent 75ad018 commit 5bf3c8e
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 29 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.bolt.v1.packstream.PackOutputClosedException;
import org.neo4j.bolt.v1.runtime.BoltResponseHandler;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError;
Expand All @@ -33,21 +34,6 @@ class MessageProcessingHandler implements BoltResponseHandler
{
protected final Map<String,Object> metadata = new HashMap<>();

// TODO: move this somewhere more sane (when modules are unified)
static void publishError( BoltResponseMessageHandler<IOException> out, Neo4jError error )
throws IOException
{
if ( error.isFatal() )
{
out.onFatal( error.status(), error.message() );
}
else
{
out.onFailure( error.status(), error.message() );
}

}

protected final Log log;
protected final BoltWorker worker;
protected final BoltResponseMessageHandler<IOException> handler;
Expand Down Expand Up @@ -128,10 +114,39 @@ Map<String,Object> getMetadata()
return metadata;
}

void clearState()
private void clearState()
{
error = null;
ignored = false;
metadata.clear();
}

private void publishError( BoltResponseMessageHandler<IOException> out, Neo4jError error ) throws IOException
{
try
{
if ( error.isFatal() )
{
out.onFatal( error.status(), error.message() );
}
else
{
out.onFailure( error.status(), error.message() );
}
}
catch ( PackOutputClosedException e )
{
// we tried to write error back to the client and realized that the underlying channel is closed
// log a warning, client driver might have just been stopped and closed all socket connections
log.warn( "Unable to send error back to the client. " +
"Communication channel is closed. Client has probably been stopped.", error.cause() );
}
catch ( Throwable t )
{
// some unexpected error happened while writing exception back to the client
// log it together with the original error being suppressed
t.addSuppressed( error.cause() );
log.error( "Unable to send error back to the client", t );
}
}
}
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.packstream;

import java.io.IOException;

public class PackOutputClosedException extends IOException
{
public PackOutputClosedException( String message )
{
super( message );
}
}
Expand Up @@ -28,6 +28,7 @@

import org.neo4j.bolt.v1.messaging.BoltResponseMessageBoundaryHook;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.bolt.v1.packstream.PackOutputClosedException;
import org.neo4j.bolt.v1.packstream.PackStream;

import static java.lang.Math.max;
Expand Down Expand Up @@ -160,7 +161,7 @@ private void ensure( int size ) throws IOException
assert size <= maxChunkSize : size + " > " + maxChunkSize;
if ( closed.get() )
{
throw new IOException( "Cannot write to buffer when closed" );
throw new PackOutputClosedException( "Unable to write to the closed output channel" );
}
int toWriteSize = chunkOpen ? size : size + CHUNK_HEADER_SIZE;
synchronized ( this )
Expand Down
Expand Up @@ -21,27 +21,37 @@

import org.junit.Test;

import java.util.Map;
import java.io.IOException;

import org.neo4j.bolt.v1.packstream.PackOutputClosedException;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.Log;

import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMapOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.neo4j.logging.AssertableLogProvider.inLog;
import static org.neo4j.test.matchers.CommonMatchers.hasSuppressed;

@SuppressWarnings( "unchecked" )
public class MessageProcessingHandlerTest
{
@Test
public void shouldCallHaltOnUnexpectedFailures() throws Exception
{
// Given
BoltResponseMessageHandler msgHandler = mock( BoltResponseMessageHandler.class );
BoltResponseMessageHandler<IOException> msgHandler = newResponseHandlerMock();
doThrow( new RuntimeException( "Something went horribly wrong" ) )
.when( msgHandler )
.onSuccess( any(Map.class) );
.onSuccess( anyMapOf( String.class, Object.class ) );

BoltWorker worker = mock( BoltWorker.class );
MessageProcessingHandler handler =
Expand All @@ -52,6 +62,96 @@ public void shouldCallHaltOnUnexpectedFailures() throws Exception
handler.onFinish();

// Then
verify( worker ).halt();
verify( worker ).halt();
}

@Test
public void shouldLogOriginalErrorWhenOutputIsClosed() throws Exception
{
testLoggingOfOriginalErrorWhenOutputIsClosed( false );
}

@Test
public void shouldLogOriginalFatalErrorWhenOutputIsClosed() throws Exception
{
testLoggingOfOriginalErrorWhenOutputIsClosed( true );
}

@Test
public void shouldLogWriteErrorAndOriginalErrorWhenUnknownFailure() throws Exception
{
testLoggingOfWriteErrorAndOriginalErrorWhenUnknownFailure( false );
}

@Test
public void shouldLogWriteErrorAndOriginalFatalErrorWhenUnknownFailure() throws Exception
{
testLoggingOfWriteErrorAndOriginalErrorWhenUnknownFailure( true );
}

private static void testLoggingOfOriginalErrorWhenOutputIsClosed( boolean fatalError ) throws Exception
{
AssertableLogProvider logProvider = new AssertableLogProvider();
Log log = logProvider.getLog( "Test" );

PackOutputClosedException outputClosed = new PackOutputClosedException( "Output closed" );
BoltResponseMessageHandler<IOException> responseHandler = newResponseHandlerMock( fatalError, outputClosed );

MessageProcessingHandler handler = new MessageProcessingHandler( responseHandler, mock( Runnable.class ),
mock( BoltWorker.class ), log );

RuntimeException originalError = new RuntimeException( "Hi, I'm the original error" );
markFailed( handler, fatalError, originalError );

logProvider.assertExactly( inLog( "Test" ).warn(
startsWith( "Unable to send error back to the client" ),
equalTo( originalError ) ) );
}

private static void testLoggingOfWriteErrorAndOriginalErrorWhenUnknownFailure( boolean fatalError ) throws Exception
{
AssertableLogProvider logProvider = new AssertableLogProvider();
Log log = logProvider.getLog( "Test" );

RuntimeException outputError = new RuntimeException( "Output failed" );
BoltResponseMessageHandler<IOException> responseHandler = newResponseHandlerMock( fatalError, outputError );

MessageProcessingHandler handler = new MessageProcessingHandler( responseHandler, mock( Runnable.class ),
mock( BoltWorker.class ), log );

RuntimeException originalError = new RuntimeException( "Hi, I'm the original error" );
markFailed( handler, fatalError, originalError );

logProvider.assertExactly( inLog( "Test" ).error(
startsWith( "Unable to send error back to the client" ),
both( equalTo( outputError ) ).and( hasSuppressed( originalError ) ) ) );
}

private static void markFailed( MessageProcessingHandler handler, boolean fatalError, Throwable error )
{
Neo4jError neo4jError = fatalError ? Neo4jError.fatalFrom( error ) : Neo4jError.from( error );
handler.markFailed( neo4jError );
handler.onFinish();
}

private static BoltResponseMessageHandler<IOException> newResponseHandlerMock( boolean fatalError, Throwable error )
throws Exception
{
BoltResponseMessageHandler<IOException> handler = newResponseHandlerMock();
if ( fatalError )
{
doThrow( error ).when( handler ).onFatal( any( Status.class ), anyString() );
}
else
{
doThrow( error ).when( handler ).onFailure( any( Status.class ), anyString() );
}
return handler;
}

@SuppressWarnings( "unchecked" )
private static BoltResponseMessageHandler<IOException> newResponseHandlerMock()
{
return mock( BoltResponseMessageHandler.class );
}
}
Expand Up @@ -23,10 +23,15 @@
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.helpers.Exceptions;

import static java.util.stream.Collectors.joining;

public final class CommonMatchers
{
private CommonMatchers()
Expand All @@ -43,6 +48,45 @@ public static <T> Matcher<? super Iterable<T>> matchesOneToOneInAnyOrder( Matche
return new MatchesOneToOneInAnyOrder<>( expectedMatchers );
}

/**
* Checks that exception has expected array or suppressed exceptions.
*
* @param expectedSuppressedErrors expected suppressed exceptions.
* @return new matcher.
*/
public static Matcher<Throwable> hasSuppressed( Throwable... expectedSuppressedErrors )
{
return new TypeSafeMatcher<Throwable>()
{
@Override
protected boolean matchesSafely( Throwable item )
{
return Arrays.equals( item.getSuppressed(), expectedSuppressedErrors );
}

@Override
public void describeTo( Description description )
{
description.appendText( "a throwable with suppressed:\n" );

if ( expectedSuppressedErrors.length == 0 )
{
description.appendText( "a throwable without suppressed" );
}
else
{
description.appendText( "a throwable with suppressed:\n" );

String expectedSuppressedAsString = Arrays.stream( expectedSuppressedErrors )
.map( Exceptions::stringify )
.collect( joining( "\n", "[\n", "]" ) );

description.appendText( expectedSuppressedAsString );
}
}
};
}

private static class MatchesOneToOneInAnyOrder<T> extends TypeSafeMatcher<Iterable<T>>
{
private final Matcher<? super T>[] expectedMatchers;
Expand Down
Expand Up @@ -28,12 +28,12 @@
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.IllegalFormatException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
Expand All @@ -52,7 +52,7 @@
public class AssertableLogProvider extends AbstractLogProvider<Log> implements TestRule
{
private final boolean debugEnabled;
private final List<LogCall> logCalls = Collections.synchronizedList( new ArrayList<LogCall>() );
private final List<LogCall> logCalls = new CopyOnWriteArrayList<>();

public AssertableLogProvider()
{
Expand Down

0 comments on commit 5bf3c8e

Please sign in to comment.