diff --git a/actors/core/src/main/java/com/ea/orbit/actors/providers/IActorClassFinder.java b/actors/core/src/main/java/com/ea/orbit/actors/providers/IActorClassFinder.java new file mode 100644 index 000000000..7af371051 --- /dev/null +++ b/actors/core/src/main/java/com/ea/orbit/actors/providers/IActorClassFinder.java @@ -0,0 +1,9 @@ +package com.ea.orbit.actors.providers; + +import com.ea.orbit.actors.IActor; +import com.ea.orbit.actors.providers.IOrbitProvider; + +public interface IActorClassFinder extends IOrbitProvider +{ + Class findActorImplementation(Class iActorInterface); +} diff --git a/actors/core/src/main/java/com/ea/orbit/actors/runtime/ClassPathSearch.java b/actors/core/src/main/java/com/ea/orbit/actors/runtime/ClassPathSearch.java index d7b0f556f..27645c55e 100644 --- a/actors/core/src/main/java/com/ea/orbit/actors/runtime/ClassPathSearch.java +++ b/actors/core/src/main/java/com/ea/orbit/actors/runtime/ClassPathSearch.java @@ -3,6 +3,9 @@ import com.ea.orbit.concurrent.ConcurrentHashSet; import com.ea.orbit.util.ClassPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; @@ -11,6 +14,7 @@ */ public class ClassPathSearch { + private static final Logger logger = LoggerFactory.getLogger(ClassPathSearch.class); private final ConcurrentHashSet unprocessed = new ConcurrentHashSet<>(); private final ConcurrentHashMap, Class> concreteImplementations = new ConcurrentHashMap<>(); @@ -43,47 +47,59 @@ public Class findImplementation(Class theInterface) final String expectedName = theInterface.getName(); // cloning the list of unprocessed since it might be modified by the operations on the stream. + if (logger.isDebugEnabled()) + { + logger.debug("Searching implementation class for: " + theInterface); + } // searching implementationClass = Stream.of(unprocessed.toArray()) .map(o -> (String) o) .sorted((a, b) -> { - // order by closest name to the interface, closest beginning then closest ending. - int sa = commonStart(expectedName, a); - int sb = commonStart(expectedName, b); - return (sa != sb) ? (sb - sa) : (commonEnd(expectedName, b) - commonEnd(expectedName, a)); + // order by closest name to the interface. + int sa = commonStart(expectedName, a) + commonEnd(expectedName, a); + int sb = commonStart(expectedName, b) + commonEnd(expectedName, b); + return (sa != sb) ? (sb - sa) : a.length() - b.length(); }) + // use this for development: .peek(System.out::println) .map(cn -> { + if (logger.isDebugEnabled()) + { + logger.debug("Checking: " + cn); + } // this returns non null if there is a match // it also culls the list try { Class clazz = Class.forName(cn); - if (!clazz.isInterface() && theInterface.isAssignableFrom(clazz)) + if (!clazz.isInterface()) { - // when searching for IHello1, must avoid: - // IHello1 <- IHello2 - // IHello1 <- HelloImpl1 - // IHello2 <- HelloImpl2 (wrong return, lest strict. + if (theInterface.isAssignableFrom(clazz)) + { + // when searching for IHello1, must avoid: + // IHello1 <- IHello2 + // IHello1 <- HelloImpl1 + // IHello2 <-s HelloImpl2 (wrong return, lest strict. - // However the application **should not** do this kind of class tree. + // However the application **should not** do this kind of class tree. - // Important: this makes ClassPathSearch non generic. - for (Class i : clazz.getInterfaces()) - { - if (i != theInterface && theInterface.isAssignableFrom(i)) + // Important: this makes ClassPathSearch non generic. + for (Class i : clazz.getInterfaces()) { - return null; + if (i != theInterface && theInterface.isAssignableFrom(i)) + { + return null; + } } + // found the best match! + return clazz; } - // found the best match! - return clazz; - } - for (Class base : classesOfInterest) - { - if (base.isAssignableFrom(clazz)) + for (Class base : classesOfInterest) { - // keep the classes that are part of the classes of interest list - return null; + if (base.isAssignableFrom(clazz)) + { + // keep the classes that are part of the classes of interest list + return null; + } } } // culling the list for the next search diff --git a/actors/stage/src/main/java/com/ea/orbit/actors/runtime/ActorClassFinder.java b/actors/stage/src/main/java/com/ea/orbit/actors/runtime/ActorClassFinder.java new file mode 100644 index 000000000..4cf4fa72f --- /dev/null +++ b/actors/stage/src/main/java/com/ea/orbit/actors/runtime/ActorClassFinder.java @@ -0,0 +1,116 @@ +package com.ea.orbit.actors.runtime; + +import com.ea.orbit.actors.IActor; +import com.ea.orbit.actors.providers.IActorClassFinder; +import com.ea.orbit.concurrent.Task; +import com.ea.orbit.exception.UncheckedException; +import com.ea.orbit.util.ClassPath; +import com.ea.orbit.util.IOUtils; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class ActorClassFinder implements IActorClassFinder +{ + private static final ClassPathSearch search = new ClassPathSearch(IActor.class); + + private ConcurrentHashMap, Class> cache = new ConcurrentHashMap<>(); + private Execution execution; + + public ActorClassFinder(Execution execution) + { + this.execution = execution; + } + + public ActorClassFinder() + { + + } + + @Override + public Class findActorImplementation(Class iActorInterface) + { + Class r = cache.get(iActorInterface); + return r != null ? (Class) r : search.findImplementation(iActorInterface); + } + + @Override + public Task start() + { + if (execution == null) + { + return Task.done(); + } + try + { + if (execution.getAutoDiscovery()) + { + final List actorInterfacesRes = ClassPath.get().getAllResources().stream() + .filter(r -> r.getResourceName().startsWith("META-INF/orbit/actors/interfaces")).collect(Collectors.toList()); + final List actorClassesRes = ClassPath.get().getAllResources().stream() + .filter(r -> r.getResourceName().startsWith("META-INF/orbit/actors/classes")).collect(Collectors.toList()); + + for (ClassPath.ResourceInfo irs : actorInterfacesRes) + { + // pre register factories + String nameFactoryName = IOUtils.toString(irs.url().openStream()); + ActorFactory factory = (ActorFactory) classForName(nameFactoryName).newInstance(); + execution.registerFactory(factory); + } + + for (ClassPath.ResourceInfo irs : actorClassesRes) + { + String className = irs.getResourceName().substring("META-INF/orbit/actors/classes".length() + 1); + Class actorClass = classForName(className); + cacheBestInterface(actorClass); + } + } + List> actorClasses = execution.getActorClasses(); + if (actorClasses != null && !actorClasses.isEmpty()) + { + for (Class actorClass : actorClasses) + { + cacheBestInterface(actorClass); + } + } + } + + catch (Throwable e) + { + throw new UncheckedException(e); + } + return Task.done(); + } + + private void cacheBestInterface(Class actorClass) + { + Class bestInterface = null; + for (Class interfaceClass : actorClass.getInterfaces()) + { + if (IActor.class.isAssignableFrom(interfaceClass) && interfaceClass != IActor.class) + { + bestInterface = (bestInterface == null) ? interfaceClass + : bestInterface.isAssignableFrom(interfaceClass) ? interfaceClass + : bestInterface; + } + } + if (bestInterface != null) + { + cache.put(actorClass, bestInterface); + } + } + + private Class classForName(String className) + { + try + { + return Class.forName(className); + } + catch (ClassNotFoundException e) + { + throw new UncheckedException(e); + } + } + +} \ No newline at end of file diff --git a/actors/stage/src/main/java/com/ea/orbit/actors/runtime/Execution.java b/actors/stage/src/main/java/com/ea/orbit/actors/runtime/Execution.java index 59247133c..de09c571e 100644 --- a/actors/stage/src/main/java/com/ea/orbit/actors/runtime/Execution.java +++ b/actors/stage/src/main/java/com/ea/orbit/actors/runtime/Execution.java @@ -34,6 +34,7 @@ import com.ea.orbit.actors.IRemindable; import com.ea.orbit.actors.annotation.StatelessWorker; import com.ea.orbit.actors.cluster.INodeAddress; +import com.ea.orbit.actors.providers.IActorClassFinder; import com.ea.orbit.actors.providers.ILifetimeProvider; import com.ea.orbit.actors.providers.IOrbitProvider; import com.ea.orbit.actors.providers.IStorageProvider; @@ -41,24 +42,21 @@ import com.ea.orbit.concurrent.ExecutorUtils; import com.ea.orbit.concurrent.Task; import com.ea.orbit.exception.UncheckedException; -import com.ea.orbit.util.ClassPath; -import com.ea.orbit.util.IOUtils; -import com.google.common.collect.MapMaker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.MapMaker; + import java.lang.ref.WeakReference; import java.time.Clock; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; @@ -78,6 +76,7 @@ public class Execution implements IRuntime { private static final Logger logger = LoggerFactory.getLogger(Execution.class); + private IActorClassFinder finder; private Map, InterfaceDescriptor> descriptorMapByInterface = new HashMap<>(); private Map descriptorMapByInterfaceId = new HashMap<>(); private Map localActors = new ConcurrentHashMap<>(); @@ -101,12 +100,8 @@ public class Execution implements IRuntime @Config("orbit.actors.autoDiscovery") private boolean autoDiscovery = true; private List orbitProviders = new ArrayList<>(); - private List actorClassPatterns = new ArrayList<>(); private List> actorClasses = new ArrayList<>(); - // list of the actor classes available in this node, computed during startup - private List availableActors; - private final WeakReference cachedRef = new WeakReference<>(this); public Execution() @@ -132,7 +127,6 @@ public ExecutorService getExecutor() public void setActorClassPatterns(List actorClassPatterns) { - this.actorClassPatterns = actorClassPatterns; } public void setActorClasses(List> actorClasses) @@ -142,21 +136,37 @@ public void setActorClasses(List> actorClasses) public boolean canActivateActor(String interfaceName, int interfaceId) { - try + Class aInterface = (Class) classForName(interfaceName); + final InterfaceDescriptor descriptor = getDescriptor(aInterface); + if (descriptor == null || descriptor.cannotActivate) { - final InterfaceDescriptor descriptor = getDescriptor(Class.forName(interfaceName)); - return descriptor != null && descriptor.concreteClassName != null; + return false; } - catch (ClassNotFoundException e) + if (descriptor.concreteClassName != null) { - throw new UncheckedException(e); + return !descriptor.cannotActivate; } + final Class concreteClass = finder.findActorImplementation(aInterface); + descriptor.cannotActivate = concreteClass == null; + descriptor.concreteClassName = concreteClass != null ? concreteClass.getName() : null; + return !descriptor.cannotActivate; + } + + public List> getActorClasses() + { + return actorClasses; + } + + public void registerFactory(ActorFactory factory) + { + // TODO: will enable caching the reference factory } private static class InterfaceDescriptor { ActorFactory factory; ActorInvoker invoker; + boolean cannotActivate; String concreteClassName; boolean isObserver; @@ -581,85 +591,13 @@ public Task unregisterReminder(final IRemindable actor, final String reminder @SuppressWarnings("unchecked") public void start() { - final List actorClassesRes = ClassPath.get().getAllResources().stream().filter(r -> r.getResourceName().startsWith("META-INF/orbit/actors/classes")).collect(Collectors.toList()); - final List> actorInterfaces = new ArrayList<>(); - - if (actorClassPatterns != null && actorClassPatterns.size() > 0) + finder = getFirstProvider(IActorClassFinder.class); + if (finder == null) { - // finding classes in the classpath - Set> iActors = ClassPath.get().getAllResources().stream() - .filter(r -> r.getResourceName().endsWith(".class")) - .map(r -> r.getResourceName().substring(0, r.getResourceName().length() - 6).replace("/", ".")) - .filter(cn -> actorClassPatterns.stream().anyMatch(i -> i.matcher(cn).matches())) - .map(cn -> classForName(cn, true)) - .filter(c -> c != null && (IActor.class.isAssignableFrom(c) || (c.isInterface() && IActorObserver.class.isAssignableFrom(c)))) - .collect(Collectors.toSet()); - actorClasses.addAll(iActors.stream().filter(c -> OrbitActor.class.isAssignableFrom(c)).collect(Collectors.toList())); - actorInterfaces.addAll(iActors.stream().filter(c -> c.isInterface()).collect(Collectors.toList())); + finder = new ActorClassFinder(this); + finder.start().join(); } - List actorInterfacesRes = ClassPath.get().getAllResources().stream().filter(r -> r.getResourceName().startsWith("META-INF/orbit/actors/interfaces")).collect(Collectors.toList()); - try - { - for (ClassPath.ResourceInfo irs : actorInterfacesRes) - { - InterfaceDescriptor descriptor = new InterfaceDescriptor(); - String nameFactoryName = IOUtils.toString(irs.url().openStream()); - descriptor.factory = (ActorFactory) classForName(nameFactoryName).newInstance(); - descriptor.invoker = (ActorInvoker) descriptor.factory.getInvoker(); - descriptor.isObserver = IActorObserver.class.isAssignableFrom(descriptor.factory.getInterface()); - - descriptorMapByInterface.put(descriptor.factory.getInterface(), descriptor); - descriptorMapByInterfaceId.put(descriptor.factory.getInterfaceId(), descriptor); - } - - // install other interface not listed by resources - actorInterfaces.forEach(c -> getDescriptor(c)); - - List availableActors = new ArrayList<>(); - if (autoDiscovery) - { - for (ClassPath.ResourceInfo irs : actorClassesRes) - { - String className = irs.getResourceName().substring("META-INF/orbit/actors/classes".length() + 1); - Class actorClass = classForName(className); - for (Class interfaceClass : actorClass.getInterfaces()) - { - final InterfaceDescriptor interfaceDescriptor = descriptorMapByInterface.get(interfaceClass); - if (interfaceDescriptor != null) - { - availableActors.add(interfaceDescriptor.factory.getInterface().getName()); - interfaceDescriptor.concreteClassName = IOUtils.toString(irs.url().openStream()); - break; - } - } - } - } - if (actorClasses != null && !actorClasses.isEmpty()) - { - for (Class actorClass : actorClasses) - { - for (Class interfaceClass : actorClass.getInterfaces()) - { - if (IActor.class.isAssignableFrom(interfaceClass)) - { - final InterfaceDescriptor interfaceDescriptor = getDescriptor(interfaceClass); - if (interfaceDescriptor != null) - { - availableActors.add(interfaceDescriptor.factory.getInterface().getName()); - interfaceDescriptor.concreteClassName = actorClass.getName(); - break; - } - } - } - } - } - this.availableActors = Collections.unmodifiableList(availableActors); - } - catch (Throwable e) - { - throw new UncheckedException(e); - } getDescriptor(IHosting.class); createObjectReference(IHosting.class, hosting, ""); diff --git a/actors/test/actor-tests/src/test/java/com/ea/orbit/actors/test/AsymmetricalStagesTest.java b/actors/test/actor-tests/src/test/java/com/ea/orbit/actors/test/AsymmetricalStagesTest.java index 48333892c..cb663ecec 100644 --- a/actors/test/actor-tests/src/test/java/com/ea/orbit/actors/test/AsymmetricalStagesTest.java +++ b/actors/test/actor-tests/src/test/java/com/ea/orbit/actors/test/AsymmetricalStagesTest.java @@ -30,7 +30,7 @@ import com.ea.orbit.actors.IActor; import com.ea.orbit.actors.OrbitStage; -import com.ea.orbit.actors.runtime.Execution; +import com.ea.orbit.actors.runtime.ActorClassFinder; import com.ea.orbit.actors.test.actors.ISomeActor; import com.ea.orbit.actors.test.actors.ISomeMatch; import com.ea.orbit.actors.test.actors.ISomePlayer; @@ -50,7 +50,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; /** * Test nodes that do not contain the same collection of actors. @@ -64,8 +66,8 @@ public class AsymmetricalStagesTest extends ActorBaseTest public void asymmetricalNodeTest() throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException { // Asymmetrical nodes, one has Match other has Player, both have SomeActor - OrbitStage stage1 = createStage(SomeActor.class, SomePlayer.class); - OrbitStage stage2 = createStage(SomeActor.class, SomeMatch.class); + OrbitStage stage1 = createStage(SomeMatch.class); + OrbitStage stage2 = createStage(SomePlayer.class); final List> tasksA = new ArrayList<>(); final List> tasksP = new ArrayList<>(); @@ -111,11 +113,21 @@ private void setField(Object target, String name, Object value) throws IllegalAc f.set(target, value); } - public OrbitStage createStage(Class... classes) throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException + public OrbitStage createStage(Class... excludedActorClasses) throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException { OrbitStage stage = new OrbitStage(); + List> excludedClasses = Arrays.asList(excludedActorClasses); stage.setAutoDiscovery(false); - Stream.of(classes).forEach(c -> stage.addProvider(c)); + stage.addProvider(new ActorClassFinder() + { + @Override + public Class findActorImplementation(Class iActorInterface) + { + Class c = super.findActorImplementation(iActorInterface); + return excludedClasses.contains(c) ? null : c; + } + }); + Stream.of(excludedActorClasses).forEach(c -> stage.addProvider(c)); stage.setMode(OrbitStage.StageMode.HOST); stage.setExecutionPool(commonPool); stage.setMessagingPool(commonPool);