Permalink
Browse files

substantial optimizations to concurrent buffer / synchro overhead

darcs-hash:20080102174825-e5a07-c115732707cd1fb9007f3b9132af06914e65df06.gz
  • Loading branch information...
1 parent 7dfb6d8 commit f47ec01b18bcae39df77ffb4a30f66b1ee0e7a44 @league committed Jan 2, 2008
View
@@ -1,80 +0,0 @@
-// BlockingIntQueue.java -- a bounded buffer with blocking threads
-// Copyright (c)2007 Christopher League <league@contrapunctus.net>
-
-// This is free software, but it comes with ABSOLUTELY NO WARRANTY.
-// GNU Lesser General Public License 2.1 or Common Public License 1.0
-
-package net.contrapunctus.lzma;
-
-import java.util.concurrent.Semaphore;
-import java.io.PrintStream;
-
-// There is no mutex protecting producer_index and consumer_index,
-// because I expect precisely one producer and one consumer!
-
-final class BlockingIntQueue
-{
- private int[] array;
- private int producer_index;
- private int consumer_index;
- private Semaphore space;
- private Semaphore data;
-
- private static final PrintStream dbg = System.err;
- private static final boolean DEBUG;
-
- static {
- String ds = null;
- try { ds = System.getProperty("DEBUG_BlockingIntQueue"); }
- catch(SecurityException e) { }
- DEBUG = ds != null;
- }
-
- BlockingIntQueue( int size )
- {
- array = new int [size];
- producer_index = 0;
- consumer_index = 0;
- space = new Semaphore( size );
- data = new Semaphore( 0 );
- }
-
- BlockingIntQueue( )
- {
- this( 4096 );
- }
-
- void put( int x ) throws InterruptedException
- {
- if(DEBUG) willBlock(space, '<');
- space.acquire( );
- array[producer_index] = x;
- if(DEBUG) dbg.printf("%s < %02x @%d%n", this, x, producer_index);
- producer_index = (producer_index+1) % array.length;
- data.release( );
- }
-
- int take( ) throws InterruptedException
- {
- if(DEBUG) willBlock(data, '>');
- data.acquire( );
- int x = array[consumer_index];
- if(DEBUG) dbg.printf("%s > %02x @%d%n", this, x, consumer_index);
- consumer_index = (consumer_index+1) % array.length;
- space.release( );
- return x;
- }
-
- private void willBlock( Semaphore s, char dir )
- {
- if( s.availablePermits() <= 0 )
- {
- dbg.printf("%s %c blocks%n", this, dir);
- }
- }
-
- public String toString( )
- {
- return String.format("BQ@%x", hashCode());
- }
-}
@@ -6,15 +6,19 @@
package net.contrapunctus.lzma;
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.PrintStream;
+import java.util.concurrent.ArrayBlockingQueue;
class ConcurrentBufferInputStream extends InputStream
{
- protected BlockingIntQueue q;
- protected boolean eof;
+ protected ArrayBlockingQueue<byte[]> q;
+ protected byte[] buf = null;
+ protected int next = 0;
+ protected boolean eof = false;
private static final PrintStream dbg = System.err;
private static final boolean DEBUG;
@@ -26,30 +30,63 @@
DEBUG = ds != null;
}
- ConcurrentBufferInputStream( BlockingIntQueue q )
+ ConcurrentBufferInputStream( ArrayBlockingQueue<byte[]> q )
{
if(DEBUG) dbg.printf("%s << %s%n", this, q);
this.q = q;
this.eof = false;
}
- public int read( ) throws IOException
+ static InputStream create( ArrayBlockingQueue<byte[]> q )
+ {
+ InputStream in = new ConcurrentBufferInputStream( q );
+ return in;
+ }
+
+ protected byte[] guarded_take( ) throws IOException
{
- if( eof ) return -1;
try {
- int i = q.take( );
- if( i == -1 ) {
- if(DEBUG) dbg.printf("%s got EOF%n", this);
- eof = true;
- }
- else i &= 0xFF;
- return i;
+ return q.take( );
}
catch( InterruptedException exn ) {
throw new InterruptedIOException( exn.getMessage() );
}
}
+ protected boolean prepareAndCheckEOF( ) throws IOException
+ {
+ if( eof ) return true;
+ if( buf == null || next >= buf.length )
+ {
+ buf = guarded_take( );
+ next = 0;
+ if( buf.length == 0 )
+ {
+ eof = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public int read( ) throws IOException
+ {
+ if( prepareAndCheckEOF() ) { return -1; }
+ int x = buf[next];
+ next++;
+ return x & 0xff;
+ }
+
+ public int read( byte[] b, int off, int len ) throws IOException
+ {
+ if( prepareAndCheckEOF() ) { return -1; }
+ int k = buf.length - next;
+ if( len < k ) { k = len; }
+ System.arraycopy( buf, next, b, off, k );
+ next += k;
+ return k;
+ }
+
public String toString( )
{
return String.format("cbIn@%x", hashCode());
@@ -6,15 +6,17 @@
package net.contrapunctus.lzma;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.util.concurrent.ArrayBlockingQueue;
class ConcurrentBufferOutputStream extends OutputStream
{
- protected BlockingIntQueue q;
-
+ protected ArrayBlockingQueue<byte[]> q;
+ static final int BUFSIZE = 16384;
private static final PrintStream dbg = System.err;
private static final boolean DEBUG;
@@ -25,16 +27,23 @@
DEBUG = ds != null;
}
- ConcurrentBufferOutputStream( BlockingIntQueue q )
+ ConcurrentBufferOutputStream( ArrayBlockingQueue<byte[]> q )
{
if(DEBUG) dbg.printf("%s >> %s%n", this, q);
this.q = q;
}
- protected void guarded_put( int i ) throws IOException
+ static OutputStream create( ArrayBlockingQueue<byte[]> q )
+ {
+ OutputStream out = new ConcurrentBufferOutputStream( q );
+ out = new BufferedOutputStream( out, BUFSIZE );
+ return out;
+ }
+
+ protected void guarded_put( byte[] a ) throws IOException
{
try {
- q.put( i );
+ q.put( a );
}
catch( InterruptedException exn ) {
throw new InterruptedIOException( exn.getMessage() );
@@ -43,13 +52,23 @@ protected void guarded_put( int i ) throws IOException
public void write( int i ) throws IOException
{
- guarded_put( i & 0xff );
+ byte b[] = new byte[1];
+ b[0] = (byte) (i & 0xff);
+ guarded_put( b );
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ byte[] a = new byte [len];
+ System.arraycopy(b, off, a, 0, len);
+ guarded_put( a );
}
public void close( ) throws IOException
{
if(DEBUG) dbg.printf("%s closed%n", this);
- guarded_put( -1 );
+ byte b[] = new byte[0]; // sentinel
+ guarded_put( b );
}
public String toString( )
View
@@ -15,7 +15,7 @@
class DecoderThread extends Thread
{
- protected BlockingIntQueue q;
+ protected ArrayBlockingQueue<byte[]> q;
protected InputStream in;
protected OutputStream out;
protected Decoder dec;
@@ -33,9 +33,9 @@
DecoderThread( InputStream _in )
{
- q = new BlockingIntQueue( );
+ q = new ArrayBlockingQueue<byte[]>( 4096 );
in = _in;
- out = new ConcurrentBufferOutputStream( q );
+ out = ConcurrentBufferOutputStream.create( q );
dec = new Decoder();
exn = null;
if(DEBUG) dbg.printf("%s >> %s (%s)%n", this, out, q);
View
@@ -15,7 +15,7 @@
class EncoderThread extends Thread
{
- protected BlockingIntQueue q;
+ protected ArrayBlockingQueue<byte[]> q;
protected InputStream in;
protected OutputStream out;
protected Encoder enc;
@@ -33,8 +33,8 @@
EncoderThread( OutputStream _out )
{
- q = new BlockingIntQueue( );
- in = new ConcurrentBufferInputStream( q );
+ q = new ArrayBlockingQueue<byte[]> ( 4096 );
+ in = ConcurrentBufferInputStream.create( q );
out = _out;
enc = new Encoder();
exn = null;
View
@@ -28,7 +28,7 @@ public LzmaInputStream( InputStream _in )
{
super( null );
dth = new DecoderThread( _in );
- in = new ConcurrentBufferInputStream( dth.q );
+ in = ConcurrentBufferInputStream.create( dth.q );
if(DEBUG) dbg.printf("%s << %s (%s)%n", this, in, dth.q);
dth.start( );
}
View
@@ -34,7 +34,7 @@ public LzmaOutputStream( OutputStream _out )
{
super( null );
eth = new EncoderThread( _out );
- out = new ConcurrentBufferOutputStream( eth.q );
+ out = ConcurrentBufferOutputStream.create( eth.q );
if(DEBUG) dbg.printf("%s >> %s (%s)%n", this, out, eth.q);
eth.start( );
}

0 comments on commit f47ec01

Please sign in to comment.