Skip to content

Commit

Permalink
Improve NDP error reporting
Browse files Browse the repository at this point in the history
- Introduce detailed error reporting to send to neo tech on unexpected errors.
- Handle misplaced message boundary gracefully
- Add better error messages for invalid message contents
  • Loading branch information
jakewins committed Jun 3, 2015
1 parent 3f620f6 commit 7413461
Show file tree
Hide file tree
Showing 27 changed files with 626 additions and 190 deletions.
10 changes: 5 additions & 5 deletions community/io/src/test/java/org/neo4j/test/RepeatRule.java
Expand Up @@ -19,15 +19,15 @@
*/
package org.neo4j.test;

import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/**
* Set a test to loop a number of times. If you find yourself using this in a production test, you are probably doing
* something wrong.
Expand All @@ -43,7 +43,7 @@ public class RepeatRule implements TestRule
@Target(ElementType.METHOD)
public @interface Repeat
{
public abstract int times();
int times();
}

private int count;
Expand Down
Expand Up @@ -401,7 +401,7 @@ public int hashCode()
}
}

public enum Classification
enum Classification
{
/** The Client sent a bad request - changing the request might yield a successful outcome. */
ClientError( TransactionEffect.NONE,
Expand All @@ -426,7 +426,7 @@ private enum TransactionEffect

private final String description;

private Classification( TransactionEffect transactionEffect, String description )
Classification( TransactionEffect transactionEffect, String description )
{
this.description = description;
this.rollbackTransaction = transactionEffect == TransactionEffect.ROLLBACK;
Expand Down
Expand Up @@ -94,7 +94,8 @@ public Lifecycle newKernelExtension( Dependencies dependencies ) throws Throwabl
final Config config = dependencies.config();
final GraphDatabaseService gdb = dependencies.db();
final GraphDatabaseAPI api = (GraphDatabaseAPI) gdb;
final Log log = dependencies.logService().getInternalLog( Sessions.class );
final LogService logging = dependencies.logService();
final Log log = logging.getInternalLog( Sessions.class );

final HostnamePort socketAddress = config.get( Settings.ndp_socket_address );
final HostnamePort webSocketAddress = config.get( Settings.ndp_ws_address );
Expand All @@ -104,17 +105,17 @@ public Lifecycle newKernelExtension( Dependencies dependencies ) throws Throwabl
if ( config.get( Settings.ndp_enabled ) )
{
final Sessions sessions = life.add( new ThreadedSessions(
life.add( new StandardSessions( api, log ) ),
life.add( new StandardSessions( api, logging ) ),
dependencies.scheduler(),
dependencies.logService() ) );
logging ) );

PrimitiveLongObjectMap<Function<Channel, SocketProtocol>> availableVersions = longObjectMap();
availableVersions.put( SocketProtocolV1.VERSION, new Function<Channel, SocketProtocol>()
{
@Override
public SocketProtocol apply( Channel channel )
{
return new SocketProtocolV1( log, sessions.newSession(), channel );
return new SocketProtocolV1( logging, sessions.newSession(), channel );
}
} );

Expand Down
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.ndp.messaging;

import java.io.IOException;

import org.neo4j.kernel.api.exceptions.Status;

public class NDPIOException extends IOException implements Status.HasStatus
{
private final Status status;

public NDPIOException( Status status, String message )
{
super(message);
this.status = status;
}

@Override
public Status status()
{
return status;
}
}
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.ndp.messaging.NDPIOException;
import org.neo4j.ndp.messaging.v1.infrastructure.ValueNode;
import org.neo4j.ndp.messaging.v1.infrastructure.ValuePath;
import org.neo4j.ndp.messaging.v1.infrastructure.ValueRelationship;
Expand Down Expand Up @@ -76,6 +77,22 @@ public interface MessageTypes
byte MSG_FAILURE = 0x7F;
}

static String messageTypeName( int type )
{
switch( type )
{
case MessageTypes.MSG_ACK_FAILURE: return "MSG_ACK_FAILURE";
case MessageTypes.MSG_RUN: return "MSG_RUN";
case MessageTypes.MSG_DISCARD_ALL: return "MSG_DISCARD_ALL";
case MessageTypes.MSG_PULL_ALL: return "MSG_PULL_ALL";
case MessageTypes.MSG_RECORD: return "MSG_RECORD";
case MessageTypes.MSG_SUCCESS: return "MSG_SUCCESS";
case MessageTypes.MSG_IGNORED: return "MSG_IGNORED";
case MessageTypes.MSG_FAILURE: return "MSG_FAILURE";
default: return "0x" + Integer.toHexString(type);
}
}

