Skip to content

Commit

Permalink
Monitors can now have parents.
Browse files Browse the repository at this point in the history
Events will propagate/bubble up in the monitor tree and draw strict
boundaries between groups of monitors. This will allow us to structure
monitors the same way as dependencies instead of having one instance to
rule them all.
  • Loading branch information
klaren committed Jul 18, 2018
1 parent b537eeb commit d123cd0
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 53 deletions.
Expand Up @@ -59,6 +59,7 @@
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;

import static org.neo4j.helpers.collection.MapUtil.stringMap;

Expand All @@ -70,6 +71,7 @@ public abstract class DatabaseRule extends ExternalResource implements GraphData
private Supplier<Statement> statementSupplier;
private boolean startEagerly = true;
private Map<Setting<?>, String> config;
private final Monitors monitors = new Monitors();

/**
* Means the database will be started on first {@link #getGraphDatabaseAPI()}}
Expand Down Expand Up @@ -268,6 +270,7 @@ private void create()
try
{
GraphDatabaseFactory factory = newFactory();
factory.setMonitors( monitors );
configure( factory );
databaseBuilder = newBuilder( factory );
configure( databaseBuilder );
Expand All @@ -279,6 +282,14 @@ private void create()
}
}

/**
* @return the high level monitor in the database.
*/
public Monitors getMonitors()
{
return monitors;
}

protected void deleteResources()
{
}
Expand Down
Expand Up @@ -60,7 +60,7 @@ void veryLargePageListsMustBeFullyAccessible()
IntStream.range( pages - 2000, pages ).parallel().forEach( id -> verifyPageMetaDataIsAccessible( pageList, id ) );
}

