Permalink
Browse files

fix #163

  • Loading branch information...
pron committed May 22, 2017
1 parent 5d0cfbc commit eb8a29a9d79e43de9c35e4ab4e63733b7d5150fb
@@ -89,7 +89,7 @@
private transient volatile ActorRef<Message> wrapperRef;
private transient /*final*/ AtomicReference<Class<?>> classRef;
private final Set<LifecycleListener> lifecycleListeners = Collections.newSetFromMap(MapUtil.<LifecycleListener, Boolean>newConcurrentHashMap());
private final Set<ActorImpl> observed = Collections.newSetFromMap(MapUtil.<ActorImpl, Boolean>newConcurrentHashMap());
private final Set<ActorRef> observed = Collections.newSetFromMap(MapUtil.<ActorRef, Boolean>newConcurrentHashMap());
private volatile V result;
private volatile Throwable exception;
private volatile Throwable deathCause;
@@ -770,9 +770,11 @@ protected Message handleLifecycleMessage(LifecycleMessage m) {
if (m instanceof ExitMessage) {
ExitMessage exit = (ExitMessage) m;
removeObserverListeners(exit.getActor());
if (exit.getWatch() == null)
if (!observed.remove(exit.getActor()))
return null;
if (exit.getWatch() == null) {
throw new LifecycleException(m);
else if (forwardWatch)
} else if (forwardWatch)
return (Message) m; // this is a false cast! forwardWatch must only be used in untyped languages
}
return null;
@@ -881,7 +883,7 @@ final void checkThrownIn0() {
throw (RuntimeException) exception;
}
}
/**
* Links this actor to another.
*
@@ -900,12 +902,20 @@ public final Actor link(ActorRef other) {
if (this.isDone()) {
other1.getLifecycleListener().dead(ref, getDeathCause());
} else {
observed.add(other);
addLifecycleListener(other1.getLifecycleListener());
other1.addLifecycleListener(this.getLifecycleListener());
other1.linked(myRef());
}
return this;
}
@Override
protected void linked(ActorRef actor) {
observed.add(actor);
addLifecycleListener(getActorRefImpl(actor).getLifecycleListener());
}
/**
* Un-links this actor from another. This operation is symmetric.
*
@@ -916,10 +926,19 @@ public final Actor link(ActorRef other) {
public final Actor unlink(ActorRef other) {
final ActorImpl other1 = getActorRefImpl(other);
record(1, "Actor", "unlink", "Uninking actors %s, %s", this, other1);
observed.remove(other);
removeLifecycleListener(other1.getLifecycleListener());
other1.removeLifecycleListener(this.getLifecycleListener());
other1.unlinked(myRef());
return this;
}
@Override
protected void unlinked(ActorRef actor) {
observed.remove(actor);
removeLifecycleListener(getActorRefImpl(actor).getLifecycleListener());
}
/**
* Makes this actor watch another actor.
@@ -941,8 +960,8 @@ public final Object watch(ActorRef other) {
final LifecycleListener listener = new ActorLifecycleListener(myRef(), id);
record(1, "Actor", "watch", "Actor %s to watch %s (listener: %s)", this, other, listener);
final ActorImpl other1 = getActorRefImpl(other);
observed.add(other);
other1.addLifecycleListener(listener);
observed.add(other1);
return id;
}
@@ -957,8 +976,8 @@ public final void unwatch(ActorRef other, Object watchId) {
final LifecycleListener listener = new ActorLifecycleListener(myRef(), watchId);
record(1, "Actor", "unwatch", "Actor %s to stop watching %s (listener: %s)", this, other, listener);
final ActorImpl other1 = getActorRefImpl(other);
observed.remove(other);
other1.removeLifecycleListener(listener);
observed.remove(getActorRefImpl(other));
}
/**
@@ -1041,19 +1060,12 @@ private void die(Throwable cause) {
} catch (Exception e) {
record(1, "Actor", "die", "Actor %s notifying listener %s of death failed with excetpion %s", this, listener, e);
}
// avoid memory leak in links:
if (listener instanceof ActorLifecycleListener) {
ActorLifecycleListener l = (ActorLifecycleListener) listener;
if (l.getId() == null) // link
l.getObserver().getImpl().removeObserverListeners(myRef());
}
}
// avoid memory leaks:
lifecycleListeners.clear();
for (ActorImpl a : observed)
a.removeObserverListeners(myRef());
for (ActorRef a : observed)
getActorRefImpl(a).removeObserverListeners(myRef());
observed.clear();
}
@@ -118,9 +118,13 @@ public void close(Throwable t) {
protected abstract void addLifecycleListener(LifecycleListener listener);
protected abstract void removeLifecycleListener(LifecycleListener listener);
protected abstract void removeObserverListeners(ActorRef actor);
protected abstract void linked(ActorRef actor);
protected abstract void unlinked(ActorRef actor);
protected LifecycleListener getLifecycleListener() {
if (lifecycleListener == null) // maybe benign race
lifecycleListener = new ActorLifecycleListener(ref(), null);
@@ -191,19 +191,10 @@ protected void die(Throwable cause) {
} catch (Exception e) {
record(1, "Actor", "die", "Actor %s notifying listener %s of death failed with excetpion %s", this, listener, e);
}
// avoid memory leak in links:
if (listener instanceof ActorLifecycleListener) {
ActorLifecycleListener l = (ActorLifecycleListener) listener;
if (l.getId() == null) // link
l.getObserver().getImpl().removeObserverListeners(ref());
}
}
// avoid memory leaks:
lifecycleListeners.clear();
for (ActorImpl a : observed)
a.removeObserverListeners(ref());
observed.clear();
}
@@ -71,4 +71,14 @@ protected void removeLifecycleListener(LifecycleListener listener) {
protected void removeObserverListeners(ActorRef actor) {
throw new UnsupportedOperationException();
}
@Override
protected void linked(ActorRef actor) {
throw new UnsupportedOperationException();
}
@Override
protected void unlinked(ActorRef actor) {
throw new UnsupportedOperationException();
}
}
@@ -26,20 +26,26 @@
public abstract class RemoteActor<Message> extends ActorImpl<Message> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteActor.class);
private final transient ActorImpl<Message> actor;
protected RemoteActor(ActorRef<Message> actor) {
super(actor.getName(),
RemoteChannelProxyFactoryService.create(actor.getImpl().mailbox(), ((Actor)actor.getImpl()).getGlobalId()),
super(actor.getName(),
RemoteChannelProxyFactoryService.create(actor.getImpl().mailbox(), ((Actor) actor.getImpl()).getGlobalId()),
actor);
this.actor = actor.getImpl();
}
protected void handleAdminMessage(RemoteActorAdminMessage msg) {
if (msg instanceof RemoteActorRegisterListenerAdminMessage) {
actor.addLifecycleListener(((RemoteActorRegisterListenerAdminMessage) msg).getListener());
final RemoteActorRegisterListenerAdminMessage reg = (RemoteActorRegisterListenerAdminMessage)msg;
if (reg.isLink())
actor.linked(((ActorLifecycleListener)reg.getListener()).getObserver());
else
actor.addLifecycleListener(reg.getListener());
} else if (msg instanceof RemoteActorUnregisterListenerAdminMessage) {
final RemoteActorUnregisterListenerAdminMessage unreg = (RemoteActorUnregisterListenerAdminMessage) msg;
if (unreg.getObserver() != null)
if (unreg.isLink())
actor.unlinked(((ActorLifecycleListener)unreg.getListener()).getObserver());
else if (unreg.getObserver() != null)
actor.removeObserverListeners(unreg.getObserver());
else
actor.removeLifecycleListener(unreg.getListener());
@@ -72,17 +78,27 @@ public boolean trySend(Message message) {
@Override
protected void addLifecycleListener(LifecycleListener listener) {
internalSendNonSuspendable(new RemoteActorRegisterListenerAdminMessage(listener));
internalSendNonSuspendable(new RemoteActorRegisterListenerAdminMessage(listener, false));
}
@Override
protected void removeLifecycleListener(LifecycleListener listener) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(listener));
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(listener, false));
}
@Override
protected void linked(ActorRef actor) {
internalSendNonSuspendable(new RemoteActorRegisterListenerAdminMessage(getActorRefImpl(actor).getLifecycleListener(), true));
}
@Override
protected void unlinked(ActorRef actor) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(getActorRefImpl(actor).getLifecycleListener(), true));
}
@Override
protected void removeObserverListeners(ActorRef observer) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(observer));
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(observer, false));
}
@Override
@@ -104,33 +120,42 @@ protected static ActorImpl getImpl(ActorRef<?> actor) {
private static class RemoteActorRegisterListenerAdminMessage extends RemoteActorAdminMessage {
private final LifecycleListener listener;
private final boolean link;
@Override
public String toString() {
return "RemoteActorListenerAdminMessage{" + "listener=" + listener + '}';
}
public RemoteActorRegisterListenerAdminMessage(LifecycleListener listener) {
public RemoteActorRegisterListenerAdminMessage(LifecycleListener listener, boolean link) {
this.listener = listener;
this.link = link;
}
public LifecycleListener getListener() {
return listener;
}
public boolean isLink() {
return link;
}
@Override
public String toString() {
return "RemoteActorListenerAdminMessage{" + "listener=" + listener + ", link=" + link + '}';
}
}
private static class RemoteActorUnregisterListenerAdminMessage extends RemoteActorAdminMessage {
private final ActorRef observer;
private final LifecycleListener listener;
private final boolean link;
public RemoteActorUnregisterListenerAdminMessage(ActorRef observer) {
public RemoteActorUnregisterListenerAdminMessage(ActorRef observer, boolean link) {
this.observer = observer;
this.listener = null;
this.link = link;
}
public RemoteActorUnregisterListenerAdminMessage(LifecycleListener listener) {
public RemoteActorUnregisterListenerAdminMessage(LifecycleListener listener, boolean link) {
this.listener = listener;
this.observer = null;
this.link = link;
}
public ActorRef getObserver() {
@@ -140,6 +165,15 @@ public ActorRef getObserver() {
public LifecycleListener getListener() {
return listener;
}
public boolean isLink() {
return link;
}
@Override
public String toString() {
return "RemoteActorUnregisterListenerAdminMessage{" + "observer=" + observer + ", listener=" + listener + ", link=" + link + '}';
}
}
private static class RemoteActorInterruptAdminMessage extends RemoteActorAdminMessage {

0 comments on commit eb8a29a

Please sign in to comment.