Permalink
Browse files

Back to stream-based version of StdInClient, unbuffered

  • Loading branch information...
pzemtsov committed Jan 29, 2015
1 parent 42e481f commit b869482c4959d4e27d7e3dddd5d30627207e4401
Showing with 36 additions and 36 deletions.
  1. +36 −36 StdInClient.java
View
@@ -1,62 +1,62 @@
+import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.FileChannel;
-
+import java.io.InputStream;
public class StdInClient
{
- private static ByteBuffer buf = ByteBuffer.allocateDirect (1024*1024);
-
public static void main (String [] args) throws Exception
{
- FileChannel chan = new FileInputStream ("/proc/self/fd/0").getChannel ();
-
- byte[] type = new byte [4];
- buf.limit (0);
- ByteBuffer msgBuf = buf.duplicate ();
-
+ InputStream in = new FileInputStream ("/proc/self/fd/0");
while (true) {
long t0 = System.currentTimeMillis ();
long sum = 0;
int N = 10000000;
for (int i = 0; i < N; i++) {
- ensure (4, chan);
- buf.get (type);
- ensure (4, chan);
- int len = buf.getInt ();
- ensure (len, chan);
- msgBuf.limit (buf.position () + len);
- msgBuf.position (buf.position ());
- buf.position (buf.position () + len);
- processMessage (type, msgBuf);
- sum += len + 8;
+ byte [] type = readBytes (in, 4);
+ int len = readInt (in);
+ byte [] msg = readBytes (in, len);
+ processMessage (type, msg);
+ sum += msg.length + 8;
}
long t1 = System.currentTimeMillis ();
long t = t1 - t0;
System.out.printf ("Time for %d msg: %d; speed: %d msg/s; %.1f MB/s\n",
N, t, N * 1000L / t, sum * 0.001 / t);
}
}
-
- private static void ensure (int len, ByteChannel chan) throws IOException
+
+ private static byte [] readBytes (InputStream in, int expectedSize) throws IOException
{
- if (buf.remaining () < len) {
- buf.compact ();
- buf.flip ();
- do {
- buf.position (buf.limit ());
- buf.limit (buf.capacity ());
- chan.read (buf);
- buf.flip ();
- } while (buf.remaining () < len);
+ final byte[] buffer = new byte[expectedSize];
+ int totalReadSize = 0;
+ while (totalReadSize < expectedSize) {
+ int readSize = in.read(buffer, totalReadSize, expectedSize - totalReadSize);
+ if (readSize < 0) throw new EOFException ();
+ totalReadSize += readSize;
}
+ return buffer;
}
- static int num = 0;
-
- private static void processMessage (byte [] type, ByteBuffer buf)
+ private static final int readInt (InputStream in) throws IOException
{
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0)
+ throw new EOFException();
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
+
+ private static final int readInt (InputStream in) throws IOException
+ {
+ byte [] b = readBytes (in, 4);
+ return (((b[0] & 0xFF) << 24) + ((b[1] & 0xFF) << 16) + ((b[2] & 0xFF) << 8) + ((b[3] & 0xFF) << 0));
+ }
+
+ private static void processMessage (byte [] type, byte [] msg)
+ {
+ }
+
}

0 comments on commit b869482

Please sign in to comment.