Skip to content

Commit

Permalink
Make the Monitors class thread-safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Jun 4, 2015
1 parent 72d5af7 commit 05b1b47
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 62 deletions.
Expand Up @@ -23,5 +23,5 @@

public interface MonitorListenerInvocationHandler
{
public void invoke( Object proxy, Method method, Object[] args, String... tags ) throws Throwable;
void invoke( Object proxy, Method method, Object[] args, String... tags ) throws Throwable;
}
Expand Up @@ -23,14 +23,14 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.helpers.Predicate;
import org.neo4j.helpers.Predicates;
import org.neo4j.helpers.collection.Iterables;

import static org.neo4j.helpers.collection.Iterables.append;
Expand All @@ -53,6 +53,8 @@
* listening functionality, and must call addMonitorListener.
*
* Monitors is monitorable itself, through the {@link org.neo4j.kernel.monitoring.Monitors.Monitor} monitor interface.
*
* This class, and the proxy objects it produces, are thread-safe.
*/
public class Monitors
{
Expand All @@ -62,8 +64,7 @@ public interface Monitor

void monitorListenerException( Throwable throwable );

public class Adapter
implements Monitor
class Adapter implements Monitor
{
@Override
public void monitorCreated( Class<?> monitorClass, String... tags )
Expand All @@ -77,27 +78,28 @@ public void monitorListenerException( Throwable throwable )
}
}

private AtomicReference<Map<Method, List<MonitorListenerInvocationHandler>>> methodMonitorListeners = new
AtomicReference<Map<Method, List<MonitorListenerInvocationHandler>>>( new HashMap<Method,
List<MonitorListenerInvocationHandler>>() );
private List<Class<?>> monitoredInterfaces = new ArrayList<Class<?>>();
private Map<Predicate<Method>, MonitorListenerInvocationHandler> monitorListeners =
new ConcurrentHashMap<Predicate<Method>, MonitorListenerInvocationHandler>();

private Monitor monitorsMonitor;
// Concurrency: Mutation of these data structures is always guarded by the monitor lock on this Monitors instance,
// while look-ups and reads are performed concurrently. The methodMonitorListerners lists (the map values) are
// read concurrently by the proxies, while changing the listener set always produce new lists that atomically
// replace the ones already in the methodMonitorListeners map.
private final Map<Method, List<MonitorListenerInvocationHandler>> methodMonitorListeners = new ConcurrentHashMap<>();
private final List<Class<?>> monitoredInterfaces = new ArrayList<>();
private final Map<Predicate<Method>, MonitorListenerInvocationHandler> monitorListeners = new ConcurrentHashMap<>();
private final Monitor monitorsMonitor;

public Monitors()
{
monitorsMonitor = newMonitor( Monitor.class );
}

public <T> T newMonitor( Class<T> monitorClass, Class<?> owningClass, String... tags )
public synchronized <T> T newMonitor( Class<T> monitorClass, Class<?> owningClass, String... tags )
{
return newMonitor( monitorClass, toArray( String.class, append( owningClass.getName(), Iterables.<String, String>iterable(
tags ) ) ) );
Iterable<String> tagIer = append( owningClass.getName(), Iterables.<String,String>iterable( tags ) );
String[] tagArray = toArray( String.class, tagIer );
return newMonitor( monitorClass, tagArray );
}