private void verifyPageMetaDataIsAccessible( PageList pageList, int id )
private static void verifyPageMetaDataIsAccessible( PageList pageList, int id )
{
long ref = pageList.deref( id );
pageList.incrementUsage( ref );
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;

import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.IndexSampler;
Expand All @@ -39,9 +40,22 @@ public FusionIndexSampler( Iterable<IndexSampler> samplers )
public IndexSample sampleIndex() throws IndexNotFoundKernelException
{
List<IndexSample> samples = new ArrayList<>();
Exception exception = null;
for ( IndexSampler sampler : samplers )
{
samples.add( sampler.sampleIndex() );
try
{
samples.add( sampler.sampleIndex() );
}
catch ( IndexNotFoundKernelException | RuntimeException e )
{
exception = Exceptions.chain( exception, e );
}
}
if ( exception != null )
{
Exceptions.throwIfUnchecked( exception );
throw (IndexNotFoundKernelException)exception;
}
return combineSamples( samples );
}
Expand Down
Expand Up @@ -51,11 +51,6 @@
*/
public class Monitors
{
// Concurrency: Mutation of these data structures is always guarded by the monitor lock on this Monitors instance,
// while look-ups and reads are performed concurrently. The methodMonitorListeners lists (the map values) are
// read concurrently by the proxies, while changing the listener set always produce new lists that atomically
// replace the ones already in the methodMonitorListeners map.

/** Monitor interface method -> Listeners */
private final Map<Method,Set<MonitorListenerInvocationHandler>> methodMonitorListeners = new ConcurrentHashMap<>();
private final MutableBag<Class<?>> monitoredInterfaces = MultiReaderHashBag.newBag();
Expand All @@ -70,6 +65,9 @@ public Monitors()
* Create a child monitor with a given {@code parent}. Propagation works as expected where you can subscribe to
* global monitors through the child monitor, but not the other way around. E.g. you can not subscribe to monitors
* that are registered on the child monitor through the parent monitor.
* <p>
* Events will bubble up from the children in a way that listeners on the child monitor will be invoked before the
* parent ones.
*
* @param parent to propagate events to and from.
*/
Expand All @@ -88,10 +86,6 @@ public <T> T newMonitor( Class<T> monitorClass, String... tags )

public void addMonitorListener( Object monitorListener, String... tags )
{
if ( parent != null )
{
parent.addMonitorListener( monitorListener, tags );
}
MonitorListenerInvocationHandler monitorListenerInvocationHandler = createInvocationHandler( monitorListener, tags );

List<Class<?>> listenerInterfaces = getAllInterfaces( monitorListener );
Expand All @@ -106,18 +100,14 @@ public void addMonitorListener( Object monitorListener, String... tags )

public void removeMonitorListener( Object monitorListener )
{
if ( parent != null )
{
parent.removeMonitorListener( monitorListener );
}
List<Class<?>> listenerInterfaces = getAllInterfaces( monitorListener );
methodsStream( listenerInterfaces ).forEach( method -> cleanupMonitorListeners( monitorListener, method ) );
listenerInterfaces.forEach( monitoredInterfaces::remove );
}

public boolean hasListeners( Class<?> monitorClass )
{
return monitoredInterfaces.contains( monitorClass ) || ((parent != null) ? parent.hasListeners( monitorClass ) : false);
return monitoredInterfaces.contains( monitorClass ) || ((parent != null) && parent.hasListeners( monitorClass ));
}

private void cleanupMonitorListeners( Object monitorListener, Method key )
Expand Down Expand Up @@ -215,19 +205,23 @@ private static class MonitorInvocationHandler implements InvocationHandler
@Override
public Object invoke( Object proxy, Method method, Object[] args )
{
invokeMonitorListeners( monitor, proxy, method, args );
invokeMonitorListeners( monitor, tags, proxy, method, args );

// Bubble up
Monitors current = monitor.parent;
while ( current != null )
{
invokeMonitorListeners( current, tags, proxy, method, args );
current = current.parent;
}
return null;
}

private void invokeMonitorListeners( Monitors monitor, Object proxy, Method method, Object[] args )
private static void invokeMonitorListeners( Monitors monitor, String[] tags, Object proxy, Method method, Object[] args )
{
Set<MonitorListenerInvocationHandler> handlers = monitor.methodMonitorListeners.get( method );
if ( handlers == null || handlers.isEmpty() )
{
if ( monitor.parent != null )
{
invokeMonitorListeners( monitor.parent, proxy, method, args );
}
return;
}
for ( MonitorListenerInvocationHandler monitorListenerInvocationHandler : handlers )
Expand Down
Expand Up @@ -158,4 +158,28 @@ public void multipleListenersRegistration()
monitors.removeMonitorListener( listener2 );
assertFalse( monitors.hasListeners( MyMonitor.class ) );
}

@Test
public void eventShouldBubbleUp()
{
Monitors parent = new Monitors();
MyMonitor parentListener = mock( MyMonitor.class );
parent.addMonitorListener( parentListener );

Monitors child = new Monitors( parent );
MyMonitor childListener = mock( MyMonitor.class );
child.addMonitorListener( childListener );

// Calls on monitors from parent should not reach child listeners
MyMonitor parentMonitor = parent.newMonitor( MyMonitor.class );
parentMonitor.aVoid();
verify( parentListener, times( 1 ) ).aVoid();
verifyZeroInteractions( childListener );

// Calls on monitors from child should reach both listeners
MyMonitor childMonitor = child.newMonitor( MyMonitor.class );
childMonitor.aVoid();
verify( parentListener, times( 2 ) ).aVoid();
verify( childListener, times( 1 ) ).aVoid();
}
}
Expand Up @@ -95,7 +95,7 @@ public DataSourceModule( final PlatformModule platformModule, EditionModule edit
File storeDir = platformModule.storeDir;
DiagnosticsManager diagnosticsManager = platformModule.diagnosticsManager;
this.queryExecutor = queryExecutionEngineSupplier;
Monitors monitors = platformModule.monitors;
Monitors monitors = new Monitors( platformModule.monitors );

threadToTransactionBridge = deps.satisfyDependency( new ThreadToStatementContextBridge( platformModule.availabilityGuard ) );

Expand Down
Expand Up @@ -37,11 +37,11 @@
public class MembershipWaiterLifecycle extends LifecycleAdapter
{
private final MembershipWaiter membershipWaiter;
private final Long joinCatchupTimeout;
private final long joinCatchupTimeout;
private final RaftMachine raft;
private final Log log;

public MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCatchupTimeout,
public MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, long joinCatchupTimeout,
RaftMachine raft, LogProvider logProvider )
{
this.membershipWaiter = membershipWaiter;
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer;
import org.neo4j.kernel.impl.transaction.log.checkpoint.SimpleTriggerInfo;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.causalclustering.ClusterRule;

import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -134,12 +133,9 @@ private static void forceLogRotationAndPruning( CoreClusterMember core )

private static Semaphore addStoreCopyBlockingMonitor( ReadReplica readReplica )
{
DependencyResolver dependencyResolver = readReplica.database().getDependencyResolver();
Monitors monitors = dependencyResolver.resolveDependency( Monitors.class );

Semaphore semaphore = new Semaphore( 0 );

monitors.addMonitorListener( (FileCopyMonitor) file ->
readReplica.monitors().addMonitorListener( (FileCopyMonitor) file ->
{
try
{
Expand Down
6 changes: 3 additions & 3 deletions enterprise/ha/src/test/java/org/neo4j/ha/BranchedDataIT.java
Expand Up @@ -192,7 +192,7 @@ public void shouldCopyStoreFromMasterIfBranchedInLiveScenario() throws Throwable
// so anyways, when thor comes back into the cluster
cluster.info( format( "%n ==== REPAIRING CABLES ====%n" ) );
cluster.await( memberThinksItIsRole( thor, UNKNOWN ) );
BranchMonitor thorHasBranched = installBranchedDataMonitor( thor );
BranchMonitor thorHasBranched = installBranchedDataMonitor( cluster.getMonitorsByDatabase( thor ) );
thorRepairKit.repair();
cluster.await( memberThinksItIsRole( thor, SLAVE ) );
cluster.await( memberThinksItIsRole( odin, MASTER ) );
Expand All @@ -219,10 +219,10 @@ public void shouldCopyStoreFromMasterIfBranchedInLiveScenario() throws Throwable
assertTrue( hasNode( odin, "0-0" ) );
}

private BranchMonitor installBranchedDataMonitor( HighlyAvailableGraphDatabase odin )
private BranchMonitor installBranchedDataMonitor( Monitors monitors )
{
BranchMonitor monitor = new BranchMonitor();
odin.getDependencyResolver().resolveDependency( Monitors.class ).addMonitorListener( monitor );
monitors.addMonitorListener( monitor );
return monitor;
}

Expand Down
Expand Up @@ -823,8 +823,9 @@ public class ManagedCluster extends LifecycleAdapter
private final Cluster spec;
private final String name;
private final Map<InstanceId,HighlyAvailableGraphDatabase> members = new ConcurrentHashMap<>();
private final Map<HighlyAvailableGraphDatabase,Monitors> monitorsMap = new ConcurrentHashMap<>();
private final List<ObservedClusterMembers> arbiters = new ArrayList<>();
private final Set<RepairKit> pendingRepairs = Collections.synchronizedSet( new HashSet<RepairKit>() );
private final Set<RepairKit> pendingRepairs = Collections.synchronizedSet( new HashSet<>() );
private final ParallelLifecycle parallelLife = new ParallelLifecycle( DEFAULT_TIMEOUT_SECONDS, SECONDS );
private final String initialHosts;
private final File parent;
Expand Down Expand Up @@ -998,6 +999,24 @@ public HighlyAvailableGraphDatabase getMemberByServerId( InstanceId serverId )
return db;
}

/**
* Returns the global monitor for a particular {@link InstanceId}.
*
* @param database the database to get the global {@link Monitors} from.
* @return the global {@link Monitors}.
* @throws IllegalStateException if no monitor is registered, this might imply that the
* server is not started yet.
*/
public Monitors getMonitorsByDatabase( HighlyAvailableGraphDatabase database )
{
Monitors monitors = monitorsMap.get( database );
if ( monitors == null )
{
throw new IllegalStateException( "Monitors for db " + database + " not found" );
}
return monitors;
}

/**
* Shuts down a member of this cluster. A {@link RepairKit} is returned
* which is able to restore the instance (i.e. start it again). This method
Expand Down Expand Up @@ -1130,11 +1149,12 @@ private HighlyAvailableGraphDatabase startMemberNow( InstanceId serverId )
int haPort = PortAuthority.allocatePort();
File storeDir = new File( parent, "server" + serverId );
if ( storeDirInitializer != null )

{
storeDirInitializer.initializeStoreDir( serverId.toIntegerIndex(), storeDir );
}
GraphDatabaseBuilder builder = dbFactory.newEmbeddedDatabaseBuilder( storeDir.getAbsoluteFile() );

Monitors monitors = new Monitors();
GraphDatabaseBuilder builder = dbFactory.setMonitors( monitors ).newEmbeddedDatabaseBuilder( storeDir.getAbsoluteFile() );
builder.setConfig( ClusterSettings.cluster_name, name );
builder.setConfig( ClusterSettings.initial_hosts, initialHosts );
builder.setConfig( ClusterSettings.server_id, serverId + "" );
Expand Down Expand Up @@ -1162,6 +1182,7 @@ private HighlyAvailableGraphDatabase startMemberNow( InstanceId serverId )

HighlyAvailableGraphDatabase graphDatabase = (HighlyAvailableGraphDatabase) builder.newGraphDatabase();
members.put( serverId, graphDatabase );
monitorsMap.put( graphDatabase, monitors );
return graphDatabase;
}

Expand Down
Expand Up @@ -62,6 +62,7 @@ public class PageCacheWarmupCcIT extends PageCacheWarmupTestSupport
.withSharedReadReplicaParam( CausalClusteringSettings.upstream_selection_strategy, LeaderOnlyStrategy.IDENTITY );

private Cluster cluster;
private CoreClusterMember leader;

@Before
public void setup() throws Exception
Expand All @@ -71,7 +72,7 @@ public void setup() throws Exception

private long warmUpCluster() throws Exception
{
cluster.awaitLeader(); // Make sure we have a cluster leader.
leader = cluster.awaitLeader(); // Make sure we have a cluster leader.
cluster.coreTx( ( db, tx ) ->
{
// Create some test data to touch a bunch of pages.
Expand All @@ -83,18 +84,18 @@ private long warmUpCluster() throws Exception
{
// Wait for an initial profile on the leader. This profile might have raced with the 'createTestData'
// transaction above, so it might be incomplete.
waitForCacheProfile( db );
waitForCacheProfile( leader.monitors() );
// Now we can wait for a clean profile on the leader, and note the count for verifying later.
pagesInMemory.set( waitForCacheProfile( db ) );
pagesInMemory.set( waitForCacheProfile( leader.monitors() ) );
} );
for ( CoreClusterMember member : cluster.coreMembers() )
{
waitForCacheProfile( member.database() );
waitForCacheProfile( member.monitors() );
}
return pagesInMemory.get();
}

private void verifyWarmupHappensAfterStoreCopy( ClusterMember member, long pagesInMemory )
private static void verifyWarmupHappensAfterStoreCopy( ClusterMember member, long pagesInMemory )
{
AtomicLong pagesLoadedInWarmup = new AtomicLong();
BinaryLatch warmupLatch = injectWarmupLatch( member, pagesLoadedInWarmup );
Expand All @@ -104,7 +105,7 @@ private void verifyWarmupHappensAfterStoreCopy( ClusterMember member, long pages
assertThat( pagesLoadedInWarmup.get(), greaterThanOrEqualTo( pagesInMemory ) );
}

private BinaryLatch injectWarmupLatch( ClusterMember member, AtomicLong pagesLoadedInWarmup )
private static BinaryLatch injectWarmupLatch( ClusterMember member, AtomicLong pagesLoadedInWarmup )
{
BinaryLatch warmupLatch = new BinaryLatch();
Monitors monitors = member.monitors();
Expand Down

0 comments on commit d123cd0

Please sign in to comment.