Permalink
Browse files

Add support for +S (#schedulers) and +A (#async threads) flags to mai…

…n erjang launcher. Previously, these were not separate. Best performance is achieved with a low +S value (at present), +A threads are needed for I/O
  • Loading branch information...
krestenkrab committed Oct 14, 2010
1 parent e9e7b3e commit 9cfae75147fd2660998dcfaf32383bc52f10e3d7
View
7 ej
@@ -10,13 +10,18 @@ EJC_DIR=`dirname "$EJC_CMD"`
source $EJC_DIR/env_cfg
## -Derjang.debug.port=true
+## -Derjang.debug.inet=true
+## -Xdebug -Xnoagent -Xrunjdwp:transport=dt_socket,address=8787,server=y,suspend=n
+
exec java \
- -Derj.threads=50 \
+ -server \
-Xmx1g -Xss1m \
-XX:PermSize=128m \
-jar $EJC_DIR/erjang-0.1.jar \
-home $HOME \
-root $ERL_ROOT \
+ +A 10 \
+ +S 1 \
+e $ERTS_VSN \
$*
@@ -784,6 +784,7 @@ public static EObject try_case_end(EObject val) {
}
static kilim.Scheduler scheduler = new kilim.Scheduler(threadPoolSize());
+ static kilim.Scheduler async_scheduler = new kilim.Scheduler(asyncThreadPoolSize());
public static EAtom am_io = EAtom.intern("io");
public static EAtom am_attributes = EAtom.intern("attributes");
public static EAtom am_exports = EAtom.intern("exports");
@@ -809,6 +810,11 @@ public static void run(Task task) {
task.start();
}
+ public static void run_async(Task task) {
+ task.setScheduler(async_scheduler);
+ task.start();
+ }
+
/*
* Skeleton for receive statement:
*
@@ -992,7 +998,7 @@ public static EDriver find_driver(EString command) {
* @param job
*/
public static void run_async(final EAsync job, final EDriverTask dt) {
- run(new Task() {
+ run_async(new Task() {
@Override
public void execute() throws Pausable, Exception {
job.async();
@@ -1013,13 +1019,21 @@ public static ESeq getRemoteNodes() {
* @return
*/
public static int threadPoolSize() {
- String threads = System.getProperty("erj.threads");
+ String threads = System.getProperty("erjang.beam.option.S");
if (threads != null)
return Integer.parseInt(threads);
else
return Runtime.getRuntime().availableProcessors();
}
+ public static int asyncThreadPoolSize() {
+ String threads = System.getProperty("erjang.beam.option.A");
+ if (threads != null)
+ return Integer.parseInt(threads);
+ else
+ return 20;
+ }
+
public static ESeq registered() {
ESeq res = ERT.NIL;
for (EAtom reg : register.keySet()) {
@@ -743,6 +743,10 @@ public void output_term_from_driver_b(EObject out) {
* @throws Pausable
*
*/
+ public void eof_from_driver_b() {
+ output_term_from_driver_b(new ETuple2(port, am_eof));
+ }
+
public void eof_from_driver() throws Pausable {
output_term_from_driver(new ETuple2(port, am_eof));
}
@@ -751,6 +755,10 @@ public void exit_status_from_driver(int code) throws Pausable {
output_term_from_driver(new ETuple2(port, ERT.box(code)));
}
+ public void exit_status_from_driver_b(int code) {
+ output_term_from_driver_b(new ETuple2(port, ERT.box(code)));
+ }
+
public static ESeq all_ports() {
ESeq res = ERT.NIL;
@@ -143,18 +143,11 @@ synchronized boolean is_closing() {
private void start_input_reader(final DataInputStream stream,
final boolean is_stdin) {
- ERT.run(new Task() {
-
- public void execute0() throws Pausable {
- System.err.println("start: "+name+" "+(is_stdin?"in":"err"));
- try {
- // execute0();
- } finally {
- System.err.println(" end: "+name+" "+(is_stdin?"in":"err"));
- }
- }
-
- public void execute() throws Pausable {
+ new Thread() {
+
+ { setDaemon(true); start(); }
+
+ public void run() {
if (is_stdin) {
@@ -177,7 +170,7 @@ public void execute() throws Pausable {
stdin_thread = null;
if (task.send_eof) {
- task.eof_from_driver();
+ task.eof_from_driver_b();
}
if (task.send_exit_status) {
@@ -190,7 +183,7 @@ public void execute() throws Pausable {
continue;
}
}
- task.exit_status_from_driver(code);
+ task.exit_status_from_driver_b(code);
}
}
@@ -217,7 +210,7 @@ public void execute() throws Pausable {
}
}
- private boolean do_read() throws Pausable, IOException {
+ private boolean do_read() throws IOException {
byte[] data;
@@ -264,15 +257,15 @@ private boolean do_read() throws Pausable, IOException {
}
if (task.send_binary_data) {
- task.output_from_driver(new EBinary(data, 0, nbytes));
+ task.output_from_driver_b(new EBinary(data, 0, nbytes));
} else {
- task.output_from_driver(EString.make(data, 0, nbytes));
+ task.output_from_driver_b(EString.make(data, 0, nbytes));
}
return true;
}
- });
+ };
}
/*
@@ -508,6 +508,10 @@ static EObject system_info(EProc proc, EObject type) {
return ERT.TRUE;
} else if (type == am_schedulers) {
return ERT.box(ERT.threadPoolSize());
+ } else if (type == am_threads) {
+ return ERT.box(true);
+ } else if (type == am_thread_pool_size) {
+ return ERT.box(ERT.asyncThreadPoolSize());
} else if (type == am_break_ignored) {
return ERT.box(false);
}

0 comments on commit 9cfae75

Please sign in to comment.