diff --git a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java index 612a0b0a..4958f46f 100755 --- a/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java +++ b/gateleen-playground/src/main/java/org/swisspush/gateleen/playground/Server.java @@ -1,6 +1,9 @@ package org.swisspush.gateleen.playground; -import io.vertx.core.*; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; @@ -55,7 +58,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.monitoring.ResetMetricsController; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueueClient; import org.swisspush.gateleen.queue.queuing.QueueProcessor; import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker; @@ -332,8 +334,6 @@ public void start() { queueCircuitBreakerConfigurationResourceManager, requestHandler, circuitBreakerPort); new QueueProcessor(vertx, selfClient, monitoringHandler, queueCircuitBreaker); - final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", Address.redisquesAddress(), - monitoringHandler); LogController logController = new LogController(); logController.registerLogConfiguratorMBean(JMX_DOMAIN); @@ -365,7 +365,7 @@ public void start() { .delegateHandler(delegateHandler) .customHttpResponseHandler(customHttpResponseHandler) .contentTypeConstraintHandler(contentTypeConstraintHandler) - .build(vertx, redisProvider, Server.class, router, monitoringHandler, queueBrowser); + .build(vertx, redisProvider, Server.class, router, monitoringHandler); Handler routingContextHandlerrNew = runConfig.buildRoutingContextHandler(); selfClient.setRoutingContexttHandler(routingContextHandlerrNew); diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java deleted file mode 100644 index 3bc4ce1a..00000000 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueBrowser.java +++ /dev/null @@ -1,381 +0,0 @@ -package org.swisspush.gateleen.queue.queuing; - -import org.swisspush.gateleen.monitoring.MonitoringHandler; -import org.swisspush.gateleen.core.util.StatusCode; -import org.swisspush.gateleen.core.util.StringUtils; -import io.vertx.core.AsyncResult; -import io.vertx.ext.web.Router; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.core.eventbus.Message; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.DecodeException; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; - -import java.nio.charset.Charset; -import java.util.*; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - -/** - * @author https://github.com/lbovet [Laurent Bovet] - * @deprecated Use http api from vertx-redisques (version greater than v2.2.4) directly. See https://github.com/swisspush/vertx-redisques - */ -public class QueueBrowser implements Handler { - - public static final String APPLICATION_JSON = "application/json"; - public static final String CONTENT_TYPE = "content-type"; - public static final String UTF_8 = "UTF-8"; - public static final String PAYLOAD = "payload"; - private static Logger log = LoggerFactory.getLogger(QueueBrowser.class); - private static final int DEFAULT_QUEUE_NUM = 1000; - private static final int DEFAULT_MAX_QUEUEITEM_COUNT = 49; - private static final String SHOW_EMPTY_QUEUES_PARAM = "showEmptyQueues"; - private EventBus eb; - private final String redisquesAddress; - - private Router router; - - public QueueBrowser(Vertx vertx, String prefix, final String redisquesAddress, final MonitoringHandler monitoringHandler) { - this.router = Router.router(vertx); - this.redisquesAddress = redisquesAddress; - eb = vertx.eventBus(); - - // List queuing features - router.get(prefix + "/").handler(ctx -> { - JsonObject result = new JsonObject(); - JsonArray items = new JsonArray(); - items.add("locks/"); - items.add("monitoring"); - items.add("queues/"); - result.put(lastPart(ctx.request().path(), "/"), items); - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(result.encode()); - }); - - // List queues - router.get(prefix + "/queues/").handler(ctx -> monitoringHandler.updateQueuesSizesInformation(DEFAULT_QUEUE_NUM, false, new MonitoringHandler.MonitoringCallback() { - @Override - public void onDone(JsonObject result) { - JsonArray array = result.getJsonArray("queues"); - JsonArray resultArray = new JsonArray(); - for (int i = 0; i < array.size(); i++) { - JsonObject arrayEntry = array.getJsonObject(i); - resultArray.add(arrayEntry.getString("name")); - } - result.put(lastPart(ctx.request().path(), "/"), resultArray); - jsonResponse(ctx.response(), result); - } - - @Override - public void onFail(String errorMessage, int statusCode) { - ctx.response().setStatusCode(statusCode); - ctx.response().setStatusMessage(errorMessage); - ctx.response().end(); - } - })); - - // List queue items - router.getWithRegex(prefix + "/queues/[^/]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path(), "/"); - String limitParam = null; - if (ctx.request() != null && ctx.request().params().contains("limit")) { - limitParam = ctx.request().params().get("limit"); - } - eb.request(redisquesAddress, buildGetQueueItemsOperation(queue, limitParam), (Handler>>) reply -> { - JsonObject replyBody = reply.result().body(); - if (OK.equals(replyBody.getString(STATUS))) { - List list = reply.result().body().getJsonArray(VALUE).getList(); - JsonArray items = new JsonArray(); - for (Object item : list.toArray()) { - items.add((String) item); - } - JsonObject result = new JsonObject().put(queue, items); - jsonResponse(ctx.response(), result); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().end(reply.result().body().getString("message")); - log.warn("Error in routerMatcher.getWithRegEx. Command = '" + (replyBody.getString("command") == null ? "" : replyBody.getString("command")) + "'."); - } - }); - }); - - // Delete all queue items - router.deleteWithRegex(prefix + "/queues/[^/]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildDeleteAllQueueItemsOperation(queue), reply -> ctx.response().end()); - }); - - // Get item - router.getWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = lastPart(ctx.request().path().substring(0, ctx.request().path().length() - 2), "/"); - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - eb.request(redisquesAddress, buildGetQueueItemOperation(queue, index), (Handler>>) reply -> { - JsonObject replyBody = reply.result().body(); - if (OK.equals(replyBody.getString(STATUS))) { - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(decode(reply.result().body().getString(VALUE))); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end("Not Found"); - } - }); - }); - - // Replace item - router.putWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 2); - checkLocked(queue, ctx.request(), aVoid -> { - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - ctx.request().bodyHandler(buffer -> { - String strBuffer = encode(buffer.toString()); - eb.request(redisquesAddress, buildReplaceQueueItemOperation(queue, index, strBuffer), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.NOT_FOUND)); - }); - }); - }); - - // Delete item - router.deleteWithRegex(prefix + "/queues/([^/]+)/[0-9]+").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 2); - final int index = Integer.parseInt(lastPart(ctx.request().path(), "/")); - checkLocked(queue, ctx.request(), aVoid -> eb.request(redisquesAddress, buildDeleteQueueItemOperation(queue, index), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.NOT_FOUND))); - }); - - // Add item - router.postWithRegex(prefix + "/queues/([^/]+)/").handler(ctx -> { - final String queue = part(ctx.request().path(), "/", 1); - ctx.request().bodyHandler(buffer -> { - String strBuffer = encode(buffer.toString()); - eb.request(redisquesAddress, buildAddQueueItemOperation(queue, strBuffer), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - }); - - // get all locks - router.getWithRegex(prefix + "/locks/").handler(ctx -> eb.request(redisquesAddress, buildGetAllLocksOperation(), - (Handler>>) reply -> { - if (OK.equals(reply.result().body().getString(STATUS))) { - jsonResponse(ctx.response(), reply.result().body().getJsonObject(VALUE)); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end("Not Found"); - } - })); - - // add lock - router.putWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildPutLockOperation(queue, extractUser(ctx.request())), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - - // get single lock - router.getWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildGetLockOperation(queue), (Handler>>) reply -> { - if (OK.equals(reply.result().body().getString(STATUS))) { - ctx.response().putHeader(CONTENT_TYPE, APPLICATION_JSON); - ctx.response().end(reply.result().body().getString(VALUE)); - } else { - ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); - ctx.response().setStatusMessage(StatusCode.NOT_FOUND.getStatusMessage()); - ctx.response().end(NO_SUCH_LOCK); - } - }); - }); - - // delete single lock - router.deleteWithRegex(prefix + "/locks/[^/]+").handler(ctx -> { - String queue = lastPart(ctx.request().path(), "/"); - eb.request(redisquesAddress, buildDeleteLockOperation(queue), - (Handler>>) reply -> checkReply(reply.result(), ctx.request(), StatusCode.BAD_REQUEST)); - }); - - // Gathering queues monitoring informations - router.getWithRegex(prefix + "/monitoring/[^/]*").handler(ctx -> { - int numQueues = extractNumOfQueuesValue(ctx.request().path(), "/"); - boolean showEmptyQueues = showEmptyQueues(ctx.request().params()); - monitoringHandler.updateQueuesSizesInformation(numQueues, showEmptyQueues, new MonitoringHandler.MonitoringCallback() { - @Override - public void onDone(JsonObject result) { - jsonResponse(ctx.response(), result); - } - - @Override - public void onFail(String errorMessage, int statusCode) { - ctx.response().setStatusCode(statusCode); - ctx.response().setStatusMessage(errorMessage); - ctx.response().end(); - } - }); - }); - } - - private String extractUser(HttpServerRequest request) { - String user = request.headers().get("x-rp-usr"); - if (user == null) { - user = "Unknown"; - } - return user; - } - - private void checkReply(Message reply, HttpServerRequest request, StatusCode statusCode) { - if (OK.equals(reply.body().getString(STATUS))) { - request.response().end(); - } else { - request.response().setStatusCode(statusCode.getStatusCode()); - request.response().setStatusMessage(statusCode.getStatusMessage()); - request.response().end(statusCode.getStatusMessage()); - } - } - - private String lastPart(String source, String separator) { - String[] tokens = source.split(separator); - return tokens[tokens.length - 1]; - } - - private String part(String source, String separator, int pos) { - String[] tokens = source.split(separator); - return tokens[tokens.length - pos]; - } - - private boolean showEmptyQueues(MultiMap requestParams) { - String showEmptyQueues = StringUtils.getStringOrEmpty(requestParams.get(SHOW_EMPTY_QUEUES_PARAM)); - return showEmptyQueues.equalsIgnoreCase("true") || showEmptyQueues.equals("1"); - } - - private int getMaxQueueItemCountIndex(HttpServerRequest request) { - int defaultMaxIndex = DEFAULT_MAX_QUEUEITEM_COUNT; - if (request != null && request.params().contains("limit")) { - String limitParam = request.params().get("limit"); - try { - int maxIndex = Integer.parseInt(limitParam) - 1; - if (maxIndex >= 0) { - defaultMaxIndex = maxIndex; - } - } catch (NumberFormatException ex) { - log.warn("Invalid limit parameter '{}' configured for max queue item count. Using default {}", limitParam, DEFAULT_MAX_QUEUEITEM_COUNT); - } - } - return defaultMaxIndex; - } - - private int extractNumOfQueuesValue(String source, String separator) { - String numberOfQueuesStr = lastPart(source, separator); - Integer numQueues; - try { - numQueues = Integer.parseInt(numberOfQueuesStr); - } catch (Exception e) { - numQueues = DEFAULT_QUEUE_NUM; - log.warn("Queue size monitoring url was used with wrong or without number of queues param. Using default {}", DEFAULT_QUEUE_NUM); - } - - return numQueues; - } - - public void handle(HttpServerRequest request) { - router.handle(request); - } - - /** - * Encode the payload from a payloadString or payloadObjet. - * - * @param decoded decoded - * @return String - */ - public String encode(String decoded) { - JsonObject object = new JsonObject(decoded); - - String payloadString; - JsonObject payloadObject = object.getJsonObject("payloadObject"); - if (payloadObject != null) { - payloadString = payloadObject.encode(); - } else { - payloadString = object.getString("payloadString"); - } - - if (payloadString != null) { - object.put(PAYLOAD, payloadString.getBytes(Charset.forName(UTF_8))); - object.remove("payloadString"); - object.remove("payloadObject"); - } - - // update the content-length - int length = 0; - if (object.containsKey(PAYLOAD)) { - length = object.getBinary(PAYLOAD).length; - } - JsonArray newHeaders = new JsonArray(); - for (Object headerObj : object.getJsonArray("headers")) { - JsonArray header = (JsonArray) headerObj; - String key = header.getString(0); - if (key.equalsIgnoreCase("content-length")) { - JsonArray contentLengthHeader = new JsonArray(); - contentLengthHeader.add("Content-Length"); - contentLengthHeader.add(Integer.toString(length)); - newHeaders.add(contentLengthHeader); - } else { - newHeaders.add(header); - } - } - object.put("headers", newHeaders); - - return object.toString(); - } - - /** - * Decode the payload if the content-type is text or json. - * - * @param encoded encoded - * @return String - */ - public String decode(String encoded) { - JsonObject object = new JsonObject(encoded); - JsonArray headers = object.getJsonArray("headers"); - for (Object headerObj : headers) { - JsonArray header = (JsonArray) headerObj; - String key = header.getString(0); - String value = header.getString(1); - if (key.equalsIgnoreCase(CONTENT_TYPE) && (value.contains("text/") || value.contains(APPLICATION_JSON))) { - try { - object.put("payloadObject", new JsonObject(new String(object.getBinary(PAYLOAD), Charset.forName(UTF_8)))); - } catch (DecodeException e) { - object.put("payloadString", new String(object.getBinary(PAYLOAD), Charset.forName(UTF_8))); - } - object.remove(PAYLOAD); - break; - } - } - return object.toString(); - } - - private void checkLocked(String queue, final HttpServerRequest request, final Handler handler) { - request.pause(); - eb.request(redisquesAddress, buildGetLockOperation(queue), (Handler>>) reply -> { - if (NO_SUCH_LOCK.equals(reply.result().body().getString(STATUS))) { - request.resume(); - request.response().setStatusCode(StatusCode.CONFLICT.getStatusCode()); - request.response().setStatusMessage("Queue must be locked to perform this operation"); - request.response().end("Queue must be locked to perform this operation"); - } else { - handler.handle(null); - request.resume(); - } - }); - } - - private void jsonResponse(HttpServerResponse response, JsonObject object) { - response.putHeader(CONTENT_TYPE, APPLICATION_JSON); - response.end(object.encode()); - } -} diff --git a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java index 2d904301..a6f9f273 100755 --- a/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java +++ b/gateleen-runconfig/src/main/java/org/swisspush/gateleen/runconfig/RunConfig.java @@ -35,7 +35,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.packing.PackingHandler; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueuingHandler; import org.swisspush.gateleen.queue.queuing.circuitbreaker.configuration.QueueCircuitBreakerConfigurationResourceManager; import org.swisspush.gateleen.queue.queuing.splitter.QueueSplitter; @@ -101,7 +100,6 @@ public class RunConfig { private final ExpansionHandler expansionHandler; private final DeltaHandler deltaHandler; private final MonitoringHandler monitoringHandler; - private final QueueBrowser queueBrowser; private final Authorizer authorizer; private final CopyResourceHandler copyResourceHandler; private final QoSHandler qosHandler; @@ -113,7 +111,7 @@ public class RunConfig { private final CustomHttpResponseHandler customHttpResponseHandler; public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler, - QueueBrowser queueBrowser, CORSHandler corsHandler, SchedulerResourceManager schedulerResourceManager, + CORSHandler corsHandler, SchedulerResourceManager schedulerResourceManager, ValidationResourceManager validationResourceManager, LoggingResourceManager loggingResourceManager, ConfigurationResourceManager configurationResourceManager, QueueCircuitBreakerConfigurationResourceManager queueCircuitBreakerConfigurationResourceManager, @@ -129,7 +127,6 @@ public RunConfig(Vertx vertx, RedisProvider redisProvider, Class verticleClass, this.verticleClass = verticleClass; this.router = router; this.monitoringHandler = monitoringHandler; - this.queueBrowser = queueBrowser; this.corsHandler = corsHandler; this.schedulerResourceManager = schedulerResourceManager; this.validationResourceManager = validationResourceManager; @@ -164,7 +161,6 @@ private RunConfig(RunConfigBuilder builder) { builder.verticleClass, builder.router, builder.monitoringHandler, - builder.queueBrowser, builder.corsHandler, builder.schedulerResourceManager, builder.validationResourceManager, @@ -222,7 +218,6 @@ public static class RunConfigBuilder { private Class verticleClass; private Router router; private MonitoringHandler monitoringHandler; - private QueueBrowser queueBrowser; private CORSHandler corsHandler; private SchedulerResourceManager schedulerResourceManager; private ValidationResourceManager validationResourceManager; @@ -377,13 +372,12 @@ public RunConfigBuilder cacheHandler(CacheHandler cacheHandler) { return this; } - public RunConfig build(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler, QueueBrowser queueBrowser) { + public RunConfig build(Vertx vertx, RedisProvider redisProvider, Class verticleClass, Router router, MonitoringHandler monitoringHandler) { this.vertx = vertx; this.redisProvider = redisProvider; this.verticleClass = verticleClass; this.router = router; this.monitoringHandler = monitoringHandler; - this.queueBrowser = queueBrowser; return new RunConfig(this); } } @@ -633,10 +627,6 @@ private void handleRequest(final RoutingContext ctx) { if (copyResourceHandler != null && copyResourceHandler.handle(request)) { return; } - if (request.path().startsWith(SERVER_ROOT + "/queuing/")) { - queueBrowser.handle(request); - return; - } if (hookHandler != null && hookHandler.handle(ctx)) { return; } diff --git a/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java b/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java index b02253c5..3d9f7d1a 100755 --- a/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java +++ b/gateleen-test/src/test/java/org/swisspush/gateleen/AbstractTest.java @@ -57,7 +57,6 @@ import org.swisspush.gateleen.monitoring.MonitoringHandler; import org.swisspush.gateleen.monitoring.ResetMetricsController; import org.swisspush.gateleen.qos.QoSHandler; -import org.swisspush.gateleen.queue.queuing.QueueBrowser; import org.swisspush.gateleen.queue.queuing.QueueClient; import org.swisspush.gateleen.queue.queuing.QueueProcessor; import org.swisspush.gateleen.queue.queuing.circuitbreaker.QueueCircuitBreaker; @@ -219,8 +218,6 @@ public static void setupBeforeClass(TestContext context) { CIRCUIT_BREAKER_REST_API_PORT); new QueueProcessor(vertx, selfClient, monitoringHandler, queueCircuitBreaker); - final QueueBrowser queueBrowser = new QueueBrowser(vertx, SERVER_ROOT + "/queuing", - Address.redisquesAddress(), monitoringHandler); new CustomRedisMonitor(vertx, redisProvider, "main", "rest-storage", 10).start(); Router router = Router.builder() @@ -266,7 +263,7 @@ public static void setupBeforeClass(TestContext context) { .delegateHandler(delegateHandler) .mergeHandler(mergeHandler) .customHttpResponseHandler(customHttpResponseHandler) - .build(vertx, redisProvider, AbstractTest.class, router, monitoringHandler, queueBrowser); + .build(vertx, redisProvider, AbstractTest.class, router, monitoringHandler); Handler routingContextHandlerrNew = runConfig.buildRoutingContextHandler(); selfClient.setRoutingContexttHandler(routingContextHandlerrNew); mainServer = vertx.createHttpServer();