public static class Writer implements MessageFormat.Writer
{
public static final Runnable NO_OP = new Runnable()
Expand Down Expand Up @@ -171,10 +188,10 @@ public void handleFailureMessage( Neo4jError cause )
packer.packMapHeader( 2 );

packer.pack( "code" );
packValue( cause.status().code().serialize() );
packer.pack( cause.status().code().serialize() );

packer.pack( "message" );
packValue( cause.message() );
packer.pack( cause.message() );
onMessageComplete.run();
}

Expand Down Expand Up @@ -287,7 +304,7 @@ else if ( obj instanceof float[] )
packer.packListHeader( arr.length );
for ( int i = 0; i < arr.length; i++ )
{
packValue( arr[i] );
packer.pack( arr[i] );
}
}
else if ( obj instanceof double[] )
Expand All @@ -296,7 +313,7 @@ else if ( obj instanceof double[] )
packer.packListHeader( arr.length );
for ( int i = 0; i < arr.length; i++ )
{
packValue( arr[i] );
packer.pack( arr[i] );
}
}
else if ( obj instanceof boolean[] )
Expand All @@ -305,7 +322,7 @@ else if ( obj instanceof boolean[] )
packer.packListHeader( arr.length );
for ( int i = 0; i < arr.length; i++ )
{
packValue( arr[i] );
packer.pack( arr[i] );
}
}
else if ( obj.getClass().isArray() )
Expand Down Expand Up @@ -394,36 +411,55 @@ public boolean hasNext() throws IOException
@Override
public <E extends Exception> void read( MessageHandler<E> output ) throws IOException, E
{
unpacker.unpackStructHeader();
int type = (int) unpacker.unpackLong();
switch ( type )
{
case MessageTypes.MSG_RUN:
unpackRunMessage( output );
break;
case MessageTypes.MSG_DISCARD_ALL:
unpackDiscardAllMessage( output );
break;
case MessageTypes.MSG_PULL_ALL:
unpackPullAllMessage( output );
break;
case MessageTypes.MSG_RECORD:
unpackRecordMessage( output );
break;
case MessageTypes.MSG_SUCCESS:
unpackSuccessMessage( output );
break;
case MessageTypes.MSG_FAILURE:
unpackFailureMessage( output );
break;
case MessageTypes.MSG_ACK_FAILURE:
unpackAckFailureMessage( output );
break;
case MessageTypes.MSG_IGNORED:
unpackIgnoredMessage( output );
break;
default:
throw new IOException( "Unknown message type: " + type );
try
{
unpacker.unpackStructHeader();
int type = (int) unpacker.unpackLong();

try
{
switch ( type )
{
case MessageTypes.MSG_RUN:
unpackRunMessage( output );
break;
case MessageTypes.MSG_DISCARD_ALL:
unpackDiscardAllMessage( output );
break;
case MessageTypes.MSG_PULL_ALL:
unpackPullAllMessage( output );
break;
case MessageTypes.MSG_RECORD:
unpackRecordMessage( output );
break;
case MessageTypes.MSG_SUCCESS:
unpackSuccessMessage( output );
break;
case MessageTypes.MSG_FAILURE:
unpackFailureMessage( output );
break;
case MessageTypes.MSG_ACK_FAILURE:
unpackAckFailureMessage( output );
break;
case MessageTypes.MSG_IGNORED:
unpackIgnoredMessage( output );
break;
default:
throw new NDPIOException( Status.Request.Invalid,
"0x" + Integer.toHexString(type) + " is not a valid message type." );
}
}
catch( PackStream.PackstreamException e )
{
throw new NDPIOException( Status.Request.InvalidFormat,
"Unable to read " + messageTypeName (type) + " message. " +
"Error was: " + e.getMessage() );
}
}
catch( PackStream.PackstreamException e )
{
throw new NDPIOException( Status.Request.InvalidFormat, "Unable to read message type. " +
"Error was: " + e.getMessage() );
}
}

Expand Down

0 comments on commit 7413461

Please sign in to comment.