Skip to content

Commit

Permalink
Allow datamill client to perform PATCH requests
Browse files Browse the repository at this point in the history
Reimplemented client request core method to rely on httpclient rather than JDK UrlConnection, as the latter doesn't allow certain Http methods (i.e. PATCH).
Adapted tests to new implementation.
  • Loading branch information
israelcolomer committed May 13, 2016
1 parent cc361a3 commit 1b0fa95
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 107 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@
<artifactId>system-rules</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
205 changes: 133 additions & 72 deletions core/src/main/java/org/chodavarapu/datamill/http/Client.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
package org.chodavarapu.datamill.http;

import com.google.common.base.Joiner;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.chodavarapu.datamill.http.impl.*;
import org.apache.http.Header;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.chodavarapu.datamill.http.impl.InputStreamEntity;
import org.chodavarapu.datamill.http.impl.RequestBuilderImpl;
import org.chodavarapu.datamill.http.impl.ResponseImpl;
import org.chodavarapu.datamill.http.impl.TemplateBasedUriBuilder;
import org.chodavarapu.datamill.http.impl.ValueEntity;
import org.chodavarapu.datamill.values.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,16 +33,13 @@
import rx.util.async.Async;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -42,10 +60,6 @@ public Observable<Response> request(Method method, Map<String, String> headers,
return request(method, headers, uri, new ValueEntity(entity));
}

protected URLConnection createConnection(String uri) throws IOException {
return new URL(uri).openConnection();
}

