Skip to content

Commit

Permalink
Single active update puller
Browse files Browse the repository at this point in the history
After introduction of detached state components lifecycle was change in a way that now it's possible to start new set of components while you still have old one active.
For case of update puller it mean that for some time we will have 2 update pullers active and they will try to apply transactions concurrently.
That is incorrect and causing errors during transactions apply phase.
This commit introduce new phase in component switch lifecycle, just after old component shutdown. And we will use this phase to start new update puller

@pair-with @lutovich
  • Loading branch information
MishaDemianenko committed Sep 9, 2016
1 parent 84a4f36 commit b36c99a
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 81 deletions.
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.ha;

/**
* Masters implementation of update puller that does nothing since master should not pull updates.
*/
public class MasterUpdatePuller implements UpdatePuller
{

public static final MasterUpdatePuller INSTANCE = new MasterUpdatePuller();

private MasterUpdatePuller()
{
}

@Override
public void start()
{
// no-op
}

@Override
public void stop()
{
// no-op
}

@Override
public void pullUpdates() throws InterruptedException
{
// no-op
}

@Override
public boolean tryPullUpdates() throws InterruptedException
{
return false;
}

@Override
public void pullUpdates( Condition condition, boolean assertPullerActive )
throws InterruptedException
{
// no-op
}
}
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.CappedLogger; import org.neo4j.kernel.impl.util.CappedLogger;
import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;


Expand Down Expand Up @@ -113,7 +112,7 @@
* *
* @see org.neo4j.kernel.ha.UpdatePuller * @see org.neo4j.kernel.ha.UpdatePuller
*/ */
public class SlaveUpdatePuller extends LifecycleAdapter implements Runnable, UpdatePuller public class SlaveUpdatePuller implements Runnable, UpdatePuller
{ {
public interface Monitor public interface Monitor
{ {
Expand All @@ -128,14 +127,7 @@ public interface Monitor
"org.neo4j.kernel.ha.SlaveUpdatePuller.AVAILABILITY_AWAIT_MILLIS", 5000 ); "org.neo4j.kernel.ha.SlaveUpdatePuller.AVAILABILITY_AWAIT_MILLIS", 5000 );
public static final String UPDATE_PULLER_THREAD_PREFIX = "UpdatePuller@"; public static final String UPDATE_PULLER_THREAD_PREFIX = "UpdatePuller@";


static final Condition NEXT_TICKET = new Condition() static final Condition NEXT_TICKET = ( currentTicket, targetTicket ) -> currentTicket >= targetTicket;
{
@Override
public boolean evaluate( int currentTicket, int targetTicket )
{
return currentTicket >= targetTicket;
}
};


private volatile boolean halted; private volatile boolean halted;
private final AtomicInteger targetTicket = new AtomicInteger(); private final AtomicInteger targetTicket = new AtomicInteger();
Expand Down Expand Up @@ -213,7 +205,7 @@ private void periodicallyPullUpdates()
} }


