From 640ffe04b7be08372ccf8ecd486e318264bd9df5 Mon Sep 17 00:00:00 2001 From: Zhen Date: Fri, 18 Dec 2015 16:43:14 +0100 Subject: [PATCH] Fixed flaky tests caused by a race in websocket client's stop method --- .../v1/docs/BoltFullExchangesDocTest.java | 2 +- .../RejectTransportEncryptionIT.java | 3 +- .../transport/socket/client/Connection.java | 2 +- .../socket/client/SocketConnection.java | 6 -- .../socket/client/WebSocketConnection.java | 52 ++++++------- .../client/WebSocketConnectionTest.java | 74 +++++++++++++++++++ 6 files changed, 101 insertions(+), 38 deletions(-) create mode 100644 community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnectionTest.java diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/docs/BoltFullExchangesDocTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/docs/BoltFullExchangesDocTest.java index 6b65e9eb5d7a..e93b59ab44f3 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/docs/BoltFullExchangesDocTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/docs/BoltFullExchangesDocTest.java @@ -91,7 +91,7 @@ public static Collection documentedFullProtocolExamples() @After public void shutdown() throws Exception { - client.close(); + client.disconnect(); } @Test diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/RejectTransportEncryptionIT.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/RejectTransportEncryptionIT.java index ab89c4eb3a5f..288c52184a49 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/RejectTransportEncryptionIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/integration/RejectTransportEncryptionIT.java @@ -66,8 +66,7 @@ public static Collection transports() return asList( new Object[]{ (Factory) SecureWebSocketConnection::new, - new IOException( "Failed to connect to the server within 30 seconds" ) - // TODO shutdown ssl connections properly + new IOException( "Failed to connect to the server within 10 seconds" ) }, new Object[]{ (Factory) SecureSocketConnection::new, diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/Connection.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/Connection.java index 593f9863de15..260da9ec90de 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/Connection.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/Connection.java @@ -23,7 +23,7 @@ import org.neo4j.helpers.HostnamePort; -public interface Connection extends AutoCloseable +public interface Connection { Connection connect( HostnamePort address ) throws Exception; diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/SocketConnection.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/SocketConnection.java index 582447b56c89..5aa73cd16e39 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/SocketConnection.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/SocketConnection.java @@ -104,10 +104,4 @@ public void disconnect() throws IOException socket.close(); } } - - @Override - public void close() throws Exception - { - disconnect(); - } } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnection.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnection.java index 6c874e69f7a1..d48be2cf8071 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnection.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnection.java @@ -43,6 +43,8 @@ public class WebSocketConnection implements Connection, WebSocketListener private final Supplier clientSupplier; private final Function uriGenerator; + private final byte[] POISON_PILL = "poison".getBytes(); + private WebSocketClient client; private RemoteEndpoint server; @@ -50,10 +52,10 @@ public class WebSocketConnection implements Connection, WebSocketListener private final LinkedBlockingQueue received = new LinkedBlockingQueue<>(); // Current input data being handled, popped off of 'received' queue - private byte[] currentRecieveBuffer = null; + private byte[] currentReceiveBuffer = null; // Index into the current receive buffer - private int currentRecieveIndex = 0; + private int currentReceiveIndex = 0; public WebSocketConnection() { @@ -66,6 +68,12 @@ public WebSocketConnection( Supplier clientSupplier, Function 0 ) { waitForRecievedData( length, remaining, target ); - for ( int i = 0; i < Math.min( remaining, currentRecieveBuffer.length - currentRecieveIndex ); i++ ) + for ( int i = 0; i < Math.min( remaining, currentReceiveBuffer.length - currentReceiveIndex ); i++ ) { - target[length - remaining] = currentRecieveBuffer[currentRecieveIndex++]; + target[length - remaining] = currentReceiveBuffer[currentReceiveIndex++]; remaining--; } } @@ -124,19 +133,21 @@ private void waitForRecievedData( int length, int remaining, byte[] target ) throws InterruptedException, IOException { long start = System.currentTimeMillis(); - while ( currentRecieveBuffer == null || currentRecieveIndex >= currentRecieveBuffer.length ) + while ( currentReceiveBuffer == null || currentReceiveIndex >= currentReceiveBuffer.length ) { - currentRecieveIndex = 0; - currentRecieveBuffer = received.poll( 10, MILLISECONDS ); + currentReceiveIndex = 0; + currentReceiveBuffer = received.poll( 10, MILLISECONDS ); - if( client.isStopped() || client.isStopping() ) + if( (currentReceiveBuffer == null && ( client.isStopped() || client.isStopping() ) ) || + currentReceiveBuffer == POISON_PILL ) { + // no data received throw new IOException( "Connection closed while waiting for data from the server." ); } if ( System.currentTimeMillis() - start > 30_000 ) { throw new IOException( "Waited 30 seconds for " + remaining + " bytes, " + - "" + (length - remaining) + " was recieved: " + + "" + (length - remaining) + " was received: " + HexPrinter.hex( ByteBuffer.wrap( target ), 0, length - remaining ) ); } } @@ -145,17 +156,9 @@ private void waitForRecievedData( int length, int remaining, byte[] target ) @Override public void disconnect() throws Exception { - close(); + client.stop(); } - @Override - public void close() throws Exception - { - if ( client != null ) - { - client.stop(); - } - } @Override public void onWebSocketBinary( byte[] bytes, int i, int i2 ) @@ -166,14 +169,7 @@ public void onWebSocketBinary( byte[] bytes, int i, int i2 ) @Override public void onWebSocketClose( int i, String s ) { - try - { - close(); - } - catch ( Exception e ) - { - throw new RuntimeException( e ); - } + received.add( POISON_PILL ); } @Override diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnectionTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnectionTest.java new file mode 100644 index 000000000000..9f6044a87906 --- /dev/null +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/transport/socket/client/WebSocketConnectionTest.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.bolt.v1.transport.socket.client; + +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WebSocketConnectionTest +{ + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void shouldNotThrowAnyExceptionWhenDataReceivedBeforeClose() throws Throwable + { + // Given + WebSocketClient client = mock( WebSocketClient.class ); + WebSocketConnection conn = new WebSocketConnection( client ); + when( client.isStopped() ).thenReturn( true ); + + byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + // When + conn.onWebSocketBinary( data, 0, 10 ); + conn.recv( 10 ); + + // Then + // no exception + } + + + @Test + public void shouldThrowIOExceptionWhenNotEnoughDataReceivedBeforeClose() throws Throwable + { + // Given + WebSocketClient client = mock( WebSocketClient.class ); + WebSocketConnection conn = new WebSocketConnection( client ); + when( client.isStopped() ).thenReturn( true, true ); + + byte[] data = {0, 1, 2, 3}; + + // When && Then + conn.onWebSocketBinary( data, 0, 4 ); + + expectedException.expect( IOException.class ); + expectedException.expectMessage( "Connection closed while waiting for data from the server." ); + conn.recv( 10 ); + } +}