Skip to content

Commit

Permalink
Merge pull request #7909 from MishaDemianenko/3.0-single-update-puller
Browse files Browse the repository at this point in the history
Single active update puller
  • Loading branch information
MishaDemianenko committed Sep 12, 2016
2 parents 9133078 + b36c99a commit 6eabd27
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha;

/**
* Masters implementation of update puller that does nothing since master should not pull updates.
*/
public class MasterUpdatePuller implements UpdatePuller
{

public static final MasterUpdatePuller INSTANCE = new MasterUpdatePuller();

private MasterUpdatePuller()
{
}

@Override
public void start()
{
// no-op
}

@Override
public void stop()
{
// no-op
}

@Override
public void pullUpdates() throws InterruptedException
{
// no-op
}

@Override
public boolean tryPullUpdates() throws InterruptedException
{
return false;
}

@Override
public void pullUpdates( Condition condition, boolean assertPullerActive )
throws InterruptedException
{
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

Expand Down Expand Up @@ -113,7 +112,7 @@
*
* @see org.neo4j.kernel.ha.UpdatePuller
*/
public class SlaveUpdatePuller extends LifecycleAdapter implements Runnable, UpdatePuller
public class SlaveUpdatePuller implements Runnable, UpdatePuller
{
public interface Monitor
{
Expand All @@ -128,14 +127,7 @@ public interface Monitor
"org.neo4j.kernel.ha.SlaveUpdatePuller.AVAILABILITY_AWAIT_MILLIS", 5000 );
public static final String UPDATE_PULLER_THREAD_PREFIX = "UpdatePuller@";

static final Condition NEXT_TICKET = new Condition()
{
@Override
public boolean evaluate( int currentTicket, int targetTicket )
{
return currentTicket >= targetTicket;
}
};
static final Condition NEXT_TICKET = ( currentTicket, targetTicket ) -> currentTicket >= targetTicket;

private volatile boolean halted;
private final AtomicInteger targetTicket = new AtomicInteger();
Expand Down Expand Up @@ -213,7 +205,7 @@ private void periodicallyPullUpdates()
}

@Override
public synchronized void init()
public void start()
{
if ( shutdownLatch != null )
{
Expand All @@ -224,18 +216,8 @@ public synchronized void init()
jobScheduler.schedule( JobScheduler.Groups.pullUpdates, this );
}

@Override
public void start() // for removing throw declaration
{
}

@Override
public void stop() // for removing throw declaration
{
}

@Override
public synchronized void shutdown()
{
if ( shutdownLatch == null )
{
Expand Down Expand Up @@ -279,7 +261,7 @@ public void pullUpdates( Condition condition, boolean strictlyAssertActive ) thr
*
* @param condition {@link UpdatePuller.Condition} to wait for.
* @param strictlyAssertActive if {@code true} then observing an inactive update puller, whether
* {@link #shutdown() halted}, will throw an {@link IllegalStateException},
* {@link #stop() halted}, will throw an {@link IllegalStateException},
* otherwise if {@code false} just stop waiting and return {@code false}.
* @return whether or not the condition was met. If {@code strictlyAssertActive} either
* {@code true} will be returned or exception thrown, if puller became inactive.
Expand Down
33 changes: 11 additions & 22 deletions enterprise/ha/src/main/java/org/neo4j/kernel/ha/UpdatePuller.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
* <p>
* On a running instance of a store there should be only one active implementation of this interface.
* <p>
* Typically master instance should use {@link #NONE} implementation since master is owner of data in cluster env
* and its up to slaves to pull updates.
*
* @see SlaveUpdatePuller
* @see MasterUpdatePuller
*/
public interface UpdatePuller
{
Expand All @@ -48,6 +47,16 @@ public interface UpdatePuller
*/
boolean tryPullUpdates() throws InterruptedException;

/**
* Start update pulling
*/
void start();

/**
* Terminate update pulling
*/
void stop();

/**
* Pull updates and waits for the supplied condition to be
* fulfilled as part of the update pulling happening.
Expand All @@ -69,24 +78,4 @@ interface Condition
boolean evaluate( int currentTicket, int targetTicket );
}

UpdatePuller NONE = new UpdatePuller()
{
@Override
public void pullUpdates() throws InterruptedException
{
}

@Override
public boolean tryPullUpdates() throws InterruptedException
{
return false;
}

@Override
public void pullUpdates( Condition condition, boolean assertPullerActive )
throws InterruptedException
{

}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,17 @@ public final void switchToPending()
private void updateDelegate( T newValue )
{
T oldDelegate = delegate.setDelegate( newValue );
shutdownDelegate( oldDelegate );
shutdownOldDelegate( oldDelegate );
startNewDelegate( newValue );
}

protected void shutdownDelegate( T oldDelegate )
protected void startNewDelegate( T newValue )
{
// no-op by default
}

protected void shutdownOldDelegate( T oldDelegate )
{
// no-op by default
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected Locks getPendingImpl()
}

@Override
protected void shutdownDelegate( Locks oldLocks )
protected void shutdownOldDelegate( Locks oldLocks )
{
if ( oldLocks != null )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.ha.cluster.modeswitch;

import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.MasterUpdatePuller;
import org.neo4j.kernel.ha.PullerFactory;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.UpdatePuller;
Expand All @@ -44,26 +45,31 @@ public UpdatePullerSwitcher( DelegateInvocationHandler<UpdatePuller> delegate, P
@Override
protected UpdatePuller getMasterImpl()
{
return UpdatePuller.NONE;
return MasterUpdatePuller.INSTANCE;
}

@Override
protected UpdatePuller getSlaveImpl()
{
SlaveUpdatePuller slaveUpdatePuller = pullerFactory.createSlaveUpdatePuller();
slaveUpdatePuller.init();
slaveUpdatePuller.start();
return slaveUpdatePuller;
return pullerFactory.createSlaveUpdatePuller();
}

@Override
protected void shutdownDelegate( UpdatePuller updatePuller )
protected void shutdownOldDelegate( UpdatePuller updatePuller )
{
if ( updatePuller != null && updatePuller instanceof SlaveUpdatePuller )
if ( updatePuller != null )
{
SlaveUpdatePuller slaveUpdatePuller = (SlaveUpdatePuller) updatePuller;
slaveUpdatePuller.stop();
slaveUpdatePuller.shutdown();
updatePuller.stop();
}
}

@Override
protected void startNewDelegate( UpdatePuller updatePuller )
{
if ( updatePuller != null )
{
updatePuller.start();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.ha;

import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -42,15 +41,13 @@
import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.master.Slave;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.util.CountingJobScheduler;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.CleanupRule;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertFalse;
Expand All @@ -59,7 +56,6 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.contains;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -97,27 +93,22 @@ public void setUp() throws Throwable
when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true );
jobScheduler.init();
jobScheduler.start();
updatePuller.init();
updatePuller.start();
}

@After
public void tearDown() throws Throwable
{
updatePuller.stop();
updatePuller.shutdown();
jobScheduler.stop();
jobScheduler.shutdown();
}

@Test
public void initialisationMustBeIdempotent() throws Throwable
{
updatePuller.init();
updatePuller.start();
updatePuller.init();
updatePuller.start();
updatePuller.init();
updatePuller.start();
assertThat( scheduledJobs.get(), is( 1 ) );
}
Expand All @@ -135,7 +126,7 @@ public void shouldStopPullingAfterStop() throws Throwable
verify( monitor, times( 1 ) ).pulledUpdates( anyLong() );

// WHEN
updatePuller.shutdown();
updatePuller.stop();
updatePuller.pullUpdates();

// THEN
Expand Down Expand Up @@ -168,7 +159,7 @@ public void keepPullingUpdatesOnConsecutiveCalls() throws Throwable
public void falseOnTryPullUpdatesOnInactivePuller() throws Throwable
{
// GIVEN
updatePuller.shutdown();
updatePuller.stop();

// WHEN
boolean result = updatePuller.tryPullUpdates();
Expand All @@ -182,7 +173,7 @@ public void shouldThrowIfPullerInitiallyInactiveStrict() throws Throwable
{
// GIVEN
Condition condition = mock( Condition.class );
updatePuller.shutdown();
updatePuller.stop();

// WHEN
try
Expand All @@ -207,7 +198,7 @@ public void shouldThrowIfPullerBecomesInactiveWhileWaitingStrict() throws Except
@Override
public Boolean answer( InvocationOnMock invocation ) throws Throwable
{
updatePuller.shutdown();
updatePuller.stop();
return false;
}
} );
Expand Down Expand Up @@ -315,13 +306,4 @@ public void shouldCapExcessiveInvalidEpochExceptionLogging() throws Exception
repeat( new InvalidEpochException( 2, 1 ), SlaveUpdatePuller.LOG_CAP + 1 ) );
}

private AssertableLogProvider.LogMatcher[] repeat( AssertableLogProvider.LogMatcher item, int logCap )
{
AssertableLogProvider.LogMatcher[] items = new AssertableLogProvider.LogMatcher[logCap];
for ( int i = 0; i < logCap; i++ )
{
items[i] = item;
}
return items;
}
}

0 comments on commit 6eabd27

Please sign in to comment.