Skip to content

Commit

Permalink
Started to make Execution Async
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Hegarty committed Aug 18, 2015
1 parent a3898f6 commit 35af96a
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 67 deletions.
5 changes: 5 additions & 0 deletions actors/core/pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
</plugins> </plugins>
</build> </build>
<dependencies> <dependencies>
<dependency>
<groupId>com.ea.orbit</groupId>
<artifactId>orbit-async</artifactId>
<version>${project.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.ea.orbit</groupId> <groupId>com.ea.orbit</groupId>
<artifactId>orbit-commons</artifactId> <artifactId>orbit-commons</artifactId>
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void checkReads() throws Exception
SomeMatch someMatch = Actor.getReference(SomeMatch.class, "300"); SomeMatch someMatch = Actor.getReference(SomeMatch.class, "300");
SomePlayer somePlayer = Actor.getReference(SomePlayer.class, "101"); SomePlayer somePlayer = Actor.getReference(SomePlayer.class, "101");
someMatch.addPlayer(somePlayer).get(); someMatch.addPlayer(somePlayer).get();
stage1.stop(); stage1.stop().join();
} }
assertEquals(1, database.getCollection("SomeMatch").count()); assertEquals(1, database.getCollection("SomeMatch").count());
{ {
Expand Down
6 changes: 0 additions & 6 deletions actors/stage/pom.xml
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -87,12 +87,6 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>


<dependency>
<groupId>com.ea.orbit</groupId>
<artifactId>orbit-async</artifactId>
<version>${project.version}</version>
</dependency>

<dependency> <dependency>
<groupId>org.infinispan</groupId> <groupId>org.infinispan</groupId>
<artifactId>infinispan-embedded</artifactId> <artifactId>infinispan-embedded</artifactId>
Expand Down
17 changes: 4 additions & 13 deletions actors/stage/src/main/java/com/ea/orbit/actors/Stage.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.ea.orbit.actors.runtime.cloner.KryoCloner; import com.ea.orbit.actors.runtime.cloner.KryoCloner;
import com.ea.orbit.annotation.Config; import com.ea.orbit.annotation.Config;
import com.ea.orbit.annotation.Wired; import com.ea.orbit.annotation.Wired;
import com.ea.orbit.async.Await;
import com.ea.orbit.concurrent.ExecutorUtils; import com.ea.orbit.concurrent.ExecutorUtils;
import com.ea.orbit.concurrent.Task; import com.ea.orbit.concurrent.Task;
import com.ea.orbit.container.Container; import com.ea.orbit.container.Container;
Expand Down Expand Up @@ -110,24 +111,14 @@ public enum StageMode


static static
{ {
// Initializes orbit async, but only if the application uses it.
try try
{ {
Class.forName("com.ea.orbit.async.Async"); Await.init();
try
{
// async is present in the classpath, let's make sure await is initialized
Class.forName("com.ea.orbit.async.Await");
}
catch (Exception ex)
{
// this might be a problem, logging.
logger.error("Error initializing orbit-async", ex);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
// no problem, application doesn't use orbit async. // this might be a problem, logging.
logger.error("Error initializing orbit-async", ex);
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static com.ea.orbit.async.Await.await;

public class Execution implements Runtime public class Execution implements Runtime
{ {


Expand Down Expand Up @@ -441,44 +443,53 @@ public Activation(final ReferenceEntry entry, final Object instance)
} }


// gets or creates the instance // gets or creates the instance
public Object getOrCreateInstance() throws IllegalAccessException, InstantiationException, ExecutionException, InterruptedException // gets or creates the instance
public Task<Object> getOrCreateInstance()
{ {
if (instance == null) try
{ {
Object newInstance = classForName(entry.descriptor.concreteClassName).newInstance(); if (instance == null)
if (newInstance instanceof AbstractActor)
{ {
final AbstractActor<?> actor = (AbstractActor<?>) newInstance; Object newInstance = classForName(entry.descriptor.concreteClassName).newInstance();
ActorTaskContext.current().setActor(actor); if (newInstance instanceof AbstractActor)
actor.reference = entry.reference; {
final AbstractActor<?> actor = (AbstractActor<?>) newInstance;
ActorTaskContext.current().setActor(actor);
actor.reference = entry.reference;


actor.stateExtension = getStorageExtensionFor(actor); actor.stateExtension = getStorageExtensionFor(actor);


Task.allOf(getAllExtensions(LifetimeExtension.class).stream().map(v -> v.preActivation(actor))).join(); await(Task.allOf(getAllExtensions(LifetimeExtension.class).stream().map(v -> v.preActivation(actor))));


if (actor.stateExtension != null) if (actor.stateExtension != null)
{
try
{ {
actor.readState().join(); try
}
catch (Exception ex)
{
if (logger.isErrorEnabled())
{ {
logger.error("Error reading actor state for: " + entry.reference, ex); await(actor.readState());
}
catch (Exception ex)
{
if (logger.isErrorEnabled())
{
logger.error("Error reading actor state for: " + entry.reference, ex);
}
throw ex;
} }
throw ex;
} }
instance = newInstance;

await(actor.activateAsync());
await(Task.allOf(getAllExtensions(LifetimeExtension.class).stream().map(v -> v.postActivation(actor))));
} }
instance = newInstance;


actor.activateAsync().join();
Task.allOf(getAllExtensions(LifetimeExtension.class).stream().map(v -> v.postActivation(actor))).join();
} }


return Task.fromValue(instance);
}
catch(Exception e)
{
throw new UncheckedException(e);
} }
return instance;
} }
} }


Expand Down Expand Up @@ -534,30 +545,30 @@ public Task<?> stop()
{ {
// * refuse new actor activations // * refuse new actor activations
state = NodeCapabilities.NodeState.STOPPING; state = NodeCapabilities.NodeState.STOPPING;
hosting.notifyStateChange().join(); await(hosting.notifyStateChange());


// * deactivate all actors // * deactivate all actors
activationCleanup().join(); await(activationCleanup());


// * finalize all timers // * finalize all timers
timer.cancel(); timer.cancel();


// * give extensions a chance to send a message // * give extensions a chance to send a message
Task.allOf(extensions.stream().map(StageLifecycleListener::onPreStop)).join(); await(Task.allOf(extensions.stream().map(StageLifecycleListener::onPreStop)));


// * stop processing new received messages (responses still work) // * stop processing new received messages (responses still work)
// * notify rest of the cluster (no more observer messages) // * notify rest of the cluster (no more observer messages)
state = NodeCapabilities.NodeState.STOPPED; state = NodeCapabilities.NodeState.STOPPED;
hosting.notifyStateChange().join(); await(hosting.notifyStateChange());


// * wait pending tasks execution // * wait pending tasks execution
executionSerializer.shutdown(); executionSerializer.shutdown();


// * cancel all pending messages, and prevents sending new ones // * cancel all pending messages, and prevents sending new ones
messaging.stop(); await(messaging.stop());


// ** stop all extensions // ** stop all extensions
Task.allOf(extensions.stream().map(Startable::stop)).join(); await(Task.allOf(extensions.stream().map(Startable::stop)));


return Task.done(); return Task.done();
} }
Expand Down Expand Up @@ -1042,7 +1053,7 @@ private Task<?> executeMessage(
try try
{ {
bind(); bind();
final Object actor = activation.getOrCreateInstance(); final Object actor = await(activation.getOrCreateInstance());
context.setActor((AbstractActor<?>) actor); context.setActor((AbstractActor<?>) actor);


Task<?> future = descriptor.invoker.safeInvoke(actor, methodId, params); Task<?> future = descriptor.invoker.safeInvoke(actor, methodId, params);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public void nodeDropTest() throws ExecutionException, InterruptedException
stage3.bind(); stage3.bind();
assertEquals(uuid, someActor_r3.getUniqueActivationId().join()); assertEquals(uuid, someActor_r3.getUniqueActivationId().join());


stage1.stop(); stage1.stop().join();
stage2.stop(); stage2.stop().join();


// a new Activation must have been created since the initial nodes where stopped. // a new Activation must have been created since the initial nodes where stopped.
stage3.bind(); stage3.bind();
Expand All @@ -75,8 +75,8 @@ public void nodeDropTest() throws ExecutionException, InterruptedException
stage3.bind(); stage3.bind();
assertEquals(secondUUID, someActor_r4.getUniqueActivationId().join()); assertEquals(secondUUID, someActor_r4.getUniqueActivationId().join());
// BTW, timing issues will sometimes make this fail by timeout with the real network. // BTW, timing issues will sometimes make this fail by timeout with the real network.
stage3.stop(); stage3.stop().join();
stage4.stop(); stage4.stop().join();


} }


Expand Down
17 changes: 4 additions & 13 deletions container/src/main/java/com/ea/orbit/container/Container.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


import com.ea.orbit.annotation.Config; import com.ea.orbit.annotation.Config;
import com.ea.orbit.annotation.Order; import com.ea.orbit.annotation.Order;
import com.ea.orbit.async.Await;
import com.ea.orbit.concurrent.Task; import com.ea.orbit.concurrent.Task;
import com.ea.orbit.configuration.ContainerConfig; import com.ea.orbit.configuration.ContainerConfig;
import com.ea.orbit.configuration.ContainerConfigImpl; import com.ea.orbit.configuration.ContainerConfigImpl;
Expand Down Expand Up @@ -81,24 +82,14 @@ public class Container


static static
{ {
// Initializes orbit async, but only if the application uses it.
try try
{ {
Class.forName("com.ea.orbit.async.Async"); Await.init();
try
{
// async is present in the classpath, let's make sure await is initialized
Class.forName("com.ea.orbit.async.Await");
}
catch (Exception ex)
{
// this might be a problem, logging.
logger.error("Error initializing orbit-async", ex);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
// no problem, application doesn't use orbit async. // this might be a problem, logging.
logger.error("Error initializing orbit-async", ex);
} }
} }


Expand Down

0 comments on commit 35af96a

Please sign in to comment.