@Override @Override
public synchronized void init() public void start()
{ {
if ( shutdownLatch != null ) if ( shutdownLatch != null )
{ {
Expand All @@ -224,18 +216,8 @@ public synchronized void init()
jobScheduler.schedule( JobScheduler.Groups.pullUpdates, this ); jobScheduler.schedule( JobScheduler.Groups.pullUpdates, this );
} }


@Override
public void start() // for removing throw declaration
{
}

@Override @Override
public void stop() // for removing throw declaration public void stop() // for removing throw declaration
{
}

@Override
public synchronized void shutdown()
{ {
if ( shutdownLatch == null ) if ( shutdownLatch == null )
{ {
Expand Down Expand Up @@ -279,7 +261,7 @@ public void pullUpdates( Condition condition, boolean strictlyAssertActive ) thr
* *
* @param condition {@link UpdatePuller.Condition} to wait for. * @param condition {@link UpdatePuller.Condition} to wait for.
* @param strictlyAssertActive if {@code true} then observing an inactive update puller, whether * @param strictlyAssertActive if {@code true} then observing an inactive update puller, whether
* {@link #shutdown() halted}, will throw an {@link IllegalStateException}, * {@link #stop() halted}, will throw an {@link IllegalStateException},
* otherwise if {@code false} just stop waiting and return {@code false}. * otherwise if {@code false} just stop waiting and return {@code false}.
* @return whether or not the condition was met. If {@code strictlyAssertActive} either * @return whether or not the condition was met. If {@code strictlyAssertActive} either
* {@code true} will be returned or exception thrown, if puller became inactive. * {@code true} will be returned or exception thrown, if puller became inactive.
Expand Down
33 changes: 11 additions & 22 deletions enterprise/ha/src/main/java/org/neo4j/kernel/ha/UpdatePuller.java
Expand Up @@ -25,10 +25,9 @@
* <p> * <p>
* On a running instance of a store there should be only one active implementation of this interface. * On a running instance of a store there should be only one active implementation of this interface.
* <p> * <p>
* Typically master instance should use {@link #NONE} implementation since master is owner of data in cluster env
* and its up to slaves to pull updates.
* *
* @see SlaveUpdatePuller * @see SlaveUpdatePuller
* @see MasterUpdatePuller
*/ */
public interface UpdatePuller public interface UpdatePuller
{ {
Expand All @@ -48,6 +47,16 @@ public interface UpdatePuller
*/ */
boolean tryPullUpdates() throws InterruptedException; boolean tryPullUpdates() throws InterruptedException;


/**
* Start update pulling
*/
void start();

/**
* Terminate update pulling
*/
void stop();

/** /**
* Pull updates and waits for the supplied condition to be * Pull updates and waits for the supplied condition to be
* fulfilled as part of the update pulling happening. * fulfilled as part of the update pulling happening.
Expand All @@ -69,24 +78,4 @@ interface Condition
boolean evaluate( int currentTicket, int targetTicket ); boolean evaluate( int currentTicket, int targetTicket );
} }


UpdatePuller NONE = new UpdatePuller()
{
@Override
public void pullUpdates() throws InterruptedException
{
}

@Override
public boolean tryPullUpdates() throws InterruptedException
{
return false;
}

@Override
public void pullUpdates( Condition condition, boolean assertPullerActive )
throws InterruptedException
{

}
};
} }
Expand Up @@ -67,10 +67,17 @@ public final void switchToPending()
private void updateDelegate( T newValue ) private void updateDelegate( T newValue )
{ {
T oldDelegate = delegate.setDelegate( newValue ); T oldDelegate = delegate.setDelegate( newValue );
shutdownDelegate( oldDelegate ); shutdownOldDelegate( oldDelegate );
startNewDelegate( newValue );
} }


protected void shutdownDelegate( T oldDelegate ) protected void startNewDelegate( T newValue )
{ {
// no-op by default
}

protected void shutdownOldDelegate( T oldDelegate )
{
// no-op by default
} }
} }
Expand Up @@ -72,7 +72,7 @@ protected Locks getPendingImpl()
} }


@Override @Override
protected void shutdownDelegate( Locks oldLocks ) protected void shutdownOldDelegate( Locks oldLocks )
{ {
if ( oldLocks != null ) if ( oldLocks != null )
{ {
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.kernel.ha.cluster.modeswitch; package org.neo4j.kernel.ha.cluster.modeswitch;


import org.neo4j.kernel.ha.DelegateInvocationHandler; import org.neo4j.kernel.ha.DelegateInvocationHandler;
import org.neo4j.kernel.ha.MasterUpdatePuller;
import org.neo4j.kernel.ha.PullerFactory; import org.neo4j.kernel.ha.PullerFactory;
import org.neo4j.kernel.ha.SlaveUpdatePuller; import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.UpdatePuller; import org.neo4j.kernel.ha.UpdatePuller;
Expand All @@ -44,26 +45,31 @@ public UpdatePullerSwitcher( DelegateInvocationHandler<UpdatePuller> delegate, P
@Override @Override
protected UpdatePuller getMasterImpl() protected UpdatePuller getMasterImpl()
{ {
return UpdatePuller.NONE; return MasterUpdatePuller.INSTANCE;
} }


@Override @Override
protected UpdatePuller getSlaveImpl() protected UpdatePuller getSlaveImpl()
{ {
SlaveUpdatePuller slaveUpdatePuller = pullerFactory.createSlaveUpdatePuller(); return pullerFactory.createSlaveUpdatePuller();
slaveUpdatePuller.init();
slaveUpdatePuller.start();
return slaveUpdatePuller;
} }


@Override @Override
protected void shutdownDelegate( UpdatePuller updatePuller ) protected void shutdownOldDelegate( UpdatePuller updatePuller )
{ {
if ( updatePuller != null && updatePuller instanceof SlaveUpdatePuller ) if ( updatePuller != null )
{ {
SlaveUpdatePuller slaveUpdatePuller = (SlaveUpdatePuller) updatePuller; updatePuller.stop();
slaveUpdatePuller.stop();
slaveUpdatePuller.shutdown();
} }
} }

@Override
protected void startNewDelegate( UpdatePuller updatePuller )
{
if ( updatePuller != null )
{
updatePuller.start();
}
}

} }
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.kernel.ha; package org.neo4j.kernel.ha;


import org.hamcrest.Matcher;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
Expand All @@ -42,15 +41,13 @@
import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.RequestContextFactory;
import org.neo4j.kernel.ha.com.master.InvalidEpochException; import org.neo4j.kernel.ha.com.master.InvalidEpochException;
import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.com.master.Slave;
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler; import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.util.CountingJobScheduler; import org.neo4j.kernel.impl.util.CountingJobScheduler;
import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.CleanupRule; import org.neo4j.test.CleanupRule;


import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
Expand All @@ -59,7 +56,6 @@
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.contains;
import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -97,27 +93,22 @@ public void setUp() throws Throwable
when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true ); when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true );
jobScheduler.init(); jobScheduler.init();
jobScheduler.start(); jobScheduler.start();
updatePuller.init();
updatePuller.start(); updatePuller.start();
} }


@After @After
public void tearDown() throws Throwable public void tearDown() throws Throwable
{ {
updatePuller.stop(); updatePuller.stop();
updatePuller.shutdown();
jobScheduler.stop(); jobScheduler.stop();
jobScheduler.shutdown(); jobScheduler.shutdown();
} }


@Test @Test
public void initialisationMustBeIdempotent() throws Throwable public void initialisationMustBeIdempotent() throws Throwable
{ {
updatePuller.init();
updatePuller.start(); updatePuller.start();
updatePuller.init();
updatePuller.start(); updatePuller.start();
updatePuller.init();
updatePuller.start(); updatePuller.start();
assertThat( scheduledJobs.get(), is( 1 ) ); assertThat( scheduledJobs.get(), is( 1 ) );
} }
Expand All @@ -135,7 +126,7 @@ public void shouldStopPullingAfterStop() throws Throwable
verify( monitor, times( 1 ) ).pulledUpdates( anyLong() ); verify( monitor, times( 1 ) ).pulledUpdates( anyLong() );


// WHEN // WHEN
updatePuller.shutdown(); updatePuller.stop();
updatePuller.pullUpdates(); updatePuller.pullUpdates();


// THEN // THEN
Expand Down Expand Up @@ -168,7 +159,7 @@ public void keepPullingUpdatesOnConsecutiveCalls() throws Throwable
public void falseOnTryPullUpdatesOnInactivePuller() throws Throwable public void falseOnTryPullUpdatesOnInactivePuller() throws Throwable
{ {
// GIVEN // GIVEN
updatePuller.shutdown(); updatePuller.stop();


// WHEN // WHEN
boolean result = updatePuller.tryPullUpdates(); boolean result = updatePuller.tryPullUpdates();
Expand All @@ -182,7 +173,7 @@ public void shouldThrowIfPullerInitiallyInactiveStrict() throws Throwable
{ {
// GIVEN // GIVEN
Condition condition = mock( Condition.class ); Condition condition = mock( Condition.class );
updatePuller.shutdown(); updatePuller.stop();


// WHEN // WHEN
try try
Expand All @@ -207,7 +198,7 @@ public void shouldThrowIfPullerBecomesInactiveWhileWaitingStrict() throws Except
@Override @Override
public Boolean answer( InvocationOnMock invocation ) throws Throwable public Boolean answer( InvocationOnMock invocation ) throws Throwable
{ {
updatePuller.shutdown(); updatePuller.stop();
return false; return false;
} }
} ); } );
Expand Down Expand Up @@ -315,13 +306,4 @@ public void shouldCapExcessiveInvalidEpochExceptionLogging() throws Exception
repeat( new InvalidEpochException( 2, 1 ), SlaveUpdatePuller.LOG_CAP + 1 ) ); repeat( new InvalidEpochException( 2, 1 ), SlaveUpdatePuller.LOG_CAP + 1 ) );
} }


private AssertableLogProvider.LogMatcher[] repeat( AssertableLogProvider.LogMatcher item, int logCap )
{
AssertableLogProvider.LogMatcher[] items = new AssertableLogProvider.LogMatcher[logCap];
for ( int i = 0; i < logCap; i++ )
{
items[i] = item;
}
return items;
}
} }

0 comments on commit b36c99a

Please sign in to comment.