Skip to content

Commit

Permalink
The REST service for ingesting JSON-formatted contributions
Browse files Browse the repository at this point in the history
The data are sent into the Ingest service directly in the request's body
  • Loading branch information
iagaponenko committed Apr 12, 2024
1 parent f7e5c49 commit 78445c1
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/replica/ingest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ add_library(replica_ingest OBJECT)
add_dependencies(replica_ingest replica_proto)
target_sources(replica_ingest PRIVATE
IngestClient.cc
IngestDataHttpSvcMod.cc
IngestFileSvc.cc
IngestHttpSvc.cc
IngestHttpSvcMod.cc
Expand Down
113 changes: 113 additions & 0 deletions src/replica/ingest/IngestDataHttpSvcMod.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/ingest/IngestDataHttpSvcMod.h"

// Qserv header
#include "http/BinaryEncoding.h"
#include "http/Method.h"
#include "replica/config/Configuration.h"
#include "replica/util/Csv.h"

// System headers
#include <sstream>
#include <stdexcept>

using namespace std;
using json = nlohmann::json;
namespace qhttp = lsst::qserv::qhttp;

namespace {
/// @return requestor's IP address
string senderIpAddr(qhttp::Request::Ptr const& req) {
ostringstream ss;
ss << req->remoteAddr.address();
return ss.str();
}
} // namespace

namespace lsst::qserv::replica {

void IngestDataHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, string const& workerName,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
string const& subModuleName, http::AuthType const authType) {
IngestDataHttpSvcMod module(serviceProvider, workerName, req, resp);
module.execute(subModuleName, authType);
}

IngestDataHttpSvcMod::IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider,
string const& workerName, qhttp::Request::Ptr const& req,
qhttp::Response::Ptr const& resp)
: http::ModuleBase(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp),
IngestFileSvc(serviceProvider, workerName) {}

string IngestDataHttpSvcMod::context() const { return "INGEST-DATA-HTTP-SVC "; }

json IngestDataHttpSvcMod::executeImpl(string const& subModuleName) {
debug(__func__, "subModuleName: '" + subModuleName + "'");
if (subModuleName == "SYNC-PROCESS-DATA") return _syncProcessData();
throw invalid_argument(context() + "::" + string(__func__) + " unsupported sub-module: '" +
subModuleName + "'");
}

json IngestDataHttpSvcMod::_syncProcessData() const {
debug(__func__);
checkApiVersion(__func__, 34);

auto const config = serviceProvider()->config();
TransactionId const transactionId = body().required<TransactionId>("transaction_id");
string const table = body().required<string>("table");
unsigned int const chunk = body().required<unsigned int>("chunk");
bool const isOverlap = body().required<int>("overlap") != 0;
string const charsetName =
body().optional<string>("charset_name", config->get<string>("worker", "ingest-charset-name"));
http::BinaryEncodingMode const binaryEncodingMode =
http::parseBinaryEncoding(body().optional<string>("binary_encoding", "hex"));
unsigned int const maxNumWarnings = body().optional<unsigned int>("max_num_warnings", 0);

debug(__func__, "transaction_id: " + to_string(transactionId));
debug(__func__, "table: '" + table + "'");
debug(__func__, "chunk: " + to_string(chunk));
debug(__func__, "overlap: " + string(isOverlap ? "1" : "0"));
debug(__func__, "charset_name: '" + charsetName + "'");
debug(__func__, "binary_encoding: '" + http::binaryEncoding2string(binaryEncodingMode) + "'");
debug(__func__, "max_num_warnings: " + to_string(maxNumWarnings));

// To indicate the data streamed directly into the service
string const url = "data-json://" + ::senderIpAddr(req()) + "/";

// Note the double quotes enforced around the fields. This is compatible with
// the JSON way for packaging strings.
csv::DialectInput dialectInput;
dialectInput.fieldsTerminatedBy = csv::Dialect::defaultFieldsTerminatedBy;
dialectInput.fieldsEnclosedBy = R"(")";
dialectInput.fieldsEscapedBy = csv::Dialect::defaultFieldsEscapedBy;
dialectInput.linesTerminatedBy = csv::Dialect::defaultLinesTerminatedBy;

