From a7fb49b879cb224a72f7c5b7bdbe325280c2d213 Mon Sep 17 00:00:00 2001 From: Ludwig Richter Date: Sat, 31 Jul 2021 11:46:59 +0200 Subject: [PATCH] feat(services): Move MongoDB implementation to own extension https://github.com/wuespace/telestion-extension-mongodb --- .../services/database/DataListener.java | 52 ------ .../services/database/DataOperation.java | 13 -- .../services/database/DataRequest.java | 15 -- .../services/database/DataService.java | 171 ------------------ .../services/database/DbRequest.java | 24 --- .../services/database/DbResponse.java | 15 -- .../database/MongoDatabaseService.java | 139 -------------- .../database/RandomPositionPublisher.java | 49 ----- 8 files changed, 478 deletions(-) delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataOperation.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java deleted file mode 100644 index 1f869124..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataListener.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.services.message.Address; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import java.util.Collections; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.api.config.Config; - -public final class DataListener extends AbstractVerticle { - private final Configuration forcedConfig; - private Configuration config; - - private final Logger logger = LoggerFactory.getLogger(DataListener.class); - - private final String save = Address.incoming(DataService.class, "save"); - - public DataListener() { - this.forcedConfig = null; - } - - public DataListener(List listeningAddresses) { - this.forcedConfig = new Configuration(listeningAddresses); - } - - @Override - public void start(Promise startPromise) throws Exception { - config = Config.get(forcedConfig, config(), Configuration.class); - this.registerConsumers(); - startPromise.complete(); - } - - private void registerConsumers() { - config.listeningAddresses().forEach(address -> { - vertx.eventBus().consumer(address, document -> { - JsonMessage.on(JsonMessage.class, document, msg -> { - vertx.eventBus().publish(save, msg.json()); - }); - }); - }); - } - - private static record Configuration(@JsonProperty List listeningAddresses) { - private Configuration() { - this(Collections.emptyList()); - } - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataOperation.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataOperation.java deleted file mode 100644 index 9ea899e0..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataOperation.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.vertx.core.json.JsonObject; -import de.wuespace.telestion.api.message.JsonMessage; - -public record DataOperation( - @JsonProperty JsonObject data, - @JsonProperty JsonObject params) implements JsonMessage { - private DataOperation() { - this(new JsonObject(), new JsonObject()); - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java deleted file mode 100644 index 10faface..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataRequest.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.vertx.core.json.JsonObject; -import de.wuespace.telestion.api.message.JsonMessage; - -public record DataRequest( - @JsonProperty String className, - @JsonProperty JsonObject query, - @JsonProperty String operation, - @JsonProperty JsonObject operationParams) implements JsonMessage { - private DataRequest() { - this("", new JsonObject(), "", new JsonObject()); - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java deleted file mode 100644 index ad3d49b1..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DataService.java +++ /dev/null @@ -1,171 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.services.message.Address; -import io.vertx.core.*; -import io.vertx.core.json.JsonObject; -import java.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.api.config.Config; - -/** - * DataService is a verticle which is the interface to a underlying database implementation. - * All data requests should come to the DataService and will be parsed and executed. - * TODO: DataOperations like Integrate, Differentiate, Offset, Sum, ... - * TODO: MongoDB Queries explanation and implementation in fetchLatestData. - * - */ -public final class DataService extends AbstractVerticle { - private final Configuration forcedConfig; - private Configuration config; - - private final Logger logger = LoggerFactory.getLogger(DataService.class); - /** - * DataService Eventbus Addresses. - */ - private final String inSave = Address.incoming(DataService.class, "save"); - private final String inFind = Address.incoming(DataService.class, "find"); - private final String dbSave = Address.incoming(MongoDatabaseService.class, "save"); - private final String dbFind = Address.incoming(MongoDatabaseService.class, "find"); - - /** - * If this constructor is used, settings have to be specified in the config file. - */ - public DataService() { - this.forcedConfig = null; - } - - /** - * This constructor supplies default options. - * @param dataOperationMap Map of String->DataOperation for incoming dataRequests - */ - public DataService(Map dataOperationMap) { - this.forcedConfig = new Configuration(dataOperationMap); - } - - @Override - public void start(Promise startPromise) throws Exception { - config = Config.get(forcedConfig, config(), Configuration.class); - this.registerConsumers(); - startPromise.complete(); - } - - /** - * Method to register consumers to the eventbus. - */ - private void registerConsumers() { - vertx.eventBus().consumer(inFind, request -> { - JsonMessage.on(DataRequest.class, request, req -> { - this.dataRequestDispatcher(req, res -> { - if (res.failed()) { - logger.error(res.cause().getMessage()); - request.fail(-1, res.cause().getMessage()); - return; - } - request.reply(res.result()); - }); - }); - }); - vertx.eventBus().consumer(inSave, document -> { - JsonMessage.on(JsonMessage.class, document, doc -> { - vertx.eventBus().request(dbSave, doc.json(), res -> { - if (res.failed()) { - logger.error(res.cause().getMessage()); - document.fail(-1, res.cause().getMessage()); - return; - } - document.reply(res.result().body()); - }); - }); - }); - } - - /** - * Parse and dispatch incoming DataRequests. - * - * @param request Determines which dataType should be retrieved and if an Operation should be executed. - * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. - */ - private void dataRequestDispatcher(DataRequest request, Handler> resultHandler) { - // TODO: If className is empty, check if query exists and just pass the query to the DatabaseClient - try { - var dataType = Class.forName(request.className()); - if (request.operation().isEmpty()) { - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - resultHandler.handle(Future.succeededFuture(res.result())); - }); - } else { - var dataOperation = new DataOperation(new JsonObject(), request.operationParams()); - this.fetchLatestData(dataType, request.query(), res -> { - if (res.failed()) { - resultHandler.handle(Future.failedFuture(res.cause().getMessage())); - return; - } - dataOperation.data().put("data", res.result()); - }); - this.applyManipulation(request.operation(), dataOperation, resultHandler); - } - } catch (ClassNotFoundException e) { - logger.error("ClassNotFoundException: {}", e.getMessage()); - } - } - - /** - * Request data from another verticle and handle the result of the request. - * - * @param address Address String of the desired verticle. - * @param message Object to send to the desired verticle. - * @param resultHandler Handles the result of the requested operation. - */ - private void requestResultHandler( - String address, JsonMessage message, Handler> resultHandler) { - JsonObject result = new JsonObject(); - vertx.eventBus().request(address, message, reply -> { - if (reply.failed()) { - logger.error(reply.cause().getMessage()); - resultHandler.handle(Future.failedFuture(reply.cause().getMessage())); - return; - } - result.put("data", reply.result().body()); - resultHandler.handle(Future.succeededFuture(result)); - }); - } - - /** - * Method to fetch the latest data of a specified data type. - * - * @param dataType Determines which data type should be fetched. - * @param query MongoDB query, can be empty JsonObject if no specific query is needed. - * @param resultHandler Handles the request to the underlying database. Can be failed or succeeded. - */ - private void fetchLatestData(Class dataType, JsonObject query, - Handler> resultHandler) { - DbRequest dbRequest = new DbRequest(dataType, query); - this.requestResultHandler(dbFind, dbRequest, resultHandler); - } - - /** - * Apply data operation to fetched data. - * - * @param dataOperation Determines which manipulation should be applied. - * @param resultHandler Handles the request to the data operation verticle. Can be failed or succeeded. - */ - private void applyManipulation(String operationAddress, DataOperation dataOperation, - Handler> resultHandler) { - this.requestResultHandler(operationAddress, dataOperation, resultHandler); - } - - private static record Configuration( - @JsonProperty Map dataOperationMap - ) { - private Configuration() { - this(Collections.emptyMap()); - } - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java deleted file mode 100644 index 7808bb13..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbRequest.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.vertx.core.json.JsonObject; -import de.wuespace.telestion.api.message.JsonMessage; - -/** - * Record to provide the structure of a database request. - * - * @param dataType class of the data type - * @param query MongoDb query looks like this: { key: value } - * with key meaning the name of the field in the document. - * IN condition: { key: { $in: ["value1", "value2", ...] }} - * AND condition: { key1: "value1", key2: { $lt: value } } with $lt meaning less than - * OR condition: { $or: [{ key1: "value1" }, { key2: { $gt: value2 }}] } - * @see MongoDB manual for more information - */ -public record DbRequest( - @JsonProperty Class dataType, - @JsonProperty JsonObject query) implements JsonMessage { - private DbRequest() { - this(null, new JsonObject()); - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java deleted file mode 100644 index c48b7382..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/DbResponse.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.vertx.core.json.JsonObject; -import java.util.Collections; -import java.util.List; -import de.wuespace.telestion.api.message.JsonMessage; - -public record DbResponse( - @JsonProperty Class dataType, - @JsonProperty List result) implements JsonMessage { - private DbResponse() { - this(null, Collections.emptyList()); - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java deleted file mode 100644 index 9456c06f..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/MongoDatabaseService.java +++ /dev/null @@ -1,139 +0,0 @@ -package de.wuespace.telestion.services.database; - -import com.fasterxml.jackson.annotation.JsonProperty; -import io.vertx.core.*; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.mongo.FindOptions; -import io.vertx.ext.mongo.MongoClient; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.api.config.Config; -import de.wuespace.telestion.services.message.Address; - -/** - * MongoDatabaseService is a verticle which connects to a local running MongoDB-Database and listens for incoming - * database requests to process. - * TODO: Each database implementation (currently only MongoDB) is written in its own DBClient, - * TODO: but listens to the same address for DBRequests. The address is the interface to the database implementation, - * TODO: so that the used DB can be replaced easily by spawning another DBClient. - * Mongo specific: - * Data is always saved in their exclusive collection which is always named after their Class.name / MessageType. - */ -public final class MongoDatabaseService extends AbstractVerticle { - private final Logger logger = LoggerFactory.getLogger(MongoDatabaseService.class); - private final Configuration forcedConfig; - private Configuration config; - private MongoClient client; - - /** - * MongoDB Eventbus Addresses. - */ - private final String inSave = Address.incoming(MongoDatabaseService.class, "save"); - private final String outSave = Address.outgoing(MongoDatabaseService.class, "save"); - private final String inFind = Address.incoming(MongoDatabaseService.class, "find"); - - /** - * This constructor supplies default options. - * - * @param dbName the name of the local running database - * @param dbPoolName the name of the database pool - */ - public MongoDatabaseService(String dbName, String dbPoolName) { - this.forcedConfig = new Configuration( - new JsonObject().put("db_name", dbName).put("useObjectId", true), dbPoolName); - } - - /** - * If this constructor is used at all, settings have to be specified in the config file. - */ - public MongoDatabaseService() { - this.forcedConfig = null; - } - - @Override - public void start(Promise startPromise) throws Exception { - config = Config.get(forcedConfig, config(), Configuration.class); - this.client = MongoClient.createShared(vertx, config.dbConfig, config.dbPoolName); - this.registerConsumers(); - startPromise.complete(); - } - - /** - * Method to register consumers to the eventbus. - */ - private void registerConsumers() { - vertx.eventBus().consumer(inSave, document -> { - JsonMessage.on(JsonMessage.class, document, this::save); - }); - vertx.eventBus().consumer(inFind, request -> { - JsonMessage.on(DbRequest.class, request, dbRequest -> { - this.findLatest(dbRequest, result -> { - if (result.failed()) { - request.fail(-1, result.cause().getMessage()); - } - if (result.succeeded()) { - request.reply(result.result()); - } - }); - }); - }); - } - - /** - * Save the received document to the database. - * If a MongoDB-ObjectId is specified data will be upserted, meaning if the id does not exist it will be inserted, - * otherwise it will be updated. Else it will be inserted with a new id. - * If the save was successful the database looks for the newly saved document and publishes it to the database - * outgoing address concatenated with "/Class.name". With this behaviour clients (e.g. Frontend) can listen - * to the outgoing address of a specific data value and will always be provided with the most recent data. - * - * @param document a JsonMessage validated through the JsonMessage.on method - */ - private void save(JsonMessage document) { - var object = document.json(); - client.save(document.className(), object, res -> { - if (res.failed()) { - logger.error("DB Save failed: ", res.cause()); - return; - } - String id = res.result(); - client.find(document.className(), new JsonObject().put("_id", id), rec -> { - if (rec.failed()) { - logger.error("DB Find failed: ", rec.cause()); - return; - } - DbResponse dbRes = new DbResponse(document.getClass(), rec.result()); - vertx.eventBus().publish(outSave.concat("/").concat(document.className()), dbRes.json()); - }); - }); - } - - /** - * Find the latest entry of the requested data type. - * - * @param request DbRequest = { class of requested data type, query? } - * @param handler Result handler, can be failed or succeeded - */ - private void findLatest(DbRequest request, Handler>> handler) { - FindOptions findOptions = new FindOptions() - .setSort(new JsonObject().put("_id", -1)).setLimit(1); // last item - client.findWithOptions(request.dataType().getName(), - request.query(), - findOptions, res -> { - if (res.failed()) { - logger.error("DB Request failed: ", res.cause()); - handler.handle(Future.failedFuture(res.cause())); - return; - } - handler.handle(Future.succeededFuture(res.result())); - }); - } - - private static record Configuration(@JsonProperty JsonObject dbConfig, @JsonProperty String dbPoolName) { - private Configuration() { - this(new JsonObject().put("db_name", "raketenpraktikum").put("useObjectId", true), "raketenpraktikumPool"); - } - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java deleted file mode 100644 index 21b46f5e..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/database/RandomPositionPublisher.java +++ /dev/null @@ -1,49 +0,0 @@ -package de.wuespace.telestion.services.database; - -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import java.time.Duration; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import de.wuespace.telestion.services.message.Address; - -/** - * Test class.
- * Will be removed upon first release. - */ -public final class RandomPositionPublisher extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(RandomPositionPublisher.class); - private final Random rand = new Random(555326456); - - private final String inSave = Address.incoming(DataService.class, "save"); - - @Override - public void start(Promise startPromise) { - vertx.setPeriodic(Duration.ofSeconds(3).toMillis(), timerId -> publishPosition()); - startPromise.complete(); - } - - /** - * Publishes random Position around Kiruna. - */ - private void publishPosition() { - var x = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("x", 67.8915); - var y = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("y", 21.0836); - var z = (double) vertx.sharedData().getLocalMap("randPos").getOrDefault("z", 0.0); - - //final Position pos = new Position(x, y, z); - - x += rand.nextDouble() * 0.02; - y += rand.nextDouble() * 0.02; - // z += rand.nextDouble()*0.02; - vertx.sharedData().getLocalMap("randPos").put("x", x); - vertx.sharedData().getLocalMap("randPos").put("y", y); - vertx.sharedData().getLocalMap("randPos").put("z", z); - - //vertx.eventBus().publish(Address.outgoing(RandomPositionPublisher.class, "MockPos"), pos.json()); - //vertx.eventBus().publish(inSave, pos.json()); - //logger.debug("Sending current pos: {} on {}", pos, RandomPositionPublisher.class.getName()); - } -} -