Skip to content

Commit

Permalink
Allow read queries to run on Pending machines
Browse files Browse the repository at this point in the history
Before this change in HA when switching to pending the Locks delegate
would have set to null, which prevents to create any transactions.
Indeed, at transaction creation time we create a lock client through
the Locks for such transaction.  Basically failing all transaction
creation.

This commit changes the Locks to be a ReadOnlyLocks when switching to
pending. Such Locks allows for creation of ReadOnlyClient which fails
all possible lock acquisition, i.e., write transaction.  In such a way
read transaction can execute on machine in pending state.

It also improves the AbstractComponentSwitcher to be less racy by
switching directly from old delegate to new delegate, without setting
the delegate to null in between those 2 steps.
  • Loading branch information
davidegrohmann committed Sep 5, 2016
1 parent 839b8f2 commit 9603a65
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 69 deletions.
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.locking;

import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.storageengine.api.lock.ResourceType;

public class ReadOnlyLocks implements Locks
{
private static final ReadOnlyClient READ_ONLY_CLIENT = new ReadOnlyClient();

@Override
public Client newClient()
{
return READ_ONLY_CLIENT;
}

@Override
public void accept( Visitor visitor )
{
}

@Override
public void close()
{
}

private static class ReadOnlyClient extends NoOpClient
{
@Override
public void acquireShared( ResourceType resourceType, long... resourceIds ) throws AcquireLockTimeoutException
{
fail();
}

@Override
public void acquireExclusive( ResourceType resourceType, long... resourceIds ) throws AcquireLockTimeoutException
{
fail();
}

@Override
public boolean tryExclusiveLock( ResourceType resourceType, long resourceId )
{
fail();
return false;
}

@Override
public boolean trySharedLock( ResourceType resourceType, long resourceId )
{
fail();
return false;
}

@Override
public void releaseShared( ResourceType resourceType, long resourceId )
{
fail();
}

@Override
public void releaseExclusive( ResourceType resourceType, long resourceId )
{
fail();
}

private void fail()
{
throw new IllegalStateException( "Cannot acquire locks in read only mode" );
}
}
}
Expand Up @@ -66,12 +66,16 @@ protected T create()
* such that future calls to {@link #harden()} cannot affect any reference received
* from {@link #cement()} prior to this call.
* @param delegate the new delegate to set.
*
* @return the old delegate
*/
public void setDelegate( T delegate )
public T setDelegate( T delegate )
{
T oldDelegate = this.delegate;
this.delegate = delegate;
harden();
concrete.invalidate();
return oldDelegate;
}

/**
Expand Down
Expand Up @@ -213,7 +213,7 @@ private void periodicallyPullUpdates()
}

@Override
public synchronized void init() throws Throwable
public synchronized void init()
{
if ( shutdownLatch != null )
{
Expand All @@ -225,7 +225,17 @@ public synchronized void init() throws Throwable
}

@Override
public synchronized void shutdown() throws Throwable
public void start() // for removing throw declaration
{
}

@Override
public void stop() // for removing throw declaration
{
}

@Override
public synchronized void shutdown()
{
if ( shutdownLatch == null )
{
Expand Down
Expand Up @@ -41,35 +41,36 @@ protected AbstractComponentSwitcher( DelegateInvocationHandler<T> delegate )

protected abstract T getSlaveImpl();

@Override
public void switchToMaster()
protected T getPendingImpl()
{
shutdownCurrent();
T component = getMasterImpl();
updateDelegate( component );
return null;
}

@Override
public void switchToSlave()
public final void switchToMaster()
{
shutdownCurrent();
T component = getSlaveImpl();
updateDelegate( component );
updateDelegate( getMasterImpl() );
}

@Override
public void switchToPending()
public final void switchToSlave()
{
shutdownCurrent();
updateDelegate( getSlaveImpl() );
}

protected void shutdownCurrent()
@Override
public final void switchToPending()
{
updateDelegate( null );
updateDelegate( getPendingImpl() );
}

private void updateDelegate( T newValue )
{
delegate.setDelegate( newValue );
T oldDelegate = delegate.setDelegate( newValue );
shutdownDelegate( oldDelegate );
}

protected void shutdownDelegate( T oldDelegate )
{
}
}
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.ha.com.master.Master;
import org.neo4j.kernel.ha.lock.SlaveLockManager;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ReadOnlyLocks;
import org.neo4j.logging.LogProvider;

public class LockManagerSwitcher extends AbstractComponentSwitcher<Locks>
Expand All @@ -38,8 +39,6 @@ public class LockManagerSwitcher extends AbstractComponentSwitcher<Locks>
private final LogProvider logProvider;
private final Config config;

private volatile Locks currentLocks;

public LockManagerSwitcher( DelegateInvocationHandler<Locks> delegate, DelegateInvocationHandler<Master> master,
RequestContextFactory requestContextFactory, AvailabilityGuard availabilityGuard,
Factory<Locks> locksFactory, LogProvider logProvider, Config config )
Expand All @@ -56,31 +55,28 @@ public LockManagerSwitcher( DelegateInvocationHandler<Locks> delegate, DelegateI
@Override
protected Locks getMasterImpl()
{
currentLocks = locksFactory.newInstance();
return currentLocks;
return locksFactory.newInstance();
}

@Override
protected Locks getSlaveImpl()
{
currentLocks = new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(),
return new SlaveLockManager( locksFactory.newInstance(), requestContextFactory, master.cement(),
availabilityGuard, logProvider, config );
return currentLocks;
}

@Override
protected void shutdownCurrent()
protected Locks getPendingImpl()
{
super.shutdownCurrent();
closeCurrentLocks();
return new ReadOnlyLocks();
}

private void closeCurrentLocks()
@Override
protected void shutdownDelegate( Locks oldLocks )
{
if ( currentLocks != null )
if ( oldLocks != null )
{
currentLocks.close();
currentLocks = null;
oldLocks.close();
}
}
}
Expand Up @@ -23,7 +23,6 @@
import org.neo4j.kernel.ha.PullerFactory;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.ha.UpdatePuller;
import org.neo4j.kernel.lifecycle.LifeSupport;

/**
* UpdatePullerSwitcher will provide different implementations of {@link UpdatePuller}
Expand All @@ -36,9 +35,6 @@ public class UpdatePullerSwitcher extends AbstractComponentSwitcher<UpdatePuller
{
private final PullerFactory pullerFactory;

// Field is volatile because it is used by threads from executor in HighAvailabilityModeSwitcher
private volatile LifeSupport life;

public UpdatePullerSwitcher( DelegateInvocationHandler<UpdatePuller> delegate, PullerFactory pullerFactory )
{
super( delegate );
Expand All @@ -55,30 +51,19 @@ protected UpdatePuller getMasterImpl()
protected UpdatePuller getSlaveImpl()
{
SlaveUpdatePuller slaveUpdatePuller = pullerFactory.createSlaveUpdatePuller();
startUpdatePuller( slaveUpdatePuller );
slaveUpdatePuller.init();
slaveUpdatePuller.start();
return slaveUpdatePuller;
}

@Override
protected void shutdownCurrent()
{
super.shutdownCurrent();
shutdownCurrentPuller();
}

private void shutdownCurrentPuller()
protected void shutdownDelegate( UpdatePuller updatePuller )
{
if ( life != null )
if ( updatePuller != null && updatePuller instanceof SlaveUpdatePuller )
{
life.shutdown();
life = null;
SlaveUpdatePuller slaveUpdatePuller = (SlaveUpdatePuller) updatePuller;
slaveUpdatePuller.stop();
slaveUpdatePuller.shutdown();
}
}

private void startUpdatePuller( SlaveUpdatePuller slaveUpdatePuller )
{
life = new LifeSupport();
life.add( slaveUpdatePuller );
life.start();
}
}
Expand Up @@ -21,13 +21,9 @@

import org.junit.Test;

import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.ha.DelegateInvocationHandler;

import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class AbstractComponentSwitcherTest
{
Expand All @@ -54,22 +50,14 @@ public void switchesToSlave() throws Throwable
}

@Test
public void switchesToPending()
public void switchesToPending() throws Throwable
{
DelegateInvocationHandler<Component> delegate = new DelegateInvocationHandler<>( Component.class );
TestComponentSwitcher switcher = new TestComponentSwitcher( delegate );

switcher.switchToPending();

try
{
delegateClass( delegate );
fail( "Exception expected" );
}
catch ( Throwable throwable )
{
assertThat( throwable, instanceOf( TransactionFailureException.class ) );
}
assertEquals( delegateClass( delegate ), PendingComponent.class );
}

private static Class<?> delegateClass( DelegateInvocationHandler<?> invocationHandler ) throws Throwable
Expand All @@ -95,6 +83,12 @@ protected Component getSlaveImpl()
{
return new SlaveComponent();
}

@Override
protected Component getPendingImpl()
{
return new PendingComponent();
}
}

private interface Component
Expand All @@ -108,4 +102,8 @@ private static class MasterComponent implements Component
private static class SlaveComponent implements Component
{
}

private static class PendingComponent implements Component
{
}
}
Expand Up @@ -59,8 +59,8 @@ public void masterUpdatePuller()
@Test
public void slaveUpdatePuller() throws Throwable
{
UpdatePuller newPuller = modeSwitcher.getSlaveImpl();
assertSame( newPuller, slaveUpdatePuller );
UpdatePuller updatePuller = modeSwitcher.getSlaveImpl();
assertSame( slaveUpdatePuller, updatePuller );
verify( slaveUpdatePuller ).start();
}
}

0 comments on commit 9603a65

Please sign in to comment.