Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

release http resources after use

  • Loading branch information...
commit 3dc4fe8dbe90f5eea833555b1c7d0fcb4c9ca412 1 parent b1e5807
Karthik K akkumar authored
55 src/java/voldemort/store/http/HttpStore.java
View
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
@@ -38,6 +39,7 @@
import voldemort.store.StoreUtils;
import voldemort.store.UnreachableStoreException;
import voldemort.utils.ByteArray;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
@@ -70,39 +72,43 @@ public HttpStore(String storeName,
public boolean delete(ByteArray key, Version version) throws VoldemortException {
StoreUtils.assertValidKey(key);
- HttpPost method = null;
+ DataInputStream input = null;
try {
- method = new HttpPost(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeDeleteRequest(new DataOutputStream(outputBytes),
storeName,
key,
(VectorClock) version,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readDeleteResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
+ } finally {
+ IOUtils.closeQuietly(input);
}
}
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
StoreUtils.assertValidKey(key);
- HttpPost method = null;
+ DataInputStream input = null;
try {
- method = new HttpPost(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetRequest(new DataOutputStream(outputBytes),
storeName,
key,
transforms,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
+ } finally {
+ IOUtils.closeQuietly(input);
}
}
@@ -110,29 +116,31 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
StoreUtils.assertValidKeys(keys);
- HttpPost method = null;
+ DataInputStream input = null;
try {
- method = new HttpPost(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetAllRequest(new DataOutputStream(outputBytes),
storeName,
keys,
transforms,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetAllResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
+ } finally {
+ IOUtils.closeQuietly(input);
}
}
public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
throws VoldemortException {
StoreUtils.assertValidKey(key);
- HttpPost method = null;
+ DataInputStream input = null;
try {
- method = new HttpPost(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writePutRequest(new DataOutputStream(outputBytes),
storeName,
@@ -141,25 +149,32 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
transforms,
(VectorClock) versioned.getVersion(),
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
requestFormat.readPutResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
+ } finally {
+ IOUtils.closeQuietly(input);
}
}
private DataInputStream executeRequest(HttpPost method, ByteArrayOutputStream output) {
+ HttpResponse response = null;
try {
method.setEntity(new ByteArrayEntity(output.toByteArray()));
- HttpResponse response = httpClient.execute(method);
+ response = httpClient.execute(method);
int statusCode = response.getStatusLine().getStatusCode();
- if(statusCode != HttpURLConnection.HTTP_OK)
+ if(statusCode != HttpURLConnection.HTTP_OK) {
+ String message = response.getStatusLine().getReasonPhrase();
+ VoldemortIOUtils.closeQuietly(response);
throw new UnreachableStoreException("HTTP request to store " + storeName
- + " returned status code " + response + " "
- + response.getStatusLine().getReasonPhrase());
+ + " returned status code " + statusCode + " "
+ + message);
+ }
return new DataInputStream(response.getEntity().getContent());
} catch(IOException e) {
+ VoldemortIOUtils.closeQuietly(response);
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
}
@@ -177,19 +192,21 @@ public Object getCapability(StoreCapabilityType capability) {
public List<Version> getVersions(ByteArray key) {
StoreUtils.assertValidKey(key);
- HttpPost method = null;
+ DataInputStream input = null;
try {
- method = new HttpPost(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetVersionRequest(new DataOutputStream(outputBytes),
storeName,
key,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetVersionResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
+ } finally {
+ IOUtils.closeQuietly(input);
}
}
}
60 src/java/voldemort/store/readonly/swapper/HttpStoreSwapper.java
View
@@ -1,6 +1,7 @@
package voldemort.store.readonly.swapper;
import java.io.File;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -78,22 +79,26 @@ public String call() throws Exception {
logger.info("Invoking fetch for node " + node.getId() + " for " + storeDir);
- HttpResponse httpResponse = httpClient.execute(post);
- int responseCode = httpResponse.getStatusLine().getStatusCode();
- String response = VoldemortIOUtils.toString(httpResponse.getEntity()
- .getContent(),
- 30000);
-
- if(responseCode != 200)
- throw new VoldemortException("Fetch request on node "
- + node.getId()
- + " ("
- + url
- + ") failed: "
- + httpResponse.getStatusLine()
- .getReasonPhrase());
- logger.info("Fetch succeeded on node " + node.getId());
- return response.trim();
+ HttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ InputStream is = httpResponse.getEntity().getContent();
+ String response = VoldemortIOUtils.toString(is, 30000);
+
+ if(responseCode != 200)
+ throw new VoldemortException("Fetch request on node "
+ + node.getId()
+ + " ("
+ + url
+ + ") failed: "
+ + httpResponse.getStatusLine()
+ .getReasonPhrase());
+ logger.info("Fetch succeeded on node " + node.getId());
+ return response.trim();
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
+ }
}
}));
}
@@ -116,6 +121,7 @@ public String call() throws Exception {
if(deleteFailedFetch) {
// Delete data from successful nodes
for(int successfulNodeId: results.keySet()) {
+ HttpResponse httpResponse = null;
try {
String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
+ readOnlyMgmtPath;
@@ -126,7 +132,7 @@ public String call() throws Exception {
params.setParameter("store", storeName);
logger.info("Deleting fetched data from node " + successfulNodeId);
- HttpResponse httpResponse = httpClient.execute(post);
+ httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode();
String response = httpResponse.getStatusLine().getReasonPhrase();
@@ -138,6 +144,8 @@ public String call() throws Exception {
} catch(Exception e) {
logger.error("Exception thrown during delete operation on node "
+ successfulNodeId + " : ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
}
}
}
@@ -162,6 +170,7 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
HashMap<Integer, Exception> exceptions = Maps.newHashMap();
for(final Node node: cluster.getNodes()) {
+ HttpResponse httpResponse = null;
try {
String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
HttpPost post = new HttpPost(url);
@@ -172,11 +181,10 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
params.setParameter("dir", dir);
params.setParameter("store", storeName);
- HttpResponse httpResponse = httpClient.execute(post);
+ httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode();
String previousDir = VoldemortIOUtils.toString(httpResponse.getEntity()
- .getContent(),
- 30000);
+ .getContent(), 30000);
if(responseCode != 200)
throw new VoldemortException("Swap request on node " + node.getId() + " ("
@@ -188,6 +196,8 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
exceptions.put(node.getId(), e);
logger.error("Exception thrown during swap operation on node " + node.getId()
+ ": ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse, node.toString());
}
}
@@ -195,6 +205,7 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
if(rollbackFailedSwap) {
// Rollback data on successful nodes
for(int successfulNodeId: previousDirs.keySet()) {
+ HttpResponse httpResponse = null;
try {
String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
+ readOnlyMgmtPath;
@@ -207,7 +218,7 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
logger.info("Rolling back data on successful node " + successfulNodeId);
- HttpResponse httpResponse = httpClient.execute(post);
+ httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode();
String response = httpResponse.getStatusLine().getReasonPhrase();
@@ -220,6 +231,8 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
logger.error("Exception thrown during rollback ( after swap ) operation on node "
+ successfulNodeId + " : ",
e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
}
}
}
@@ -240,6 +253,7 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
public void invokeRollback(String storeName, final long pushVersion) {
Exception exception = null;
for(Node node: cluster.getNodes()) {
+ HttpResponse httpResponse = null;
try {
logger.info("Attempting rollback for node " + node.getId() + " storeName = "
+ storeName);
@@ -251,7 +265,7 @@ public void invokeRollback(String storeName, final long pushVersion) {
params.setParameter("store", storeName);
params.setParameter("pushVersion", Long.toString(pushVersion));
- HttpResponse httpResponse = httpClient.execute(post);
+ httpResponse = httpClient.execute(post);
int responseCode = httpResponse.getStatusLine().getStatusCode();
String response = httpResponse.getStatusLine().getReasonPhrase();
if(responseCode == 200) {
@@ -263,6 +277,8 @@ public void invokeRollback(String storeName, final long pushVersion) {
exception = e;
logger.error("Exception thrown during rollback operation on node " + node.getId()
+ ": ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse, String.valueOf(node));
}
}
28 src/java/voldemort/utils/VoldemortIOUtils.java
View
@@ -23,18 +23,36 @@
import java.io.StringWriter;
import java.io.Writer;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.log4j.Logger;
+
public class VoldemortIOUtils {
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
- public static String toString(final InputStream input, final long limit)
- throws IOException {
+ private static final Logger logger = Logger.getLogger(VoldemortIOUtils.class);
+
+ public static String toString(final InputStream input, final long limit) throws IOException {
return toString(input, null, limit);
}
- public static String toString(final InputStream input,
- final String encoding,
- final long limit) throws IOException {
+ public static void closeQuietly(final HttpResponse httpResponse, final String context) {
+ if(httpResponse != null && httpResponse.getEntity() != null) {
+ try {
+ IOUtils.closeQuietly(httpResponse.getEntity().getContent());
+ } catch(Exception e) {
+ logger.error("Error closing entity connection : " + context, e);
+ }
+ }
+ }
+
+ public static void closeQuietly(final HttpResponse httpResponse) {
+ closeQuietly(httpResponse, "");
+ }
+
+ public static String toString(final InputStream input, final String encoding, final long limit)
+ throws IOException {
StringWriter sw = new StringWriter();
copy(input, sw, encoding, limit);
return sw.toString();
9 test/integration/voldemort/performance/HttpClientBench.java
View
@@ -30,6 +30,8 @@
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
+import voldemort.utils.VoldemortIOUtils;
+
public class HttpClientBench {
private static final int DEFAULT_CONNECTION_MANAGER_TIMEOUT = 100000;
@@ -54,12 +56,15 @@ public static void main(String[] args) throws Exception {
@Override
public void doOperation(int index) {
- HttpGet get = new HttpGet(url);
+ HttpResponse response = null;
try {
- HttpResponse response = client.execute(get);
+ HttpGet get = new HttpGet(url);
+ response = client.execute(get);
response.getEntity().consumeContent();
} catch(Exception e) {
e.printStackTrace();
+ } finally {
+ VoldemortIOUtils.closeQuietly(response);
}
}
};
18 test/unit/voldemort/utils/VoldemortIOUtilsTest.java
View
@@ -2,8 +2,12 @@
import java.io.IOException;
import java.io.InputStream;
+import java.net.HttpURLConnection;
import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpVersion;
+import org.apache.http.message.BasicHttpResponse;
import org.junit.Assert;
import org.junit.Test;
@@ -31,4 +35,18 @@ public void testToStringSmall() throws IOException {
Assert.assertTrue(str2.length() <= upperBound);
}
+ @Test
+ public void testCloseQuietlyNullHttpResponse() {
+ VoldemortIOUtils.closeQuietly(null);
+ }
+
+ @Test
+ public void testCloseQuietlyNullEntity() {
+ HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
+ HttpURLConnection.HTTP_OK,
+ "");
+ response.setEntity(null);
+ VoldemortIOUtils.closeQuietly(response);
+ }
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.