Skip to content

Commit

Permalink
Always write chunks of requested segment size to proxy stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
dkocher committed Aug 4, 2017
1 parent 5be4bee commit 6690358
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 10 deletions.
Expand Up @@ -40,7 +40,7 @@ public void close() throws IOException {
buffer.close();
}

protected void copy() throws IOException {
public void flush() throws IOException {
if(log.isDebugEnabled()) {
log.debug(String.format("Copy buffer%s to output", buffer));
}
Expand Down
Expand Up @@ -20,12 +20,14 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

public class MemorySegementingOutputStream extends SegmentingOutputStream {
private static final Logger log = Logger.getLogger(MemorySegementingOutputStream.class);

private final OutputStream proxy;
private final ByteArrayOutputStream buffer;
private final Integer threshold;

public MemorySegementingOutputStream(final OutputStream proxy, final Integer threshold) {
this(proxy, threshold, new ByteArrayOutputStream(threshold));
Expand All @@ -35,13 +37,32 @@ public MemorySegementingOutputStream(final OutputStream proxy, final Integer thr
super(proxy, (long) threshold, buffer);
this.proxy = proxy;
this.buffer = buffer;
this.threshold = threshold;
}

@Override
protected void copy() throws IOException {
public void flush() throws IOException {
// Copy from memory file to output
buffer.writeTo(proxy);
final byte[] content = buffer.toByteArray();
// Re-use buffer
buffer.reset();
for(int offset = 0; offset < content.length; offset += threshold) {
int len = Math.min(threshold, content.length - offset);
final byte[] bytes = Arrays.copyOfRange(content, offset, offset + len);
if(len < threshold) {
// Write to start of buffer
this.write(bytes);
}
else {
// Write out
proxy.write(bytes);
}
}
}

@Override
public void close() throws IOException {
proxy.write(buffer.toByteArray());
super.close();
}
}
Expand Up @@ -92,11 +92,19 @@ protected void afterWrite(final int n) throws IOException {

protected void checkThreshold(final int count) throws IOException {
if(written >= threshold) {
this.copy();
this.reset();
this.flush();
}
}

/**
* Copy from temporary buffer to output
*/
@Override
public void flush() throws IOException {
proxy.flush();
}

@Override
public void close() throws IOException {
if(close.get()) {
Expand All @@ -105,8 +113,8 @@ public void close() throws IOException {
}
try {
if(written > 0L || !after.get()) {
this.copy();
this.reset();
this.flush();
}
proxy.close();
}
Expand All @@ -115,11 +123,6 @@ public void close() throws IOException {
}
}

/**
* Copy from temporary buffer to output
*/
protected abstract void copy() throws IOException;

protected void reset() {
// Wait for trigger of next threshold
this.written = 0L;
Expand Down
@@ -0,0 +1,58 @@
package ch.cyberduck.core.io;

/*
* Copyright (c) 2002-2017 iterate GmbH. All rights reserved.
* https://cyberduck.io/
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;

import static org.junit.Assert.assertArrayEquals;

public class MemorySegementingOutputStreamTest {

@Test
public void testCopy1() throws Exception {
final ByteArrayOutputStream proxy = new ByteArrayOutputStream(20);
final MemorySegementingOutputStream out = new MemorySegementingOutputStream(proxy, 32768);
final byte[] content = RandomUtils.nextBytes(40500);
out.write(content, 0, 32800);
out.write(content, 32800, 7700);
out.close();
assertArrayEquals(content, proxy.toByteArray());
}

@Test
public void testCopy2() throws Exception {
final ByteArrayOutputStream proxy = new ByteArrayOutputStream(40500);
final MemorySegementingOutputStream out = new MemorySegementingOutputStream(proxy, 32768);
final byte[] content = RandomUtils.nextBytes(40500);
out.write(content, 0, 32768);
out.write(content, 32768, 7732);
out.close();
assertArrayEquals(content, proxy.toByteArray());
}

@Test
public void testCopy3() throws Exception {
final ByteArrayOutputStream proxy = new ByteArrayOutputStream(40500);
final MemorySegementingOutputStream out = new MemorySegementingOutputStream(proxy, 32768);
final byte[] content = RandomUtils.nextBytes(40500);
out.write(content, 0, 32767);
out.write(content, 32767, 7733);
out.close();
assertArrayEquals(content, proxy.toByteArray());
}
}

0 comments on commit 6690358

Please sign in to comment.