diff --git a/src/replica/ingest/CMakeLists.txt b/src/replica/ingest/CMakeLists.txt
index 2f6408664..8e9b1bd15 100644
--- a/src/replica/ingest/CMakeLists.txt
+++ b/src/replica/ingest/CMakeLists.txt
@@ -2,6 +2,7 @@ add_library(replica_ingest OBJECT)
add_dependencies(replica_ingest replica_proto)
target_sources(replica_ingest PRIVATE
IngestClient.cc
+ IngestDataHttpSvcMod.cc
IngestFileSvc.cc
IngestHttpSvc.cc
IngestHttpSvcMod.cc
diff --git a/src/replica/ingest/IngestDataHttpSvcMod.cc b/src/replica/ingest/IngestDataHttpSvcMod.cc
new file mode 100644
index 000000000..d110ccfd5
--- /dev/null
+++ b/src/replica/ingest/IngestDataHttpSvcMod.cc
@@ -0,0 +1,384 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/ingest/IngestDataHttpSvcMod.h"
+
+// Qserv header
+#include "http/BinaryEncoding.h"
+#include "http/Exceptions.h"
+#include "http/Method.h"
+#include "replica/config/Configuration.h"
+#include "replica/services/DatabaseServices.h"
+#include "replica/util/Csv.h"
+#include "util/String.h"
+
+// System headers
+#include
+#include
+#include
+#include
+#include
+
+using namespace std;
+using json = nlohmann::json;
+namespace qhttp = lsst::qserv::qhttp;
+
+namespace {
+/// @return requestor's IP address
+string senderIpAddr(qhttp::Request::Ptr const& req) {
+ ostringstream ss;
+ ss << req->remoteAddr.address();
+ return ss.str();
+}
+
+/// These keywords are found in all known binary columns types of MySQL.
+vector const binColTypePatterns = {"BIT", "BINARY", "BLOB"};
+
+/**
+ * @param type The column type name of.
+ * @return 'true' if the type represents the binary column type in MySQL.
+ */
+bool isBinaryColumnType(string const& type) {
+ string typeUpperCase = type;
+ transform(typeUpperCase.cbegin(), typeUpperCase.cend(), typeUpperCase.begin(),
+ [](unsigned char c) { return toupper(c); });
+ for (string const& pattern : binColTypePatterns) {
+ if (string::npos != typeUpperCase.find(pattern)) return true;
+ }
+ return false;
+}
+
+} // namespace
+
+namespace lsst::qserv::replica {
+
+void IngestDataHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, string const& workerName,
+ qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
+ string const& subModuleName, http::AuthType const authType) {
+ IngestDataHttpSvcMod module(serviceProvider, workerName, req, resp);
+ module.execute(subModuleName, authType);
+}
+
+IngestDataHttpSvcMod::IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider,
+ string const& workerName, qhttp::Request::Ptr const& req,
+ qhttp::Response::Ptr const& resp)
+ : http::ModuleBase(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp),
+ IngestFileSvc(serviceProvider, workerName) {}
+
+string IngestDataHttpSvcMod::context() const { return "INGEST-DATA-HTTP-SVC "; }
+
+json IngestDataHttpSvcMod::executeImpl(string const& subModuleName) {
+ debug(__func__, "subModuleName: '" + subModuleName + "'");
+ if (subModuleName == "SYNC-PROCESS-DATA") return _syncProcessData();
+ throw invalid_argument(context() + "::" + string(__func__) + " unsupported sub-module: '" +
+ subModuleName + "'");
+}
+
+json IngestDataHttpSvcMod::_syncProcessData() {
+ debug(__func__);
+ checkApiVersion(__func__, 34);
+
+ auto const context_ = context() + __func__;
+ auto const config = serviceProvider()->config();
+ auto const databaseServices = serviceProvider()->databaseServices();
+
+ // Fill out parameters in the contribution descriptor. This information is needed
+ // for bookeeping and monitoring purposes. The descriptor's state will be kept
+ // updated in the Replication/Ingest's database as the contribution processing
+ // will be happening.
+ _contrib.transactionId = body().required("transaction_id");
+ _contrib.table = body().required("table");
+ _contrib.chunk = body().required("chunk");
+ _contrib.isOverlap = body().required("overlap") != 0;
+ _contrib.worker = workerName();
+
+ // To indicate the JSON-formatted data were streamed directly into the service
+ _contrib.url = "data-json://" + ::senderIpAddr(req()) + "/";
+ _contrib.charsetName =
+ body().optional("charset_name", config->get("worker", "ingest-charset-name"));
+
+ // Note the double quotes enforced around the fields. This is compatible with
+ // the JSON way for packaging strings.
+ _contrib.dialectInput.fieldsEnclosedBy = R"(")";
+
+ // Retries are allowed before an attemp to load data into MySQL. When such attempt
+ // is made the persistent state of the destination table is supposed to be changed.
+ _contrib.retryAllowed = true;
+
+ // This parameters sets a limit foe the number of warnings (should there be any)
+ // reported by MySQL after contribution loading attempt. Warnings is an important
+ // mechanism for debugging problems with the ingested data.
+ _contrib.maxNumWarnings = body().optional(
+ "max_num_warnings", config->get("worker", "loader-max-warnings"));
+
+ // This is needed for decoding values of the binary columns should they be present
+ // in the table schema.
+ http::BinaryEncodingMode const binaryEncodingMode =
+ http::parseBinaryEncoding(body().optional("binary_encoding", "hex"));
+
+ // Rows are expected to be supplied in the JSON array
+ if (!body().has("rows")) {
+ throw http::Error(context_, "a collection of rows is missing in the request");
+ }
+ json const& rows = body().objJson.at("rows");
+ if (!rows.is_array()) {
+ throw http::Error(context_, "a collection of rows found in the request is not the JSON array");
+ }
+ if (rows.empty()) {
+ throw http::Error(context_, "a collection of rows in the request is empty");
+ }
+
+ debug(__func__, "transaction_id: " + to_string(_contrib.transactionId));
+ debug(__func__, "table: '" + _contrib.table + "'");
+ debug(__func__, "chunk: " + to_string(_contrib.chunk));
+ debug(__func__, "overlap: " + string(_contrib.isOverlap ? "1" : "0"));
+ debug(__func__, "charset_name: '" + _contrib.charsetName + "'");
+ debug(__func__, "max_num_warnings: " + to_string(_contrib.maxNumWarnings));
+ debug(__func__, "binary_encoding: '" + http::binaryEncoding2string(binaryEncodingMode) + "'");
+ debug(__func__, "rows.size: " + to_string(rows.size()));
+
+ // Attempts to pass invalid transaction identifiers or tables are not recorded
+ // as transaction contributions in the persistent state of the Replication/Ingest
+ // system since it's impossible to determine a context of these operations.
+ // The following operations will throw exceptions should any problems with
+ // validation a context of the request will be encountered.
+ TransactionInfo const trans = databaseServices->transaction(_contrib.transactionId);
+ _contrib.database = trans.database;
+
+ DatabaseInfo const database = config->databaseInfo(_contrib.database);
+ TableInfo const table = database.findTable(_contrib.table);
+
+ // Scan table schema for the binary columns and build 0-based index.
+ // The index will be required for decoding the input data of the binary columns.
+ //
+ // NOTES:
+ // - The transaction identifier column will not be added to the index since it's
+ // a special column added by the Ingest system. The column is not supposed to be
+ // known to (or used by) the ingest workflows.
+ // - The index size will be also used for validating sizes of the input rows.
+ bool const failed = true;
+ if (table.columns.empty() || (table.columns.front().name != "qserv_trans_id")) {
+ _contrib.error = "incomplete or missing table schema";
+ _contrib = databaseServices->createdTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+ vector isBinary;
+ for (auto const& coldef : table.columns) {
+ if (coldef.name == "qserv_trans_id") continue;
+ isBinary.push_back(::isBinaryColumnType(coldef.type));
+ }
+
+ // Make sure the transaction is in the rigth state.
+ if (trans.state != TransactionInfo::State::STARTED) {
+ _contrib.error = "transactionId=" + to_string(_contrib.transactionId) + " is not active";
+ _contrib = databaseServices->createdTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+
+ // Register the validated contribution and mark it as started.
+ _contrib = databaseServices->createdTransactionContrib(_contrib);
+ try {
+ _contrib.tmpFile =
+ openFile(_contrib.transactionId, _contrib.table, csv::Dialect(_contrib.dialectInput),
+ _contrib.charsetName, _contrib.chunk, _contrib.isOverlap);
+ _contrib = databaseServices->startedTransactionContrib(_contrib);
+ } catch (exception const& ex) {
+ _contrib.systemError = errno;
+ _contrib.error = ex.what();
+ _contrib = databaseServices->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+
+ // Begin reading, validating and transforming the input data into a valid CSV stream.
+ // Note using the string as a buffer. To reduce repeated memory allocations/deallocations
+ // when processing rows the algorithm assumes that the capacity of the string is automatically
+ // increased to fit the whole row on the first iteraton of the loop. After that the string
+ // buffer is expected stay like that for the rest of the data extracton (see a note below on
+ // the string clear before processing each row).
+ string row;
+
+ // The storage overhead for the transaction identifier prepended at each row.
+ // The number is used for estimating and reporting the overall number of bytes
+ // in the input contribution.
+ size_t const numBytesInTransactionId = sizeof(uint32_t) +
+ 2 * _contrib.dialectInput.fieldsEnclosedBy.size() +
+ _contrib.dialectInput.fieldsTerminatedBy.size();
+
+ for (size_t rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
+ json const& jsonRow = rows[rowIdx];
+
+ // C++ doesn't require the string clear method to keep the previously allocated
+ // buffers. The following operations ensures that the amount of memory allocated
+ // during the previous iteration (if any) would stay the same.
+ size_t const capacity = row.capacity();
+ row.clear();
+ row.reserve(capacity);
+
+ // These tests would prevent a problem with the input data before making an actual
+ // table loading attempt.
+ if (!jsonRow.is_array()) {
+ _contrib.error = "a row found in the request is not the JSON array";
+ _contrib = databaseServices->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+ if (jsonRow.size() != isBinary.size()) {
+ _contrib.error = "the row size in the request doesn't match the table schema";
+ _contrib = databaseServices->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+
+ // Extract and process columns
+ for (size_t colIdx = 0; colIdx < jsonRow.size(); ++colIdx) {
+ json const& jsonColumn = jsonRow[colIdx];
+ if (colIdx == 0) row.append(_contrib.dialectInput.fieldsTerminatedBy);
+ row.append(_contrib.dialectInput.fieldsEnclosedBy);
+ if (isBinary[colIdx]) {
+ switch (binaryEncodingMode) {
+ case http::BinaryEncodingMode::HEX:
+ row.append(_translateHexString(context_, jsonColumn, rowIdx, colIdx));
+ break;
+ case http::BinaryEncodingMode::ARRAY: {
+ u8string const str = _translateByteArray(context_, jsonColumn, rowIdx, colIdx);
+ row.append(reinterpret_cast(str.data()), str.size());
+ break;
+ }
+ default:
+ _contrib.error = "unsupported binary encoding mode '" +
+ http::binaryEncoding2string(binaryEncodingMode) + "'";
+ _contrib = databaseServices->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+ } else {
+ row.append(_translatePrimitiveType(context_, jsonColumn, rowIdx, colIdx));
+ }
+ row.append(_contrib.dialectInput.fieldsEnclosedBy);
+ }
+ row.append(_contrib.dialectInput.linesTerminatedBy);
+ try {
+ writeRowIntoFile(row.data(), row.size());
+ _contrib.numRows++;
+ _contrib.numBytes += numBytesInTransactionId + row.size();
+ } catch (exception const& ex) {
+ _contrib.error = "failed to write the row into the temporary file at row " + to_string(rowIdx) +
+ ", ex: " + string(ex.what());
+ _contrib = databaseServices->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+ }
+
+ // Report that processing of the input data and preparing the contribution file is over.
+ _contrib = databaseServices->readTransactionContrib(_contrib);
+
+ // Begin making irreversible changes to the destination table.
+ _contrib.retryAllowed = false;
+ try {
+ loadDataIntoTable(_contrib.maxNumWarnings);
+ _contrib.numWarnings = numWarnings();
+ _contrib.warnings = warnings();
+ _contrib.numRowsLoaded = numRowsLoaded();
+ _contrib = databaseServices->loadedTransactionContrib(_contrib);
+ closeFile();
+ } catch (exception const& ex) {
+ _contrib.error = "MySQL load failed, ex: " + string(ex.what());
+ _contrib.systemError = errno;
+ databaseServices->loadedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+ }
+ return json::object({{"contrib", _contrib.toJson()}});
+}
+
+string IngestDataHttpSvcMod::_translateHexString(string const& context_, json const& jsonColumn,
+ size_t rowIdx, size_t colIdx) {
+ if (jsonColumn.is_string()) {
+ try {
+ return util::String::fromHex(jsonColumn.get());
+ } catch (exception const& ex) {
+ _contrib.error = "failed to decode a value of the '" +
+ http::binaryEncoding2string(http::BinaryEncodingMode::HEX) +
+ "' binary encoded column at row " + to_string(rowIdx) + " and column " +
+ to_string(colIdx) + ", ex: " + string(ex.what());
+ }
+ } else {
+ _contrib.error = "unsupported type name '" + string(jsonColumn.type_name()) + "' found at row " +
+ to_string(rowIdx) + " and column " + to_string(colIdx) +
+ " where the string type was expected";
+ }
+ bool const failed = true;
+ _contrib = serviceProvider()->databaseServices()->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+}
+
+u8string IngestDataHttpSvcMod::_translateByteArray(string const& context_, json const& jsonColumn,
+ size_t rowIdx, size_t colIdx) {
+ if (jsonColumn.is_array()) {
+ try {
+ // An array of unsigned 8-bit numbers is expected here.
+ return jsonColumn.get();
+ } catch (exception const& ex) {
+ _contrib.error = "failed to decode a value of the '" +
+ http::binaryEncoding2string(http::BinaryEncodingMode::ARRAY) +
+ "' binary encoded column at row " + to_string(rowIdx) + " and column " +
+ to_string(colIdx) + ", ex: " + string(ex.what());
+ }
+ } else {
+ _contrib.error = "unsupported type name '" + string(jsonColumn.type_name()) + "' found at row " +
+ to_string(rowIdx) + " and column " + to_string(colIdx) +
+ " where the string type was expected";
+ }
+ bool const failed = true;
+ _contrib = serviceProvider()->databaseServices()->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+}
+
+string IngestDataHttpSvcMod::_translatePrimitiveType(string const& context_, json const& jsonColumn,
+ size_t rowIdx, size_t colIdx) {
+ if (jsonColumn.is_boolean()) {
+ return string(jsonColumn.get() ? "1" : "0");
+ } else if (jsonColumn.is_number() || jsonColumn.is_string()) {
+ return to_string(jsonColumn);
+ } else {
+ _contrib.error = "unsupported type name '" + string(jsonColumn.type_name()) + "' found at row " +
+ to_string(rowIdx) + " and column " + to_string(colIdx) +
+ " where the boolean, numeric or string type was expected";
+ }
+ bool const failed = true;
+ _contrib = serviceProvider()->databaseServices()->startedTransactionContrib(_contrib, failed);
+ _failed(context_);
+ throw http::Error(context_, _contrib.error);
+}
+
+void IngestDataHttpSvcMod::_failed(string const& context_) {
+ error(context_, _contrib.error);
+ closeFile();
+}
+
+} // namespace lsst::qserv::replica
diff --git a/src/replica/ingest/IngestDataHttpSvcMod.h b/src/replica/ingest/IngestDataHttpSvcMod.h
new file mode 100644
index 000000000..5d8d0f60d
--- /dev/null
+++ b/src/replica/ingest/IngestDataHttpSvcMod.h
@@ -0,0 +1,114 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_INGESTDATAHTTPSVCMOD_H
+#define LSST_QSERV_INGESTDATAHTTPSVCMOD_H
+
+// System headers
+#include
+
+// Third party headers
+#include "nlohmann/json.hpp"
+
+// Qserv headers
+#include "http/ModuleBase.h"
+#include "qhttp/Request.h"
+#include "qhttp/Response.h"
+#include "replica/ingest/IngestFileSvc.h"
+#include "replica/ingest/TransactionContrib.h"
+#include "replica/services/ServiceProvider.h"
+
+// This header declarations
+namespace lsst::qserv::replica {
+
+/**
+ * Class IngestDataHttpSvcMod processes chunk/table contribution requests made over HTTP.
+ * The class is used by the HTTP server built into the worker Ingest service.
+ * Unlike class IngestHttpSvcMod, the current class is meant to be used for ingesting
+ * payloads that are pushed directly into the service over the HTTP protocol.
+ */
+class IngestDataHttpSvcMod : public http::ModuleBase, public IngestFileSvc {
+public:
+ IngestDataHttpSvcMod() = delete;
+ IngestDataHttpSvcMod(IngestDataHttpSvcMod const&) = delete;
+ IngestDataHttpSvcMod& operator=(IngestDataHttpSvcMod const&) = delete;
+
+ virtual ~IngestDataHttpSvcMod() = default;
+
+ /**
+ * Process a request.
+ *
+ * Supported values for parameter 'subModuleName':
+ *
+ * SYNC-PROCESS-DATA for synchronous execution of the table contribution requests
+ *
+ * @param serviceProvider The provider of services is needed to access
+ * the configuration and the database services.
+ * @param workerName The name of a worker this service is acting upon (used to pull
+ * worker-specific configuration options for the service).
+ * @param req The HTTP request.
+ * @param resp The HTTP response channel.
+ * @param subModuleName The name of a submodule to be called.
+ * @param authType The authorization requirements for the module
+ * @throws std::invalid_argument for unknown values of parameter 'subModuleName'
+ */
+ static void process(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
+ qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
+ std::string const& subModuleName,
+ http::AuthType const authType = http::AuthType::REQUIRED);
+
+protected:
+ /// @see http::ModuleBase::context()
+ virtual std::string context() const final;
+
+ /// @see http::ModuleBase::executeImpl()
+ virtual nlohmann::json executeImpl(std::string const& subModuleName) final;
+
+private:
+ /// @see method IngestDataHttpSvcMod::create()
+ IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
+ qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp);
+
+ /// Process a table contribution request (SYNC).
+ nlohmann::json _syncProcessData();
+
+ /**
+ * Close the temporary file if needed and post an error message.
+ * @param context_ The caller's context.
+ */
+ void _failed(std::string const& context_);
+
+ // The following three methods translate an input JSON object into a string.
+ // Methods throw http::Error for any problems encountered during object proccesing
+ // or translation.
+
+ std::string _translateHexString(std::string const& context_, nlohmann::json const& jsonColumn,
+ size_t rowIdx, size_t colIdx);
+ std::u8string _translateByteArray(std::string const& context_, nlohmann::json const& jsonColumn,
+ size_t rowIdx, size_t colIdx);
+ std::string _translatePrimitiveType(std::string const& context_, nlohmann::json const& jsonColumn,
+ size_t rowIdx, size_t colIdx);
+
+ TransactionContribInfo _contrib; ///< A state of the contribution processing
+};
+
+} // namespace lsst::qserv::replica
+
+#endif // LSST_QSERV_INGESTDATAHTTPSVCMOD_H
diff --git a/src/replica/ingest/IngestHttpSvc.cc b/src/replica/ingest/IngestHttpSvc.cc
index c0cd3b44d..aa47c6999 100644
--- a/src/replica/ingest/IngestHttpSvc.cc
+++ b/src/replica/ingest/IngestHttpSvc.cc
@@ -30,6 +30,7 @@
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "replica/config/Configuration.h"
+#include "replica/ingest/IngestDataHttpSvcMod.h"
#include "replica/ingest/IngestHttpSvcMod.h"
#include "replica/ingest/IngestRequestMgr.h"
@@ -75,6 +76,11 @@ void IngestHttpSvc::registerServices() {
{"instance_id", self->serviceProvider()->instanceId()}});
http::MetaModule::process(::context_, info, req, resp, "VERSION");
}},
+ {"POST", "/ingest/data",
+ [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
+ IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp,
+ "SYNC-PROCESS-DATA");
+ }},
{"POST", "/ingest/file",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
diff --git a/src/replica/ingest/IngestHttpSvcMod.h b/src/replica/ingest/IngestHttpSvcMod.h
index 7773882da..a015e21ac 100644
--- a/src/replica/ingest/IngestHttpSvcMod.h
+++ b/src/replica/ingest/IngestHttpSvcMod.h
@@ -40,7 +40,7 @@ namespace lsst::qserv::replica {
/**
* Class IngestHttpSvcMod processes chunk/table contribution requests made over HTTP.
- * The class is used by the HTTP server build into the worker Ingest service.
+ * The class is used by the HTTP server built into the worker Ingest service.
*/
class IngestHttpSvcMod : public http::ModuleBase {
public: