Skip to content

Commit

Permalink
Make the SlaveUpdatePuller schedule its thread through the JobScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Dec 18, 2015
1 parent b2812c8 commit 2373c4a
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 31 deletions.
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CountingJobScheduler implements JobScheduler
{
private final AtomicInteger counter;
private final Neo4jJobScheduler delegate;

public CountingJobScheduler( AtomicInteger counter, Neo4jJobScheduler delegate )
{
this.counter = counter;
this.delegate = delegate;
}

@Override
public void init()
{
delegate.init();
}

@Override
public JobHandle schedule( Group group, Runnable job )
{
counter.getAndIncrement();
return delegate.schedule( group, job );
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long period,
TimeUnit timeUnit )
{
counter.getAndIncrement();
return delegate.scheduleRecurring( group, runnable, period, timeUnit );
}

@Override
public JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay, long period,
TimeUnit timeUnit )
{
counter.getAndIncrement();
return delegate.scheduleRecurring( group, runnable, initialDelay, period, timeUnit );
}

@Override
public void shutdown()
{
delegate.shutdown();
}

@Override
public void start() throws Throwable
{
delegate.start();
}

@Override
public void stop() throws Throwable
{
delegate.stop();
}
}
Expand Up @@ -72,7 +72,7 @@ public PullerFactory( RequestContextFactory requestContextFactory, Master master
public UpdatePuller createUpdatePuller( LifeSupport life ) public UpdatePuller createUpdatePuller( LifeSupport life )
{ {
return life.add( new SlaveUpdatePuller( requestContextFactory, master, lastUpdateTime, logging, serverId, return life.add( new SlaveUpdatePuller( requestContextFactory, master, lastUpdateTime, logging, serverId,
availabilityGuard, invalidEpochHandler ) ); availabilityGuard, invalidEpochHandler, jobScheduler ) );
} }


public TransactionObligationFulfiller createObligationFulfiller( LifeSupport life, UpdatePuller updatePuller ) public TransactionObligationFulfiller createObligationFulfiller( LifeSupport life, UpdatePuller updatePuller )
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.com.TransactionStream; import org.neo4j.com.TransactionStream;
import org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker; import org.neo4j.com.storecopy.TransactionCommittingResponseUnpacker;
import org.neo4j.com.storecopy.TransactionObligationFulfiller; import org.neo4j.com.storecopy.TransactionObligationFulfiller;
import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.helpers.Pair; import org.neo4j.helpers.Pair;
import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.kernel.ha.com.slave.MasterClient; import org.neo4j.kernel.ha.com.slave.MasterClient;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.CappedOperation; import org.neo4j.kernel.impl.util.CappedOperation;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.logging.Logging; import org.neo4j.kernel.logging.Logging;
Expand Down Expand Up @@ -126,7 +128,8 @@ public boolean evaluate( int currentTicket, int targetTicket )
}; };


private volatile boolean halted; private volatile boolean halted;
private final AtomicInteger targetTicket = new AtomicInteger(), currentTicket = new AtomicInteger(); private final AtomicInteger targetTicket = new AtomicInteger();
private final AtomicInteger currentTicket = new AtomicInteger();
private final RequestContextFactory requestContextFactory; private final RequestContextFactory requestContextFactory;
private final Master master; private final Master master;
private final StringLogger logger; private final StringLogger logger;
Expand All @@ -135,18 +138,27 @@ public boolean evaluate( int currentTicket, int targetTicket )
private final InstanceId instanceId; private final InstanceId instanceId;
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;
private InvalidEpochExceptionHandler invalidEpochHandler; private InvalidEpochExceptionHandler invalidEpochHandler;
private Thread me; private final JobScheduler jobScheduler;
private volatile Thread updatePullingThread;
private volatile BinaryLatch shutdownLatch; // Store under synchronised(this), load in update puller thread


