diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/util/CountingJobScheduler.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/CountingJobScheduler.java
new file mode 100644
index 0000000000000..c35683e858b73
--- /dev/null
+++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/util/CountingJobScheduler.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2002-2015 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.kernel.impl.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CountingJobScheduler implements JobScheduler
+{
+ private final AtomicInteger counter;
+ private final Neo4jJobScheduler delegate;
+
+ public CountingJobScheduler( AtomicInteger counter, Neo4jJobScheduler delegate )
+ {
+ this.counter = counter;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void init()
+ {
+ delegate.init();
+ }
+
+ @Override
+ public JobHandle schedule( Group group, Runnable job )
+ {
+ counter.getAndIncrement();
+ return delegate.schedule( group, job );
+ }
+
+ @Override
+ public JobHandle scheduleRecurring( Group group, Runnable runnable, long period,
+ TimeUnit timeUnit )
+ {
+ counter.getAndIncrement();
+ return delegate.scheduleRecurring( group, runnable, period, timeUnit );
+ }
+
+ @Override
+ public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay, long period,
+ TimeUnit timeUnit )
+ {
+ counter.getAndIncrement();
+ return delegate.scheduleRecurring( group, runnable, initialDelay, period, timeUnit );
+ }
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public void start() throws Throwable
+ {
+ delegate.start();
+ }
+
+ @Override
+ public void stop() throws Throwable
+ {
+ delegate.stop();
+ }
+}
diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/PullerFactory.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/PullerFactory.java
index 70cdd57e62730..d368b52f8d68a 100644
--- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/PullerFactory.java
+++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/PullerFactory.java
@@ -72,7 +72,7 @@ public PullerFactory( RequestContextFactory requestContextFactory, Master master
public UpdatePuller createUpdatePuller( LifeSupport life )
{
return life.add( new SlaveUpdatePuller( requestContextFactory, master, lastUpdateTime, logging, serverId,
- availabilityGuard, invalidEpochHandler ) );
+ availabilityGuard, invalidEpochHandler, jobScheduler ) );
}
public TransactionObligationFulfiller createObligationFulfiller( LifeSupport life, UpdatePuller updatePuller )
diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveUpdatePuller.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveUpdatePuller.java
index 88b4b24ece857..48b9381c74767 100644
--- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveUpdatePuller.java
+++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/SlaveUpdatePuller.java
@@ -30,6 +30,7 @@
import org.neo4j.com.TransactionStream;
import org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker;
import org.neo4j.com.storecopy.TransactionObligationFulfiller;
+import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.helpers.Pair;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.ha.com.RequestContextFactory;
@@ -40,6 +41,7 @@
import org.neo4j.kernel.ha.com.slave.MasterClient;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.CappedOperation;
+import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.logging.Logging;
@@ -126,7 +128,8 @@ public boolean evaluate( int currentTicket, int targetTicket )
};
private volatile boolean halted;
- private final AtomicInteger targetTicket = new AtomicInteger(), currentTicket = new AtomicInteger();
+ private final AtomicInteger targetTicket = new AtomicInteger();
+ private final AtomicInteger currentTicket = new AtomicInteger();
private final RequestContextFactory requestContextFactory;
private final Master master;
private final StringLogger logger;
@@ -135,11 +138,19 @@ public boolean evaluate( int currentTicket, int targetTicket )
private final InstanceId instanceId;
private final AvailabilityGuard availabilityGuard;
private InvalidEpochExceptionHandler invalidEpochHandler;
- private Thread me;
+ private final JobScheduler jobScheduler;
+ private volatile Thread updatePullingThread;
+ private volatile BinaryLatch shutdownLatch; // Store under synchronised(this), load in update puller thread
- SlaveUpdatePuller( RequestContextFactory requestContextFactory, Master master, LastUpdateTime lastUpdateTime,
- Logging logging, InstanceId instanceId, AvailabilityGuard availabilityGuard,
- InvalidEpochExceptionHandler invalidEpochHandler )
+ SlaveUpdatePuller(
+ RequestContextFactory requestContextFactory,
+ Master master,
+ LastUpdateTime lastUpdateTime,
+ Logging logging,
+ InstanceId instanceId,
+ AvailabilityGuard availabilityGuard,
+ InvalidEpochExceptionHandler invalidEpochHandler,
+ JobScheduler jobScheduler )
{
this.requestContextFactory = requestContextFactory;
this.master = master;
@@ -147,6 +158,7 @@ public boolean evaluate( int currentTicket, int targetTicket )
this.instanceId = instanceId;
this.availabilityGuard = availabilityGuard;
this.invalidEpochHandler = invalidEpochHandler;
+ this.jobScheduler = jobScheduler;
this.logger = logging.getMessagesLog( getClass() );
this.cappedLogger = new CappedOperation>(
CappedOperation.count( 10 ) )
@@ -161,6 +173,23 @@ protected void triggered( Pair event )
@Override
public void run()
+ {
+ updatePullingThread = Thread.currentThread();
+ String oldName = updatePullingThread.getName();
+ updatePullingThread.setName( UPDATE_PULLER_THREAD_PREFIX + instanceId );
+ try
+ {
+ periodicallyPullUpdates();
+ }
+ finally
+ {
+ updatePullingThread.setName( oldName );
+ updatePullingThread = null;
+ shutdownLatch.release();
+ }
+ }
+
+ private void periodicallyPullUpdates()
{
while ( !halted )
{
@@ -177,25 +206,30 @@ public void run()
}
@Override
- public void init() throws Throwable
+ public synchronized void init() throws Throwable
{
- // TODO Don't do this. This is just to satisfy LockSupport park/unpark
- // And we cannot have this class extend Thread since there's a naming clash with Lifecycle for stop()
- me = new Thread( this, UPDATE_PULLER_THREAD_PREFIX + instanceId );
- me.start();
+ if ( shutdownLatch != null )
+ {
+ return; // This SlaveUpdatePuller has already been initialised
+ }
+
+ shutdownLatch = new BinaryLatch();
+ jobScheduler.schedule( JobScheduler.Group.pullUpdates, this );
}
@Override
- public void shutdown() throws Throwable
+ public synchronized void shutdown() throws Throwable
{
- halted = true;
- while ( me.getState() != Thread.State.TERMINATED )
+ if ( shutdownLatch == null )
{
- Thread.sleep( 1 );
- Thread.yield();
+ return; // This SlaveUpdatePuller has already been shut down
}
- invalidEpochHandler = null;
- me = null;
+
+ Thread thread = updatePullingThread;
+ halted = true;
+ LockSupport.unpark( thread );
+ shutdownLatch.await();
+ shutdownLatch = null;
}
@Override
@@ -282,7 +316,7 @@ private boolean checkActive( boolean strictlyAssertActive )
private int poke()
{
int result = this.targetTicket.incrementAndGet();
- LockSupport.unpark( me );
+ LockSupport.unpark( updatePullingThread );
return result;
}
diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/InvalidEpochExceptionHandler.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/InvalidEpochExceptionHandler.java
index 33c3ee17d56c6..5520b779cda15 100644
--- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/InvalidEpochExceptionHandler.java
+++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/com/slave/InvalidEpochExceptionHandler.java
@@ -22,13 +22,4 @@
public interface InvalidEpochExceptionHandler
{
void handle();
-
- InvalidEpochExceptionHandler NONE = new InvalidEpochExceptionHandler()
- {
- @Override
- public void handle()
- {
-
- }
- };
}
diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/SlaveUpdatePullerTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/SlaveUpdatePullerTest.java
index 234a961799b5f..21b5c0ad391ce 100644
--- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/SlaveUpdatePullerTest.java
+++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/SlaveUpdatePullerTest.java
@@ -19,6 +19,7 @@
*/
package org.neo4j.kernel.ha;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -28,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId;
@@ -39,6 +41,9 @@
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
+import org.neo4j.kernel.impl.util.CountingJobScheduler;
+import org.neo4j.kernel.impl.util.JobScheduler;
+import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.logging.BufferingLogger;
@@ -47,7 +52,9 @@
import org.neo4j.kernel.logging.Logging;
import org.neo4j.test.CleanupRule;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -63,6 +70,7 @@
public class SlaveUpdatePullerTest
{
+ private final AtomicInteger scheduledJobs = new AtomicInteger();
private final InstanceId instanceId = new InstanceId( 1 );
private final Config config = mock( Config.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );
@@ -71,22 +79,47 @@ public class SlaveUpdatePullerTest
private final ErrorTrackingLogging logging = new ErrorTrackingLogging();
private final RequestContextFactory requestContextFactory = mock( RequestContextFactory.class );
private final InvalidEpochExceptionHandler invalidEpochHandler = mock( InvalidEpochExceptionHandler.class );
+ private final JobScheduler jobScheduler = new CountingJobScheduler(
+ scheduledJobs, new Neo4jJobScheduler( "SlaveUpdatePullerTest" ) );
private final SlaveUpdatePuller updatePuller = new SlaveUpdatePuller( requestContextFactory,
- master, lastUpdateTime, logging, instanceId, availabilityGuard, invalidEpochHandler );
+ master, lastUpdateTime, logging, instanceId, availabilityGuard, invalidEpochHandler, jobScheduler );
@Rule
public final CleanupRule cleanup = new CleanupRule();
@Before
- public void setup() throws Throwable
+ public void setUp() throws Throwable
{
- when( config.get( HaSettings.pull_interval ) ).thenReturn( 1000l );
+ when( config.get( HaSettings.pull_interval ) ).thenReturn( 1000L );
when( config.get( ClusterSettings.server_id ) ).thenReturn( instanceId );
when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true );
+ jobScheduler.init();
+ jobScheduler.start();
updatePuller.init();
updatePuller.start();
}
+ @After
+ public void tearDown() throws Throwable
+ {
+ updatePuller.stop();
+ updatePuller.shutdown();
+ jobScheduler.stop();
+ jobScheduler.shutdown();
+ }
+
+ @Test
+ public void initialisationMustBeIdempotent() throws Throwable
+ {
+ updatePuller.init();
+ updatePuller.start();
+ updatePuller.init();
+ updatePuller.start();
+ updatePuller.init();
+ updatePuller.start();
+ assertThat( scheduledJobs.get(), is( 1 ) );
+ }
+
@Test
public void shouldStopPullingAfterStop() throws Throwable
{