Permalink
Browse files

Channel-based versions of StdInClient and StdOutServer

  • Loading branch information...
pzemtsov committed Jan 29, 2015
1 parent cd1153d commit 18767998b9eca38382adc0fb4921aa3c0fe6d3c2
Showing with 38 additions and 37 deletions.
  1. +29 −25 StdInClient.java
  2. +9 −12 StdOutServer.java
View
@@ -1,53 +1,57 @@
-import java.io.EOFException;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.FileChannel;
public class StdInClient
{
- static byte[] type = new byte[4];
- static byte[] msg = new byte [100];
- static byte[] tmp = new byte [4];
-
+ private static ByteBuffer buf = ByteBuffer.allocateDirect (1024*1024);
public static void main (String [] args) throws Exception
{
- InputStream in = System.in;
+ FileChannel chan = new FileInputStream ("/proc/self/fd/0").getChannel ();
+
+ byte[] type = new byte [4];
+ byte[] msg = new byte [1024];
+ buf.limit (0);
+
while (true) {
long t0 = System.currentTimeMillis ();
long sum = 0;
int N = 10000000;
for (int i = 0; i < N; i++) {
- readBytes (in, type, 4);
- int len = readInt (in);
- readBytes (in, msg, len);
+ ensure (4, chan);
+ buf.get (type);
+ ensure (4, chan);
+ int len = buf.getInt ();
+ ensure (len, chan);
+ buf.get (msg, 0, len);
processMessage (type, msg, len);
- sum += msg.length + 8;
+ sum += len + 8;
}
long t1 = System.currentTimeMillis ();
long t = t1 - t0;
System.out.printf ("Time for %d msg: %d; speed: %d msg/s; %.1f MB/s\n",
N, t, N * 1000L / t, sum * 0.001 / t);
}
}
-
- private static void readBytes (InputStream in, byte[] buffer, int expectedSize) throws IOException
+
+ private static void ensure (int len, ByteChannel chan) throws IOException
{
- int totalReadSize = 0;
- while (totalReadSize < expectedSize) {
- int readSize = in.read(buffer, totalReadSize, expectedSize - totalReadSize);
- if (readSize < 0) throw new EOFException ();
- totalReadSize += readSize;
+ if (buf.remaining () < len) {
+ buf.compact ();
+ buf.flip ();
+ do {
+ buf.position (buf.limit ());
+ buf.limit (buf.capacity ());
+ chan.read (buf);
+ buf.flip ();
+ } while (buf.remaining () < len);
}
}
- private static final int readInt (InputStream in) throws IOException
- {
- byte[] b = tmp;
- readBytes (in, b, 4);
- return (((b[0] & 0xFF) << 24) + ((b[1] & 0xFF) << 16) + ((b[2] & 0xFF) << 8) + ((b[3] & 0xFF) << 0));
- }
-
private static void processMessage (byte [] type, byte [] msg, int len)
{
}
View
@@ -1,6 +1,6 @@
-import java.io.OutputStream;
+import java.io.FileOutputStream;
import java.nio.ByteBuffer;
-
+import java.nio.channels.FileChannel;
public class StdOutServer
{
@@ -9,22 +9,19 @@ public static void main (String [] args) throws Exception
byte [] MESSAGE_TYPE = new byte[] {1, 2, 3, 4};
int MESSAGE_LEN = 100;
int N = 1024;
+ byte [] message = new byte [MESSAGE_LEN];
- byte [] buf = new byte [(4 + 4 + MESSAGE_LEN) * N];
- ByteBuffer buffer = ByteBuffer.wrap (buf);
- int num = 0;
+ ByteBuffer buffer = ByteBuffer.allocateDirect ((4 + 4 + MESSAGE_LEN) * N);
for (int i = 0; i < N; i++) {
buffer.put (MESSAGE_TYPE);
buffer.putInt (MESSAGE_LEN);
- for (int j = 0; j < MESSAGE_LEN; j++) {
- buffer.put ((byte) num);
- num += 3;
- }
+ buffer.put (message);
}
-
- OutputStream out = System.out;
+
+ FileChannel channel = new FileOutputStream ("/proc/self/fd/1").getChannel ();
while (true) {
- out.write (buf);
+ channel.write (buffer);
+ buffer.flip ();
}
}
}

0 comments on commit 1876799

Please sign in to comment.