Skip to content

Commit

Permalink
Add a feature toggle that will enable INFO level log output of the co…
Browse files Browse the repository at this point in the history
…unts store update lock.

In very rare cases we see threads getting stuck on this lock, and this will help with debugging those cases.
  • Loading branch information
chrisvest committed Jan 16, 2019
1 parent 5b64499 commit ed885b8
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 52 deletions.
Expand Up @@ -94,7 +94,7 @@ public CountsTracker( final LogProvider logProvider, FileSystemAbstraction fs, P
public CountsTracker( final LogProvider logProvider, FileSystemAbstraction fs, PageCache pages, Config config, public CountsTracker( final LogProvider logProvider, FileSystemAbstraction fs, PageCache pages, Config config,
File baseFile, SystemNanoClock clock, VersionContextSupplier versionContextSupplier ) File baseFile, SystemNanoClock clock, VersionContextSupplier versionContextSupplier )
{ {
super( fs, pages, baseFile, new CountsTrackerRotationMonitor( logProvider ), super( fs, pages, baseFile, new CountsTrackerRotationMonitor( logProvider ), logProvider.getLog( CountsTracker.class ).infoLogger(),
new RotationTimerFactory( clock, config.get( counts_store_rotation_timeout ).toMillis() ), new RotationTimerFactory( clock, config.get( counts_store_rotation_timeout ).toMillis() ),
versionContextSupplier, 16, 16, HEADER_FIELDS ); versionContextSupplier, 16, 16, HEADER_FIELDS );
} }
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
Expand All @@ -31,12 +30,12 @@
import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.impl.FileIsNotMappedException; import org.neo4j.io.pagecache.impl.FileIsNotMappedException;
import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier; import org.neo4j.io.pagecache.tracing.cursor.context.VersionContextSupplier;
import org.neo4j.kernel.impl.locking.LockWrapper;
import org.neo4j.kernel.impl.store.UnderlyingStorageException; import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Logger;


import static org.neo4j.kernel.impl.locking.LockWrapper.readLock; import static org.neo4j.kernel.impl.store.kvstore.LockWrapper.readLock;
import static org.neo4j.kernel.impl.locking.LockWrapper.writeLock; import static org.neo4j.kernel.impl.store.kvstore.LockWrapper.writeLock;


/** /**
* The base for building a key value store based on rotating immutable * The base for building a key value store based on rotating immutable
Expand All @@ -48,18 +47,19 @@
@State( State.Strategy.CONCURRENT_HASH_MAP ) @State( State.Strategy.CONCURRENT_HASH_MAP )
public abstract class AbstractKeyValueStore<Key> extends LifecycleAdapter public abstract class AbstractKeyValueStore<Key> extends LifecycleAdapter
{ {
private final ReadWriteLock updateLock = new ReentrantReadWriteLock( /*fair=*/true ); private final UpdateLock updateLock = new UpdateLock();
private final Format format; private final Format format;
final RotationStrategy rotationStrategy; final RotationStrategy rotationStrategy;
private final RotationTimerFactory rotationTimerFactory; private final RotationTimerFactory rotationTimerFactory;
private final Logger logger;
volatile ProgressiveState<Key> state; volatile ProgressiveState<Key> state;
private DataInitializer<EntryUpdater<Key>> stateInitializer; private DataInitializer<EntryUpdater<Key>> stateInitializer;
private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
final int keySize; final int keySize;
final int valueSize; final int valueSize;
private volatile boolean stopped; private volatile boolean stopped;