public <T> T newMonitor( Class<T> monitorClass, String... tags )
public synchronized <T> T newMonitor( Class<T> monitorClass, String... tags )
{
if ( !monitoredInterfaces.contains( monitorClass ) )
{
Expand All @@ -113,8 +115,8 @@ public <T> T newMonitor( Class<T> monitorClass, String... tags )
MonitorInvocationHandler monitorInvocationHandler = new MonitorInvocationHandler( tags );
try
{
return monitorClass.cast( Proxy.newProxyInstance( classLoader, new Class<?>[]{monitorClass},
monitorInvocationHandler ) );
return monitorClass.cast( Proxy.newProxyInstance(
classLoader, new Class<?>[]{monitorClass}, monitorInvocationHandler ) );
}
finally
{
Expand All @@ -125,40 +127,39 @@ public <T> T newMonitor( Class<T> monitorClass, String... tags )
}
}

public void addMonitorListener( final Object monitorListener, String... tags )
public synchronized void addMonitorListener( final Object monitorListener, String... tags )
{
MonitorListenerInvocationHandler monitorListenerInvocationHandler = tags.length == 0 ? new
UntaggedMonitorListenerInvocationHandler( monitorListener ) :
new TaggedMonitorListenerInvocationHandler( monitorListener, tags );
MonitorListenerInvocationHandler monitorListenerInvocationHandler =
tags.length == 0 ? new UntaggedMonitorListenerInvocationHandler( monitorListener )
: new TaggedMonitorListenerInvocationHandler( monitorListener, tags );

for ( Class<?> monitorInterface : getInterfacesOf( monitorListener.getClass() ) )
{
for ( final Method method : monitorInterface.getMethods() )
{
monitorListeners.put( new Predicate<Method>()
{
@Override
public boolean accept( Method item )
{
return method.equals( item );
}
}, monitorListenerInvocationHandler );
monitorListeners.put(
Predicates.equalTo( method ),
monitorListenerInvocationHandler );

recalculateMethodListeners( method );
}
}
}

public void removeMonitorListener( Object monitorListener )
public synchronized void removeMonitorListener( Object monitorListener )
{
Iterator<Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler>> iter = monitorListeners.entrySet
().iterator();
Iterator<Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler>> iter =
monitorListeners.entrySet().iterator();

while ( iter.hasNext() )
{
Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler> handlerEntry = iter.next();
if ( handlerEntry.getValue() instanceof UntaggedMonitorListenerInvocationHandler )
{
if ( ((UntaggedMonitorListenerInvocationHandler)
handlerEntry.getValue()).getMonitorListener() == monitorListener )
UntaggedMonitorListenerInvocationHandler handler =
(UntaggedMonitorListenerInvocationHandler) handlerEntry.getValue();

if ( handler.getMonitorListener() == monitorListener )
{
iter.remove();
}
Expand All @@ -168,18 +169,19 @@ public void removeMonitorListener( Object monitorListener )
recalculateAllMethodListeners();
}

public void addMonitorListener( MonitorListenerInvocationHandler invocationHandler,
Predicate<Method> methodSpecification )
public synchronized void addMonitorListener(
MonitorListenerInvocationHandler invocationHandler, Predicate<Method> methodSpecification )
{
monitorListeners.put( methodSpecification, invocationHandler );

recalculateAllMethodListeners();
}

public void removeMonitorListener( MonitorListenerInvocationHandler invocationHandler )
public synchronized void removeMonitorListener( MonitorListenerInvocationHandler invocationHandler )
{
Iterator<Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler>> iter = monitorListeners.entrySet
().iterator();
Iterator<Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler>> iter =
monitorListeners.entrySet().iterator();

while ( iter.hasNext() )
{
Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler> handlerEntry = iter.next();
Expand All @@ -194,35 +196,31 @@ public void removeMonitorListener( MonitorListenerInvocationHandler invocationHa

private void recalculateMethodListeners( Method method )
{
List<MonitorListenerInvocationHandler> listeners = new ArrayList<MonitorListenerInvocationHandler>();
for ( Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler> handlerEntry : monitorListeners
.entrySet() )
List<MonitorListenerInvocationHandler> listeners = new ArrayList<>();
for ( Map.Entry<Predicate<Method>, MonitorListenerInvocationHandler> handlerEntry : monitorListeners.entrySet() )
{
if ( handlerEntry.getKey().accept( method ) )
{
listeners.add( handlerEntry.getValue() );
}
}
methodMonitorListeners.get().put( method, listeners );
methodMonitorListeners.put( method, listeners );
}

private void recalculateAllMethodListeners()
{
for ( Method method : methodMonitorListeners.get().keySet() )
for ( Method method : methodMonitorListeners.keySet() )
{
recalculateMethodListeners( method );
}
}

private Iterable<Class<?>> getInterfacesOf( Class<?> aClass )
{
List<Class<?>> interfaces = new ArrayList<Class<?>>();
List<Class<?>> interfaces = new ArrayList<>();
while ( aClass != null )
{
for ( Class<?> classInterface : aClass.getInterfaces() )
{
interfaces.add( classInterface );
}
Collections.addAll( interfaces, aClass.getInterfaces() );
aClass = aClass.getSuperclass();
}
return interfaces;
Expand Down Expand Up @@ -253,7 +251,7 @@ public void invoke( Object proxy, Method method, Object[] args, String... tags )
private static class TaggedMonitorListenerInvocationHandler
extends UntaggedMonitorListenerInvocationHandler
{
private String[] tags;
private final String[] tags;

public TaggedMonitorListenerInvocationHandler( Object monitorListener, String... tags )
{
Expand All @@ -262,16 +260,13 @@ public TaggedMonitorListenerInvocationHandler( Object monitorListener, String...
}

@Override
public void invoke( Object proxy, Method method, Object[] args, String... tags )
throws Throwable
public void invoke( Object proxy, Method method, Object[] args, String... tags ) throws Throwable
{
required:
for ( int i = 0; i < this.tags.length; i++ )
for ( String requiredTag : this.tags )
{
String requiredTag = this.tags[i];
for ( int j = 0; j < tags.length; j++ )
for ( String tag : tags )
{
String tag = tags[j];
if ( requiredTag.equals( tag ) )
{
continue required;
Expand All @@ -284,7 +279,6 @@ public void invoke( Object proxy, Method method, Object[] args, String... tags )
}
}


private class MonitorInvocationHandler implements InvocationHandler
{
private String[] tags;
Expand All @@ -303,12 +297,12 @@ public Object invoke( Object proxy, Method method, Object[] args ) throws Throwa

private void invokeMonitorListeners( Object proxy, Method method, Object[] args )
{
List<MonitorListenerInvocationHandler> handlers = methodMonitorListeners.get().get( method );
List<MonitorListenerInvocationHandler> handlers = methodMonitorListeners.get( method );

if ( handlers != null )
{
for ( int i = 0; i < handlers.size(); i++ )
for ( MonitorListenerInvocationHandler monitorListenerInvocationHandler : handlers )
{
MonitorListenerInvocationHandler monitorListenerInvocationHandler = handlers.get( i );
try
{
monitorListenerInvocationHandler.invoke( proxy, method, args, tags );
Expand Down

0 comments on commit 05b1b47

Please sign in to comment.