Skip to content

Commit

Permalink
trying to make objects short lived
Browse files Browse the repository at this point in the history
  • Loading branch information
miniway committed Sep 2, 2012
1 parent 4e350f3 commit e8e97a7
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 81 deletions.
10 changes: 1 addition & 9 deletions src/main/java/zmq/Command.java
Expand Up @@ -22,7 +22,7 @@
package zmq;

// This structure defines the commands that can be sent between threads.
public class Command implements IReplaceable {
public class Command {

// Object to process the command.
private ZObject destination;
Expand Down Expand Up @@ -103,12 +103,4 @@ public String toString() {
return super.toString() + "[" + type + ", " + destination + "]";
}

@Override
public void replace(Object src_) {
Command src = (Command) src_;
destination = src.destination;
type = src.type;
arg = src.arg;
}

}
1 change: 1 addition & 0 deletions src/main/java/zmq/Dealer.java
Expand Up @@ -87,6 +87,7 @@ private Msg xxrecv (int flags_)
if (prefetched) {
msg_ = prefetched_msg ;
prefetched = false;
prefetched_msg = null;
return msg_;
}

Expand Down
29 changes: 0 additions & 29 deletions src/main/java/zmq/IReplaceable.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/java/zmq/Mailbox.java
Expand Up @@ -51,7 +51,7 @@ public class Mailbox {
private final String name;

public Mailbox(String name_) {
cpipe = new YPipe<Command>(Command.class, Config.command_pipe_granularity.getValue(), true);
cpipe = new YPipe<Command>(Command.class, Config.command_pipe_granularity.getValue());
sync = new ReentrantLock();
signaler = new Signaler();

Expand Down
8 changes: 1 addition & 7 deletions src/main/java/zmq/Msg.java
Expand Up @@ -24,7 +24,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;

public class Msg implements IReplaceable {
public class Msg {

// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
Expand Down Expand Up @@ -287,11 +287,5 @@ public void put(String str, int i) {
put(str.getBytes(), i);
}

@Override
public void replace(Object src) {
clone((Msg)src);
}



}
4 changes: 2 additions & 2 deletions src/main/java/zmq/Router.java
Expand Up @@ -247,10 +247,12 @@ protected Msg xrecv (int flags_)
if (prefetched) {
if (!identity_sent) {
msg_ = prefetched_id;
prefetched_id = null;
identity_sent = true;
}
else {
msg_ = prefetched_msg;
prefetched_msg = null;
prefetched = false;
}
more_in = msg_.has_more();
Expand Down Expand Up @@ -321,8 +323,6 @@ protected boolean xhas_in ()
assert ((prefetched_msg.flags () & Msg.identity) == 0);

Blob identity = pipe[0].get_identity ();
//prefetched_id = new Msg(identity.size());
//prefetched_id.put(identity.data());
prefetched_id = new Msg(identity.data());
prefetched_id.set_flags (Msg.more);

Expand Down
10 changes: 3 additions & 7 deletions src/main/java/zmq/YPipe.java
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.atomic.AtomicLong;


public class YPipe<T extends IReplaceable> {
public class YPipe<T> {

// Allocation-efficient queue to store pipe items.
// Front of the queue points to the first prefetched item, back of
Expand All @@ -48,12 +48,8 @@ public class YPipe<T extends IReplaceable> {
// atomic operations.
private final AtomicLong c;

public YPipe(Class<T> klass, int qsize ) {
this(klass, qsize, false);
}

public YPipe(Class<T> klass, int qsize, boolean allocate) {
queue = new YQueue<T>(klass, qsize, allocate);
public YPipe(Class<T> klass, int qsize) {
queue = new YQueue<T>(klass, qsize);
queue.push();
w = r = f = queue.back_pos();
c = new AtomicLong(queue.back_pos());
Expand Down
27 changes: 7 additions & 20 deletions src/main/java/zmq/YQueue.java
Expand Up @@ -23,7 +23,7 @@
import java.lang.reflect.Array;


public class YQueue<T extends IReplaceable> {
public class YQueue<T> {

// Individual memory chunk to hold N elements.
private class Chunk
Expand All @@ -34,21 +34,14 @@ private class Chunk
Chunk next;

@SuppressWarnings("unchecked")
protected Chunk(Class<T> klass, int size, long memory_ptr, boolean allocate) {
protected Chunk(Class<T> klass, int size, long memory_ptr) {
values = (T[])(Array.newInstance(klass, size));
pos = new long[size];
assert values != null;
prev = next = null;
for (int i=0; i != values.length; i++) {
pos[i] = memory_ptr;
memory_ptr++;
if (allocate) {
try {
values[i] = klass.newInstance();
} catch (Exception e) {
throw new ZError.InstantiationException(e);
}
}
}

}
Expand All @@ -67,7 +60,6 @@ protected Chunk(Class<T> klass, int size, long memory_ptr, boolean allocate) {
private Chunk spare_chunk;
private final Class<T> klass;
private final int size;
private boolean allocate;

// People are likely to produce and consume at similar rates. In
// this scenario holding onto the most recently freed chunk saves
Expand All @@ -76,13 +68,12 @@ protected Chunk(Class<T> klass, int size, long memory_ptr, boolean allocate) {
private long memory_ptr;


public YQueue(Class<T> klass, int size, boolean allocate) {
public YQueue(Class<T> klass, int size) {

this.klass = klass;
this.size = size;
this.allocate = allocate;
memory_ptr = 0;
begin_chunk = new Chunk(klass, size, memory_ptr, allocate);
begin_chunk = new Chunk(klass, size, memory_ptr);
memory_ptr += size;
begin_pos = 0;
back_pos = 0;
Expand Down Expand Up @@ -113,17 +104,13 @@ public T back() {
}

public T back(T val) {
if (allocate)
back_chunk.values[back_pos].replace(val);
else
back_chunk.values [back_pos] = val;
back_chunk.values [back_pos] = val;
return val;
}


public void pop() {
if (!allocate)
begin_chunk.values [begin_pos] = null;
begin_chunk.values [begin_pos] = null;
begin_pos++;
if (begin_pos == size) {
begin_chunk = begin_chunk.next;
Expand All @@ -149,7 +136,7 @@ public void push() {
end_chunk.next = sc;
sc.prev = end_chunk;
} else {
end_chunk.next = new Chunk(klass, size, memory_ptr, allocate);
end_chunk.next = new Chunk(klass, size, memory_ptr);
memory_ptr += size;
end_chunk.next.prev = end_chunk;
}
Expand Down
14 changes: 8 additions & 6 deletions src/test/java/zmq/TestYQueue.java
Expand Up @@ -29,7 +29,7 @@ public class TestYQueue {
@Test
public void testReuse() {
// yqueue has a first empty entry
YQueue<Msg> p = new YQueue<Msg>(Msg.class, 3, true);
YQueue<Msg> p = new YQueue<Msg>(Msg.class, 3);

p.push();

Expand All @@ -43,22 +43,24 @@ public void testReuse() {
m7.put("1234567".getBytes(),0);

p.back(m1);
Msg front = p.front();
assertThat(front.size(), is(1));
assertThat(p.back_pos(), is(0L));

p.push();
p.back(m2); p.push(); // might allocated new chunk
p.back(m3); p.push();
assertThat(p.back_pos(), is(3L));

assertThat(p.front_pos(), is(0L));
p.pop();
p.pop();
p.pop(); // offer the old chunk
assertThat(p.front_pos(), is(3L));

p.back(m4); p.push();
p.back(m5); p.push();// might reuse the old chunk
p.back(m6); p.push();
p.back(m7); p.push();

assertThat(front.size(), is(7));
assertThat(front.data().length, is(7));
assertThat(p.back_pos(), is(0L));

}
}

0 comments on commit e8e97a7

Please sign in to comment.