Permalink
Browse files

Merge pull request #61 from akkumar/httpclient4x

Upgrade to httpclient 4.x from commons-httpclient 3.0.x
  • Loading branch information...
2 parents a01e4de + 390f9a5 commit ce113eec31ce359ad224c3306659ac16fe96a3c6 @afeinberg afeinberg committed Jan 1, 2012
View
5 .classpath
@@ -14,11 +14,9 @@
<classpathentry kind="src" path="contrib/krati/test"/>
<classpathentry kind="src" path="contrib/collections/src/java"/>
<classpathentry kind="src" path="contrib/collections/test"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/catalina-ant.jar"/>
<classpathentry kind="lib" path="lib/commons-codec-1.3.jar"/>
<classpathentry kind="lib" path="lib/commons-dbcp-1.2.2.jar"/>
- <classpathentry kind="lib" path="lib/commons-httpclient-3.1.jar"/>
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-cli-2.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.20.2-core.jar"/>
@@ -53,5 +51,8 @@
<classpathentry kind="lib" path="lib/libthrift-0.5.0.jar"/>
<classpathentry kind="lib" path="lib/compress-lzf-0.9.1.jar"/>
<classpathentry kind="lib" path="lib/snappy-0.2.jar"/>
+ <classpathentry kind="lib" path="lib/httpclient-4.1.2.jar" />
+ <classpathentry kind="lib" path="lib/httpcore-4.1.2.jar" />
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
7 .settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Fri Sep 18 15:20:52 BST 2009
+#Fri Dec 30 14:37:10 PST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -149,9 +149,12 @@ org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true
org.eclipse.jdt.core.formatter.indentation.size=4
+org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_member=insert
+org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
+org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
+org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
View
BIN lib/commons-httpclient-3.1.jar
Binary file not shown.
View
BIN lib/httpclient-4.1.2.jar
Binary file not shown.
View
BIN lib/httpcore-4.1.2.jar
Binary file not shown.
View
63 src/java/voldemort/client/HttpStoreClientFactory.java
@@ -22,15 +22,16 @@
import java.util.Collection;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpVersion;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.params.HttpClientParams;
-import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
-import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.http.HttpVersion;
+import org.apache.http.client.params.CookiePolicy;
+import org.apache.http.client.params.HttpClientParams;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.conn.SchemeRegistryFactory;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
import voldemort.client.protocol.RequestFormatFactory;
import voldemort.client.protocol.RequestFormatType;
@@ -42,6 +43,7 @@
import voldemort.store.http.HttpStore;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.ByteArray;
+import voldemort.utils.VoldemortIOUtils;
/**
* A {@link voldemort.client.StoreClientFactory StoreClientFactory} that creates
@@ -55,31 +57,33 @@
private static final String VOLDEMORT_USER_AGENT = "vldmrt/0.01";
- private final HttpClient httpClient;
+ private final DefaultHttpClient httpClient;
private final RequestFormatFactory requestFormatFactory;
private final boolean reroute;
public HttpStoreClientFactory(ClientConfig config) {
super(config);
- HostConfiguration hostConfig = new HostConfiguration();
- hostConfig.getParams().setParameter("http.protocol.version", HttpVersion.HTTP_1_1);
- MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
- this.httpClient = new HttpClient(connectionManager);
- this.httpClient.setHostConfiguration(hostConfig);
- HttpClientParams clientParams = this.httpClient.getParams();
- clientParams.setConnectionManagerTimeout(config.getConnectionTimeout(TimeUnit.MILLISECONDS));
- clientParams.setSoTimeout(config.getSocketTimeout(TimeUnit.MILLISECONDS));
- clientParams.setParameter(HttpMethodParams.RETRY_HANDLER,
- new DefaultHttpMethodRetryHandler(0, false));
- clientParams.setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
- clientParams.setParameter("http.useragent", VOLDEMORT_USER_AGENT);
- HttpConnectionManagerParams managerParams = this.httpClient.getHttpConnectionManager()
- .getParams();
- managerParams.setConnectionTimeout(config.getConnectionTimeout(TimeUnit.MILLISECONDS));
- managerParams.setMaxTotalConnections(config.getMaxTotalConnections());
- managerParams.setStaleCheckingEnabled(false);
- managerParams.setMaxConnectionsPerHost(httpClient.getHostConfiguration(),
- config.getMaxConnectionsPerNode());
+ ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(SchemeRegistryFactory.createDefault(),
+ config.getConnectionTimeout(TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ mgr.setMaxTotal(config.getMaxTotalConnections());
+ mgr.setDefaultMaxPerRoute(config.getMaxConnectionsPerNode());
+
+ this.httpClient = new DefaultHttpClient(mgr);
+ HttpParams clientParams = this.httpClient.getParams();
+
+ HttpProtocolParams.setUserAgent(clientParams, VOLDEMORT_USER_AGENT);
+ HttpProtocolParams.setVersion(clientParams, HttpVersion.HTTP_1_1);
+
+ HttpConnectionParams.setConnectionTimeout(clientParams,
+ config.getConnectionTimeout(TimeUnit.MILLISECONDS));
+ HttpConnectionParams.setSoTimeout(clientParams,
+ config.getSocketTimeout(TimeUnit.MILLISECONDS));
+ HttpConnectionParams.setStaleCheckingEnabled(clientParams, false);
+
+ this.httpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(0, false));
+ HttpClientParams.setCookiePolicy(clientParams, CookiePolicy.IGNORE_COOKIES);
+
this.reroute = config.getRoutingTier().equals(RoutingTier.SERVER);
this.requestFormatFactory = new RequestFormatFactory();
}
@@ -137,6 +141,7 @@ protected void validateUrl(URI url) {
public void close() {
super.close();
// should timeout connections on its own
+ VoldemortIOUtils.closeQuietly(this.httpClient);
}
}
View
77 src/java/voldemort/store/http/HttpStore.java
@@ -24,10 +24,11 @@
import java.util.List;
import java.util.Map;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpException;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormat;
@@ -38,6 +39,7 @@
import voldemort.store.StoreUtils;
import voldemort.store.UnreachableStoreException;
import voldemort.utils.ByteArray;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
@@ -70,78 +72,75 @@ public HttpStore(String storeName,
public boolean delete(ByteArray key, Version version) throws VoldemortException {
StoreUtils.assertValidKey(key);
- PostMethod method = null;
+ DataInputStream input = null;
try {
- method = new PostMethod(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeDeleteRequest(new DataOutputStream(outputBytes),
storeName,
key,
(VectorClock) version,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readDeleteResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
} finally {
- if(method != null)
- method.releaseConnection();
+ IOUtils.closeQuietly(input);
}
}
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
StoreUtils.assertValidKey(key);
- PostMethod method = null;
+ DataInputStream input = null;
try {
- method = new PostMethod(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetRequest(new DataOutputStream(outputBytes),
storeName,
key,
transforms,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
} finally {
- if(method != null)
- method.releaseConnection();
+ IOUtils.closeQuietly(input);
}
}
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
StoreUtils.assertValidKeys(keys);
- PostMethod method = null;
+ DataInputStream input = null;
try {
- method = new PostMethod(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetAllRequest(new DataOutputStream(outputBytes),
storeName,
keys,
transforms,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetAllResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
} finally {
- if(method != null)
- method.releaseConnection();
+ IOUtils.closeQuietly(input);
}
}
public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
throws VoldemortException {
StoreUtils.assertValidKey(key);
- PostMethod method = null;
+ DataInputStream input = null;
try {
- method = new PostMethod(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writePutRequest(new DataOutputStream(outputBytes),
storeName,
@@ -150,29 +149,32 @@ public void put(ByteArray key, Versioned<byte[]> versioned, byte[] transforms)
transforms,
(VectorClock) versioned.getVersion(),
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
requestFormat.readPutResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
} finally {
- if(method != null)
- method.releaseConnection();
+ IOUtils.closeQuietly(input);
}
}
- private DataInputStream executeRequest(PostMethod method, ByteArrayOutputStream output) {
+ private DataInputStream executeRequest(HttpPost method, ByteArrayOutputStream output) {
+ HttpResponse response = null;
try {
- method.setRequestEntity(new ByteArrayRequestEntity(output.toByteArray()));
- int response = httpClient.executeMethod(method);
- if(response != HttpURLConnection.HTTP_OK)
+ method.setEntity(new ByteArrayEntity(output.toByteArray()));
+ response = httpClient.execute(method);
+ int statusCode = response.getStatusLine().getStatusCode();
+ if(statusCode != HttpURLConnection.HTTP_OK) {
+ String message = response.getStatusLine().getReasonPhrase();
+ VoldemortIOUtils.closeQuietly(response);
throw new UnreachableStoreException("HTTP request to store " + storeName
- + " returned status code " + response + " "
- + method.getStatusText());
- return new DataInputStream(method.getResponseBodyAsStream());
- } catch(HttpException e) {
- throw new VoldemortException(e);
+ + " returned status code " + statusCode + " "
+ + message);
+ }
+ return new DataInputStream(response.getEntity().getContent());
} catch(IOException e) {
+ VoldemortIOUtils.closeQuietly(response);
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
}
@@ -190,22 +192,21 @@ public Object getCapability(StoreCapabilityType capability) {
public List<Version> getVersions(ByteArray key) {
StoreUtils.assertValidKey(key);
- PostMethod method = null;
+ DataInputStream input = null;
try {
- method = new PostMethod(this.storeUrl);
+ HttpPost method = new HttpPost(this.storeUrl);
ByteArrayOutputStream outputBytes = new ByteArrayOutputStream();
requestFormat.writeGetVersionRequest(new DataOutputStream(outputBytes),
storeName,
key,
reroute);
- DataInputStream input = executeRequest(method, outputBytes);
+ input = executeRequest(method, outputBytes);
return requestFormat.readGetVersionResponse(input);
} catch(IOException e) {
throw new UnreachableStoreException("Could not connect to " + storeUrl + " for "
+ storeName, e);
} finally {
- if(method != null)
- method.releaseConnection();
+ IOUtils.closeQuietly(input);
}
}
}
View
126 src/java/voldemort/store/readonly/swapper/HttpStoreSwapper.java
@@ -1,6 +1,8 @@
package voldemort.store.readonly.swapper;
import java.io.File;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -9,14 +11,17 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.params.HttpParams;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.store.readonly.ReadOnlyUtils;
+import voldemort.utils.VoldemortIOUtils;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@@ -64,24 +69,37 @@ public HttpStoreSwapper(Cluster cluster,
public String call() throws Exception {
String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
- PostMethod post = new PostMethod(url);
- post.addParameter("operation", "fetch");
+ HttpPost post = new HttpPost(url);
+ HttpParams params = post.getParams();
+ params.setParameter("operation", "fetch");
String storeDir = basePath + "/node-" + node.getId();
- post.addParameter("dir", storeDir);
- post.addParameter("store", storeName);
+ params.setParameter("dir", storeDir);
+ params.setParameter("store", storeName);
if(pushVersion > 0)
- post.addParameter("pushVersion", Long.toString(pushVersion));
+ params.setParameter("pushVersion", Long.toString(pushVersion));
logger.info("Invoking fetch for node " + node.getId() + " for " + storeDir);
- int responseCode = httpClient.executeMethod(post);
- String response = post.getResponseBodyAsString(30000);
-
- if(responseCode != 200)
- throw new VoldemortException("Fetch request on node " + node.getId() + " ("
- + url + ") failed: " + post.getStatusText());
- logger.info("Fetch succeeded on node " + node.getId());
- return response.trim();
+ HttpResponse httpResponse = null;
+ try {
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ InputStream is = httpResponse.getEntity().getContent();
+ String response = VoldemortIOUtils.toString(is, 30000);
+
+ if(responseCode != HttpURLConnection.HTTP_OK)
+ throw new VoldemortException("Fetch request on node "
+ + node.getId()
+ + " ("
+ + url
+ + ") failed: "
+ + httpResponse.getStatusLine()
+ .getReasonPhrase());
+ logger.info("Fetch succeeded on node " + node.getId());
+ return response.trim();
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
+ }
}
}));
}
@@ -104,17 +122,20 @@ public String call() throws Exception {
if(deleteFailedFetch) {
// Delete data from successful nodes
for(int successfulNodeId: results.keySet()) {
+ HttpResponse httpResponse = null;
try {
String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
+ readOnlyMgmtPath;
- PostMethod post = new PostMethod(url);
- post.addParameter("operation", "failed-fetch");
- post.addParameter("dir", results.get(successfulNodeId));
- post.addParameter("store", storeName);
+ HttpPost post = new HttpPost(url);
+ HttpParams params = post.getParams();
+ params.setParameter("operation", "failed-fetch");
+ params.setParameter("dir", results.get(successfulNodeId));
+ params.setParameter("store", storeName);
logger.info("Deleting fetched data from node " + successfulNodeId);
- int responseCode = httpClient.executeMethod(post);
- String response = post.getStatusText();
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ String response = httpResponse.getStatusLine().getReasonPhrase();
if(responseCode == 200) {
logger.info("Deleted successfully on node " + successfulNodeId);
@@ -124,6 +145,8 @@ public String call() throws Exception {
} catch(Exception e) {
logger.error("Exception thrown during delete operation on node "
+ successfulNodeId + " : ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
}
}
}
@@ -148,47 +171,57 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
HashMap<Integer, Exception> exceptions = Maps.newHashMap();
for(final Node node: cluster.getNodes()) {
+ HttpResponse httpResponse = null;
try {
String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
- PostMethod post = new PostMethod(url);
- post.addParameter("operation", "swap");
+ HttpPost post = new HttpPost(url);
+ HttpParams params = post.getParams();
+ params.setParameter("operation", "swap");
String dir = fetchFiles.get(node.getId());
logger.info("Attempting swap for node " + node.getId() + " dir = " + dir);
- post.addParameter("dir", dir);
- post.addParameter("store", storeName);
+ params.setParameter("dir", dir);
+ params.setParameter("store", storeName);
- int responseCode = httpClient.executeMethod(post);
- String previousDir = post.getResponseBodyAsString(30000);
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ String previousDir = VoldemortIOUtils.toString(httpResponse.getEntity()
+ .getContent(), 30000);
- if(responseCode != 200)
+ if(responseCode != HttpURLConnection.HTTP_OK)
throw new VoldemortException("Swap request on node " + node.getId() + " ("
- + url + ") failed: " + post.getStatusText());
+ + url + ") failed: "
+ + httpResponse.getStatusLine().getReasonPhrase());
logger.info("Swap succeeded on node " + node.getId());
previousDirs.put(node.getId(), previousDir);
} catch(Exception e) {
exceptions.put(node.getId(), e);
logger.error("Exception thrown during swap operation on node " + node.getId()
+ ": ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse, node.toString());
}
}
if(!exceptions.isEmpty()) {
if(rollbackFailedSwap) {
// Rollback data on successful nodes
for(int successfulNodeId: previousDirs.keySet()) {
+ HttpResponse httpResponse = null;
try {
String url = cluster.getNodeById(successfulNodeId).getHttpUrl() + "/"
+ readOnlyMgmtPath;
- PostMethod post = new PostMethod(url);
- post.addParameter("operation", "rollback");
- post.addParameter("store", storeName);
- post.addParameter("pushVersion",
- Long.toString(ReadOnlyUtils.getVersionId(new File(previousDirs.get(successfulNodeId)))));
+ HttpPost post = new HttpPost(url);
+ HttpParams params = post.getParams();
+ params.setParameter("operation", "rollback");
+ params.setParameter("store", storeName);
+ params.setParameter("pushVersion",
+ Long.toString(ReadOnlyUtils.getVersionId(new File(previousDirs.get(successfulNodeId)))));
logger.info("Rolling back data on successful node " + successfulNodeId);
- int responseCode = httpClient.executeMethod(post);
- String response = post.getStatusText();
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ String response = httpResponse.getStatusLine().getReasonPhrase();
if(responseCode == 200) {
logger.info("Rollback succeeded for node " + successfulNodeId);
@@ -199,6 +232,8 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
logger.error("Exception thrown during rollback ( after swap ) operation on node "
+ successfulNodeId + " : ",
e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse);
}
}
}
@@ -219,17 +254,21 @@ public void invokeSwap(final String storeName, final List<String> fetchFiles) {
public void invokeRollback(String storeName, final long pushVersion) {
Exception exception = null;
for(Node node: cluster.getNodes()) {
+ HttpResponse httpResponse = null;
try {
logger.info("Attempting rollback for node " + node.getId() + " storeName = "
+ storeName);
String url = node.getHttpUrl() + "/" + readOnlyMgmtPath;
- PostMethod post = new PostMethod(url);
- post.addParameter("operation", "rollback");
- post.addParameter("store", storeName);
- post.addParameter("pushVersion", Long.toString(pushVersion));
+ HttpPost post = new HttpPost(url);
+ HttpParams params = post.getParams();
- int responseCode = httpClient.executeMethod(post);
- String response = post.getStatusText();
+ params.setParameter("operation", "rollback");
+ params.setParameter("store", storeName);
+ params.setParameter("pushVersion", Long.toString(pushVersion));
+
+ httpResponse = httpClient.execute(post);
+ int responseCode = httpResponse.getStatusLine().getStatusCode();
+ String response = httpResponse.getStatusLine().getReasonPhrase();
if(responseCode == 200) {
logger.info("Rollback succeeded for node " + node.getId());
} else {
@@ -239,10 +278,13 @@ public void invokeRollback(String storeName, final long pushVersion) {
exception = e;
logger.error("Exception thrown during rollback operation on node " + node.getId()
+ ": ", e);
+ } finally {
+ VoldemortIOUtils.closeQuietly(httpResponse, String.valueOf(node));
}
}
if(exception != null)
throw new VoldemortException(exception);
}
+
}
View
28 src/java/voldemort/store/readonly/swapper/StoreSwapper.java
@@ -11,18 +11,19 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.io.FileUtils;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.utils.CmdUtils;
import voldemort.utils.Time;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.xml.ClusterMapper;
import com.google.common.base.Joiner;
@@ -115,20 +116,22 @@ public static void main(String[] args) throws Exception {
StoreSwapper swapper = null;
AdminClient adminClient = null;
+ DefaultHttpClient httpClient = null;
if(useAdminServices) {
adminClient = new AdminClient(cluster, new AdminClientConfig());
swapper = new AdminStoreSwapper(cluster, executor, adminClient, timeoutMs);
} else {
- HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
-
int numConnections = cluster.getNumberOfNodes() + 3;
- manager.getParams().setMaxTotalConnections(numConnections);
- manager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
- numConnections);
- HttpClient client = new HttpClient(manager);
- client.getParams().setParameter("http.socket.timeout", timeoutMs);
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager();
+ httpClient = new DefaultHttpClient(connectionManager);
+
+ HttpParams clientParams = httpClient.getParams();
+
+ connectionManager.setMaxTotal(numConnections);
+ connectionManager.setDefaultMaxPerRoute(numConnections);
+ HttpConnectionParams.setSoTimeout(clientParams, timeoutMs);
- swapper = new HttpStoreSwapper(cluster, executor, client, mgmtPath);
+ swapper = new HttpStoreSwapper(cluster, executor, httpClient, mgmtPath);
}
try {
@@ -146,6 +149,7 @@ public static void main(String[] args) throws Exception {
adminClient.stop();
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
+ VoldemortIOUtils.closeQuietly(httpClient);
}
System.exit(0);
}
View
108 src/java/voldemort/utils/VoldemortIOUtils.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.io.Writer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.log4j.Logger;
+
+public class VoldemortIOUtils {
+
+ private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+
+ private static final Logger logger = Logger.getLogger(VoldemortIOUtils.class);
+
+ public static String toString(final InputStream input, final long limit) throws IOException {
+ return toString(input, null, limit);
+ }
+
+ public static void closeQuietly(final HttpResponse httpResponse, final String context) {
+ if(httpResponse != null && httpResponse.getEntity() != null) {
+ try {
+ IOUtils.closeQuietly(httpResponse.getEntity().getContent());
+ } catch(Exception e) {
+ logger.error("Error closing entity connection : " + context, e);
+ }
+ }
+ }
+
+ public static void closeQuietly(final HttpResponse httpResponse) {
+ closeQuietly(httpResponse, "");
+ }
+
+ public static void closeQuietly(HttpClient httpClient) {
+ if(httpClient != null) {
+ httpClient.getConnectionManager().shutdown();
+ }
+ }
+
+ public static String toString(final InputStream input, final String encoding, final long limit)
+ throws IOException {
+ StringWriter sw = new StringWriter();
+ copy(input, sw, encoding, limit);
+ return sw.toString();
+ }
+
+ public static void copy(InputStream input, Writer output, String encoding, final long limit)
+ throws IOException {
+ if(encoding == null) {
+ copy(input, output, limit);
+ } else {
+ InputStreamReader in = new InputStreamReader(input, encoding);
+ copy(in, output, limit);
+ }
+ }
+
+ public static void copy(InputStream input, Writer output, final long limit) throws IOException {
+ InputStreamReader in = new InputStreamReader(input);
+ copy(in, output, limit);
+ }
+
+ public static int copy(Reader input, Writer output, long limit) throws IOException {
+ long count = copyLarge(input, output, limit);
+ if(count > Integer.MAX_VALUE) {
+ return -1;
+ }
+ return (int) count;
+ }
+
+ public static long copyLarge(Reader input, Writer output, long limit) throws IOException {
+ char[] buffer = new char[DEFAULT_BUFFER_SIZE];
+ long count = 0;
+ int n = 0;
+ long remaining = limit;
+ while(remaining > 0) {
+ n = (remaining > DEFAULT_BUFFER_SIZE) ? input.read(buffer)
+ : input.read(buffer, 0, (int) remaining);
+ if(n == -1) {
+ break;
+ }
+ output.write(buffer, 0, n);
+ count += n;
+ remaining -= n;
+ }
+ return count;
+ }
+}
View
24 test/common/voldemort/ServerTestUtils.java
@@ -1,12 +1,12 @@
/*
* Copyright 2008-2009 LinkedIn, Inc
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -31,8 +31,9 @@
import java.util.Properties;
import java.util.Set;
-import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.io.FileUtils;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
@@ -81,8 +82,8 @@
/**
* Helper functions for testing with real server implementations
- *
- *
+ *
+ *
*/
public class ServerTestUtils {
@@ -210,11 +211,12 @@ public static Context getJettyServer(String clusterXml,
return context;
}
- public static HttpStore getHttpStore(String storeName, RequestFormatType format, int port) {
+ public static HttpStore getHttpStore(String storeName, RequestFormatType format, int port,
+ final HttpClient httpClient) {
return new HttpStore(storeName,
"localhost",
port,
- new HttpClient(),
+ httpClient,
new RequestFormatFactory().getRequestFormat(format),
false);
}
@@ -286,7 +288,7 @@ public static Cluster getLocalCluster(int numberOfNodes, int[] ports, int[][] pa
/**
* Update a cluster by replacing the specified server with a new host, i.e.
* new ports since they are all localhost
- *
+ *
* @param original The original cluster to be updated
* @param serverIds The ids of the server to be replaced with new hosts
* @return updated cluster
@@ -326,7 +328,7 @@ public static Cluster updateClusterWithNewHost(Cluster original, int... serverId
/**
* Returns a list of zones with their proximity list being in increasing
* order
- *
+ *
* @param numberOfZones The number of zones to return
* @return List of zones
*/
@@ -350,7 +352,7 @@ public static Cluster updateClusterWithNewHost(Cluster original, int... serverId
* Returns a cluster with <b>numberOfNodes</b> nodes in <b>numberOfZones</b>
* zones. It is important that <b>numberOfNodes</b> be divisible by
* <b>numberOfZones</b>
- *
+ *
* @param numberOfNodes Number of nodes in the cluster
* @param partitionsPerNode Number of partitions in one node
* @param numberOfZones Number of zones
View
84 test/integration/voldemort/performance/HttpClientBench.java
@@ -16,17 +16,23 @@
package voldemort.performance;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.HttpVersion;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.HttpClientParams;
-import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
-import org.apache.commons.httpclient.params.HttpMethodParams;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpVersion;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.params.CookiePolicy;
+import org.apache.http.client.params.HttpClientParams;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.conn.SchemeRegistryFactory;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+
+import voldemort.utils.VoldemortIOUtils;
public class HttpClientBench {
@@ -52,46 +58,48 @@ public static void main(String[] args) throws Exception {
@Override
public void doOperation(int index) {
- GetMethod get = new GetMethod(url);
+ HttpResponse response = null;
try {
- client.executeMethod(get);
- get.getResponseBody();
+ HttpGet get = new HttpGet(url);
+ response = client.execute(get);
+ response.getEntity().consumeContent();
} catch(Exception e) {
e.printStackTrace();
} finally {
- get.releaseConnection();
+ VoldemortIOUtils.closeQuietly(response);
}
}
};
perfTest.run(numRequests, numThreads);
perfTest.printStats();
-
+ VoldemortIOUtils.closeQuietly(client);
System.exit(1);
}
private static HttpClient createClient() {
- HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
- HttpClient httpClient = new HttpClient(connectionManager);
- HttpClientParams clientParams = httpClient.getParams();
- clientParams.setConnectionManagerTimeout(DEFAULT_CONNECTION_MANAGER_TIMEOUT);
- clientParams.setSoTimeout(500);
- clientParams.setParameter(HttpMethodParams.RETRY_HANDLER,
- new DefaultHttpMethodRetryHandler(0, false));
- clientParams.setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
- clientParams.setBooleanParameter("http.tcp.nodelay", false);
- clientParams.setIntParameter("http.socket.receivebuffer", 60000);
- clientParams.setParameter("http.useragent", VOLDEMORT_USER_AGENT);
- HostConfiguration hostConfig = new HostConfiguration();
- hostConfig.setHost("localhost");
- hostConfig.getParams().setParameter("http.protocol.version", HttpVersion.HTTP_1_1);
- httpClient.setHostConfiguration(hostConfig);
- HttpConnectionManagerParams managerParams = httpClient.getHttpConnectionManager()
- .getParams();
- managerParams.setConnectionTimeout(DEFAULT_CONNECTION_MANAGER_TIMEOUT);
- managerParams.setMaxTotalConnections(DEFAULT_MAX_CONNECTIONS);
- managerParams.setMaxConnectionsPerHost(httpClient.getHostConfiguration(),
- DEFAULT_MAX_HOST_CONNECTIONS);
- managerParams.setStaleCheckingEnabled(false);
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager(SchemeRegistryFactory.createDefault(),
+ DEFAULT_CONNECTION_MANAGER_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+
+ DefaultHttpClient httpClient = new DefaultHttpClient(connectionManager);
+
+ HttpParams clientParams = httpClient.getParams();
+
+ HttpConnectionParams.setSocketBufferSize(clientParams, 60000);
+ HttpConnectionParams.setTcpNoDelay(clientParams, false);
+ HttpProtocolParams.setUserAgent(clientParams, VOLDEMORT_USER_AGENT);
+ HttpProtocolParams.setVersion(clientParams, HttpVersion.HTTP_1_1);
+ // HostConfiguration hostConfig = new HostConfiguration();
+ // hostConfig.setHost("localhost");
+
+ HttpConnectionParams.setConnectionTimeout(clientParams, DEFAULT_CONNECTION_MANAGER_TIMEOUT);
+ HttpConnectionParams.setSoTimeout(clientParams, 500);
+ httpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(0, false));
+ HttpClientParams.setCookiePolicy(clientParams, CookiePolicy.IGNORE_COOKIES);
+
+ connectionManager.setMaxTotal(DEFAULT_MAX_CONNECTIONS);
+ connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_HOST_CONNECTIONS);
+ HttpConnectionParams.setStaleCheckingEnabled(clientParams, false);
return httpClient;
}
View
59 test/integration/voldemort/performance/RemoteStoreComparisonTest.java
@@ -16,15 +16,18 @@
package voldemort.performance;
-import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpVersion;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.params.HttpClientParams;
-import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
-import org.apache.commons.httpclient.params.HttpMethodParams;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpVersion;
+import org.apache.http.client.params.CookiePolicy;
+import org.apache.http.client.params.HttpClientParams;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.conn.SchemeRegistryFactory;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
@@ -42,6 +45,7 @@
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.ByteArray;
import voldemort.utils.Utils;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.versioning.Versioned;
public class RemoteStoreComparisonTest {
@@ -154,21 +158,25 @@ public void doOperation(int i) {
numThreads,
8080);
httpService.start();
- HttpClient httpClient = new HttpClient(new MultiThreadedHttpConnectionManager());
- HttpClientParams clientParams = httpClient.getParams();
- clientParams.setParameter(HttpMethodParams.RETRY_HANDLER,
- new DefaultHttpMethodRetryHandler(0, false));
- clientParams.setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
- clientParams.setParameter("http.useragent", "test-agent");
- HostConfiguration hostConfig = new HostConfiguration();
- hostConfig.getParams().setParameter("http.protocol.version", HttpVersion.HTTP_1_1);
- httpClient.setHostConfiguration(hostConfig);
- HttpConnectionManagerParams managerParams = httpClient.getHttpConnectionManager()
- .getParams();
- managerParams.setConnectionTimeout(10000);
- managerParams.setMaxTotalConnections(numThreads);
- managerParams.setStaleCheckingEnabled(false);
- managerParams.setMaxConnectionsPerHost(httpClient.getHostConfiguration(), numThreads);
+
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager(SchemeRegistryFactory.createDefault(),
+ 10000,
+ TimeUnit.MILLISECONDS);
+
+ DefaultHttpClient httpClient = new DefaultHttpClient(connectionManager);
+
+ HttpParams clientParams = httpClient.getParams();
+ httpClient.setHttpRequestRetryHandler(new DefaultHttpRequestRetryHandler(0, false));
+ HttpClientParams.setCookiePolicy(clientParams, CookiePolicy.IGNORE_COOKIES);
+ HttpProtocolParams.setUserAgent(clientParams, "test-agent");
+ HttpProtocolParams.setVersion(clientParams, HttpVersion.HTTP_1_1);
+
+ HttpConnectionParams.setConnectionTimeout(clientParams, 10000);
+
+ connectionManager.setMaxTotal(numThreads);
+ connectionManager.setDefaultMaxPerRoute(numThreads);
+ HttpConnectionParams.setStaleCheckingEnabled(clientParams, false);
+
final HttpStore httpStore = new HttpStore("test",
"localhost",
8080,
@@ -203,5 +211,8 @@ public void doOperation(int i) {
httpReadTest.printStats();
httpService.stop();
+
+ VoldemortIOUtils.closeQuietly(httpClient);
+
}
}
View
12 test/unit/voldemort/client/HttpStoreClientFactoryTest.java
@@ -16,6 +16,8 @@
package voldemort.client;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.junit.After;
import org.junit.Before;
import org.mortbay.jetty.Server;
@@ -25,16 +27,18 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.serialization.SerializerFactory;
import voldemort.store.http.HttpStore;
+import voldemort.utils.VoldemortIOUtils;
/**
- *
+ *
*/
public class HttpStoreClientFactoryTest extends AbstractStoreClientFactoryTest {
private HttpStore httpStore;
private Server server;
private Context context;
private String url;
+ private HttpClient httpClient;
@Override
@Before
@@ -46,9 +50,11 @@ public void setUp() throws Exception {
RequestFormatType.VOLDEMORT_V1,
getLocalNode().getHttpPort());
server = context.getServer();
+ httpClient = new DefaultHttpClient();
httpStore = ServerTestUtils.getHttpStore(getValidStoreName(),
RequestFormatType.VOLDEMORT_V1,
- getLocalNode().getHttpPort());
+ getLocalNode().getHttpPort(),
+ httpClient);
url = getLocalNode().getHttpUrl().toString();
}
@@ -58,6 +64,8 @@ public void tearDown() throws Exception {
httpStore.close();
server.stop();
context.destroy();
+ VoldemortIOUtils.closeQuietly(httpClient);
+
}
@Override
View
28 test/unit/voldemort/store/http/HttpStoreTest.java
@@ -16,7 +16,14 @@
package voldemort.store.http;
-import org.apache.commons.httpclient.HttpClient;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.SchemeRegistryFactory;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
@@ -31,6 +38,7 @@
import voldemort.store.Store;
import voldemort.store.UnreachableStoreException;
import voldemort.utils.ByteArray;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
import voldemort.xml.ClusterMapper;
@@ -45,6 +53,7 @@
private HttpStore httpStore;
private Server server;
private Context context;
+ private HttpClient httpClient;
@Override
public void setUp() throws Exception {
@@ -59,14 +68,22 @@ public void setUp() throws Exception {
server = context.getServer();
httpStore = ServerTestUtils.getHttpStore("users",
RequestFormatType.VOLDEMORT_V1,
- node.getHttpPort());
+ node.getHttpPort(),
+ httpClient);
}
public <T extends Exception> void testBadUrlOrPort(String url, int port, Class<T> expected) {
ByteArray key = new ByteArray("test".getBytes());
RequestFormat requestFormat = new RequestFormatFactory().getRequestFormat(RequestFormatType.VOLDEMORT_V1);
- HttpClient client = new HttpClient();
- client.getHttpConnectionManager().getParams().setConnectionTimeout(5000);
+
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager(SchemeRegistryFactory.createDefault(),
+ 5000,
+ TimeUnit.MILLISECONDS);
+
+ DefaultHttpClient client = new DefaultHttpClient(connectionManager);
+ HttpParams clientParams = client.getParams();
+ HttpConnectionParams.setConnectionTimeout(clientParams, 5000);
+
HttpStore badUrlHttpStore = new HttpStore("test", url, port, client, requestFormat, false);
try {
badUrlHttpStore.put(key,
@@ -85,6 +102,8 @@ public void setUp() throws Exception {
} catch(Exception e) {
assertTrue(e.getClass().equals(expected));
}
+
+ client.getConnectionManager().shutdown();
}
public void testBadUrl() {
@@ -105,6 +124,7 @@ public void tearDown() throws Exception {
httpStore.close();
server.stop();
context.destroy();
+ VoldemortIOUtils.closeQuietly(httpClient);
}
@Override
View
38 test/unit/voldemort/store/readonly/swapper/StoreSwapperTest.java
@@ -27,10 +27,8 @@
import junit.framework.TestCase;
-import org.apache.commons.httpclient.HostConfiguration;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpConnectionManager;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -52,12 +50,14 @@
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.Utils;
+import voldemort.utils.VoldemortIOUtils;
import voldemort.xml.StoreDefinitionsMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
+*
*/
public class StoreSwapperTest extends TestCase {
@@ -152,14 +152,16 @@ public void testAdminStoreSwapper() throws Exception {
@Test
public void testHttpStoreSwapper() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
-
+ DefaultHttpClient client = null;
try {
// Use the http store swapper
- HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
- manager.getParams().setMaxTotalConnections(10);
- manager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
- 10);
- HttpClient client = new HttpClient(manager);
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager();
+
+ connectionManager.setMaxTotal(10);
+ connectionManager.setDefaultMaxPerRoute(10);
+
+ client = new DefaultHttpClient(connectionManager);
+
StoreSwapper swapper = new HttpStoreSwapper(cluster,
executor,
client,
@@ -169,6 +171,7 @@ public void testHttpStoreSwapper() throws Exception {
testFetchSwap(swapper);
} finally {
executor.shutdown();
+ VoldemortIOUtils.closeQuietly(client);
}
}
@@ -193,14 +196,16 @@ public void testAdminStoreSwapperWithoutRollback() throws Exception {
@Test
public void testHttpStoreSwapperWithoutRollback() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
-
+ DefaultHttpClient client = null;
try {
// Use the http store swapper
- HttpConnectionManager manager = new MultiThreadedHttpConnectionManager();
- manager.getParams().setMaxTotalConnections(10);
- manager.getParams().setMaxConnectionsPerHost(HostConfiguration.ANY_HOST_CONFIGURATION,
- 10);
- HttpClient client = new HttpClient(manager);
+
+ ThreadSafeClientConnManager connectionManager = new ThreadSafeClientConnManager();
+
+ connectionManager.setMaxTotal(10);
+ connectionManager.setDefaultMaxPerRoute(10);
+
+ client = new DefaultHttpClient(connectionManager);
StoreSwapper swapper = new HttpStoreSwapper(cluster,
executor,
client,
@@ -210,6 +215,7 @@ public void testHttpStoreSwapperWithoutRollback() throws Exception {
testFetchSwapWithoutRollback(swapper);
} finally {
executor.shutdown();
+ VoldemortIOUtils.closeQuietly(client);
}
}
View
52 test/unit/voldemort/utils/VoldemortIOUtilsTest.java
@@ -0,0 +1,52 @@
+package voldemort.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpVersion;
+import org.apache.http.message.BasicHttpResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class VoldemortIOUtilsTest {
+
+ @Test
+ public void testToString() throws IOException {
+ // some input file which is > 30K;
+ final int upperBound = 30000;
+ InputStream is = getClass().getResourceAsStream("Xtranslcl.c.input");
+ String str = IOUtils.toString(is);
+ Assert.assertTrue(str.length() > 0);
+ Assert.assertTrue(str.length() > upperBound);
+
+ InputStream is2 = getClass().getResourceAsStream("Xtranslcl.c.input");
+ String str2 = VoldemortIOUtils.toString(is2, upperBound);
+ Assert.assertEquals(upperBound, str2.length());
+ }
+
+ @Test
+ public void testToStringSmall() throws IOException {
+ final int upperBound = 30000;
+ InputStream is = getClass().getResourceAsStream("maze.c.input");
+ String str2 = VoldemortIOUtils.toString(is, upperBound);
+ Assert.assertTrue(str2.length() <= upperBound);
+ }
+
+ @Test
+ public void testCloseQuietlyNullHttpResponse() {
+ VoldemortIOUtils.closeQuietly((HttpResponse) null);
+ }
+
+ @Test
+ public void testCloseQuietlyNullEntity() {
+ HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1,
+ HttpURLConnection.HTTP_OK,
+ "");
+ response.setEntity(null);
+ VoldemortIOUtils.closeQuietly(response);
+ }
+
+}
View
2,562 test/unit/voldemort/utils/Xtranslcl.c.input
@@ -0,0 +1,2562 @@
+/*
+
+Copyright 1993, 1994, 1998 The Open Group
+
+Permission to use, copy, modify, distribute, and sell this software and its
+documentation for any purpose is hereby granted without fee, provided that
+the above copyright notice appear in all copies and that both that
+copyright notice and this permission notice appear in supporting
+documentation.
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+IN NO EVENT SHALL THE OPEN GROUP BE LIABLE FOR ANY CLAIM, DAMAGES OR
+OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+Except as contained in this notice, the name of The Open Group shall
+not be used in advertising or otherwise to promote the sale, use or
+other dealings in this Software without prior written authorization
+from The Open Group.
+
+ * Copyright 1993, 1994 NCR Corporation - Dayton, Ohio, USA
+ *
+ * All Rights Reserved
+ *
+ * Permission to use, copy, modify, and distribute this software and its
+ * documentation for any purpose and without fee is hereby granted, provided
+ * that the above copyright notice appear in all copies and that both that
+ * copyright notice and this permission notice appear in supporting
+ * documentation, and that the name NCR not be used in advertising
+ * or publicity pertaining to distribution of the software without specific,
+ * written prior permission. NCR makes no representations about the
+ * suitability of this software for any purpose. It is provided "as is"
+ * without express or implied warranty.
+ *
+ * NCR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
+ * INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
+ * NO EVENT SHALL NCR BE LIABLE FOR ANY SPECIAL, INDIRECT OR
+ * CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
+ * OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+ * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+ * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/*
+ *
+ * The connection code/ideas in lib/X and server/os for SVR4/Intel
+ * environments was contributed by the following companies/groups:
+ *
+ * MetroLink Inc
+ * NCR
+ * Pittsburgh Powercomputing Corporation (PPc)/Quarterdeck Office Systems
+ * SGCS
+ * Unix System Laboratories (USL) / Novell
+ * XFree86
+ *
+ * The goal is to have common connection code among all SVR4/Intel vendors.
+ *
+ * ALL THE ABOVE COMPANIES DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS
+ * SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS,
+ * IN NO EVENT SHALL THESE COMPANIES * BE LIABLE FOR ANY SPECIAL, INDIRECT
+ * OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
+ * OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+ * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE
+ * OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <errno.h>
+#include <ctype.h>
+#include <sys/signal.h>
+#include <sys/ioctl.h>
+#include <sys/stat.h>
+#if defined(SVR4) || defined(__SVR4)
+#include <sys/filio.h>
+#endif
+#ifdef sun
+# include <stropts.h>
+#else
+# include <sys/stropts.h>
+#endif
+#include <sys/wait.h>
+#include <sys/types.h>
+
+/*
+ * The local transports should be treated the same as a UNIX domain socket
+ * wrt authentication, etc. Because of this, we will use struct sockaddr_un
+ * for the address format. This will simplify the code in other places like
+ * The X Server.
+ */
+
+#include <sys/socket.h>
+#ifndef X_NO_SYS_UN
+#include <sys/un.h>
+#endif
+
+
+/* Types of local connections supported:
+ * - PTS
+ * - named pipes
+ * - SCO
+ */
+#if !defined(sun)
+# define LOCAL_TRANS_PTS
+#endif
+#if defined(SVR4) || defined(__SVR4)
+# define LOCAL_TRANS_NAMED
+#endif
+#if defined(__SCO__) || defined(__UNIXWARE__)
+# define LOCAL_TRANS_SCO
+#endif
+
+static int TRANS(LocalClose)(XtransConnInfo ciptr);
+
+/*
+ * These functions actually implement the local connection mechanisms.
+ */
+
+/* Type Not Supported */
+
+static int
+TRANS(OpenFail)(XtransConnInfo ciptr, char *port)
+
+{
+ return -1;
+}
+
+#ifdef TRANS_REOPEN
+
+static int
+TRANS(ReopenFail)(XtransConnInfo ciptr, int fd, char *port)
+
+{
+ return 0;
+}
+
+#endif /* TRANS_REOPEN */
+
+
+
+static int
+TRANS(FillAddrInfo)(XtransConnInfo ciptr, char *sun_path, char *peer_sun_path)
+
+{
+ struct sockaddr_un *sunaddr;
+ struct sockaddr_un *p_sunaddr;
+
+ ciptr->family = AF_UNIX;
+ ciptr->addrlen = sizeof (struct sockaddr_un);
+
+ if ((sunaddr = (struct sockaddr_un *) xalloc (ciptr->addrlen)) == NULL)
+ {
+ PRMSG(1,"FillAddrInfo: failed to allocate memory for addr\n", 0, 0, 0);
+ return 0;
+ }
+
+ sunaddr->sun_family = AF_UNIX;
+
+ if (strlen(sun_path) > sizeof(sunaddr->sun_path) - 1) {
+ PRMSG(1, "FillAddrInfo: path too long\n", 0, 0, 0);
+ xfree((char *) sunaddr);
+ return 0;
+ }
+ strcpy (sunaddr->sun_path, sun_path);
+#if defined(BSD44SOCKETS)
+ sunaddr->sun_len = strlen (sunaddr->sun_path);
+#endif
+
+ ciptr->addr = (char *) sunaddr;
+
+ ciptr->peeraddrlen = sizeof (struct sockaddr_un);
+
+ if ((p_sunaddr = (struct sockaddr_un *) xalloc (
+ ciptr->peeraddrlen)) == NULL)
+ {
+ PRMSG(1,
+ "FillAddrInfo: failed to allocate memory for peer addr\n",
+ 0,0,0);
+ xfree ((char *) sunaddr);
+ ciptr->addr = NULL;
+
+ return 0;
+ }
+
+ p_sunaddr->sun_family = AF_UNIX;
+
+ if (strlen(peer_sun_path) > sizeof(p_sunaddr->sun_path) - 1) {
+ PRMSG(1, "FillAddrInfo: peer path too long\n", 0, 0, 0);
+ xfree((char *) p_sunaddr);
+ return 0;
+ }
+ strcpy (p_sunaddr->sun_path, peer_sun_path);
+#if defined(BSD44SOCKETS)
+ p_sunaddr->sun_len = strlen (p_sunaddr->sun_path);
+#endif
+
+ ciptr->peeraddr = (char *) p_sunaddr;
+
+ return 1;
+}
+
+
+
+#ifdef LOCAL_TRANS_PTS
+/* PTS */
+
+#if defined(SYSV) && !defined(__SCO__)
+#define SIGNAL_T int
+#else
+#define SIGNAL_T void
+#endif /* SYSV */
+
+typedef SIGNAL_T (*PFV)();
+
+extern PFV signal();
+
+extern char *ptsname(
+ int
+);
+
+static void _dummy(int sig)
+
+{
+}
+#endif /* LOCAL_TRANS_PTS */
+
+#ifndef sun
+#define X_STREAMS_DIR "/dev/X"
+#define DEV_SPX "/dev/spx"
+#else
+#ifndef X11_t
+#define X_STREAMS_DIR "/dev/X"
+#else
+#define X_STREAMS_DIR "/tmp/.X11-pipe"
+#endif
+#endif
+
+#define DEV_PTMX "/dev/ptmx"
+
+#if defined(X11_t)
+
+#define PTSNODENAME "/dev/X/server."
+#ifdef sun
+#define NAMEDNODENAME "/tmp/.X11-pipe/X"
+#else
+#define NAMEDNODENAME "/dev/X/Nserver."
+
+#define SCORNODENAME "/dev/X%1sR"
+#define SCOSNODENAME "/dev/X%1sS"
+#endif /* !sun */
+#endif
+#if defined(XIM_t)
+#ifdef sun
+#define NAMEDNODENAME "/tmp/.XIM-pipe/XIM"
+#else
+#define PTSNODENAME "/dev/X/XIM."
+#define NAMEDNODENAME "/dev/X/NXIM."
+#define SCORNODENAME "/dev/XIM.%sR"
+#define SCOSNODENAME "/dev/XIM.%sS"
+#endif
+#endif
+#if defined(FS_t) || defined (FONT_t)
+#ifdef sun
+#define NAMEDNODENAME "/tmp/.font-pipe/fs"
+#else
+/*
+ * USL has already defined something here. We need to check with them
+ * and see if their choice is usable here.
+ */
+#define PTSNODENAME "/dev/X/fontserver."
+#define NAMEDNODENAME "/dev/X/Nfontserver."
+#define SCORNODENAME "/dev/fontserver.%sR"
+#define SCOSNODENAME "/dev/fontserver.%sS"
+#endif
+#endif
+#if defined(ICE_t)
+#ifdef sun
+#define NAMEDNODENAME "/tmp/.ICE-pipe/"
+#else
+#define PTSNODENAME "/dev/X/ICE."
+#define NAMEDNODENAME "/dev/X/NICE."
+#define SCORNODENAME "/dev/ICE.%sR"
+#define SCOSNODENAME "/dev/ICE.%sS"
+#endif
+#endif
+#if defined(TEST_t)
+#ifdef sun
+#define NAMEDNODENAME "/tmp/.Test-unix/test"
+#endif
+#define PTSNODENAME "/dev/X/transtest."
+#define NAMEDNODENAME "/dev/X/Ntranstest."
+#define SCORNODENAME "/dev/transtest.%sR"
+#define SCOSNODENAME "/dev/transtest.%sS"
+#endif
+
+
+
+#ifdef LOCAL_TRANS_PTS
+#ifdef TRANS_CLIENT
+
+static int
+TRANS(PTSOpenClient)(XtransConnInfo ciptr, char *port)
+
+{
+#ifdef PTSNODENAME
+ int fd,server,exitval,alarm_time,ret;
+ char server_path[64];
+ char *slave, namelen;
+ char buf[20]; /* MAX_PATH_LEN?? */
+ PFV savef;
+ pid_t saved_pid;
+#endif
+
+ PRMSG(2,"PTSOpenClient(%s)\n", port, 0,0 );
+
+#if !defined(PTSNODENAME)
+ PRMSG(1,"PTSOpenClient: Protocol is not supported by a pts connection\n", 0,0,0);
+ return -1;
+#else
+ if (port && *port ) {
+ if( *port == '/' ) { /* A full pathname */
+ (void) sprintf(server_path, "%s", port);
+ } else {
+ (void) sprintf(server_path, "%s%s", PTSNODENAME, port);
+ }
+ } else {
+ (void) sprintf(server_path, "%s%d", PTSNODENAME, getpid());
+ }
+
+
+ /*
+ * Open the node the on which the server is listening.
+ */
+
+ if ((server = open (server_path, O_RDWR)) < 0) {
+ PRMSG(1,"PTSOpenClient: failed to open %s\n", server_path, 0,0);
+ return -1;
+ }
+
+
+ /*
+ * Open the streams based pipe that will be this connection.
+ */
+
+ if ((fd = open(DEV_PTMX, O_RDWR)) < 0) {
+ PRMSG(1,"PTSOpenClient: failed to open %s\n", DEV_PTMX, 0,0);
+ close(server);
+ return(-1);
+ }
+
+ (void) grantpt(fd);
+ (void) unlockpt(fd);
+
+ slave = ptsname(fd); /* get name */
+
+ if( slave == NULL ) {
+ PRMSG(1,"PTSOpenClient: failed to get ptsname()\n", 0,0,0);
+ close(fd);
+ close(server);
+ return -1;
+ }
+
+ /*
+ * This is neccesary for the case where a program is setuid to non-root.
+ * grantpt() calls /usr/lib/pt_chmod which is set-uid root. This program will
+ * set the owner of the pt device incorrectly if the uid is not restored
+ * before it is called. The problem is that once it gets restored, it
+ * cannot be changed back to its original condition, hence the fork().
+ */
+
+ if(!(saved_pid=fork())) {
+ uid_t saved_euid;
+
+ saved_euid = geteuid();
+ /** sets the euid to the actual/real uid **/
+ if (setuid( getuid() ) == -1) {
+ exit(1);
+ }
+ if( chown( slave, saved_euid, -1 ) < 0 ) {
+ exit( 1 );
+ }
+
+ exit( 0 );
+ }
+
+ waitpid(saved_pid, &exitval, 0);
+ if (WIFEXITED(exitval) && WEXITSTATUS(exitval) != 0) {
+ close(fd);
+ close(server);
+ PRMSG(1, "PTSOpenClient: cannot set the owner of %s\n",
+ slave, 0, 0);
+ return(-1);
+ }
+ if (chmod(slave, 0666) < 0) {
+ close(fd);
+ close(server);
+ PRMSG(1,"PTSOpenClient: Cannot chmod %s\n", slave, 0,0);
+ return(-1);
+ }
+
+ /*
+ * write slave name to server
+ */
+
+ namelen = strlen(slave);
+ buf[0] = namelen;
+ (void) sprintf(&buf[1], slave);
+ (void) write(server, buf, namelen+1);
+ (void) close(server);
+
+ /*
+ * wait for server to respond
+ */
+
+ savef = signal(SIGALRM, _dummy);
+ alarm_time = alarm (30); /* CONNECT_TIMEOUT */
+
+ ret = read(fd, buf, 1);
+
+ (void) alarm(alarm_time);
+ (void) signal(SIGALRM, savef);
+
+ if (ret != 1) {
+ PRMSG(1,
+ "PTSOpenClient: failed to get acknoledgement from server\n", 0, 0, 0);
+ (void) close(fd);
+ fd = -1;
+ }
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+
+ if (TRANS(FillAddrInfo) (ciptr, slave, server_path) == 0)
+ {
+ PRMSG(1,"PTSOpenClient: failed to fill in addr info\n", 0, 0, 0);
+ close(fd);
+ return -1;
+ }
+
+ return(fd);
+
+#endif /* !PTSNODENAME */
+}
+
+#endif /* TRANS_CLIENT */
+
+
+#ifdef TRANS_SERVER
+
+static int
+TRANS(PTSOpenServer)(XtransConnInfo ciptr, char *port)
+
+{
+#ifdef PTSNODENAME
+ int fd, server;
+ char server_path[64], *slave;
+ int mode;
+#endif
+
+ PRMSG(2,"PTSOpenServer(%s)\n", port, 0,0 );
+
+#if !defined(PTSNODENAME)
+ PRMSG(1,"PTSOpenServer: Protocol is not supported by a pts connection\n", 0,0,0);
+ return -1;
+#else
+ if (port && *port ) {
+ if( *port == '/' ) { /* A full pathname */
+ (void) sprintf(server_path, "%s", port);
+ } else {
+ (void) sprintf(server_path, "%s%s", PTSNODENAME, port);
+ }
+ } else {
+ (void) sprintf(server_path, "%s%d", PTSNODENAME, getpid());
+ }
+
+#ifdef HAS_STICKY_DIR_BIT
+ mode = 01777;
+#else
+ mode = 0777;
+#endif
+ if (trans_mkdir(X_STREAMS_DIR, mode) == -1) {
+ PRMSG (1, "PTSOpenServer: mkdir(%s) failed, errno = %d\n",
+ X_STREAMS_DIR, errno, 0);
+ return(-1);
+ }
+
+#if 0
+ if( (fd=open(server_path, O_RDWR)) >= 0 ) {
+ /*
+ * This doesn't prevent the server from starting up, and doesn't
+ * prevent clients from trying to connect to the in-use PTS (which
+ * is often in use by something other than another server).
+ */
+ PRMSG(1, "PTSOpenServer: A server is already running on port %s\n", port, 0,0 );
+ PRMSG(1, "PTSOpenServer: Remove %s if this is incorrect.\n", server_path, 0,0 );
+ close(fd);
+ return(-1);
+ }
+#else
+ /* Just remove the old path (which is what happens with UNIXCONN) */
+#endif
+
+ unlink(server_path);
+
+ if( (fd=open(DEV_PTMX, O_RDWR)) < 0) {
+ PRMSG(1, "PTSOpenServer: Unable to open %s\n", DEV_PTMX, 0,0 );
+ return(-1);
+ }
+
+ grantpt(fd);
+ unlockpt(fd);
+
+ if( (slave=ptsname(fd)) == NULL) {
+ PRMSG(1, "PTSOpenServer: Unable to get slave device name\n", 0,0,0 );
+ close(fd);
+ return(-1);
+ }
+
+ if( link(slave,server_path) < 0 ) {
+ PRMSG(1, "PTSOpenServer: Unable to link %s to %s\n", slave, server_path,0 );
+ close(fd);
+ return(-1);
+ }
+
+ if( chmod(server_path, 0666) < 0 ) {
+ PRMSG(1, "PTSOpenServer: Unable to chmod %s to 0666\n", server_path,0,0 );
+ close(fd);
+ return(-1);
+ }
+
+ if( (server=open(server_path, O_RDWR)) < 0 ) {
+ PRMSG(1, "PTSOpenServer: Unable to open server device %s\n", server_path,0,0 );
+ close(fd);
+ return(-1);
+ }
+
+ close(server);
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+
+ if (TRANS(FillAddrInfo) (ciptr, server_path, server_path) == 0)
+ {
+ PRMSG(1,"PTSOpenServer: failed to fill in addr info\n", 0, 0, 0);
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+
+#endif /* !PTSNODENAME */
+}
+
+static int
+TRANS(PTSAccept)(XtransConnInfo ciptr, XtransConnInfo newciptr, int *status)
+
+{
+ int newfd;
+ int in;
+ unsigned char length;
+ char buf[256];
+ struct sockaddr_un *sunaddr;
+
+ PRMSG(2,"PTSAccept(%x->%d)\n",ciptr,ciptr->fd,0);
+
+ if( (in=read(ciptr->fd,&length,1)) <= 0 ){
+ if( !in ) {
+ PRMSG(2,
+ "PTSAccept: Incoming connection closed\n",0,0,0);
+ }
+ else {
+ PRMSG(1,
+ "PTSAccept: Error reading incoming connection. errno=%d \n",
+ errno,0,0);
+ }
+ *status = TRANS_ACCEPT_MISC_ERROR;
+ return -1;
+ }
+
+ if( (in=read(ciptr->fd,buf,length)) <= 0 ){
+ if( !in ) {
+ PRMSG(2,
+ "PTSAccept: Incoming connection closed\n",0,0,0);
+ }
+ else {
+ PRMSG(1,
+"PTSAccept: Error reading device name for new connection. errno=%d \n",
+ errno,0,0);
+ }
+ *status = TRANS_ACCEPT_MISC_ERROR;
+ return -1;
+ }
+
+ buf[length] = '\0';
+
+ if( (newfd=open(buf,O_RDWR)) < 0 ) {
+ PRMSG(1, "PTSAccept: Failed to open %s\n",buf,0,0);
+ *status = TRANS_ACCEPT_MISC_ERROR;
+ return -1;
+ }
+
+ write(newfd,"1",1);
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+
+ newciptr->addrlen=ciptr->addrlen;
+ if( (newciptr->addr=(char *)xalloc(newciptr->addrlen)) == NULL ) {
+ PRMSG(1,"PTSAccept: failed to allocate memory for peer addr\n",
+ 0,0,0);
+ close(newfd);
+ *status = TRANS_ACCEPT_BAD_MALLOC;
+ return -1;
+ }
+
+ memcpy(newciptr->addr,ciptr->addr,newciptr->addrlen);
+
+ newciptr->peeraddrlen=sizeof(struct sockaddr_un);
+ if( (sunaddr=(struct sockaddr_un *)xalloc(newciptr->peeraddrlen)) == NULL ) {
+ PRMSG(1,"PTSAccept: failed to allocate memory for peer addr\n",
+ 0,0,0);
+ xfree(newciptr->addr);
+ close(newfd);
+ *status = TRANS_ACCEPT_BAD_MALLOC;
+ return -1;
+ }
+
+ sunaddr->sun_family=AF_UNIX;
+ strcpy(sunaddr->sun_path,buf);
+#if defined(BSD44SOCKETS)
+ sunaddr->sun_len=strlen(sunaddr->sun_path);
+#endif
+
+ newciptr->peeraddr=(char *)sunaddr;
+
+ *status = 0;
+
+ return newfd;
+}
+
+#endif /* TRANS_SERVER */
+#endif /* LOCAL_TRANS_PTS */
+
+
+#ifdef LOCAL_TRANS_NAMED
+
+/* NAMED */
+
+#ifdef TRANS_CLIENT
+
+static int
+TRANS(NAMEDOpenClient)(XtransConnInfo ciptr, char *port)
+
+{
+#ifdef NAMEDNODENAME
+ int fd;
+ char server_path[64];
+ struct stat filestat;
+# ifndef sun
+ extern int isastream(int);
+# endif
+#endif
+
+ PRMSG(2,"NAMEDOpenClient(%s)\n", port, 0,0 );
+
+#if !defined(NAMEDNODENAME)
+ PRMSG(1,"NAMEDOpenClient: Protocol is not supported by a NAMED connection\n", 0,0,0);
+ return -1;
+#else
+ if ( port && *port ) {
+ if( *port == '/' ) { /* A full pathname */
+ (void) snprintf(server_path, sizeof(server_path), "%s", port);
+ } else {
+ (void) snprintf(server_path, sizeof(server_path), "%s%s", NAMEDNODENAME, port);
+ }
+ } else {
+ (void) snprintf(server_path, sizeof(server_path), "%s%ld", NAMEDNODENAME, (long)getpid());
+ }
+
+ if ((fd = open(server_path, O_RDWR)) < 0) {
+ PRMSG(1,"NAMEDOpenClient: Cannot open %s for NAMED connection\n", server_path, 0,0 );
+ return -1;
+ }
+
+ if (fstat(fd, &filestat) < 0 ) {
+ PRMSG(1,"NAMEDOpenClient: Cannot stat %s for NAMED connection\n", server_path, 0,0 );
+ (void) close(fd);
+ return -1;
+ }
+
+ if ((filestat.st_mode & S_IFMT) != S_IFIFO) {
+ PRMSG(1,"NAMEDOpenClient: Device %s is not a FIFO\n", server_path, 0,0 );
+ /* Is this really a failure? */
+ (void) close(fd);
+ return -1;
+ }
+
+
+ if (isastream(fd) <= 0) {
+ PRMSG(1,"NAMEDOpenClient: %s is not a streams device\n", server_path, 0,0 );
+ (void) close(fd);
+ return -1;
+ }
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+
+ if (TRANS(FillAddrInfo) (ciptr, server_path, server_path) == 0)
+ {
+ PRMSG(1,"NAMEDOpenClient: failed to fill in addr info\n",
+ 0,0,0);
+ close(fd);
+ return -1;
+ }
+
+ return(fd);
+
+#endif /* !NAMEDNODENAME */
+}
+
+#endif /* TRANS_CLIENT */
+
+
+#ifdef TRANS_SERVER
+
+
+#ifdef NAMEDNODENAME
+static int
+TRANS(NAMEDOpenPipe)(const char *server_path)
+{
+ PRMSG(2,"NAMEDOpenPipe(%s)\n", server_path, 0,0 );
+
+ int fd, pipefd[2];
+ struct stat sbuf;
+ int mode;
+
+#if defined(sun) && defined(X11_t)
+ mode = 0775; /* Solaris requires uid or gid 0 to create X11 pipes */
+#else
+#ifdef HAS_STICKY_DIR_BIT
+ mode = 01777;
+#else
+ mode = 0777;
+#endif
+#endif
+ if (trans_mkdir(X_STREAMS_DIR, mode) == -1) {
+ PRMSG (1, "NAMEDOpenPipe: mkdir(%s) failed, errno = %d\n",
+ X_STREAMS_DIR, errno, 0);
+ return(-1);
+ }
+
+ if(stat(server_path, &sbuf) != 0) {
+ if (errno == ENOENT) {
+ if ((fd = creat(server_path, (mode_t)0666)) == -1) {
+ PRMSG(1, "NAMEDOpenPipe: Can't open %s\n", server_path, 0,0 );
+ return(-1);
+ }
+ close(fd);
+ if (chmod(server_path, (mode_t)0666) < 0) {
+ PRMSG(1, "NAMEDOpenPipe: Can't open %s\n", server_path, 0,0 );
+ return(-1);
+ }
+ } else {
+ PRMSG(1, "NAMEDOpenPipe: stat on %s failed\n", server_path, 0,0 );
+ return(-1);
+ }
+ }
+
+ if( pipe(pipefd) != 0) {
+ PRMSG(1, "NAMEDOpenPipe: pipe() failed, errno=%d\n",errno, 0,0 );
+ return(-1);
+ }
+
+ if( ioctl(pipefd[0], I_PUSH, "connld") != 0) {
+ PRMSG(1, "NAMEDOpenPipe: ioctl(I_PUSH,\"connld\") failed, errno=%d\n",errno, 0,0 );
+ close(pipefd[0]);
+ close(pipefd[1]);
+ return(-1);
+ }
+
+ if( fattach(pipefd[0], server_path) != 0) {
+ PRMSG(1, "NAMEDOpenPipe: fattach(%s) failed, errno=%d\n", server_path,errno, 0 );
+ close(pipefd[0]);
+ close(pipefd[1]);
+ return(-1);
+ }
+
+ return(pipefd[1]);
+}
+#endif
+
+static int
+TRANS(NAMEDOpenServer)(XtransConnInfo ciptr, char *port)
+{
+#ifdef NAMEDNODENAME
+ int fd;
+ char server_path[64];
+#endif
+
+ PRMSG(2,"NAMEDOpenServer(%s)\n", port, 0,0 );
+
+#if !defined(NAMEDNODENAME)
+ PRMSG(1,"NAMEDOpenServer: Protocol is not supported by a NAMED connection\n", 0,0,0);
+ return -1;
+#else
+ if ( port && *port ) {
+ if( *port == '/' ) { /* A full pathname */
+ (void) snprintf(server_path, sizeof(server_path), "%s", port);
+ } else {
+ (void) snprintf(server_path, sizeof(server_path), "%s%s",
+ NAMEDNODENAME, port);
+ }
+ } else {
+ (void) snprintf(server_path, sizeof(server_path), "%s%ld",
+ NAMEDNODENAME, (long)getpid());
+ }
+
+ fd = TRANS(NAMEDOpenPipe)(server_path);
+ if (fd < 0) {
+ return -1;
+ }
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+
+ if (TRANS(FillAddrInfo) (ciptr, server_path, server_path) == 0)
+ {
+ PRMSG(1,"NAMEDOpenServer: failed to fill in addr info\n", 0,0,0);
+ TRANS(LocalClose)(ciptr);
+ return -1;
+ }
+
+ return fd;
+
+#endif /* !NAMEDNODENAME */
+}
+
+static int
+TRANS(NAMEDResetListener) (XtransConnInfo ciptr)
+
+{
+ int status = TRANS_RESET_NOOP;
+ struct sockaddr_un *sockname=(struct sockaddr_un *) ciptr->addr;
+ struct stat statb;
+
+ PRMSG(2,"NAMEDResetListener(%p, %d)\n", ciptr, ciptr->fd, 0 );
+
+ if (ciptr->fd != -1) {
+ /*
+ * see if the pipe has disappeared
+ */
+
+ if (stat (sockname->sun_path, &statb) == -1 ||
+ (statb.st_mode & S_IFMT) != S_IFIFO) {
+ PRMSG(3, "Pipe %s trashed, recreating\n", sockname->sun_path, 0, 0);
+ TRANS(LocalClose)(ciptr);
+ ciptr->fd = TRANS(NAMEDOpenPipe)(sockname->sun_path);
+ if (ciptr->fd >= 0)
+ return TRANS_RESET_NEW_FD;
+ else
+ return TRANS_CREATE_LISTENER_FAILED;
+ }
+ }
+ return TRANS_RESET_NOOP;
+}
+
+static int
+TRANS(NAMEDAccept)(XtransConnInfo ciptr, XtransConnInfo newciptr, int *status)
+
+{
+ struct strrecvfd str;
+
+ PRMSG(2,"NAMEDAccept(%x->%d)\n", ciptr, ciptr->fd, 0 );
+
+ if( ioctl(ciptr->fd, I_RECVFD, &str ) < 0 ) {
+ PRMSG(1, "NAMEDAccept: ioctl(I_RECVFD) failed, errno=%d\n", errno, 0,0 );
+ *status = TRANS_ACCEPT_MISC_ERROR;
+ return(-1);
+ }
+
+ /*
+ * Everything looks good: fill in the XtransConnInfo structure.
+ */
+ newciptr->family=ciptr->family;
+ newciptr->addrlen=ciptr->addrlen;
+ if( (newciptr->addr=(char *)xalloc(newciptr->addrlen)) == NULL ) {
+ PRMSG(1,
+ "NAMEDAccept: failed to allocate memory for pipe addr\n",
+ 0,0,0);
+ close(str.fd);
+ *status = TRANS_ACCEPT_BAD_MALLOC;
+ return -1;
+ }
+
+ memcpy(newciptr->addr,ciptr->addr,newciptr->addrlen);
+
+ newciptr->peeraddrlen=newciptr->addrlen;
+ if( (newciptr->peeraddr=(char *)xalloc(newciptr->peeraddrlen)) == NULL ) {
+ PRMSG(1,
+ "NAMEDAccept: failed to allocate memory for peer addr\n",
+ 0,0,0);
+ xfree(newciptr->addr);
+ close(str.fd);
+ *status = TRANS_ACCEPT_BAD_MALLOC;
+ return -1;
+ }
+
+ memcpy(newciptr->peeraddr,newciptr->addr,newciptr->peeraddrlen);
+
+ *status = 0;
+
+ return str.fd;
+}
+
+#endif /* TRANS_SERVER */
+
+#endif /* LOCAL_TRANS_NAMED */
+