-
Notifications
You must be signed in to change notification settings - Fork 575
/
FakeActor.java
214 lines (192 loc) · 8.25 KB
/
FakeActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 3.0
* as published by the Free Software Foundation.
*/
package co.paralleluniverse.actors;
import co.paralleluniverse.concurrent.util.MapUtil;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.SendPort;
import co.paralleluniverse.strands.queues.QueueCapacityExceededException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
/**
* An {@link ActorRef} which is not backed by any actual {@link Actor}.
* Instead, this "fake actor" only has a channel that serves as a mailbox, but not no {@link Actor#doRun() doRun} method, or a private strand.
*
* @author pron
*/
public abstract class FakeActor<Message> extends ActorImpl<Message> {
private static final Throwable NATURAL = new Throwable();
private final Set<LifecycleListener> lifecycleListeners = Collections.newSetFromMap(MapUtil.<LifecycleListener, Boolean>newConcurrentHashMap());
private final Set<ActorImpl> observed = Collections.newSetFromMap(MapUtil.<ActorImpl, Boolean>newConcurrentHashMap());
private volatile Throwable deathCause;
public FakeActor(String name, SendPort<Message> mailbox) {
super(name, (SendPort<Object>) mailbox, null);
}
/**
* All messages sent to the mailbox are passed to this method. If this method returns a non-null value, this value will be returned
* from the {@code receive} methods. If it returns {@code null}, then {@code receive} will keep waiting.
* <p/>
* By default, this message passes all {@link LifecycleMessage} messages to {@link #handleLifecycleMessage(LifecycleMessage) handleLifecycleMessage}, while
* other messages are returned (and will be returned by {@code receive}.
*
* @param m the message
*/
protected Message filterMessage(Object m) {
if (m instanceof LifecycleMessage) {
return handleLifecycleMessage((LifecycleMessage) m);
}
return (Message) m;
}
@Override
public boolean trySend(Message message) {
record(1, "ActorRef", "trySend", "Sending %s -> %s", message, this);
Message msg = filterMessage(message);
if (msg == null)
return true;
if (mailbox().trySend(msg))
return true;
record(1, "ActorRef", "trySend", "Message not sent. Mailbox is not ready.");
return false;
}
/**
* For internal use
*
* @param message
*/
@Override
protected void internalSend(Object message) throws SuspendExecution {
record(1, "ActorRef", "send", "Sending %s -> %s", message, this);
Message msg = filterMessage(message);
if (msg == null)
return;
try {
mailbox().send(message);
} catch (InterruptedException e) {
Strand.currentStrand().interrupt();
}
}
@Override
protected void internalSendNonSuspendable(Object message) {
record(1, "ActorRef", "internalSendNonSuspendable", "Sending %s -> %s", message, this);
Message msg = filterMessage(message);
if (msg == null)
return;
if (!mailbox().trySend(msg))
throw new QueueCapacityExceededException();
}
@Override
protected void addLifecycleListener(LifecycleListener listener) {
final Throwable cause = getDeathCause();
if (isDone()) {
listener.dead(ref(), cause);
return;
}
lifecycleListeners.add(listener);
if (isDone())
listener.dead(ref(), cause);
}
/**
* Returns this actor's cause of death
*
* @return the {@link Throwable} that caused this actor's death, or {@code null} if it died by natural causes, or if it not dead.
*/
protected final Throwable getDeathCause() {
return deathCause == NATURAL ? null : deathCause;
}
/**
* Tests whether this fake actor has terminated.
*/
protected final boolean isDone() {
return deathCause != null;
}
@Override
protected void removeLifecycleListener(LifecycleListener listener) {
lifecycleListeners.remove(listener);
}
@Override
protected void removeObserverListeners(ActorRef actor) {
for (Iterator<LifecycleListener> it = lifecycleListeners.iterator(); it.hasNext();) {
LifecycleListener lifecycleListener = it.next();
if (lifecycleListener instanceof ActorLifecycleListener)
if (((ActorLifecycleListener) lifecycleListener).getObserver().equals(actor))
it.remove();
}
}
/**
* Makes this fake actor watch another actor.
*
* When the other actor dies, this actor receives an {@link ExitMessage}, that is
* handled by {@link #handleLifecycleMessage(LifecycleMessage) handleLifecycleMessage}. This message does not cause an exception to be thrown,
* unlike the case where it is received as a result of a linked actor's death.
* <p/>
* Unlike a link, a watch is asymmetric, and it is also composable, namely, calling this method twice with the same argument would result in two different values
* returned, and in an {@link ExitMessage} to be received twice.
*
* @param other the other actor
* @return a {@code watchId} object that identifies this watch in messages, and used to remove the watch by the {@link #unwatch(ActorRef, Object) unwatch} method.
* @see #unwatch(ActorRef, Object)
*/
public final Object watch(ActorRef other) {
final Object id = ActorUtil.randtag();
final ActorImpl other1 = getActorRefImpl(other);
final LifecycleListener listener = new ActorLifecycleListener(ref(), id);
record(1, "Actor", "watch", "Actor %s to watch %s (listener: %s)", this, other1, listener);
other1.addLifecycleListener(listener);
observed.add(other1);
return id;
}
/**
* Un-watches another actor.
*
* @param other the other actor
* @param watchId the object returned from the call to {@link #watch(ActorRef) watch(other)}
* @see #watch(ActorRef)
*/
public final void unwatch(ActorRef other, Object watchId) {
final ActorImpl other1 = getActorRefImpl(other);
final LifecycleListener listener = new ActorLifecycleListener(ref(), watchId);
record(1, "Actor", "unwatch", "Actor %s to stop watching %s (listener: %s)", this, other1, listener);
other1.removeLifecycleListener(listener);
observed.remove(getActorRefImpl(other));
}
protected abstract Message handleLifecycleMessage(LifecycleMessage m);
protected void die(Throwable cause) {
record(1, "Actor", "die", "Actor %s is dying of cause %s", this, cause);
this.deathCause = (cause == null ? NATURAL : cause);
for (LifecycleListener listener : lifecycleListeners) {
record(1, "Actor", "die", "Actor %s notifying listener %s of death.", this, listener);
try {
listener.dead(ref(), cause);
} catch (Exception e) {
record(1, "Actor", "die", "Actor %s notifying listener %s of death failed with excetpion %s", this, listener, e);
}
// avoid memory leak in links:
if (listener instanceof ActorLifecycleListener) {
ActorLifecycleListener l = (ActorLifecycleListener) listener;
if (l.getId() == null) // link
l.getObserver().getImpl().removeObserverListeners(ref());
}
}
// avoid memory leaks:
lifecycleListeners.clear();
for (ActorImpl a : observed)
a.removeObserverListeners(ref());
observed.clear();
}
/////////// Serialization ///////////////////////////////////
protected final Object writeReplace() throws java.io.ObjectStreamException {
return RemoteActorProxyFactoryService.create(ref(), null);
}
}