diff --git a/admin/local/docker/compose/docker-compose.yml b/admin/local/docker/compose/docker-compose.yml index 5a3c37f99..dc5ca4880 100644 --- a/admin/local/docker/compose/docker-compose.yml +++ b/admin/local/docker/compose/docker-compose.yml @@ -415,6 +415,7 @@ services: --registry-host=repl-mgr-registry --controller-auto-register-workers=1 --qserv-sync-force + --qserv-chunk-map-update --debug expose: - "25081" diff --git a/src/replica/apps/MasterControllerHttpApp.cc b/src/replica/apps/MasterControllerHttpApp.cc index 462371824..2b00107fc 100644 --- a/src/replica/apps/MasterControllerHttpApp.cc +++ b/src/replica/apps/MasterControllerHttpApp.cc @@ -147,6 +147,10 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[]) " This affect replicas to be deleted from the workers during the synchronization" " stages.", _forceQservSync); + parser().flag("qserv-chunk-map-update", + "The flag which would result in updating the chunk disposition map" + " in Qserv's QMeta database.", + _qservChunkMapUpdate); parser().flag("purge", "The binary flag which, if provided, enables the 'purge' algorithm in" " the end of each replication cycle that eliminates excess replicas which" @@ -198,7 +202,7 @@ int MasterControllerHttpApp::runImpl() { _replicationTask = ReplicationTask::create( _controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec, - _forceQservSync, _replicationIntervalSec, _purge); + _forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge); _replicationTask->start(); _healthMonitorTask = HealthMonitorTask::create( diff --git a/src/replica/apps/MasterControllerHttpApp.h b/src/replica/apps/MasterControllerHttpApp.h index d3e17f782..f5ef4ed02 100644 --- a/src/replica/apps/MasterControllerHttpApp.h +++ b/src/replica/apps/MasterControllerHttpApp.h @@ -132,6 +132,7 @@ class MasterControllerHttpApp : public Application { bool _purge; bool _forceQservSync; + bool _qservChunkMapUpdate; bool _permanentDelete; /// A connection URL for the MySQL service of the Qserv master database. diff --git a/src/replica/contr/ReplicationTask.cc b/src/replica/contr/ReplicationTask.cc index c7dbdea2d..360f78927 100644 --- a/src/replica/contr/ReplicationTask.cc +++ b/src/replica/contr/ReplicationTask.cc @@ -22,23 +22,35 @@ // Class header #include "replica/contr/ReplicationTask.h" +// System headers +#include + // Qserv headers +#include "replica/config/Configuration.h" #include "replica/jobs/FindAllJob.h" #include "replica/jobs/FixUpJob.h" #include "replica/jobs/ReplicateJob.h" #include "replica/jobs/RebalanceJob.h" #include "replica/jobs/PurgeJob.h" +#include "replica/mysql/DatabaseMySQL.h" +#include "replica/mysql/DatabaseMySQLGenerator.h" +#include "replica/mysql/DatabaseMySQLUtils.h" +#include "replica/services/DatabaseServices.h" +#include "replica/util/ReplicaInfo.h" using namespace std; namespace lsst::qserv::replica { +using namespace database::mysql; + ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) { + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, + bool purge) { return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync, - replicationIntervalSec, purge)); + qservChunkMapUpdate, replicationIntervalSec, purge)); } bool ReplicationTask::onRun() { @@ -51,6 +63,8 @@ bool ReplicationTask::onRun() { launch(priority, saveReplicaInfo, allWorkers); sync(_qservSyncTimeoutSec, _forceQservSync); + if (_qservChunkMapUpdate) _updateChunkMap(); + launch(priority); sync(_qservSyncTimeoutSec, _forceQservSync); @@ -73,10 +87,81 @@ bool ReplicationTask::onRun() { ReplicationTask::ReplicationTask(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge) : Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec), _qservSyncTimeoutSec(qservSyncTimeoutSec), _forceQservSync(forceQservSync), + _qservChunkMapUpdate(qservChunkMapUpdate), _purge(purge) {} +void ReplicationTask::_updateChunkMap() { + // Open MySQL connection using the RAII-style handler that would automatically + // abort the transaction should any problem occured when loading data into the table. + ConnectionHandler h; + try { + h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta")); + } catch (exception const& ex) { + error("failed to connect to the czar's database server, ex: " + string(ex.what())); + return; + } + QueryGenerator const g(h.conn); + + // Get info on known chunk replicas from the persistent store of the Replication system + // and package those into ready-to-ingest data. + bool const allDatabases = true; + string const emptyDatabaseFilter; + bool const isPublished = true; + bool const includeFileInfo = true; // need this to access tables sizes + vector rows; + for (auto const& workerName : serviceProvider()->config()->workers()) { + vector replicas; + serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter, + allDatabases, isPublished, includeFileInfo); + for (auto const& replica : replicas) { + for (auto const& fileInfo : replica.fileInfo()) { + if (fileInfo.isData() && !fileInfo.isOverlap()) { + rows.push_back(g.packVals(workerName, replica.database(), fileInfo.baseTable(), + replica.chunk(), fileInfo.size)); + } + } + } + } + if (rows.empty()) { + warn("no replicas found in the persistent state of the Replication system"); + return; + } + + // Get the limit for the length of the bulk insert queries. The limit is needed + // to run the query generation. + size_t maxQueryLength = 0; + string const globalVariableName = "max_allowed_packet"; + try { + string const query = g.showVars(SqlVarScope::GLOBAL, globalVariableName); + h.conn->executeInOwnTransaction([&query, &maxQueryLength](auto conn) { + bool const noMoreThanOne = true; + if (!selectSingleValue(conn, query, maxQueryLength, "Value", noMoreThanOne)) { + throw runtime_error("no such variable found"); + } + }); + } catch (exception const& ex) { + error("failed to get a value of GLOBAL '" + globalVariableName + "', ex: " + string(ex.what())); + return; + } + + // Execute a sequence of queries atomically + vector const deleteQueries = {g.delete_("chunkMap"), g.delete_("chunkMapStatus")}; + vector insertQueries = g.insertPacked( + "chunkMap", g.packIds("worker", "database", "table", "chunk", "size"), rows, maxQueryLength); + insertQueries.push_back(g.insert("chunkMapStatus", Sql::NOW)); + try { + h.conn->executeInOwnTransaction([&deleteQueries, &insertQueries](auto conn) { + for (auto const& query : deleteQueries) conn->execute(query); + for (auto const& query : insertQueries) conn->execute(query); + }); + } catch (exception const& ex) { + error("failed to update chunk map in the Czar database, ex: " + string(ex.what())); + return; + } +} + } // namespace lsst::qserv::replica diff --git a/src/replica/contr/ReplicationTask.h b/src/replica/contr/ReplicationTask.h index 2f8191e58..5bc99c76b 100644 --- a/src/replica/contr/ReplicationTask.h +++ b/src/replica/contr/ReplicationTask.h @@ -55,6 +55,7 @@ class ReplicationTask : public Task { * @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving * up on the Qserv synchronization requests. * @param forceQservSync Force chunk removal at worker resource collections if 'true'. + * @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'. * @param replicationIntervalSec The number of seconds to wait in the end of each * iteration loop before to begin the new one. * @param purge Purge excess replicas if 'true'. @@ -62,7 +63,7 @@ class ReplicationTask : public Task { */ static Ptr create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); protected: @@ -72,15 +73,23 @@ class ReplicationTask : public Task { private: /// @see ReplicationTask::create() ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); + void _updateChunkMap(); + /// The maximum number of seconds to be waited before giving up /// on the Qserv synchronization requests. unsigned int const _qservSyncTimeoutSec; - bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'. - bool const _purge; ///< Purge excess replicas if 'true'. + /// Force removal at worker resource collections if 'true'. + bool const _forceQservSync; + + /// Update the chunk disposition map in Qserv's QMeta database if 'true'. + bool const _qservChunkMapUpdate; + + /// Purge excess replicas if 'true'. + bool const _purge; }; } // namespace lsst::qserv::replica diff --git a/src/replica/contr/Task.h b/src/replica/contr/Task.h index b805b32f5..cc0151912 100644 --- a/src/replica/contr/Task.h +++ b/src/replica/contr/Task.h @@ -197,6 +197,12 @@ class Task : public EventLogger, public std::enable_shared_from_this { */ void debug(std::string const& msg) { LOGS(_log, LOG_LVL_DEBUG, context() << msg); } + /** + * Log a message into the Logger's LOG_LVL_WARN stream. + * @param msg A message to be logged. + */ + void warn(std::string const& msg) { LOGS(_log, LOG_LVL_WARN, context() << msg); } + /** * Log a message into the Logger's LOG_LVL_ERROR stream. * @param msg A message to be logged.