diff --git a/README.md b/README.md index 9e5d5402b..bb58528a1 100644 --- a/README.md +++ b/README.md @@ -64,12 +64,15 @@ Once that is done, a new proxy will be available on the port returned. All you h - status - the HTTP status code to return for URLs that are blacklisted - DELETE /proxy/[port]/blacklist - Clears all URL patterns from the blacklist - PUT /proxy/[port]/limit - Limit the bandwidth through the proxy. Takes the following parameters: - - downstreamKbps - Sets the downstream kbps - - upstreamKbps - Sets the upstream kbps + - downstreamKbps - Sets the downstream bandwidth limit in kbps + - upstreamKbps - Sets the upstream bandwidth limit kbps + - downstreamMaxKB - Specifies how many kilobytes in total the client is allowed to download through the proxy. + - upstreamMaxKB - Specifies how many kilobytes in total the client is allowed to upload through the proxy. - latency - Add the given latency to each HTTP request - enable - (true/false) a boolean that enable bandwidth limiter. By default the limit is disabled, although setting any of the properties above will implicitly enable throttling - payloadPercentage - a number ]0, 100] specifying what percentage of data sent is payload. e.g. use this to take into account overhead due to tcp/ip. - maxBitsPerSecond - The max bits per seconds you want this instance of StreamManager to respect. + - GET /proxy/[port]/limit - Displays the amount of data remaining to be uploaded/downloaded until the limit is reached. - POST /proxy/[port]/headers - Set and override HTTP Request headers. For example setting a custom User-Agent. - Payload data should be json encoded set of headers (not url-encoded) - POST /proxy/[port]/hosts - Overrides normal DNS lookups and remaps the given hosts with the associated IP address diff --git a/src/main/java/net/lightbody/bmp/proxy/bricks/ProxyResource.java b/src/main/java/net/lightbody/bmp/proxy/bricks/ProxyResource.java index 028b9615f..482d5b36c 100644 --- a/src/main/java/net/lightbody/bmp/proxy/bricks/ProxyResource.java +++ b/src/main/java/net/lightbody/bmp/proxy/bricks/ProxyResource.java @@ -12,7 +12,6 @@ import com.google.sitebricks.http.Get; import com.google.sitebricks.http.Post; import com.google.sitebricks.http.Put; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintWriter; @@ -28,9 +27,6 @@ import javax.script.ScriptEngine; import javax.script.ScriptEngineManager; import javax.script.ScriptException; - - - import net.lightbody.bmp.core.har.Har; import net.lightbody.bmp.proxy.ProxyExistsException; import net.lightbody.bmp.proxy.ProxyManager; @@ -40,7 +36,6 @@ import net.lightbody.bmp.proxy.http.BrowserMobHttpResponse; import net.lightbody.bmp.proxy.http.RequestInterceptor; import net.lightbody.bmp.proxy.http.ResponseInterceptor; - import org.java_bandwidthlimiter.StreamManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -348,6 +343,20 @@ public Reply limit(@Named("port") int port, Request request) { streamManager.enable(); } catch (NumberFormatException e) { } } + String upstreamMaxKB = request.param("upstreamMaxKB"); + if (upstreamMaxKB != null) { + try { + streamManager.setUpstreamMaxKB(Integer.parseInt(upstreamMaxKB)); + streamManager.enable(); + } catch (NumberFormatException e) { } + } + String downstreamMaxKB = request.param("downstreamMaxKB"); + if (downstreamMaxKB != null) { + try { + streamManager.setDownstreamMaxKB(Integer.parseInt(downstreamMaxKB)); + streamManager.enable(); + } catch (NumberFormatException e) { } + } String latency = request.param("latency"); if (latency != null) { try { @@ -378,6 +387,16 @@ public Reply limit(@Named("port") int port, Request request) { return Reply.saying().ok(); } + @Get + @At("/:port/limit") + public Reply getLimits(@Named("port") int port, Request request) { + ProxyServer proxy = proxyManager.get(port); + if (proxy == null) { + return Reply.saying().notFound(); + } + return Reply.with(new BandwidthLimitDescriptor(proxy.getStreamManager())).as(Json.class); + } + @Put @At("/:port/timeout") public Reply timeout(@Named("port") int port, Request request) { @@ -559,4 +578,53 @@ public void setProxyList(Collection proxyList) { this.proxyList = proxyList; } } + + public static class BandwidthLimitDescriptor { + private long maxUpstreamKB; + private long remainingUpstreamKB; + private long maxDownstreamKB; + private long remainingDownstreamKB; + + public BandwidthLimitDescriptor(){ + } + + public BandwidthLimitDescriptor(StreamManager manager){ + this.maxDownstreamKB = manager.getMaxDownstreamKB(); + this.remainingDownstreamKB = manager.getRemainingDownstreamKB(); + this.maxUpstreamKB = manager.getMaxUpstreamKB(); + this.remainingUpstreamKB = manager.getRemainingUpstreamKB(); + } + + public long getMaxUpstreamKB() { + return maxUpstreamKB; + } + + public void setMaxUpstreamKB(long maxUpstreamKB) { + this.maxUpstreamKB = maxUpstreamKB; + } + + public long getRemainingUpstreamKB() { + return remainingUpstreamKB; + } + + public void setRemainingUpstreamKB(long remainingUpstreamKB) { + this.remainingUpstreamKB = remainingUpstreamKB; + } + + public long getMaxDownstreamKB() { + return maxDownstreamKB; + } + + public void setMaxDownstreamKB(long maxDownstreamKB) { + this.maxDownstreamKB = maxDownstreamKB; + } + + public long getRemainingDownstreamKB() { + return remainingDownstreamKB; + } + + public void setRemainingDownstreamKB(long remainingDownstreamKB) { + this.remainingDownstreamKB = remainingDownstreamKB; + } + } } diff --git a/src/main/java/org/java_bandwidthlimiter/BandwidthLimiter.java b/src/main/java/org/java_bandwidthlimiter/BandwidthLimiter.java index cc1c0de78..e1a27c331 100644 --- a/src/main/java/org/java_bandwidthlimiter/BandwidthLimiter.java +++ b/src/main/java/org/java_bandwidthlimiter/BandwidthLimiter.java @@ -36,6 +36,10 @@ public interface BandwidthLimiter { public void setDownstreamKbps(long downstreamKbps); public void setUpstreamKbps(long upstreamKbps); + + public void setDownstreamMaxKB(long downstreamMaxKB); + + public void setUpstreamMaxKB(long upstreamMaxKB); public void setLatency(long latency); } diff --git a/src/main/java/org/java_bandwidthlimiter/MaximumTransferExceededException.java b/src/main/java/org/java_bandwidthlimiter/MaximumTransferExceededException.java new file mode 100644 index 000000000..3b00e68ca --- /dev/null +++ b/src/main/java/org/java_bandwidthlimiter/MaximumTransferExceededException.java @@ -0,0 +1,22 @@ +package org.java_bandwidthlimiter; + +import java.io.IOException; + +public class MaximumTransferExceededException extends IOException { + private final boolean isUpstream; + private final long limit; + + public boolean isUpstream() { + return isUpstream; + } + + public long getLimit() { + return limit; + } + + public MaximumTransferExceededException(long limit, boolean isUpstream) { + super("Maximum " + (isUpstream? "upstream" : "downstream") + " transfer allowance of " + limit + " KB exceeded."); + this.isUpstream = isUpstream; + this.limit = limit; + } +} diff --git a/src/main/java/org/java_bandwidthlimiter/StreamManager.java b/src/main/java/org/java_bandwidthlimiter/StreamManager.java index 7dbfd4a22..5e3743b48 100644 --- a/src/main/java/org/java_bandwidthlimiter/StreamManager.java +++ b/src/main/java/org/java_bandwidthlimiter/StreamManager.java @@ -32,6 +32,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Random; +import static org.java_bandwidthlimiter.BandwidthLimiter.OneSecond; /** * A class that manages the bandwidth of all the registered streams (both input and output streams). @@ -61,6 +62,8 @@ private class StreamParams { public long remainingBps; public long nextResetTimestamp; public long nextResetSubIntervals; + public long maxBytes; + public long remainingBytes; private long timeToNextReset() { return nextResetTimestamp - System.currentTimeMillis(); @@ -83,11 +86,11 @@ private void adjustBytes(long bytesNumber) { //even calls to setDownstreamKbps and setUpstreamKbps will be forced to honor this upperbound. private long maxBytesPerSecond; - private StreamParams downStream = new StreamParams(); - private StreamParams upStream = new StreamParams(); + private final StreamParams downStream = new StreamParams(); + private final StreamParams upStream = new StreamParams(); private long latency = 0; - private Random randomGenerator = new Random(); + private final Random randomGenerator = new Random(); /** * Create an instance of StreamManager. @@ -101,11 +104,13 @@ private void adjustBytes(long bytesNumber) { * */ public StreamManager(long maxBitsPerSecond) { - setMaxBitsPerSecondThreshold( maxBitsPerSecond ); - setDownstreamKbps(maxBitsPerSecond / 1000); - setUpstreamKbps(maxBitsPerSecond / 1000); - setPayloadPercentage(95); - disable(); + this.maxBytesPerSecond = maxBitsPerSecond/8; + setMaxBps(this.downStream, this.maxBytesPerSecond); + setMaxBps(this.upStream, this.maxBytesPerSecond); + this.actualPayloadPercentage = 0.95; + setMaxBytes(this.downStream, 0); + setMaxBytes(this.upStream, 0); + this.enabled = false; } @@ -154,11 +159,31 @@ public void setUpstreamKbps(long upstreamKbps) { public void setLatency(long latency) { this.latency = latency; } + + /** + * Specifies how many kilobytes in total the client is allowed to download. + * When the limit is used up, MaximumTransferExceededException is thrown + * @param downstreamMaxKB + */ + @Override + public void setDownstreamMaxKB(long downstreamMaxKB) { + setMaxBytes(this.downStream, downstreamMaxKB * 1000); + } + + /** + * Specifies how many kilobytes in total the client is allowed to upload. + * When the limit is used up, MaximumTransferExceededException is thrown + * @param upstreamMaxKB + */ + @Override + public void setUpstreamMaxKB(long upstreamMaxKB) { + setMaxBytes(this.upStream, upstreamMaxKB * 1000); + } public long getLatency() { return latency; } - + /** * To take into account overhead due to underlying protocols (e.g. TCP/IP) * @param payloadPercentage a ] 0 , 100] value. where 100 means that the required @@ -192,7 +217,7 @@ public InputStream registerStream(InputStream in) { * Register an output stream. * A client would then use the returned OutputStream, which will throttle * the one passed as parameter. - * * @param in The OutputStream that will be throttled + * @param out The OutputStream that will be throttled * @return a new throttled OutputStream (wrapping the one given as parameter) * */ @@ -215,6 +240,22 @@ public void setMaxBitsPerSecondThreshold(long maxBitsPerSecond) { setMaxBps(this.downStream, this.downStream.maxBps); setMaxBps(this.upStream, this.upStream.maxBps); } + + public long getMaxUpstreamKB(){ + return this.upStream.maxBytes/1000; + } + + public long getRemainingUpstreamKB(){ + return this.upStream.remainingBytes/1000; + } + + public long getMaxDownstreamKB(){ + return this.downStream.maxBytes/1000; + } + + public long getRemainingDownstreamKB(){ + return this.downStream.remainingBytes/1000; + } private void setMaxBps( StreamParams direction, long maxBps ) { synchronized (direction) { @@ -226,6 +267,13 @@ private void setMaxBps( StreamParams direction, long maxBps ) { direction.reset(); } } + + private void setMaxBytes( StreamParams direction, long maxBytes ) { + synchronized (direction) { + direction.maxBytes = maxBytes; + direction.remainingBytes = maxBytes; + } + } private long timeToNextReset(StreamParams direction) { synchronized (direction) { @@ -255,7 +303,7 @@ private int getAllowedBytesWrite(ManagedOutputStream stream, int bufferLength) { private int getAllowedBytesUnFair(StreamParams direction, int bufferLength) { //this is an unfair allocation of bytes/second because it gives as many //bytes as possible to anyone who ask for them - int allowed = 0; + int allowed; synchronized(direction) { resetCounterIfNecessary(direction); //this stream desires to read up to bufferLength bytes @@ -288,6 +336,11 @@ private int manageRead(ManagedInputStream stream, byte[] b, int off, int len) th if(allowed > 0) { // Read a maximum of "allowed" bytes int bytesRead = stream.doRead(b, off, allowed); + + // check if we exceeded the transfer limit + if(this.downStream.maxBytes > 0 && (this.downStream.remainingBytes -= bytesRead) < 0){ + throw new MaximumTransferExceededException(getMaxDownstreamKB(), false); + } // If less than the "allowed" bytes were read, adjust how many we can still read for this period of time adjustBytes(this.downStream, allowed - bytesRead); @@ -318,7 +371,7 @@ private void manageWrite(ManagedOutputStream stream, byte[] b, int off, int len) assert maxBytesPerSecond > 0; int bytesWritten = 0; - int allowed = 0; + int allowed; // we need a while loop since the write doesn't return a "written bytes" count, // rather it expects that all of them are written // hence we loop here until all of them have been written @@ -326,7 +379,11 @@ private void manageWrite(ManagedOutputStream stream, byte[] b, int off, int len) allowed = getAllowedBytesWrite(stream, len); if(allowed > 0) { stream.doWrite(b, off, allowed); - bytesWritten += allowed; + bytesWritten += allowed; + // check if we exceeded the transfer limit + if(this.upStream.maxBytes > 0 && (this.upStream.remainingBytes -= allowed) < 0){ + throw new MaximumTransferExceededException(this.getMaxUpstreamKB(), true); + } } else { long sleepTime = timeToNextReset(this.upStream); if( sleepTime > 0 ) { @@ -361,7 +418,7 @@ private class ManagedOutputStream extends OutputStream { long lastActivity; boolean roundUp; //just an helper buffer so we don't allocate it all the time when calling void write(int b) which writes ONE byte! - private byte[] oneByteBuff = new byte[1]; + private final byte[] oneByteBuff = new byte[1]; public ManagedOutputStream(OutputStream stream, StreamManager manager) { assert manager != null; @@ -407,10 +464,12 @@ public void doWrite(byte[] b, int offset, int length) throws IOException { stream.write(b, offset, length); } + @Override public void flush() throws IOException { stream.flush(); } + @Override public void close() throws IOException { stream.close(); } @@ -422,7 +481,7 @@ private class ManagedInputStream extends InputStream { long lastActivity; boolean roundUp; //just an helper buffer so we don't allocate it all the time when calling int read() which reads ONE byte! - private byte[] oneByteBuff = new byte[1]; + private final byte[] oneByteBuff = new byte[1]; public ManagedInputStream(InputStream stream, StreamManager manager) { assert manager != null; @@ -445,17 +504,20 @@ public boolean getRoundUp() { return roundUp; } + @Override public int read() throws IOException { read(oneByteBuff, 0, 1); return oneByteBuff[0]; } + @Override public int read(byte[] b) throws IOException { int length = b.length; int bytesRead = read(b, 0, length); return bytesRead; } + @Override public int read(byte[] b, int off, int len) throws IOException { int readBytes = manager.manageRead(this, b, off, len); if( readBytes > 0 ) { @@ -472,26 +534,32 @@ public int doRead(byte[] b, int offset, int length) throws IOException { return stream.read(b, offset, length); } + @Override public long skip(long n) throws IOException { return stream.skip(n); } + @Override public int available() throws IOException { return stream.available(); } + @Override public void close() throws IOException { stream.close(); } + @Override public void mark(int readLimit) { stream.mark(readLimit); } + @Override public void reset() throws IOException { stream.reset(); } + @Override public boolean markSupported() { return stream.markSupported(); } diff --git a/src/test/java/org/java_bandwidthlimiter/StreamManagerTest.java b/src/test/java/org/java_bandwidthlimiter/StreamManagerTest.java new file mode 100644 index 000000000..4614dc298 --- /dev/null +++ b/src/test/java/org/java_bandwidthlimiter/StreamManagerTest.java @@ -0,0 +1,87 @@ +package org.java_bandwidthlimiter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Test; + +public class StreamManagerTest { + private final int BUFFER = 256; + private final long LIMIT_KB = 8; + private byte[] data; + + @Before + public void setUp(){ + data = new byte[10*1024]; + for(int i = 0; i < data.length; i++){ + data[i] = 127; + } + } + + @Test + public void testDownstreamDataLimit(){ + StreamManager sm = new StreamManager( BandwidthLimiter.OneMbps ); + sm.setDownstreamMaxKB(LIMIT_KB); + sm.enable(); + InputStream in = sm.registerStream(new ByteArrayInputStream(data)); + byte[] buffer = new byte[BUFFER]; + int read = 0; + try{ + while(read < data.length){ + read += in.read(buffer); + } + + fail(); + + }catch(IOException ex){ + assertThat(ex, instanceOf(MaximumTransferExceededException.class)); + + MaximumTransferExceededException ex2 = (MaximumTransferExceededException)ex; + + assertFalse(ex2.isUpstream()); + assertEquals(LIMIT_KB, ex2.getLimit()); + assertTrue(read < LIMIT_KB*1000 + BUFFER); + assertTrue(read >= LIMIT_KB*1000 - BUFFER); + } + } + + @Test + public void testUpstreamDataLimit(){ + StreamManager sm = new StreamManager( BandwidthLimiter.OneMbps ); + sm.setUpstreamMaxKB(LIMIT_KB); + sm.enable(); + InputStream in = new ByteArrayInputStream(data); + ByteArrayOutputStream out1 = new ByteArrayOutputStream(data.length); + OutputStream out = sm.registerStream(out1); + byte[] buffer = new byte[BUFFER]; + int read = 0; + try{ + while(read < data.length){ + read += in.read(buffer); + out.write(buffer); + } + + fail(); + + }catch(IOException ex){ + assertThat(ex, instanceOf(MaximumTransferExceededException.class)); + + MaximumTransferExceededException ex2 = (MaximumTransferExceededException)ex; + + assertTrue(ex2.isUpstream()); + assertEquals(LIMIT_KB, ex2.getLimit()); + assertTrue(out1.size() < LIMIT_KB*1000 + BUFFER); + assertTrue(out1.size() >= LIMIT_KB*1000 - BUFFER); + } + } + +}