// auto const request = _createRequest();
// request->process();
// return json::object({{"contrib", request->transactionContribInfo().toJson()}});

return json();
}

} // namespace lsst::qserv::replica
94 changes: 94 additions & 0 deletions src/replica/ingest/IngestDataHttpSvcMod.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_INGESTDATAHTTPSVCMOD_H
#define LSST_QSERV_INGESTDATAHTTPSVCMOD_H

// System headers
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "http/ModuleBase.h"
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "replica/ingest/IngestFileSvc.h"
#include "replica/services/ServiceProvider.h"

// This header declarations
namespace lsst::qserv::replica {

/**
* Class IngestDataHttpSvcMod processes chunk/table contribution requests made over HTTP.
* The class is used by the HTTP server built into the worker Ingest service.
* Unlike class IngestHttpSvcMod, the current class is meant to be used for ingesting
* payloads that are pushed directly into the service over the HTTP protocol.
*/
class IngestDataHttpSvcMod : public http::ModuleBase, public IngestFileSvc {
public:
IngestDataHttpSvcMod() = delete;
IngestDataHttpSvcMod(IngestDataHttpSvcMod const&) = delete;
IngestDataHttpSvcMod& operator=(IngestDataHttpSvcMod const&) = delete;

virtual ~IngestDataHttpSvcMod() = default;

/**
* Process a request.
*
* Supported values for parameter 'subModuleName':
*
* SYNC-PROCESS-DATA for synchronous execution of the table contribution requests
*
* @param serviceProvider The provider of services is needed to access
* the configuration and the database services.
* @param workerName The name of a worker this service is acting upon (used to pull
* worker-specific configuration options for the service).
* @param req The HTTP request.
* @param resp The HTTP response channel.
* @param subModuleName The name of a submodule to be called.
* @param authType The authorization requirements for the module
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
*/
static void process(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
std::string const& subModuleName,
http::AuthType const authType = http::AuthType::REQUIRED);

protected:
/// @see http::ModuleBase::context()
virtual std::string context() const final;

/// @see http::ModuleBase::executeImpl()
virtual nlohmann::json executeImpl(std::string const& subModuleName) final;

private:
/// @see method IngestDataHttpSvcMod::create()
IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp);

/// Process a table contribution request (SYNC).
nlohmann::json _syncProcessData() const;
};

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_INGESTDATAHTTPSVCMOD_H
6 changes: 6 additions & 0 deletions src/replica/ingest/IngestHttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "replica/config/Configuration.h"
#include "replica/ingest/IngestDataHttpSvcMod.h"
#include "replica/ingest/IngestHttpSvcMod.h"
#include "replica/ingest/IngestRequestMgr.h"

Expand Down Expand Up @@ -75,6 +76,11 @@ void IngestHttpSvc::registerServices() {
{"instance_id", self->serviceProvider()->instanceId()}});
http::MetaModule::process(::context_, info, req, resp, "VERSION");
}},
{"POST", "/ingest/data",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp,
"SYNC-PROCESS-DATA");
}},
{"POST", "/ingest/file",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName,
Expand Down
2 changes: 1 addition & 1 deletion src/replica/ingest/IngestHttpSvcMod.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace lsst::qserv::replica {

/**
* Class IngestHttpSvcMod processes chunk/table contribution requests made over HTTP.
* The class is used by the HTTP server build into the worker Ingest service.
* The class is used by the HTTP server built into the worker Ingest service.
*/
class IngestHttpSvcMod : public http::ModuleBase {
public:
Expand Down

0 comments on commit 78445c1

Please sign in to comment.