Permalink
Browse files

Some stability changes to tests and enabled a previously ignored test

  • Loading branch information...
1 parent efb174e commit 52b1ebda1aada55d4dffd42b2adcd210ba235664 @tinwelint committed Feb 28, 2012
@@ -28,6 +28,8 @@
import static org.neo4j.kernel.impl.nioneo.store.NeoStore.versionStringToLong;
import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
import org.junit.Before;
import org.junit.Test;
@@ -40,23 +42,32 @@
private static final byte INTERNAL_PROTOCOL_VERSION = 0;
private static final byte APPLICATION_PROTOCOL_VERSION = 0;
- private static final int PORT = 1234;
private static final String PATH = "target/tmp";
private StoreId storeIdToUse;
+ private int port;
@Before
- public void doBefore()
+ public void doBefore() throws IOException
{
storeIdToUse = new StoreId();
new File( PATH ).mkdirs();
+ port = findFreePort();
+ }
+
+ public static int findFreePort() throws IOException
+ {
+ ServerSocket server = new ServerSocket( 0 );
+ int port = server.getLocalPort();
+ server.close();
+ return port;
}
@Test
public void clientGetResponseFromServerViaComLayer() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
int value1 = 10;
int value2 = 5;
@@ -69,14 +80,15 @@ public void clientGetResponseFromServerViaComLayer() throws Exception
server.shutdown();
}
- private MadeUpServer madeUpServer( MadeUpImplementation serverImplementation )
+ private MadeUpServer madeUpServer( MadeUpImplementation serverImplementation ) throws IOException
{
return madeUpServer( serverImplementation, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
}
private MadeUpServer madeUpServer( MadeUpImplementation serverImplementation, byte internalVersion, byte applicationVersion )
+ throws IOException
{
- return new MadeUpServer( serverImplementation, PORT, internalVersion, applicationVersion,
+ return new MadeUpServer( serverImplementation, port, internalVersion, applicationVersion,
TxChecksumVerifier.ALWAYS_MATCH );
}
@@ -94,7 +106,7 @@ public void makeSureClientStoreIdsMustMatch() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT,
+ MadeUpClient client = new MadeUpClient( port,
new StoreId( 10, 10, NeoStore.versionStringToLong( CommonAbstractStore.ALL_STORES_VERSION ) ),
INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
@@ -120,7 +132,7 @@ public void makeSureServerStoreIdsMustMatch() throws Exception
MadeUpImplementation serverImplementation = new MadeUpImplementation(
new StoreId( 10, 10, versionStringToLong( ALL_STORES_VERSION ) ) );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
try
{
@@ -143,7 +155,7 @@ public void makeSureClientCanStreamBigData() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
client.streamSomeData( new ToAssertionWriter(), MadeUpServer.FRAME_LENGTH*3 );
@@ -166,7 +178,7 @@ public void clientThrowsServerSideErrorMidwayThroughStreaming() throws Exception
}
};
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
try
{
@@ -204,7 +216,7 @@ public void throwingServerSideExceptionBackToClient() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
String exceptionMessage = "The message";
try
@@ -226,7 +238,7 @@ public void applicationProtocolVersionsMustMatch() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation, INTERNAL_PROTOCOL_VERSION, (byte) (APPLICATION_PROTOCOL_VERSION+1) );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
try
{
@@ -266,7 +278,7 @@ public void internalProtocolVersionsMustMatch() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation, (byte)1, APPLICATION_PROTOCOL_VERSION );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, (byte)2, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, (byte)2, APPLICATION_PROTOCOL_VERSION );
try
{
@@ -305,7 +317,7 @@ public void serverStopsStreamingToDeadClient() throws Exception
{
MadeUpImplementation serverImplementation = new MadeUpImplementation( storeIdToUse );
MadeUpServer server = madeUpServer( serverImplementation );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
int failAtSize = MadeUpServer.FRAME_LENGTH*2;
ClientCrashingWriter writer = new ClientCrashingWriter( client, failAtSize );
@@ -340,8 +352,8 @@ public void assertMatch( long txId, int masterId, long checksum )
throw new FailingException( failureMessage );
}
};
- MadeUpServer server = new MadeUpServer( serverImplementation, PORT, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION, failingVerifier );
- MadeUpClient client = new MadeUpClient( PORT, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
+ MadeUpServer server = new MadeUpServer( serverImplementation, port, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION, failingVerifier );
+ MadeUpClient client = new MadeUpClient( port, storeIdToUse, INTERNAL_PROTOCOL_VERSION, APPLICATION_PROTOCOL_VERSION );
try
{
@@ -932,7 +932,16 @@ public String toString()
protected synchronized void reevaluateMyself( StoreId storeId )
{
- Pair<Master, Machine> master = broker.getMasterReally( true );
+ Pair<Master, Machine> master;
+ try
+ {
+ master = broker.getMasterReally( true );
+ }
+ catch ( NullPointerException e )
+ {
+ System.out.println( this );
+ throw e;
+ }
boolean iAmCurrentlyMaster = masterServer != null;
getMessageLog().logMessage( "ReevaluateMyself: machineId=" + machineId + " with master[" + master +
"] (I am master=" + iAmCurrentlyMaster + ", " + internalGraphDatabase + ")" );
@@ -68,4 +68,37 @@ else if ( unit.equals( "m" ) )
return timeUnit.toMillis( amount*multiplyFactor );
}
}
+
+ public static <T,E extends Exception> T waitForCondition( Condition<T,E> condition, int timeMillis ) throws E
+ {
+ long endTime = System.currentTimeMillis()+timeMillis;
+ T result = condition.tryToFullfill();
+ if ( result != null ) return result;
+ while ( result == null && System.currentTimeMillis() < endTime )
+ {
+ sleepWithoutInterruption( 1, "Failed waiting for " + condition + " to be fulfilled" );
+ result = condition.tryToFullfill();
+ if ( result != null ) return result;
+ }
+ throw condition.failure();
+ }
+
+ private static void sleepWithoutInterruption( long millis, String errorMessage )
+ {
+ try
+ {
+ Thread.sleep( millis );
+ }
+ catch ( InterruptedException e )
+ {
+ throw new RuntimeException( errorMessage );
+ }
+ }
+
+ public interface Condition<T, E extends Exception>
+ {
+ T tryToFullfill();
+
+ E failure();
+ }
}
@@ -20,6 +20,7 @@
package org.neo4j.test.ha;
import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static org.neo4j.kernel.ha.TimeUtil.waitForCondition;
import java.io.File;
import java.io.IOException;
@@ -32,13 +33,17 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.server.quorum.QuorumMXBean;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
import org.jboss.netty.handler.timeout.TimeoutException;
import org.junit.Ignore;
import org.neo4j.helpers.Format;
import org.neo4j.helpers.Predicate;
import org.neo4j.kernel.HaConfig;
+import org.neo4j.kernel.ha.TimeUtil.Condition;
import org.neo4j.kernel.ha.zookeeper.ZooKeeperClusterClient;
import org.neo4j.test.TargetDirectory;
import org.neo4j.test.subprocess.SubProcess;
@@ -80,7 +85,26 @@ public LocalhostZooKeeperCluster( TargetDirectory target, int... ports )
public static LocalhostZooKeeperCluster standardZoo( Class<?> owningTest)
{
- return new LocalhostZooKeeperCluster( owningTest, 2181, 2182, 2183 );
+ for ( int i = 0; i < 3; i++ )
+ {
+ try
+ {
+ LocalhostZooKeeperCluster zoo = new LocalhostZooKeeperCluster( owningTest, 2181, 2182, 2183 );
+ if ( zoo.ping( 20*1000 ) )
+ {
+ return zoo;
+ }
+ else
+ {
+ zoo.shutdown();
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ }
+ throw new RuntimeException( "Couldn't start ZK" );
}
private void await( ZooKeeper[] keepers, long timeout, TimeUnit unit )
@@ -209,6 +233,82 @@ public synchronized void shutdown()
}
Arrays.fill( keeper, null );
}
+
+ public boolean ping( long timeout )
+ {
+ long endTime = System.currentTimeMillis() + timeout;
+ while ( System.currentTimeMillis() < endTime )
+ {
+ ZKPinger pinger = null;
+ try
+ {
+ pinger = new ZKPinger();
+ pinger.waitForConnection( 5000 );
+ Thread.sleep( 2000 );
+ pinger.waitForConnection( 5000 );
+ Thread.sleep( 2000 );
+ pinger.waitForConnection( 5000 );
+ return true;
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+// throw new ZooKeeperException( "Unable to create zoo keeper client", e );
+ }
+ finally
+ {
+ if ( pinger != null ) pinger.close();
+ }
+ }
+ return false;
+ }
+
+ private class ZKPinger implements Watcher, Condition<Boolean, Exception>
+ {
+ private org.apache.zookeeper.ZooKeeper zk;
+ private volatile boolean isConnected;
+
+ public ZKPinger() throws IOException
+ {
+ zk = new org.apache.zookeeper.ZooKeeper( getConnectionString(), 5000, this );
+ }
+
+ public void waitForConnection( long timeout ) throws Exception
+ {
+ waitForCondition( this, 5000 );
+ }
+
+ @Override
+ public void process( WatchedEvent event )
+ {
+ isConnected = event.getState() == KeeperState.SyncConnected;
+ }
+
+ @Override
+ public Boolean tryToFullfill()
+ {
+ return isConnected ? Boolean.TRUE : null;
+ }
+
+ @Override
+ public Exception failure()
+ {
+ return new Exception( "Not connected" );
+ }
+
+ private void close()
+ {
+ try
+ {
+ zk.close();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.interrupted();
+ // OK
+ }
+ }
+ }
public static void main( String[] args ) throws Exception
{
Oops, something went wrong.

0 comments on commit 52b1ebd

Please sign in to comment.