-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
435 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
137 changes: 137 additions & 0 deletions
137
src/main/java/com/redhat/vertx/pipeline/steps/HttpClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package com.redhat.vertx.pipeline.steps; | ||
|
||
import java.io.IOException; | ||
import java.net.MalformedURLException; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.function.Function; | ||
import java.util.logging.Logger; | ||
|
||
import com.redhat.vertx.Engine; | ||
import com.redhat.vertx.pipeline.AbstractStep; | ||
import com.redhat.vertx.pipeline.Step; | ||
import com.redhat.vertx.pipeline.templates.MissingParameterException; | ||
import io.reactivex.Completable; | ||
import io.reactivex.Maybe; | ||
import io.reactivex.MaybeSource; | ||
import io.vertx.core.http.HttpMethod; | ||
import io.vertx.core.json.JsonObject; | ||
import io.vertx.ext.web.client.WebClientOptions; | ||
import io.vertx.reactivex.core.buffer.Buffer; | ||
import io.vertx.reactivex.ext.web.client.HttpResponse; | ||
import io.vertx.reactivex.ext.web.client.WebClient; | ||
import io.vertx.reactivex.ext.web.client.HttpRequest; | ||
import org.kohsuke.MetaInfServices; | ||
|
||
@MetaInfServices(Step.class) | ||
public class HttpClient extends AbstractStep { | ||
private static Logger logger = Logger.getLogger(HttpClient.class.getName()); | ||
private static WebClient http; | ||
|
||
public String getUrl(JsonObject env) throws URISyntaxException, MalformedURLException { | ||
String url = env.getString("url"); | ||
if ( url == null ) { | ||
throw new MissingParameterException("url"); | ||
} | ||
return url; | ||
} | ||
|
||
@Override | ||
public Completable init(Engine engine, JsonObject config) { | ||
config.put("timeout",config.getString("timeout", "PT30.000S")); | ||
return super.init(engine, config); | ||
} | ||
|
||
public HttpRequest<Buffer> getHttpRequest(JsonObject env) throws URISyntaxException, MalformedURLException { | ||
// SocketAddress serverAddress = SocketAddress.domainSocketAddress("/var/run/docker.sock"); | ||
|
||
String url = getUrl(env); | ||
logger.fine(() -> "requesting " + url); | ||
URI uri = URI.create(url); | ||
String pqf = uri.getPath(); | ||
if (uri.getQuery() != null) { | ||
pqf += "?" + uri.getQuery(); | ||
} | ||
if (uri.getFragment() != null) { | ||
pqf += "#" + uri.getFragment(); | ||
} | ||
return webClient() | ||
.request(HttpMethod.GET, uri.getPort(), uri.getHost(), pqf) | ||
.putHeader("Accept","application/json"); | ||
} | ||
|
||
public Object processResponse(HttpResponse<Buffer> response) throws HttpResponseStatusException { | ||
switch (response.statusCode()) { | ||
case 200: // OK | ||
return decodeResponse(response); | ||
case 204: // no content | ||
return null; | ||
default: | ||
throw new HttpResponseStatusException(response); | ||
} | ||
} | ||
|
||
public static final Map<String, Function<HttpResponse<Buffer>,Object>> decodings; | ||
static { | ||
Map<String, Function<HttpResponse<Buffer>,Object>> d = Map.of( | ||
"text/plain",HttpResponse::bodyAsString, | ||
"text/html",HttpResponse::bodyAsString, | ||
"application/json", HttpResponse::bodyAsJsonObject, // TODO manage JsonArray and String as appropriate | ||
"application/xml", HttpResponse::bodyAsString); // TODO parse and shoehorn into JSON | ||
decodings = Collections.unmodifiableMap(d); | ||
} | ||
|
||
public Object decodeResponse(HttpResponse<Buffer> response) { | ||
if (response.body().length() == 0) { | ||
return null; | ||
} | ||
String contentType = response.getHeader("Content-type"); | ||
|
||
return decodings.getOrDefault(contentType,HttpResponse::bodyAsString).apply(response); | ||
} | ||
|
||
public MaybeSource<Object> rxProcessResponse(HttpResponse<Buffer> response) { | ||
try { | ||
Object body = processResponse(response); | ||
return (body == null)?Maybe.empty():Maybe.just(body); | ||
} catch (Exception e) { | ||
return Maybe.error(e); | ||
} | ||
} | ||
|
||
@Override | ||
public Maybe<Object> execute(JsonObject env) { | ||
HttpRequest<Buffer> request; | ||
try { | ||
request = getHttpRequest(env); | ||
} catch (Exception e) { | ||
return Maybe.error(e); | ||
} | ||
|
||
return request.rxSend().flatMapMaybe(this::rxProcessResponse); | ||
} | ||
|
||
protected WebClient webClient() { | ||
if (http == null) { | ||
WebClientOptions options = new WebClientOptions() | ||
.setUserAgent("vertx-engine") | ||
.setKeepAlive(true) | ||
.setConnectTimeout(30) | ||
.setKeepAliveTimeout(300) | ||
.setIdleTimeout(300); | ||
http = WebClient.create(getVertx(),options); | ||
} | ||
return http; | ||
} | ||
|
||
public static class HttpResponseStatusException extends IOException { | ||
public final HttpResponse<?> response; | ||
|
||
HttpResponseStatusException(HttpResponse<?> response) { | ||
super(response.statusCode() + " " + response.statusMessage()); | ||
this.response=response; | ||
} | ||
} | ||
} |
167 changes: 167 additions & 0 deletions
167
src/test/java/com/redhat/vertx/pipeline/step/HttpClientIntegrationTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
package com.redhat.vertx.pipeline.step; | ||
|
||
import java.util.Arrays; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.logging.Handler; | ||
import java.util.logging.Logger; | ||
|
||
import com.github.tomakehurst.wiremock.WireMockServer; | ||
import com.github.tomakehurst.wiremock.client.WireMock; | ||
import com.github.tomakehurst.wiremock.junit.WireMockRule; | ||
import com.redhat.ResourceUtils; | ||
import com.redhat.vertx.Engine; | ||
import com.redhat.vertx.pipeline.steps.HttpClient; | ||
import io.reactivex.Maybe; | ||
import io.reactivex.disposables.Disposable; | ||
import io.reactivex.internal.disposables.DisposableHelper; | ||
import io.vertx.core.json.JsonObject; | ||
import io.vertx.junit5.VertxExtension; | ||
import io.vertx.junit5.VertxTestContext; | ||
import io.vertx.reactivex.core.Vertx; | ||
import io.vertx.reactivex.ext.unit.TestContext; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Disabled; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.opentest4j.AssertionFailedError; | ||
|
||
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.get; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.matching; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; | ||
import static com.github.tomakehurst.wiremock.client.WireMock.verify; | ||
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
@ExtendWith(VertxExtension.class) | ||
public class HttpClientIntegrationTest { | ||
private WireMockServer wireMockServer; | ||
|
||
@BeforeEach | ||
public void setup() { | ||
wireMockServer = new WireMockRule(wireMockConfig().dynamicPort().dynamicHttpsPort()); | ||
wireMockServer.start(); | ||
WireMock.configureFor("localhost", wireMockServer.port()); | ||
} | ||
|
||
@AfterEach | ||
public void teardown() { | ||
wireMockServer.stop(); | ||
} | ||
|
||
@Test | ||
public void happyPath200(Vertx vertx, VertxTestContext testContext) throws Exception { | ||
int port = wireMockServer.port(); | ||
JsonObject payload = new JsonObject("{ \"lorem ipsum\":\"boring text\" }"); | ||
wireMockServer.stubFor(get(urlEqualTo("/my/resource")) | ||
.withHeader("Accept", matching("application/json")) | ||
.willReturn(aResponse() | ||
.withStatus(200) | ||
.withHeader("Content-Type", "application/json") | ||
.withBody(payload.encode()))); | ||
|
||
String url = "http://localhost:" + port + "/my/resource"; | ||
|
||
Engine engine = new Engine( | ||
ResourceUtils.fileContentsFromResource( | ||
"com/redhat/vertx/pipeline/step/httpClientIntegrationTest.yaml" | ||
)); | ||
JsonObject doc = new JsonObject().put("url", url); | ||
vertx.rxDeployVerticle(engine).timeout(1, TimeUnit.SECONDS).blockingGet(); | ||
JsonObject d2 = (JsonObject) engine.execute(doc).timeout(5, TimeUnit.SECONDS).blockingGet(); | ||
|
||
assertThat(d2.containsKey("response")).isTrue(); | ||
assertThat(d2.getJsonObject("response")).isEqualTo(payload); | ||
|
||
verify(getRequestedFor(urlMatching("/my/resource")) | ||
.withHeader("Accept", matching("application/json"))); | ||
|
||
testContext.completeNow(); | ||
} | ||
|
||
@Test | ||
@Disabled // TODO terminates event execution | ||
public void happyPath204(Vertx vertx, VertxTestContext testContext) throws Exception { | ||
int port = wireMockServer.port(); | ||
wireMockServer.stubFor(get(urlEqualTo("/my/resource")) | ||
.withHeader("Accept", matching("application/json")) | ||
.willReturn(aResponse().withStatus(204))); | ||
|
||
String url = "http://localhost:" + port + "/my/resource"; | ||
|
||
Engine engine = new Engine( | ||
ResourceUtils.fileContentsFromResource( | ||
"com/redhat/vertx/pipeline/step/httpClientIntegrationTest.yaml" | ||
)); | ||
JsonObject doc = new JsonObject().put("url",url); | ||
vertx.rxDeployVerticle(engine).timeout(1, TimeUnit.SECONDS).blockingGet(); | ||
AtomicReference<Disposable> docSub = new AtomicReference<>(); | ||
DisposableHelper.set(docSub, engine.execute(doc).timeout(10, TimeUnit.SECONDS) | ||
.doOnError(testContext::failNow) | ||
.doOnSuccess(r -> validate204(r, testContext)) | ||
.doAfterTerminate(() -> { | ||
DisposableHelper.dispose(docSub); | ||
testContext.completeNow(); | ||
}) | ||
.test() | ||
.assertSubscribed()); | ||
|
||
assertThat(testContext.awaitCompletion(6, TimeUnit.SECONDS)).isTrue(); | ||
} | ||
|
||
private void validate204(Object r, VertxTestContext testContext) { | ||
JsonObject jo = (JsonObject)r; | ||
assertThat(jo.containsKey("response")).isTrue(); // This could well be false because the result is empty | ||
assertThat(jo.getJsonObject("response")).isNull(); | ||
verify(getRequestedFor(urlMatching("/my/resource")) | ||
.withHeader("Accept", matching("application/json"))); | ||
testContext.completeNow(); | ||
} | ||
|
||
@Test | ||
public void serverError500(Vertx vertx, VertxTestContext testContext) throws Exception { | ||
Logger logger = Logger.getLogger(this.getClass().getName() + "#" + Thread.currentThread().getStackTrace()[0].getMethodName()); | ||
int port = wireMockServer.port(); | ||
JsonObject payload = new JsonObject("{ \"lorem ipsum\":\"boring text\" }"); | ||
wireMockServer.stubFor(get(urlEqualTo("/my/resource")) | ||
.withHeader("Accept", matching("application/json")) | ||
.willReturn(aResponse() | ||
.withStatus(500) | ||
.withHeader("Content-Type", "text/html") | ||
.withBody("<html><head/><body>500 Internal server error</body></html>"))); | ||
|
||
String url = "http://localhost:" + port + "/my/resource"; | ||
|
||
Engine engine = new Engine( | ||
ResourceUtils.fileContentsFromResource( | ||
"com/redhat/vertx/pipeline/step/httpClientIntegrationTest.yaml" | ||
)); | ||
JsonObject doc = new JsonObject().put("url", url); | ||
vertx.rxDeployVerticle(engine).timeout(1, TimeUnit.SECONDS).blockingGet(); | ||
|
||
AtomicReference<Disposable> docSub = new AtomicReference<>(); | ||
|
||
DisposableHelper.set(docSub, | ||
engine.execute(doc) | ||
.doOnError(t -> { | ||
logger.info("Evaluating exception " + t.toString()); | ||
assertThat(t).isNotInstanceOf(HttpClient.HttpResponseStatusException.class); | ||
}) | ||
.doAfterTerminate(() -> { | ||
logger.info("Disposing after terminate"); | ||
DisposableHelper.dispose(docSub); | ||
testContext.completeNow(); | ||
}) | ||
.test() | ||
.assertSubscribed()); | ||
|
||
Arrays.asList(logger.getHandlers()).forEach(Handler::flush); | ||
assertThat(testContext.awaitCompletion(6, TimeUnit.SECONDS)).isTrue(); | ||
logger.info("Complete."); | ||
} | ||
|
||
} |
Oops, something went wrong.