public Observable<Response> request(Method method, Map<String, String> headers, String uri, Entity entity) {
return request(method, headers != null ? Multimaps.forMap(headers) : null, uri, null, null, null, entity);
}
Expand All @@ -61,108 +75,155 @@ public Observable<Response> request(
if (uriParameters != null && uriParameters.size() > 0) {
uri = uriBuilder.build(uri, uriParameters);
}
URIBuilder uriBuilder = new URIBuilder();


int firstSlash = uri.indexOf('/');
uriBuilder.setScheme(uri.substring(0, firstSlash - 1));

String uriNoScheme = uri.substring(firstSlash + 1, uri.length());
uriBuilder.setHost(uriNoScheme.substring(1));

uri = appendQueryParameters(uri, queryParameters);
URI anURI = null;
try {
anURI = appendQueryParameters(uriBuilder, queryParameters);
} catch (URISyntaxException e) {
e.printStackTrace();
}

final URI theURI = anURI;
final String composedUri = uri;

return Async.fromCallable(() -> {
URLConnection urlConnection = createConnection(composedUri);
HttpURLConnection httpConnection = (HttpURLConnection) urlConnection;
PipedOutputStream pipedOutputStream = null;
PipedInputStream pipedInputStream = null;

try {


CloseableHttpClient httpclient = HttpClients.createDefault();

httpConnection.setRequestMethod(method.toString());
HttpUriRequest request = buildHttpRequest(method, theURI);

if (options != null && options.size() > 0) {
Object connectTimeout = options.get(Request.OPTION_CONNECT_TIMEOUT);
if (connectTimeout instanceof Integer) {
httpConnection.setConnectTimeout((int) connectTimeout);
if (options != null && options.size() > 0) {
Object connectTimeout = options.get(Request.OPTION_CONNECT_TIMEOUT);
if (connectTimeout instanceof Integer) {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout((int) connectTimeout)
.build();
((HttpRequestBase) request).setConfig(requestConfig);
}
}
}

if (headers != null) {
for (Map.Entry<String, String> header : headers.entries()) {
httpConnection.addRequestProperty(header.getKey(), header.getValue());
if (headers != null) {
for (Map.Entry<String, String> header : headers.entries()) {
request.addHeader(header.getKey(), header.getValue());
}
}
}

if (entity != null) {
writeEntityOutOverConnection(entity, httpConnection);
}
if (entity != null) {
if (!(request instanceof HttpEntityEnclosingRequestBase)) {
throw new IllegalArgumentException("Expecting to write an entity for a request type that does not support it!");
}

pipedOutputStream = buildPipedOutputStream();
pipedInputStream = buildPipedInputStream();
pipedInputStream.connect(pipedOutputStream);

BasicHttpEntity httpEntity = new BasicHttpEntity();
httpEntity.setContent(pipedInputStream);
((HttpEntityEnclosingRequestBase) request).setEntity(httpEntity);

logger.debug("Making HTTP request {} {}", method.name(), composedUri);
if (headers != null && logger.isDebugEnabled()) {
logger.debug(" HTTP request headers:");
for (Map.Entry<String, String> header : headers.entries()) {
logger.debug(" {}: {}", header.getKey(), header.getValue());
writeEntityOutOverConnection(entity, pipedOutputStream);
}
}

int responseCode = httpConnection.getResponseCode();
InputStream inputStream = httpConnection.getInputStream();
logger.debug("Making HTTP request {} {}", method.name(), composedUri);
if (headers != null && logger.isDebugEnabled()) {
logger.debug(" HTTP request headers:");
for (Map.Entry<String, String> header : headers.entries()) {
logger.debug(" {}: {}", header.getKey(), header.getValue());
}
}

CloseableHttpResponse httpResponse = httpclient.execute(request);

int responseCode = httpResponse.getStatusLine().getStatusCode();
Map<String, String> combinedHeaders = new HashMap<>();

Map<String, List<String>> responseHeaders = httpConnection.getHeaderFields();
Map<String, String> combinedHeaders = new HashMap<>();
for (Map.Entry<String, List<String>> header : responseHeaders.entrySet()) {
if (header.getValue().size() > 1) {
combinedHeaders.put(header.getKey(), Joiner.on(',').join(header.getValue()));
} else {
combinedHeaders.put(header.getKey(), header.getValue().get(0));
for (Header header : httpResponse.getAllHeaders()) {
combinedHeaders.put(header.getName(), header.getValue());
}
}

return new ResponseImpl(Status.valueOf(responseCode), combinedHeaders, new InputStreamEntity(inputStream));
return new ResponseImpl(Status.valueOf(responseCode), combinedHeaders, new InputStreamEntity(httpResponse.getEntity().getContent()));
} finally {
if (pipedInputStream != null) {
try {
pipedInputStream.close();
} catch (IOException e) {}
}
if (pipedOutputStream != null) {
try {
pipedOutputStream.close();
} catch (IOException e) {}
}
}
}, Schedulers.io());
}

private String appendQueryParameters(String uri, Multimap<String, String> queryParameters) {
if (queryParameters != null && queryParameters.size() > 0) {
try {
StringBuilder queryBuilder = new StringBuilder("?");
Iterator<Map.Entry<String, String>> iterator = queryParameters.entries().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> parameter = iterator.next();
protected PipedOutputStream buildPipedOutputStream() {
return new PipedOutputStream();
}

queryBuilder.append(URLEncoder.encode(parameter.getKey(), "UTF-8"));
queryBuilder.append('=');
protected PipedInputStream buildPipedInputStream() {
return new PipedInputStream();
}

if (parameter.getValue() != null) {
queryBuilder.append(URLEncoder.encode(parameter.getValue()));
}
protected HttpUriRequest buildHttpRequest(Method method, URI uri) {
switch (method) {
case OPTIONS: return new HttpOptions(uri);
case GET: return new HttpGet(uri);
case HEAD: return new HttpHead(uri);
case POST: return new HttpPost(uri);
case PUT: return new HttpPut(uri);
case DELETE:return new HttpDelete(uri);
case TRACE: return new HttpTrace(uri);
case PATCH: return new HttpPatch(uri);
default: throw new IllegalArgumentException("Method " + method + " is not implemented!");
}
}

if (iterator.hasNext()) {
queryBuilder.append('&');
}
private URI appendQueryParameters(URIBuilder uriBuilder, Multimap<String, String> queryParameters) throws URISyntaxException {
if (queryParameters != null && queryParameters.size() > 0) {
queryParameters.entries().stream().forEach(entry -> {
try {
uriBuilder.setParameter(URLEncoder.encode(entry.getKey(), "UTF-8"), entry.getValue());
}

uri = uri + queryBuilder.toString();
} catch (UnsupportedEncodingException e) {
}
catch (UnsupportedEncodingException e) {}
});
}
return uri;
return uriBuilder.build();
}

private void writeEntityOutOverConnection(Entity entity, HttpURLConnection httpConnection) throws IOException {
httpConnection.setDoOutput(true);
OutputStream outputStream = httpConnection.getOutputStream();
private void writeEntityOutOverConnection(Entity entity, PipedOutputStream pipedOutputStream) throws IOException {
entity.asChunks().observeOn(Schedulers.io())
.doOnNext(bytes -> {
try {
outputStream.write(bytes);
pipedOutputStream.write(bytes);
} catch (IOException e) {
throw new HttpException("Error writing entity!", e);
}
})
.doOnCompleted(() -> {
try {
outputStream.close();
pipedOutputStream.close();
onEntitySendingCompletion(entity);
} catch (IOException e) {
throw new HttpException("Error while closing stream!", e);
}
})
.doOnError(e -> {
try {
outputStream.close();
pipedOutputStream.close();
onErrorSendingEntity(entity);
} catch (IOException closing) {
onErrorSendingEntity(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public enum Method {
OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, CONNECT, PATCH, UNKNOWN;
OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, PATCH, UNKNOWN;

private static final Set<Method> methods =
EnumSet.allOf(Method.class);
Expand Down
Loading

0 comments on commit 1b0fa95

Please sign in to comment.