/
RemoteActor.java
193 lines (160 loc) · 6.5 KB
/
RemoteActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
* the Eclipse Foundation
*
* or (per the licensee's choosing)
*
* under the terms of the GNU Lesser General Public License version 3.0
* as published by the Free Software Foundation.
*/
package co.paralleluniverse.actors;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.remote.RemoteChannelProxyFactoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class should be extended by implementations of remote actors.
*
* @author pron
*/
public abstract class RemoteActor<Message> extends ActorImpl<Message> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteActor.class);
private final transient ActorImpl<Message> actor;
protected RemoteActor(ActorRef<Message> actor) {
super(actor.getName(),
RemoteChannelProxyFactoryService.create(actor.getImpl().mailbox(), ((Actor) actor.getImpl()).getGlobalId()),
actor);
this.actor = actor.getImpl();
}
protected void handleAdminMessage(RemoteActorAdminMessage msg) {
if (msg instanceof RemoteActorRegisterListenerAdminMessage) {
final RemoteActorRegisterListenerAdminMessage reg = (RemoteActorRegisterListenerAdminMessage)msg;
if (reg.isLink())
actor.linked(((ActorLifecycleListener)reg.getListener()).getObserver());
else
actor.addLifecycleListener(reg.getListener());
} else if (msg instanceof RemoteActorUnregisterListenerAdminMessage) {
final RemoteActorUnregisterListenerAdminMessage unreg = (RemoteActorUnregisterListenerAdminMessage) msg;
if (unreg.isLink())
actor.unlinked(((ActorLifecycleListener)unreg.getListener()).getObserver());
else if (unreg.getObserver() != null)
actor.removeObserverListeners(unreg.getObserver());
else
actor.removeLifecycleListener(unreg.getListener());
} else if (msg instanceof RemoteActorInterruptAdminMessage) {
actor.interrupt();
} else if (msg instanceof RemoteActorThrowInAdminMessage) {
actor.throwIn(((RemoteActorThrowInAdminMessage) msg).getException());
}
}
public ActorImpl<Message> getActor() {
return actor;
}
@Override
protected void internalSend(Object message) throws SuspendExecution {
actor.internalSend(message);
}
@Override
protected void internalSendNonSuspendable(Object message) {
actor.internalSendNonSuspendable(message);
}
@Override
public boolean trySend(Message message) {
internalSendNonSuspendable(message);
return true;
}
@Override
protected void addLifecycleListener(LifecycleListener listener) {
internalSendNonSuspendable(new RemoteActorRegisterListenerAdminMessage(listener, false));
}
@Override
protected void removeLifecycleListener(LifecycleListener listener) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(listener, false));
}
@Override
protected void linked(ActorRef actor) {
internalSendNonSuspendable(new RemoteActorRegisterListenerAdminMessage(getActorRefImpl(actor).getLifecycleListener(), true));
}
@Override
protected void unlinked(ActorRef actor) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(getActorRefImpl(actor).getLifecycleListener(), true));
}
@Override
protected void removeObserverListeners(ActorRef observer) {
internalSendNonSuspendable(new RemoteActorUnregisterListenerAdminMessage(observer, false));
}
@Override
protected void throwIn(RuntimeException e) {
internalSendNonSuspendable(new RemoteActorThrowInAdminMessage(e));
}
@Override
public void interrupt() {
internalSendNonSuspendable(new RemoteActorInterruptAdminMessage());
}
protected static ActorImpl getImpl(ActorRef<?> actor) {
return actor.getImpl();
}
protected static abstract class RemoteActorAdminMessage implements java.io.Serializable {
}
private static class RemoteActorRegisterListenerAdminMessage extends RemoteActorAdminMessage {
private final LifecycleListener listener;
private final boolean link;
public RemoteActorRegisterListenerAdminMessage(LifecycleListener listener, boolean link) {
this.listener = listener;
this.link = link;
}
public LifecycleListener getListener() {
return listener;
}
public boolean isLink() {
return link;
}
@Override
public String toString() {
return "RemoteActorListenerAdminMessage{" + "listener=" + listener + ", link=" + link + '}';
}
}
private static class RemoteActorUnregisterListenerAdminMessage extends RemoteActorAdminMessage {
private final ActorRef observer;
private final LifecycleListener listener;
private final boolean link;
public RemoteActorUnregisterListenerAdminMessage(ActorRef observer, boolean link) {
this.observer = observer;
this.listener = null;
this.link = link;
}
public RemoteActorUnregisterListenerAdminMessage(LifecycleListener listener, boolean link) {
this.listener = listener;
this.observer = null;
this.link = link;
}
public ActorRef getObserver() {
return observer;
}
public LifecycleListener getListener() {
return listener;
}
public boolean isLink() {
return link;
}
@Override
public String toString() {
return "RemoteActorUnregisterListenerAdminMessage{" + "observer=" + observer + ", listener=" + listener + ", link=" + link + '}';
}
}
private static class RemoteActorInterruptAdminMessage extends RemoteActorAdminMessage {
}
private static class RemoteActorThrowInAdminMessage extends RemoteActorAdminMessage {
private final RuntimeException e;
public RemoteActorThrowInAdminMessage(RuntimeException e) {
this.e = e;
}
public RuntimeException getException() {
return e;
}
}
}