Permalink
Browse files

Refactor migration

  • Loading branch information...
pron committed Jul 19, 2014
1 parent d236d08 commit 3e5cdea525884fc6f1d0895dac881cc999c243e5
@@ -15,10 +15,10 @@
import co.paralleluniverse.actors.ActorImpl.ActorLifecycleListener;
import static co.paralleluniverse.actors.ActorImpl.getActorRefImpl;
import co.paralleluniverse.actors.spi.MigrationRecord;
import co.paralleluniverse.common.util.Debug;
import co.paralleluniverse.common.util.Objects;
import co.paralleluniverse.concurrent.util.MapUtil;
import co.paralleluniverse.concurrent.util.ThreadAccess;
import co.paralleluniverse.fibers.DefaultFiberScheduler;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberScheduler;
@@ -97,7 +97,7 @@
private boolean hasMonitor;
private ActorSpec<?, Message, V> spec;
private Object aux;
private transient /*final*/ ActorRunner<V> runner;
private /*final*/ ActorRunner<V> runner;
private boolean migrating;
/**
@@ -599,7 +599,7 @@ public final void join(long timeout, TimeUnit unit) throws ExecutionException, I
* Tests whether this actor has been started, i.e. whether the strand executing it has been started.
*/
public final boolean isStarted() {
return runner.isStarted();
return runner == null || runner.isStarted(); // runner == null iff migrateAndRestart
}
/**
@@ -637,7 +637,8 @@ protected final boolean isInActor() {
/////////// Lifecycle ///////////////////////////////////
final V run0() throws InterruptedException, SuspendExecution {
JMXActorsMonitor.getInstance().actorStarted(ref);
if (!(runner.getStrand() instanceof Fiber))
final Strand strand = runner.getStrand(); // runner might be nulled by running actor
if (!strand.isFiber())
currentActor.set(this);
try {
if (this instanceof MigratingActor && globalId == null)
@@ -670,9 +671,9 @@ final V run0() throws InterruptedException, SuspendExecution {
throw t;
} finally {
record(1, "Actor", "die", "Actor %s is now dead of %s", this, getDeathCause());
if (!(runner.getStrand() instanceof Fiber))
if (!strand.isFiber())
currentActor.set(null);
JMXActorsMonitor.getInstance().actorTerminated(ref, runner.getStrand());
JMXActorsMonitor.getInstance().actorTerminated(ref, strand);
}
}
@@ -994,10 +995,11 @@ public void migrateAndRestart() throws SuspendExecution {
record(1, "Actor", "migrateAndRestart", "Actor %s is migrating.", this);
verifyOnActorStrand();
this.runner = null;
migrating = true;
try {
preMigrate();
MigrationService.migrate(getGlobalId(), this, Serialization.getInstance().write(new MigrationRecord(this, null)));
MigrationService.migrate(getGlobalId(), this, Serialization.getInstance().write(this));
postMigrate();
throw Migrate.MIGRATE;
} finally {
@@ -1020,7 +1022,7 @@ public void migrate() throws SuspendExecution {
@Override
public void write(Fiber fiber, ByteArraySerializer ser) {
final byte[] buf = ser.write(new MigrationRecord(Actor.this, fiber));
final byte[] buf = ser.write(fiber);
new Fiber<Void>() {
@Override
protected Void run() throws SuspendExecution, InterruptedException {
@@ -1041,6 +1043,20 @@ private void preMigrate() {
}
}
/**
* Returns the actor associated with the given strand, or {@code null} if none is.
*/
public static Actor getActor(Strand s) {
final ActorRunner runner;
if (s.isFiber())
runner = (ActorRunner) ((Fiber) s.getUnderlying()).getTarget();
else
runner = (ActorRunner) Strand.unwrapSuspendable(ThreadAccess.getTarget((Thread) s.getUnderlying()));
if (runner == null)
return null;
return runner.getActor();
}
private void postMigrate() {
final RemoteActor<Message> remote = RemoteActorProxyFactoryService.create(ref, getGlobalId());
ref.setImpl(remote);
@@ -1061,12 +1077,13 @@ private void postMigrate() {
}
public static <M> ActorRef<M> hire(ActorRef<M> ref, FiberScheduler scheduler) throws SuspendExecution {
MigrationRecord mr = MigrationService.hire(ref, Fiber.newFiberSerializer());
Actor actor = MigrationService.hire(ref, Fiber.newFiberSerializer());
final Actor<M, ?> actor = (Actor<M, ?>) mr.getActor();
final Fiber<?> fiber = mr.getFiber();
final Fiber<?> fiber = actor.runner != null ? (Fiber) actor.getStrand() : null;
actor.setRef(ref);
actor.runner = fiber != null ? (ActorRunner) fiber.getTarget() : new ActorRunner<>(ref);
if (fiber == null)
actor.runner = new ActorRunner<>(ref);
// actor.runner = fiber != null ? (ActorRunner) fiber.getTarget() : new ActorRunner<>(ref);
actor.ref.setImpl(actor);
assert ref == actor.ref : ref + " - " + actor.ref;
@@ -29,7 +29,7 @@
*
* @author pron
*/
class ActorRunner<V> implements SuspendableCallable<V>, Stranded, Joinable<V> {
class ActorRunner<V> implements SuspendableCallable<V>, Stranded, Joinable<V>, java.io.Serializable {
private /*final*/ transient ActorRef<?> actorRef;
private volatile Actor<?, V> actor;
private Strand strand;
@@ -103,10 +103,10 @@ public void await(int iter, long timeout, TimeUnit unit) throws SuspendExecution
@Override
protected Object writeReplace() throws java.io.ObjectStreamException {
if (!actor.isStarted())
throw new IllegalStateException("Owning actor " + actor + " not started");
if (actor.isMigrating())
return new SerializedMailbox(config);
if (!actor.isStarted())
throw new IllegalStateException("Owning actor " + actor + " not started");
return RemoteChannelProxyFactoryService.create(this, actor.getGlobalId());
}
@@ -13,7 +13,6 @@
*/
package co.paralleluniverse.actors;
import co.paralleluniverse.actors.spi.MigrationRecord;
import co.paralleluniverse.actors.spi.Migrator;
import co.paralleluniverse.common.util.ServiceUtil;
import co.paralleluniverse.fibers.SuspendExecution;
@@ -41,11 +40,15 @@ public static Object registerMigratingActor() throws SuspendExecution {
return res;
}
public static void migrate(Object id, Actor actor, byte[] serializedMigrationRecord) throws SuspendExecution {
migrator.migrate(actor.getGlobalId(), actor, serializedMigrationRecord);
public static void migrate(Object id, byte[] serialized) throws SuspendExecution {
migrate(id, null, serialized);
}
public static MigrationRecord hire(ActorRef<?> actorRef, ByteArraySerializer ser) throws SuspendExecution {
public static void migrate(Object id, Actor actor, byte[] serialized) throws SuspendExecution {
migrator.migrate(actor.getGlobalId(), actor, serialized);
}
public static Actor hire(ActorRef<?> actorRef, ByteArraySerializer ser) throws SuspendExecution {
return migrator.hire(actorRef, actorRef.getImpl(), ser);
}
}
@@ -25,6 +25,6 @@
*/
public interface Migrator {
Object registerMigratingActor() throws SuspendExecution;
void migrate(Object id, Actor actor, byte[] serializedMigrationRecord) throws SuspendExecution;
MigrationRecord hire(ActorRef actorRef, ActorImpl actorImpl, ByteArraySerializer ser) throws SuspendExecution;
void migrate(Object id, Actor actor, byte[] serialized) throws SuspendExecution;
Actor hire(ActorRef actorRef, ActorImpl actorImpl, ByteArraySerializer ser) throws SuspendExecution;
}
@@ -1914,6 +1914,8 @@ public Fiber read(Kryo kryo, Input input, Class<Fiber> type) {
ThreadAccess.setInheritablehreadLocals(currentThread, null);
try {
final Registration reg = kryo.readClass(input);
if (reg == null)
return null;
f = (Fiber) new FieldSerializer(kryo, reg.getType()).read(kryo, input, reg.getType());
f.fiberLocals = ThreadAccess.getThreadLocals(currentThread);
@@ -42,6 +42,13 @@ public static Serialization getInstance() {
return tlInstance.get();
}
public static Serialization newInstance() {
if (instance != null)
return instance;
else
return new Serialization(new KryoSerializer());
}
public static Kryo getKryo() {
return ((KryoSerializer) tlInstance.get().bas).getKryo();
}
@@ -781,37 +781,67 @@ public static Strand clone(Strand strand, final SuspendableRunnable target) {
* as the target of a thread.
*/
public static Runnable toRunnable(final SuspendableRunnable runnable) {
return new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (SuspendExecution ex) {
throw new AssertionError(ex);
} catch (InterruptedException ex) {
}
}
};
return new SuspendableRunnableRunnable(runnable);
}
/**
* A utility method that converts a {@link SuspendableCallable} to a {@link Runnable} so that it could run
* as the target of a thread. The return value of the callable is ignored.
*/
public static Runnable toRunnable(final SuspendableCallable<?> callable) {
return new Runnable() {
@Override
public void run() {
try {
callable.run();
} catch (SuspendExecution ex) {
throw new AssertionError(ex);
} catch (InterruptedException ex) {
} catch (Exception e) {
throw Exceptions.rethrow(e);
}
return new SuspendableCallableRunnable(callable);
}
/**
* Returns the {@link SuspendableCallable} or {@link SuspendableRunnable}, wrapped by the given {@code Runnable}
* by {@code toRunnable}.
*/
public static Object unwrapSuspendable(Runnable r) {
if (r instanceof SuspendableCallableRunnable)
return ((SuspendableCallableRunnable) r).callable;
if (r instanceof SuspendableRunnableRunnable)
return ((SuspendableRunnableRunnable) r).runnable;
return null;
}
private static class SuspendableRunnableRunnable implements Runnable {
private final SuspendableRunnable runnable;
public SuspendableRunnableRunnable(SuspendableRunnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (SuspendExecution ex) {
throw new AssertionError(ex);
} catch (InterruptedException ex) {
} catch (Exception e) {
throw Exceptions.rethrow(e);
}
};
}
}
private static class SuspendableCallableRunnable implements Runnable {
private final SuspendableCallable<?> callable;
public SuspendableCallableRunnable(SuspendableCallable<?> callable) {
this.callable = callable;
}
@Override
public void run() {
try {
callable.run();
} catch (SuspendExecution ex) {
throw new AssertionError(ex);
} catch (InterruptedException ex) {
} catch (Exception e) {
throw Exceptions.rethrow(e);
}
}
}
private static Thread cloneThread(Thread thread, Runnable target) {
@@ -48,6 +48,7 @@
private final co.paralleluniverse.galaxy.Grid grid1;
private final Grid grid;
@SuppressWarnings("LeakingThisInConstructor")
public GlxGlobalRegistry() {
assert INSTANCE == null;
try {
@@ -73,7 +74,7 @@ public Object register(ActorRef<?> actor, Object globalId) throws SuspendExecuti
final long root = store.getRoot(rootName, globalId != null ? (Long) globalId : -1, txn);
// assert globalId == null || ((Long) globalId) == root; -- it's OK to replace the actor's globalId -- until it's too late
store.getx(root, txn);
store.set(root, Serialization.getInstance().write(actor), txn);
store.set(root, Serialization.newInstance().write(actor), txn);
LOG.debug("Registered actor {} at rootId {}", actor, Long.toHexString(root));
store.commit(txn);
return root; // root is the global id
@@ -289,7 +290,7 @@ public void received(Cache cache, long id, long version, ByteBuffer data) {
}
LOG.debug("Store returned null for root {}. Registering actor {} at rootId {}", rootName, actor, root);
store.set(root, Serialization.getInstance().write(actor), txn);
store.set(root, Serialization.newInstance().write(actor), txn);
} else
actor = deserActor(rootName, buf);
@@ -309,7 +310,7 @@ public void received(Cache cache, long id, long version, ByteBuffer data) {
private <Message> ActorRef<Message> deserActor(final String rootName, byte[] buf) {
try {
final ActorRef<Message> actor = (ActorRef<Message>) Serialization.getInstance().read(buf);
final ActorRef<Message> actor = (ActorRef<Message>) Serialization.newInstance().read(buf);
LOG.debug("Deserialized actor {} for root {}", actor, rootName);
return actor;
} catch (Exception e) {
@@ -18,6 +18,7 @@
import co.paralleluniverse.actors.ActorRef;
import co.paralleluniverse.actors.spi.MigrationRecord;
import co.paralleluniverse.actors.spi.Migrator;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.galaxy.quasar.Grid;
import co.paralleluniverse.galaxy.quasar.Store;
@@ -58,19 +59,19 @@ public Object registerMigratingActor() throws SuspendExecution {
}
@Override
public void migrate(Object id, Actor actor, byte[] serializedMigrationRecord) throws SuspendExecution {
public void migrate(Object id, Actor actor, byte[] serialized) throws SuspendExecution {
final long _id = (Long) id;
try {
store.setListener(_id, null);
store.set(_id, serializedMigrationRecord, null);
store.set(_id, serialized, null);
store.release(_id);
} catch (co.paralleluniverse.galaxy.TimeoutException e) {
throw new RuntimeException(e);
}
}
@Override
public MigrationRecord hire(ActorRef actorRef, ActorImpl impl, ByteArraySerializer ser) throws SuspendExecution {
public Actor hire(ActorRef actorRef, ActorImpl impl, ByteArraySerializer ser) throws SuspendExecution {
final GlxGlobalChannelId gcid = ((GlxRemoteActor) impl).getId();
if (!gcid.isGlobal())
throw new IllegalArgumentException("Actor " + actorRef + " is not a migrating actor");
@@ -80,9 +81,18 @@ public MigrationRecord hire(ActorRef actorRef, ActorImpl impl, ByteArraySerializ
store.setListener(id, null);
final byte[] buf = store.getx(id, null);
assert buf != null : actorRef + " " + impl;
final MigrationRecord mr = (MigrationRecord) ser.read(buf);
GlobalRemoteChannelReceiver.getReceiver(mr.getActor().getMailbox(), id);
return mr;
final Object obj = ser.read(buf);
final Actor actor;
if (obj instanceof Actor)
actor = (Actor) obj;
else if (obj instanceof Fiber)
actor = Actor.getActor((Fiber) obj);
else
throw new IllegalArgumentException("Serialized object " + obj + " is not an actor or a fiber");
GlobalRemoteChannelReceiver.getReceiver(actor.getMailbox(), id);
return actor;
} catch (co.paralleluniverse.galaxy.TimeoutException e) {
throw new RuntimeException(e);
}
Oops, something went wrong.

0 comments on commit 3e5cdea

Please sign in to comment.