Skip to content

Commit

Permalink
use creating context for WebClient requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslansennov committed Jul 5, 2017
1 parent 05e613d commit 319b78c
Showing 1 changed file with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ConsulClientImpl implements ConsulClient {
private static final List<Integer> TXN_VALID_CODES = Arrays.asList(200, 409);

private final WebClient webClient;
private final Context ctx;
private final String aclToken;
private final String dc;
private final long timeoutMs;
Expand All @@ -55,6 +56,7 @@ public ConsulClientImpl(Vertx vertx, ConsulClientOptions options) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(options);
webClient = WebClient.create(vertx, options);
ctx = vertx.getOrCreateContext();
aclToken = options.getAclToken();
dc = options.getDc();
timeoutMs = options.getTimeout();
Expand Down Expand Up @@ -591,38 +593,38 @@ private <T> void requestVoid(HttpMethod method, String path, Query query, String
request(DEFAULT_VALID_CODES, method, path, query, body, resultHandler, resp -> null);
}

private <T> void request(List<Integer> validCodes, HttpMethod method, String path, Query query, String body,
private <T> void request(List<Integer> validCodes, HttpMethod method, String path, Query q, String body,
Handler<AsyncResult<T>> resultHandler,
Function<HttpResponse<Buffer>, T> mapper) {
if (query == null) {
query = new Query();
}
if (dc != null) {
query.put("dc", dc);
}
HttpRequest<Buffer> rq = webClient.request(method, path);
query.entrySet().forEach(e -> rq.addQueryParam(e.getKey(), e.getValue()));
if (aclToken != null) {
rq.putHeader(TOKEN_HEADER, aclToken);
}
if (timeoutMs > 0) {
rq.timeout(timeoutMs);
}
rq.sendBuffer(body == null ? Buffer.buffer() : Buffer.buffer(body), h -> {
if (h.succeeded()) {
HttpResponse<Buffer> resp = h.result();
if (validCodes.contains(resp.statusCode())) {
try {
resultHandler.handle(Future.succeededFuture(mapper.apply(resp)));
} catch (Throwable throwable) {
resultHandler.handle(Future.failedFuture(throwable));
Query query = (q == null) ? new Query() : q;
ctx.runOnContext(v -> {
if (dc != null) {
query.put("dc", dc);
}
HttpRequest<Buffer> rq = webClient.request(method, path);
query.entrySet().forEach(e -> rq.addQueryParam(e.getKey(), e.getValue()));
if (aclToken != null) {
rq.putHeader(TOKEN_HEADER, aclToken);
}
if (timeoutMs > 0) {
rq.timeout(timeoutMs);
}
rq.sendBuffer(body == null ? Buffer.buffer() : Buffer.buffer(body), h -> {
if (h.succeeded()) {
HttpResponse<Buffer> resp = h.result();
if (validCodes.contains(resp.statusCode())) {
try {
resultHandler.handle(Future.succeededFuture(mapper.apply(resp)));
} catch (Throwable throwable) {
resultHandler.handle(Future.failedFuture(throwable));
}
} else {
resultHandler.handle(Future.failedFuture(resp.statusMessage()));
}
} else {
resultHandler.handle(Future.failedFuture(resp.statusMessage()));
resultHandler.handle(Future.failedFuture(h.cause()));
}
} else {
resultHandler.handle(Future.failedFuture(h.cause()));
}
});
});
}

Expand Down

0 comments on commit 319b78c

Please sign in to comment.