Skip to content

Commit

Permalink
bounded retrieval of data from stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Karthik Kumar committed Dec 30, 2011
1 parent c1efb91 commit b1e5807
Show file tree
Hide file tree
Showing 5 changed files with 2,698 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/java/voldemort/store/readonly/swapper/HttpStoreSwapper.java
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
Expand All @@ -20,6 +19,7 @@
import voldemort.cluster.Cluster; import voldemort.cluster.Cluster;
import voldemort.cluster.Node; import voldemort.cluster.Node;
import voldemort.store.readonly.ReadOnlyUtils; import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.VoldemortIOUtils;


import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -80,9 +80,9 @@ public String call() throws Exception {


HttpResponse httpResponse = httpClient.execute(post); HttpResponse httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode(); int responseCode = httpResponse.getStatusLine().getStatusCode();
String response = IOUtils.toString(httpResponse.getEntity().getContent()); String response = VoldemortIOUtils.toString(httpResponse.getEntity()
// TODO: clip response to 30K . .getContent(),
// post.getResponseBodyAsString(30000); 30000);


if(responseCode != 200) if(responseCode != 200)
throw new VoldemortException("Fetch request on node " throw new VoldemortException("Fetch request on node "
Expand Down Expand Up @@ -174,9 +174,9 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {


HttpResponse httpResponse = httpClient.execute(post); HttpResponse httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode(); int responseCode = httpResponse.getStatusLine().getStatusCode();
String previousDir = IOUtils.toString(httpResponse.getEntity().getContent()); String previousDir = VoldemortIOUtils.toString(httpResponse.getEntity()
// TODO: clip response to 30K . .getContent(),
// post.getResponseBodyAsString(30000); 30000);


if(responseCode != 200) if(responseCode != 200)
throw new VoldemortException("Swap request on node " + node.getId() + " (" throw new VoldemortException("Swap request on node " + node.getId() + " ("
Expand Down Expand Up @@ -269,4 +269,5 @@ public void invokeRollback(String storeName, final long pushVersion) {
if(exception != null) if(exception != null)
throw new VoldemortException(exception); throw new VoldemortException(exception);
} }

} }
87 changes: 87 additions & 0 deletions src/java/voldemort/utils/VoldemortIOUtils.java
@@ -0,0 +1,87 @@
/*
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;

public class VoldemortIOUtils {

private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;

public static String toString(final InputStream input, final long limit)
throws IOException {
return toString(input, null, limit);
}

public static String toString(final InputStream input,
final String encoding,
final long limit) throws IOException {
StringWriter sw = new StringWriter();
copy(input, sw, encoding, limit);
return sw.toString();
}

public static void copy(InputStream input, Writer output, String encoding, final long limit)
throws IOException {
if(encoding == null) {
copy(input, output, limit);
} else {
InputStreamReader in = new InputStreamReader(input, encoding);
copy(in, output, limit);
}
}

public static void copy(InputStream input, Writer output, final long limit) throws IOException {
InputStreamReader in = new InputStreamReader(input);
copy(in, output, limit);
}

public static int copy(Reader input, Writer output, long limit) throws IOException {
long count = copyLarge(input, output, limit);
if(count > Integer.MAX_VALUE) {
return -1;
}
return (int) count;
}

public static long copyLarge(Reader input, Writer output, long limit) throws IOException {
char[] buffer = new char[DEFAULT_BUFFER_SIZE];
long count = 0;
int n = 0;
long remaining = limit;
while(remaining > 0) {
if(remaining > DEFAULT_BUFFER_SIZE) {
n = input.read(buffer);
} else {
char[] remainingbuffer = new char[(int) remaining];
n = input.read(remainingbuffer);
}
if(n == -1) {
break;
}
output.write(buffer, 0, n);
count += n;
remaining -= n;
}
return count;
}
}
34 changes: 34 additions & 0 deletions test/unit/voldemort/utils/VoldemortIOUtilsTest.java
@@ -0,0 +1,34 @@
package voldemort.utils;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;

public class VoldemortIOUtilsTest {

@Test
public void testToString() throws IOException {
// some input file which is > 30K;
final int upperBound = 30000;
InputStream is = getClass().getResourceAsStream("Xtranslcl.c.input");
String str = IOUtils.toString(is);
Assert.assertTrue(str.length() > 0);
Assert.assertTrue(str.length() > upperBound);

InputStream is2 = getClass().getResourceAsStream("Xtranslcl.c.input");
String str2 = VoldemortIOUtils.toString(is2, upperBound);
Assert.assertEquals(upperBound, str2.length());
}

@Test
public void testToStringSmall() throws IOException {
final int upperBound = 30000;
InputStream is = getClass().getResourceAsStream("maze.c.input");
String str2 = VoldemortIOUtils.toString(is, upperBound);
Assert.assertTrue(str2.length() <= upperBound);
}

}

0 comments on commit b1e5807

Please sign in to comment.