Skip to content

Commit

Permalink
Czar and workers can send http messages to each other.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed May 16, 2024
1 parent 5915c67 commit da87adb
Show file tree
Hide file tree
Showing 40 changed files with 1,043 additions and 399 deletions.
7 changes: 0 additions & 7 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,6 @@ bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32

// This is needed to ensure the job query would be staying alive for the duration
// of the operation to prevent inconsistency witin the application.
/* &&&
auto const jobQuery = getJobQuery().lock();
if (jobQuery == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobQuery was NULL");
return false;
}
*/
auto const jobBase = getJobBase().lock();
if (jobBase == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobBase was NULL");
Expand Down
2 changes: 1 addition & 1 deletion src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
std::string query = aQuery;

// TODO: DM-43386 need to have WorkerChunkMap info at this point
// &&&
// &&&uj

std::string stripped;
bool async = false;
Expand Down
168 changes: 58 additions & 110 deletions src/ccontrol/UserQuerySelect.cc

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion src/ccontrol/UserQuerySelect.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class UserQuerySelect : public UserQuery {

/// Begin execution of the query over all ChunkSpecs added so far.
void submit() override;
void submitOld(); //&&&

/// Wait until the query has completed execution.
/// @return the final execution state.
Expand Down
1 change: 1 addition & 0 deletions src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources(czar PRIVATE
CzarRegistry.cc
HttpCzarSvc.cc
HttpCzarQueryModule.cc
HttpCzarWorkerModule.cc
HttpModule.cc
HttpMonitorModule.cc
HttpSvc.cc
Expand Down
10 changes: 0 additions & 10 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,6 @@ extern XrdSsiProvider* XrdSsiProviderClient;

namespace {

/* &&&
string const createAsyncResultTmpl(
"CREATE TABLE IF NOT EXISTS %1% "
"(jobId BIGINT, resultLocation VARCHAR(1024))"
"ENGINE=MEMORY;"
"INSERT INTO %1% (jobId, resultLocation) "
"VALUES (%2%, '%3%')");
*/

LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.Czar");

} // anonymous namespace
Expand Down Expand Up @@ -197,7 +188,6 @@ Czar::Czar(string const& configFilePath, string const& czarName)

Czar::~Czar() {
LOGS(_log, LOG_LVL_DEBUG, "Czar::~Czar()");
cout << "&&& Czar::~Czar()" << endl;
}

SubmitResult Czar::submitQuery(string const& query, map<string, string> const& hints) {
Expand Down
6 changes: 2 additions & 4 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

// System headers
#include <sstream>
#include <iostream> // &&& del
//&&& #include <iostream> // &&& del

// LSST headers
#include "lsst/log/Log.h"

// Qserv headers
#include "qmeta/QMeta.h" //&&& move and check linking
#include "qmeta/QMeta.h"
#include "czar/Czar.h"
#include "czar/CzarRegistry.h"
#include "qmeta/Exceptions.h"
Expand All @@ -47,7 +47,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap");
namespace lsst::qserv::czar {

CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta) {
cout << "&&& CzarChunkMap::CzarChunkMap()" << endl;
try {
auto mapsSet = _read();
if (!mapsSet) {
Expand All @@ -61,7 +60,6 @@ CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(

CzarChunkMap::~CzarChunkMap() {
LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap::~CzarChunkMap()");
cout << "&&& CzarChunkMap::~CzarChunkMap()" << endl;
}

bool CzarChunkMap::_read() {
Expand Down
2 changes: 0 additions & 2 deletions src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
#include <map>
#include <memory>
#include <mutex>
//&&&#include <set>
#include <string>
#include <sstream>

// Qserv headers
#include "global/clock_defs.h"
// #include "qmeta/QMeta.h" &&&
#include "util/Issue.h"

namespace lsst::qserv::qmeta {
Expand Down
10 changes: 0 additions & 10 deletions src/czar/CzarRegistry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarRegistry");
namespace lsst::qserv::czar {

CzarRegistry::CzarRegistry(std::shared_ptr<cconfig::CzarConfig> const& czarConfig) : _czarConfig(czarConfig) {
cout << "&&& CzarRegistry::CzarRegistry a" << endl;
// Begin periodically updating worker's status in the Replication System's registry.
// This will continue until the application gets terminated.
thread registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this);
Expand All @@ -60,22 +59,16 @@ CzarRegistry::CzarRegistry(std::shared_ptr<cconfig::CzarConfig> const& czarConfi
}

CzarRegistry::~CzarRegistry() {
cout << "&&& CzarRegistry::~CzarRegistry a" << endl;
_loop = false;
if (_czarHeartbeatThrd.joinable()) {
cout << "&&& CzarRegistry::~CzarRegistry a1" << endl;
_czarHeartbeatThrd.join();
}
cout << "&&& CzarRegistry::~CzarRegistry b" << endl;
if (_czarWorkerInfoThrd.joinable()) {
cout << "&&& CzarRegistry::~CzarRegistry b1" << endl;
_czarWorkerInfoThrd.join();
}
cout << "&&& CzarRegistry::~CzarRegistry end" << endl;
}

void CzarRegistry::_registryUpdateLoop() {
cout << "&&& CzarRegistry::_registryUpdateLoop a" << endl;
auto const method = http::Method::POST;
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
Expand Down Expand Up @@ -106,11 +99,9 @@ void CzarRegistry::_registryUpdateLoop() {
}
this_thread::sleep_for(chrono::seconds(max(1U, _czarConfig->replicationRegistryHearbeatIvalSec())));
}
cout << "&&& CzarRegistry::_registryUpdateLoop end" << endl;
}

void CzarRegistry::_registryWorkerInfoLoop() {
cout << "&&& CzarRegistry::_registryWorkerInfoLoop a" << endl;
// Get worker information from the registry
vector<string> const headers;
auto const method = http::Method::GET;
Expand Down Expand Up @@ -144,7 +135,6 @@ void CzarRegistry::_registryWorkerInfoLoop() {
}
this_thread::sleep_for(chrono::seconds(15));
}
cout << "&&& CzarRegistry::_registryWorkerInfoLoop end" << endl;
}

CzarRegistry::WorkerContactMapPtr CzarRegistry::_buildMapFromJson(nlohmann::json const& response) {
Expand Down
88 changes: 88 additions & 0 deletions src/czar/HttpCzarWorkerModule.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "czar/HttpCzarWorkerModule.h"

// System headers
#include <set>
#include <stdexcept>

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "global/intTypes.h"
#include "http/Exceptions.h"
#include "http/RequestQuery.h"
#include "util/String.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;
using json = nlohmann::json;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.HttpCzarWorkerModule");
}

namespace lsst::qserv::czar {

void HttpCzarWorkerModule::process(string const& context, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp, string const& subModuleName,
http::AuthType const authType) {
HttpCzarWorkerModule module(context, req, resp);
module.execute(subModuleName, authType);
}

HttpCzarWorkerModule::HttpCzarWorkerModule(string const& context, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp)
: HttpModule(context, req, resp) {}

json HttpCzarWorkerModule::executeImpl(string const& subModuleName) {
string const func = string(__func__) + "[sub-module='" + subModuleName + "']";
debug(func);
//&&&uj this seems irrelevant for a worker enforceInstanceId(func, cconfig::CzarConfig::instance()->replicationInstanceId());
enforceCzarName(func);
if (subModuleName == "QUERYJOB-ERROR")
return _queryJobError();
else if (subModuleName == "QUERYJOB-READY")
return _queryJobReady();
throw invalid_argument(context() + func + " unsupported sub-module");
}

json HttpCzarWorkerModule::_queryJobError() {
debug(__func__);
checkApiVersion(__func__, 34);
LOGS(_log, LOG_LVL_INFO, __func__ << "&&&uj queryJobError json=" << body().objJson); //&&&
//&&&uj NEED CODE for this
return json::object();
}

json HttpCzarWorkerModule::_queryJobReady() {
debug(__func__);
checkApiVersion(__func__, 34);
LOGS(_log, LOG_LVL_INFO, __func__ << "&&&uj queryJobReady json=" << body().objJson); //&&&
//&&&uj NEED CODE for this
json ret = {{"success", 1}};
return json::object();
}

} // namespace lsst::qserv::czar
77 changes: 77 additions & 0 deletions src/czar/HttpCzarWorkerModule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_CZAR_HTTPCZARWORKERMODULE_H
#define LSST_QSERV_CZAR_HTTPCZARWORKERMODULE_H

// System headers
#include <memory>
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "czar/HttpModule.h"

// Forward declarations
namespace lsst::qserv::qhttp {
class Request;
class Response;
} // namespace lsst::qserv::qhttp

// This header declarations
namespace lsst::qserv::czar {

/// &&& doc This class is used to handle messages to this czar from workers.
class HttpCzarWorkerModule : public czar::HttpModule {
public:
/// @note supported values for parameter 'subModuleName' are:
/// 'QUERYJOB-ERROR' - error in a QUERYJOB
/// 'QUERYJOB-READY' -
/// @throws std::invalid_argument for unknown values of parameter 'subModuleName'
static void process(std::string const& context, std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp, std::string const& subModuleName,
http::AuthType const authType = http::AuthType::NONE);

HttpCzarWorkerModule() = delete;
HttpCzarWorkerModule(HttpCzarWorkerModule const&) = delete;
HttpCzarWorkerModule& operator=(HttpCzarWorkerModule const&) = delete;

~HttpCzarWorkerModule() final = default;

protected:
nlohmann::json executeImpl(std::string const& subModuleName) final;

private:
HttpCzarWorkerModule(std::string const& context, std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp);

/// &&& doc
nlohmann::json _queryJobError();

/// &&& doc
nlohmann::json _queryJobReady();

};

} // namespace lsst::qserv::czar

#endif // LSST_QSERV_CZAR_HTTPCZARWORKERMODULE_H
11 changes: 11 additions & 0 deletions src/czar/HttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
// Qserv headers
#include "cconfig/CzarConfig.h"
#include "czar/HttpMonitorModule.h"
#include "czar/HttpCzarWorkerModule.h"
#include "http/MetaModule.h"
#include "qhttp/Server.h"

Expand Down Expand Up @@ -90,6 +91,16 @@ uint16_t HttpSvc::start() {
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, req, resp, "STATUS");
}}});
_httpServerPtr->addHandlers(
{{"POST", "/queryjob-error",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpCzarWorkerModule::process(::serviceName, req, resp, "QUERYJOB-ERROR");
}}});
_httpServerPtr->addHandlers(
{{"POST", "/queryjob-ready",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpCzarWorkerModule::process(::serviceName, req, resp, "QUERYJOB-READY");
}}});
_httpServerPtr->start();

// Initialize the I/O context and start the service threads. At this point
Expand Down
11 changes: 0 additions & 11 deletions src/czar/testCzar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ qmeta::QMetaChunkMap convertJsonToChunkMap(nlohmann::json const& jsChunks) {

BOOST_AUTO_TEST_CASE(CzarChunkMap) {
// Each chunk only occurs on one worker
cerr << "&&& a" << endl;
string test1 = R"(
{
"ce1c1b79-e6fb-11ee-a46b-0242c0a80308":
Expand Down Expand Up @@ -126,7 +125,6 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) {
}
}
)";
cerr << "&&& b " << test1 << endl;

/// 3 workers, each containing all chunks.
string test2 = R"(
Expand Down Expand Up @@ -187,27 +185,18 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) {
}
}
)";
cerr << "&&& c" << endl;

auto jsTest1 = nlohmann::json::parse(test1);
cerr << "&&& d" << endl;
qmeta::QMetaChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1);
cerr << "&&& e" << endl;
auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1);
czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure.
cerr << "&&& f" << endl;
LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 1 passed");

cerr << "&&& g" << endl;
auto jsTest2 = nlohmann::json::parse(test2);
cerr << "&&& h" << endl;
qmeta::QMetaChunkMap qChunkMap2 = convertJsonToChunkMap(jsTest2);
cerr << "&&& i" << endl;
tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(qChunkMap2);
cerr << "&&& j" << endl;
czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure.
LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 2 passed");
cerr << "&&& end" << endl;
}

BOOST_AUTO_TEST_SUITE_END()
Loading

0 comments on commit da87adb

Please sign in to comment.