Skip to content

Commit

Permalink
trying performance optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
Neo(Dongmin Yu) committed Aug 31, 2012
1 parent d942daf commit c53333c
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 51 deletions.
1 change: 1 addition & 0 deletions src/main/java/zmq/DecoderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public ByteBuffer get_buffer() {
} else
b = read_buf;
} else {
zero_copy = false;
b = buf;
b.clear();
}
Expand Down
16 changes: 7 additions & 9 deletions src/main/java/zmq/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
Expand Down Expand Up @@ -205,8 +204,6 @@ public void run() {
int rc;
try {
rc = selector.select(timeout);
} catch (ClosedSelectorException e) {
break;
} catch (IOException e) {
throw new ZError.IOException(e);
}
Expand All @@ -227,13 +224,14 @@ public void run() {
continue;
}


if (key.isAcceptable()) {
evt.accept_event();
} else if (key.isReadable() ) {
evt.in_event();
} else if (key.isWritable()) {
if (key.isWritable()) {
evt.out_event();
}

if (key.isReadable() ) {
evt.in_event();
} else if (key.isAcceptable()) {
evt.accept_event();
} else if (key.isConnectable()) {
evt.connect_event();
}
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/zmq/SessionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,47 @@

public class SessionBase extends Own implements Pipe.IPipeEvents, IPollEvents {

Logger LOG = LoggerFactory.getLogger(SessionBase.class);
private static Logger LOG = LoggerFactory.getLogger(SessionBase.class);

// If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener.
boolean connect;
private boolean connect;

// Pipe connecting the session to its socket.
Pipe pipe;
private Pipe pipe;

// This flag is true if the remainder of the message being processed
// is still in the in pipe.
boolean incomplete_in;
private boolean incomplete_in;

// True if termination have been suspended to push the pending
// messages to the network.
boolean pending;
private boolean pending;

// The protocol I/O engine connected to the session.
IEngine engine;
private IEngine engine;

// The socket the session belongs to.
SocketBase socket;
private SocketBase socket;

// I/O thread the session is living in. It will be used to plug in
// the engines into the same thread.
IOThread io_thread;
private IOThread io_thread;

// ID of the linger timer
private static int linger_timer_id = 0x20;

// True is linger timer is running.
boolean has_linger_timer;
private boolean has_linger_timer;

// If true, identity is to be sent/recvd from the network.
boolean send_identity;
boolean recv_identity;
private boolean send_identity;
private boolean recv_identity;

// Protocol and address to use when connecting.
final Address addr;
private final Address addr;

IOObject io_object;
private IOObject io_object;

public static SessionBase create(IOThread io_thread_, boolean connect_,
SocketBase socket_, Options options_, Address addr_) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/zmq/Signaler.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ boolean wait_event (long timeout_) {
ZError.EAGAIN();
return false;
}

selector.selectedKeys().clear();



assert (rc == 1);
return true;

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/zmq/SocketBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ public boolean send (Msg msg_, int flags_)
}

// Process pending commands, if any.
boolean brc = process_commands (0, true);
if (!brc)
boolean rc = process_commands (0, true);
if (!rc)
return false;

// Clear any user-visible flags that are set on the message.
Expand All @@ -587,8 +587,8 @@ public boolean send (Msg msg_, int flags_)
msg_.set_flags (Msg.more);

// Try to send the message.
brc = xsend (msg_, flags_);
if (brc)
rc = xsend (msg_, flags_);
if (rc)
return true;
if (!ZError.is(ZError.EAGAIN))
return false;
Expand All @@ -609,8 +609,8 @@ public boolean send (Msg msg_, int flags_)
while (true) {
if (!process_commands (timeout, false) )
return false;
brc = xsend (msg_, flags_);
if (brc)
rc = xsend (msg_, flags_);
if (rc)
break;
if (!ZError.is(ZError.EAGAIN))
return false;
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/zmq/StreamEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,25 @@ public class StreamEngine implements IEngine, IPollEvents {
//final private IOObject io_object;
private SocketChannel handle;

ByteBuffer inbuf;
int insize;
final DecoderBase decoder;
boolean input_error;
private ByteBuffer inbuf;
private int insize;
private final DecoderBase decoder;
private boolean input_error;

private Transfer outbuf;
int outsize;
final EncoderBase encoder;
private int outsize;
private final EncoderBase encoder;

// The session this engine is attached to.
SessionBase session;
private SessionBase session;

// Detached transient session.
SessionBase leftover_session;
//private SessionBase leftover_session;

Options options;
private Options options;

// String representation of endpoint
String endpoint;
private String endpoint;

boolean plugged;

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/zmq/YPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ public boolean check_read ()
{
// Was the value prefetched already? If so, return.
long h = queue.front_pos();
if (h != r && r != -1)
if (h != r)
return true;

// There's no prefetched value, so let us prefetch more values.
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
// items to prefetch, set c to -1 (using compare-and-swap).
if (c.compareAndSet (h, -1)) {
// nothing to read, h == r must be the same
} else {
Expand All @@ -140,7 +140,7 @@ public boolean check_read ()
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when items
// are being deallocated.
if (h == r || r == -1)
if (h == r)
return false;

// There was at least one value prefetched.
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/zmq/YQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ protected Chunk(Class<T> klass, int size, long memory_ptr, boolean allocate) {
}

}
protected void reset() {
if (!allocate) {
for (int i=0; i != values.length; i++) {
values[i] = null;
}
}
}
};

// Back position may point to invalid memory if the queue is empty,
Expand Down Expand Up @@ -129,9 +122,10 @@ public T back(T val) {


public void pop() {
if (!allocate)
begin_chunk.values [begin_pos] = null;
begin_pos++;
if (begin_pos == size) {
begin_chunk.reset();
begin_chunk = begin_chunk.next;
begin_chunk.prev = null;
begin_pos = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/zmq/ZError.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static void ETERM() {
}

public static void EAGAIN() {
errno(EAGAIN);
errno.set(EAGAIN);
}

public static void EINTR() {
Expand Down
1 change: 0 additions & 1 deletion src/test/java/perf/RemoteThr.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public static void main(String[] argv) {

ZMQ.zmq_term (ctx);


}


Expand Down

0 comments on commit c53333c

Please sign in to comment.