public AbstractKeyValueStore( FileSystemAbstraction fs, PageCache pages, File base, RotationMonitor monitor, public AbstractKeyValueStore( FileSystemAbstraction fs, PageCache pages, File base, RotationMonitor monitor, Logger logger,
RotationTimerFactory timerFactory, VersionContextSupplier versionContextSupplier, int keySize, RotationTimerFactory timerFactory, VersionContextSupplier versionContextSupplier, int keySize,
int valueSize, HeaderField<?>... headerFields ) int valueSize, HeaderField<?>... headerFields )
{ {
Expand All @@ -72,6 +72,7 @@ public AbstractKeyValueStore( FileSystemAbstraction fs, PageCache pages, File ba
monitor = RotationMonitor.NONE; monitor = RotationMonitor.NONE;
} }
this.format = new Format( headerFields ); this.format = new Format( headerFields );
this.logger = logger;
this.rotationStrategy = rotation.value().create( fs, pages, format, monitor, base, rotation.parameters() ); this.rotationStrategy = rotation.value().create( fs, pages, format, monitor, base, rotation.parameters() );
this.rotationTimerFactory = timerFactory; this.rotationTimerFactory = timerFactory;
this.state = new DeadState.Stopped<>( format, getClass().getAnnotation( State.class ).value(), this.state = new DeadState.Stopped<>( format, getClass().getAnnotation( State.class ).value(),
Expand Down Expand Up @@ -173,7 +174,7 @@ public final File currentFile()
@Override @Override
public final void init() throws IOException public final void init() throws IOException
{ {
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
state = state.initialize( rotationStrategy ); state = state.initialize( rotationStrategy );
} }
Expand All @@ -182,31 +183,31 @@ public final void init() throws IOException
@Override @Override
public final void start() throws IOException public final void start() throws IOException
{ {
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
state = state.start( stateInitializer ); state = state.start( stateInitializer );
} }
} }


protected final Optional<EntryUpdater<Key>> updater( final long version ) protected final Optional<EntryUpdater<Key>> updater( final long version )
{ {
try ( LockWrapper lock = readLock( updateLock ) ) try ( LockWrapper lock = readLock( updateLock, logger ) )
{ {
return state.optionalUpdater( version, lock.get() ); return state.optionalUpdater( version, lock.get() );
} }
} }


protected final EntryUpdater<Key> updater() protected final EntryUpdater<Key> updater()
{ {
try ( LockWrapper lock = readLock( updateLock ) ) try ( LockWrapper lock = readLock( updateLock, logger ) )
{ {
return state.unsafeUpdater( lock.get() ); return state.unsafeUpdater( lock.get() );
} }
} }


protected final EntryUpdater<Key> resetter( long version ) protected final EntryUpdater<Key> resetter( long version )
{ {
try ( LockWrapper lock = writeLock( updateLock ) ) try ( LockWrapper lock = writeLock( updateLock, logger ) )
{ {
ProgressiveState<Key> current = state; ProgressiveState<Key> current = state;
return current.resetter( lock.get(), new RotationTask( version ) ); return current.resetter( lock.get(), new RotationTask( version ) );
Expand All @@ -227,7 +228,7 @@ protected final EntryUpdater<Key> resetter( long version )
*/ */
protected final PreparedRotation prepareRotation( final long version ) protected final PreparedRotation prepareRotation( final long version )
{ {
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
ProgressiveState<Key> prior = state; ProgressiveState<Key> prior = state;
if ( prior.storedVersion() == version && !prior.hasChanges() ) if ( prior.storedVersion() == version && !prior.hasChanges() )
Expand All @@ -243,7 +244,7 @@ protected final PreparedRotation prepareRotation( final long version )
@Override @Override
public final void shutdown() throws IOException public final void shutdown() throws IOException
{ {
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
stopped = true; stopped = true;
state = state.stop(); state = state.stop();
Expand Down Expand Up @@ -290,7 +291,7 @@ public long rotate() throws IOException
@Override @Override
public void run() public void run()
{ {
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
rotate( true ); rotate( true );
} }
Expand All @@ -309,7 +310,7 @@ private long rotate( boolean force ) throws IOException
final long version = rotation.rotationVersion(); final long version = rotation.rotationVersion();
ProgressiveState<Key> next = rotation.rotate( force, rotationStrategy, rotationTimerFactory, ProgressiveState<Key> next = rotation.rotate( force, rotationStrategy, rotationTimerFactory,
value -> updateHeaders( value, version ) ); value -> updateHeaders( value, version ) );
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
state = next; state = next;
} }
Expand All @@ -319,7 +320,7 @@ private long rotate( boolean force ) throws IOException
{ {
// Rotation failed. Here we assume that rotation state remembers this so that closing it // Rotation failed. Here we assume that rotation state remembers this so that closing it
// won't close the state as it was before rotation began, which we're reverting to right here. // won't close the state as it was before rotation began, which we're reverting to right here.
try ( LockWrapper ignored = writeLock( updateLock ) ) try ( LockWrapper ignored = writeLock( updateLock, logger ) )
{ {
// Only mark as failed if we're still running. // Only mark as failed if we're still running.
// If shutdown has been called while being in rotation state then shutdown will fail // If shutdown has been called while being in rotation state then shutdown will fail
Expand Down
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.store.kvstore;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import org.neo4j.logging.Logger;
import org.neo4j.util.FeatureToggles;

public class LockWrapper implements AutoCloseable
{
private static final boolean debugLocking = FeatureToggles.flag( AbstractKeyValueStore.class, "debugLocking", false );

public static LockWrapper readLock( UpdateLock lock, Logger logger )
{
return new LockWrapper( lock.readLock(), lock, logger );
}

public static LockWrapper writeLock( UpdateLock lock, Logger logger )
{
return new LockWrapper( lock.writeLock(), lock, logger );
}

private Lock lock;

private LockWrapper( Lock lock, UpdateLock managingLock, Logger logger )
{
this.lock = lock;
if ( debugLocking )
{
if ( !lock.tryLock() )
{
logger.log( Thread.currentThread() + " may block on " + lock + " of " + managingLock );
while ( !tryLockBlocking( lock, managingLock, logger ) )
{
logger.log( Thread.currentThread() + " still blocked on " + lock + " of " + managingLock );
}
}
}
else
{
lock.lock();
}
}

private static boolean tryLockBlocking( Lock lock, UpdateLock managingLock, Logger logger )
{
try
{
return lock.tryLock( 1, TimeUnit.HOURS );
}
catch ( InterruptedException e )
{
logger.log( Thread.currentThread() + " ignoring interrupt while blocked on " + lock + " of " + managingLock );
}
return false;
}

@Override
public void close()
{
if ( lock != null )
{
lock.unlock();
lock = null;
}
}

public Lock get()
{
return lock;
}
}
Expand Up @@ -17,41 +17,27 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.neo4j.kernel.impl.locking; package org.neo4j.kernel.impl.store.kvstore;


import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;


public class LockWrapper implements AutoCloseable class UpdateLock extends ReentrantReadWriteLock
{ {
public static LockWrapper readLock( java.util.concurrent.locks.ReadWriteLock lock ) UpdateLock()
{ {
return new LockWrapper( lock.readLock() ); super( true /* always fair */ );
}

public static LockWrapper writeLock( java.util.concurrent.locks.ReadWriteLock lock )
{
return new LockWrapper( lock.writeLock() );
}

private java.util.concurrent.locks.Lock lock;

public LockWrapper( java.util.concurrent.locks.Lock lock )
{
(this.lock = lock).lock();
} }


@Override @Override
public void close() public String toString()
{
if ( lock != null )
{
lock.unlock();
lock = null;
}
}

public Lock get()
{ {
return lock; return "AbstractKeyValyeStore-UpdateLock[owner = " + getOwner() +
", is write locked = " + isWriteLocked() +
", writer holds count = " + getWriteHoldCount() +
", read holds count = " + getReadHoldCount() +
", readers count = " + getReadLockCount() +
", threads waiting for write lock = " + getQueuedWriterThreads() +
", threads waiting for read lock = " + getQueuedReaderThreads() +
"] " + super.toString();
} }
} }
Expand Up @@ -480,11 +480,6 @@ public void shouldLeaveStoreInGoodStateAfterRotationFailure() throws Exception
} }
} }


private static ValueUpdate longValue( long value )
{
return target -> target.putLong( 0, value );
}

private Store createTestStore() private Store createTestStore()
{ {
return createTestStore( TimeUnit.SECONDS.toMillis( 100 ) ); return createTestStore( TimeUnit.SECONDS.toMillis( 100 ) );
Expand Down Expand Up @@ -607,7 +602,7 @@ private Store( HeaderField<?>... headerFields )


private Store( long rotationTimeout, HeaderField<?>... headerFields ) private Store( long rotationTimeout, HeaderField<?>... headerFields )
{ {
super( resourceManager.fileSystem(), resourceManager.pageCache(), resourceManager.testPath(), null, super( resourceManager.fileSystem(), resourceManager.pageCache(), resourceManager.testPath(), null, null,
new RotationTimerFactory( Clocks.nanoClock(), rotationTimeout ), new RotationTimerFactory( Clocks.nanoClock(), rotationTimeout ),
EmptyVersionContextSupplier.EMPTY, 16, 16, headerFields ); EmptyVersionContextSupplier.EMPTY, 16, 16, headerFields );
this.headerFields = headerFields; this.headerFields = headerFields;
Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess;
import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.locking.LockWrapper; import org.neo4j.kernel.impl.store.kvstore.LockWrapper;
import org.neo4j.kernel.impl.transaction.TransactionRepresentation; import org.neo4j.kernel.impl.transaction.TransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor; import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
Expand Down

0 comments on commit ed885b8

Please sign in to comment.