Permalink
Browse files

refined message routing back to erlang client

  • Loading branch information...
1 parent 21d1c32 commit c078404df11c317f5a5d4c8705b583aae4c25c87 ingo committed Mar 8, 2010
View
5 README
@@ -16,6 +16,7 @@ Erlang >= R13B03
GNU make
Ant >= 1.8 (lower may work as well)
commons-cli-1.2
+log4j--1.2.15
NOTE: Dependency management is by hand for now. Later we may
use Ivy or Maven for Java dependencies.
@@ -36,8 +37,8 @@ RUN
$ cd erl
$ bin/shell
-erl 1> nerlo_jsrv:start().
-erl 2> nerlo_jsrv:stop().
+erl 1> ej_srv:start().
+erl 2> ej_srv:stop().
View
@@ -16,7 +16,7 @@
-behaviour(gen_server).
% public interface
--export([send/2, call/2]).
+-export([send/2, call/2, call/3]).
-export([start/0, start/1, start/2, start_link/0, start_link/1, start_link/2, stop/0]).
% gen_server exports
@@ -33,15 +33,6 @@
-define(STARTSPEC, {local, ?SRVNAME}).
-define(PEERNAME, jnode).
-define(PEERSTR, atom_to_list(?PEERNAME)).
-
-%% -define(TAG_OK, ok).
-%% -define(TAG_ERROR, error).
-%% -define(TAG_DATA, data).
-%% -define(TAG_CALL, call).
-%% -define(TAG_NODE, node).
-%% -define(EJMSG(Tag,Body), {self(), {Tag, Body}}).
-%% -define(EJMSGPART(Key, Value), {Key, Value}).
-
-define(BINDIR, ".").
-define(JNODEBIN, "jnode").
@@ -74,20 +65,26 @@ start_link(N,Bindir) ->
stop() ->
gen_server:cast(?SRVNAME, {'STOP'}).
-% @doc Send a message to the peer.
+% @doc Send a message to the peer and return immediately.
send(Tag,Msg = [_|_]) ->
Ref = ?EJMSGREF(self(),erlang:make_ref()),
gen_server:call(?SRVNAME, {send, Ref, Tag, Msg}).
-% @doc blocking call
-call(Tag,Msg = [_|_]) ->
+% @doc Send a message to the peer and wait for an answer.
+% This runs with a default timeout of 10 seconds.
+call(Tag,Msg) ->
+ call(Tag,Msg,10).
+
+% @doc Send a message to the peer and wait for an answer.
+% After Timeout seconds {error,timeout} will be returned.
+% Set Timeout to 'infinity' to wait forever.
+call(Tag,Msg = [_|_],Timeout) ->
Ref = ?EJMSGREF(self(),erlang:make_ref()),
gen_server:call(?SRVNAME, {send, Ref, Tag, Msg}),
receive
- {From, Ref, Result} -> Result;
- Any -> {any,Any}
+ {From, Ref, Result} -> Result
after
- 2000 -> timeout
+ Timeout * 1000 -> {error,timeout}
end.
% @hidden
@@ -99,7 +96,12 @@ init(S) ->
{ok,Cwd} = file:get_cwd(),
log:debug(self(), "cwd: ~p", [Cwd]),
timer:start(),
- Peer = handshake(S#jsrv.bindir),
+ Bindir =
+ if
+ S#jsrv.bindir =:= ?BINDIR -> Cwd;
+ true -> S#jsrv.bindir
+ end,
+ Peer = handshake(Bindir),
S2 = S#jsrv{peer=Peer},
Workers =
lists:foldl(fun(_I,Acc) ->
@@ -108,14 +110,9 @@ init(S) ->
_Any -> Acc
end
end, [], lists:seq(1,S#jsrv.n)),
- Bindir =
- if
- S2#jsrv.bindir =:= ?BINDIR -> Cwd;
- true -> S2#jsrv.bindir
- end,
S2#jsrv{workers=Workers,bindir=Bindir}
end,
- log:info(self(), "~p initialized with state ~w", [?MODULE, S1]),
+ log:info(self(), "~p initialized with state ~p", [?MODULE, S1]),
{ok,S1}.
% @hidden
@@ -142,7 +139,7 @@ handle_cast({'STOP'}, S) ->
no -> shutdown(S#jsrv.peer,S)
end,
timer:send_after(500,?EJMSG(erlang:make_ref(), ?TAG_OK,[?EJMSGPART(call,bye)])),
- log:info(self(),"stopping with state: ~w", [S]),
+ log:info(self(),"stopping with state: ~p", [S]),
{noreply, S#jsrv{stopping=true}};
handle_cast(Msg,S) ->
log:info(self(),"cannot handle cast: ~p", [Msg]),
@@ -174,16 +171,14 @@ handle_info({From,Ref,{?TAG_OK,[?EJMSGPART(call,bye)]}},S) ->
handle_info({'STOP'},S) ->
case S#jsrv.worker of
yes ->
- log:info(self(),"stopping with state: ~w", [S]),
+ log:info(self(),"stopping with state: ~p", [S]),
{stop, normal, S};
no ->
{noreply,S}
end;
% messages to be routed to client
-handle_info({From,Ref,Msg},S) ->
- % TODO check Ref
+handle_info({From,Ref={Client,Id},Msg},S) ->
log:debug(self(), "got result: ~p ~p ~p", [From,Ref,Msg]),
- {Client,Id} = Ref,
Client ! {self(),Ref,Msg},
{noreply, S};
handle_info(Msg,S) ->
View
@@ -1 +1,2 @@
+!.gitignore
*
@@ -1,6 +1,5 @@
package org.ister.ej;
-import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -12,8 +11,6 @@
/**
* Main class.
- *
- * We should read node spec from command line and/or conf file.
*
* @author ingo
*/
@@ -4,7 +4,6 @@
package org.ister.ej;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
@@ -26,9 +25,6 @@
* KEY := Atom
* VALUE := Atom | String | Binary | Int
*
- * Should be threadsafe. But, what exactly does "threadsafe" mean...?
- * This one is immutable.
- *
* @author ingo
*
*/
@@ -47,7 +43,7 @@
*/
public Msg(OtpErlangPid self, MsgRef ref, OtpErlangTuple tuple) {
this.from = (OtpErlangPid) self.clone();
- this.ref = ref;
+ this.ref = (MsgRef) ref.clone();
this.msg = (OtpErlangTuple) tuple.clone();
this.map = msgToMap();
}
@@ -6,7 +6,7 @@
import com.ericsson.otp.erlang.OtpErlangTuple;
/**
- * An ej message ref.
+ * An ej message reference.
*
* @author ingo
*
@@ -17,9 +17,15 @@
private final OtpErlangRef ref;
private final OtpErlangTuple t;
+ private MsgRef(MsgRef other) {
+ this.pid = (OtpErlangPid) other.pid.clone();
+ this.ref = (OtpErlangRef) other.ref.clone();
+ this.t = (OtpErlangTuple) other.t.clone();
+ }
+
public MsgRef(OtpErlangPid pid, OtpErlangRef ref) {
- this.pid = pid;
- this.ref = ref;
+ this.pid = (OtpErlangPid) pid.clone();
+ this.ref = (OtpErlangRef) ref.clone();
OtpErlangObject[] r = {this.pid, this.ref};
this.t = new OtpErlangTuple(r);
}
@@ -34,13 +40,18 @@ public MsgRef(OtpErlangTuple t) throws IllegalArgumentException {
if (! (t.elementAt(1) instanceof OtpErlangRef)) {
throw new IllegalArgumentException("ref has no ref at position 1");
}
- this.pid = (OtpErlangPid) t.elementAt(0);
- this.ref = (OtpErlangRef) t.elementAt(1);
- this.t = t;
+ this.pid = (OtpErlangPid) t.elementAt(0).clone();
+ this.ref = (OtpErlangRef) t.elementAt(1).clone();
+ this.t = (OtpErlangTuple) t.clone();
}
public OtpErlangTuple toTuple() {
- return this.t;
+ return (OtpErlangTuple) this.t.clone();
+ }
+
+ @Override
+ public Object clone() {
+ return new MsgRef(this);
}
}
@@ -2,14 +2,10 @@
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.ister.nerlo.Bundle;
-import org.ister.nerlo.example.SimpleFiber;
import com.ericsson.otp.erlang.*;
@@ -29,7 +25,7 @@
* </pre>
*
* If this has been started canonically from within Erlang with
- * nerlo_jsrv:start() you may send messages using nerlo_jsrv:send(Tag,Msg).
+ * ej_srv:start() you may send messages using ej_srv:send/2 or ej_srv:call/2,3.
*
* @author ingo
*/
@@ -49,7 +45,6 @@
private OtpErlangPid self = null;
private OtpErlangPid peerpid = null;
-// private Bundle bundle;
/**
* Create with custom setup.
@@ -66,7 +61,6 @@ private Node(String name, String peer, Properties props) throws IOException {
this.mboxname = name;
this.peernode = peer;
this.node = getNode();
-// this.bundle = Bundle.getInstance();
}
/**
@@ -126,9 +120,6 @@ public void run() throws Exception {
continue;
} catch (IllegalArgumentException e) {
log.error("parsing message\n" + e.toString());
-// } catch (Exception e) {
-// System.out.println("Error: unexpected exception in while\n" + e.toString());
-// System.exit(1);
}
}
}
@@ -188,7 +179,6 @@ private void handshake(Msg msg) {
private void shutdown(Msg msg, OtpNode node) {
log.info("shutdown request from: " + msg.getFrom().toString());
this.handler.shutdown();
-// this.bundle.shutdown();
try {
Map<String, Object> map = new HashMap<String, Object>(2);
map.put("call", "bye");
@@ -202,9 +192,6 @@ private void shutdown(Msg msg, OtpNode node) {
System.exit(0);
}
}
-
-
-
private MsgHandler getHandler() {
String className = Main.getProperty("ej.msgHandler", null);
@@ -247,9 +234,5 @@ private OtpNode getNode() throws IOException {
throw e;
}
}
-
- private MsgRef createRef() {
- return new MsgRef(this.self, this.node.createRef());
- }
}
@@ -46,7 +46,6 @@ private void job(Msg msg) {
log.info("future returned: " + res);
}
Map<String, Object> map = new HashMap<String, Object>(2);
- map.put("job", "done");
map.put("result", l.toString());
Msg answer = Msg.answer(node.getSelf(), MsgTag.OK, map, msg);
node.sendPeer(answer);

0 comments on commit c078404

Please sign in to comment.