SlaveUpdatePuller( RequestContextFactory requestContextFactory, Master master, LastUpdateTime lastUpdateTime, SlaveUpdatePuller(
Logging logging, InstanceId instanceId, AvailabilityGuard availabilityGuard, RequestContextFactory requestContextFactory,
InvalidEpochExceptionHandler invalidEpochHandler ) Master master,
LastUpdateTime lastUpdateTime,
Logging logging,
InstanceId instanceId,
AvailabilityGuard availabilityGuard,
InvalidEpochExceptionHandler invalidEpochHandler,
JobScheduler jobScheduler )
{ {
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.master = master; this.master = master;
this.lastUpdateTime = lastUpdateTime; this.lastUpdateTime = lastUpdateTime;
this.instanceId = instanceId; this.instanceId = instanceId;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.invalidEpochHandler = invalidEpochHandler; this.invalidEpochHandler = invalidEpochHandler;
this.jobScheduler = jobScheduler;
this.logger = logging.getMessagesLog( getClass() ); this.logger = logging.getMessagesLog( getClass() );
this.cappedLogger = new CappedOperation<Pair<String,? extends Exception>>( this.cappedLogger = new CappedOperation<Pair<String,? extends Exception>>(
CappedOperation.count( 10 ) ) CappedOperation.count( 10 ) )
Expand All @@ -161,6 +173,23 @@ protected void triggered( Pair<String,? extends Exception> event )


@Override @Override
public void run() public void run()
{
updatePullingThread = Thread.currentThread();
String oldName = updatePullingThread.getName();
updatePullingThread.setName( UPDATE_PULLER_THREAD_PREFIX + instanceId );
try
{
periodicallyPullUpdates();
}
finally
{
updatePullingThread.setName( oldName );
updatePullingThread = null;
shutdownLatch.release();
}
}

private void periodicallyPullUpdates()
{ {
while ( !halted ) while ( !halted )
{ {
Expand All @@ -177,25 +206,30 @@ public void run()
} }


@Override @Override
public void init() throws Throwable public synchronized void init() throws Throwable
{ {
// TODO Don't do this. This is just to satisfy LockSupport park/unpark if ( shutdownLatch != null )
// And we cannot have this class extend Thread since there's a naming clash with Lifecycle for stop() {
me = new Thread( this, UPDATE_PULLER_THREAD_PREFIX + instanceId ); return; // This SlaveUpdatePuller has already been initialised
me.start(); }

shutdownLatch = new BinaryLatch();
jobScheduler.schedule( JobScheduler.Group.pullUpdates, this );
} }


@Override @Override
public void shutdown() throws Throwable public synchronized void shutdown() throws Throwable
{ {
halted = true; if ( shutdownLatch == null )
while ( me.getState() != Thread.State.TERMINATED )
{ {
Thread.sleep( 1 ); return; // This SlaveUpdatePuller has already been shut down
Thread.yield();
} }
invalidEpochHandler = null;
me = null; Thread thread = updatePullingThread;
halted = true;
LockSupport.unpark( thread );
shutdownLatch.await();
shutdownLatch = null;
} }


@Override @Override
Expand Down Expand Up @@ -282,7 +316,7 @@ private boolean checkActive( boolean strictlyAssertActive )
private int poke() private int poke()
{ {
int result = this.targetTicket.incrementAndGet(); int result = this.targetTicket.incrementAndGet();
LockSupport.unpark( me ); LockSupport.unpark( updatePullingThread );
return result; return result;
} }


Expand Down
Expand Up @@ -22,13 +22,4 @@
public interface InvalidEpochExceptionHandler public interface InvalidEpochExceptionHandler
{ {
void handle(); void handle();

InvalidEpochExceptionHandler NONE = new InvalidEpochExceptionHandler()
{
@Override
public void handle()
{

}
};
} }
Expand Up @@ -19,6 +19,7 @@
*/ */
package org.neo4j.kernel.ha; package org.neo4j.kernel.ha;


import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
Expand All @@ -28,6 +29,7 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;


import org.neo4j.cluster.ClusterSettings; import org.neo4j.cluster.ClusterSettings;
import org.neo4j.cluster.InstanceId; import org.neo4j.cluster.InstanceId;
Expand All @@ -39,6 +41,9 @@
import org.neo4j.kernel.ha.com.master.InvalidEpochException; import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler; import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.util.CountingJobScheduler;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.impl.util.StringLogger;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.logging.BufferingLogger; import org.neo4j.kernel.logging.BufferingLogger;
Expand All @@ -47,7 +52,9 @@
import org.neo4j.kernel.logging.Logging; import org.neo4j.kernel.logging.Logging;
import org.neo4j.test.CleanupRule; import org.neo4j.test.CleanupRule;


import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
Expand All @@ -63,6 +70,7 @@


public class SlaveUpdatePullerTest public class SlaveUpdatePullerTest
{ {
private final AtomicInteger scheduledJobs = new AtomicInteger();
private final InstanceId instanceId = new InstanceId( 1 ); private final InstanceId instanceId = new InstanceId( 1 );
private final Config config = mock( Config.class ); private final Config config = mock( Config.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class ); private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );
Expand All @@ -71,22 +79,47 @@ public class SlaveUpdatePullerTest
private final ErrorTrackingLogging logging = new ErrorTrackingLogging(); private final ErrorTrackingLogging logging = new ErrorTrackingLogging();
private final RequestContextFactory requestContextFactory = mock( RequestContextFactory.class ); private final RequestContextFactory requestContextFactory = mock( RequestContextFactory.class );
private final InvalidEpochExceptionHandler invalidEpochHandler = mock( InvalidEpochExceptionHandler.class ); private final InvalidEpochExceptionHandler invalidEpochHandler = mock( InvalidEpochExceptionHandler.class );
private final JobScheduler jobScheduler = new CountingJobScheduler(
scheduledJobs, new Neo4jJobScheduler( "SlaveUpdatePullerTest" ) );
private final SlaveUpdatePuller updatePuller = new SlaveUpdatePuller( requestContextFactory, private final SlaveUpdatePuller updatePuller = new SlaveUpdatePuller( requestContextFactory,
master, lastUpdateTime, logging, instanceId, availabilityGuard, invalidEpochHandler ); master, lastUpdateTime, logging, instanceId, availabilityGuard, invalidEpochHandler, jobScheduler );


@Rule @Rule
public final CleanupRule cleanup = new CleanupRule(); public final CleanupRule cleanup = new CleanupRule();


@Before @Before
public void setup() throws Throwable public void setUp() throws Throwable
{ {
when( config.get( HaSettings.pull_interval ) ).thenReturn( 1000l ); when( config.get( HaSettings.pull_interval ) ).thenReturn( 1000L );
when( config.get( ClusterSettings.server_id ) ).thenReturn( instanceId ); when( config.get( ClusterSettings.server_id ) ).thenReturn( instanceId );
when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true ); when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true );
jobScheduler.init();
jobScheduler.start();
updatePuller.init(); updatePuller.init();
updatePuller.start(); updatePuller.start();
} }


@After
public void tearDown() throws Throwable
{
updatePuller.stop();
updatePuller.shutdown();
jobScheduler.stop();
jobScheduler.shutdown();
}

@Test
public void initialisationMustBeIdempotent() throws Throwable
{
updatePuller.init();
updatePuller.start();
updatePuller.init();
updatePuller.start();
updatePuller.init();
updatePuller.start();
assertThat( scheduledJobs.get(), is( 1 ) );
}

@Test @Test
public void shouldStopPullingAfterStop() throws Throwable public void shouldStopPullingAfterStop() throws Throwable
{ {
Expand Down

0 comments on commit 2373c4a

Please sign in to comment.