Skip to content

Commit

Permalink
Extended Replication Controller to update chunk map at Czar
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Apr 5, 2024
1 parent f3c46cb commit 1010539
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 8 deletions.
1 change: 1 addition & 0 deletions admin/local/docker/compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ services:
--registry-host=repl-mgr-registry
--controller-auto-register-workers=1
--qserv-sync-force
--qserv-chunk-map-update
--debug
expose:
- "25081"
Expand Down
6 changes: 5 additions & 1 deletion src/replica/apps/MasterControllerHttpApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
" This affect replicas to be deleted from the workers during the synchronization"
" stages.",
_forceQservSync);
parser().flag("qserv-chunk-map-update",
"The flag which would result in updating the chunk disposition map"
" in Qserv's QMeta database.",
_qservChunkMapUpdate);
parser().flag("purge",
"The binary flag which, if provided, enables the 'purge' algorithm in"
" the end of each replication cycle that eliminates excess replicas which"
Expand Down Expand Up @@ -198,7 +202,7 @@ int MasterControllerHttpApp::runImpl() {

_replicationTask = ReplicationTask::create(
_controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec,
_forceQservSync, _replicationIntervalSec, _purge);
_forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge);
_replicationTask->start();

_healthMonitorTask = HealthMonitorTask::create(
Expand Down
1 change: 1 addition & 0 deletions src/replica/apps/MasterControllerHttpApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MasterControllerHttpApp : public Application {

bool _purge;
bool _forceQservSync;
bool _qservChunkMapUpdate;
bool _permanentDelete;

/// A connection URL for the MySQL service of the Qserv master database.
Expand Down
91 changes: 88 additions & 3 deletions src/replica/contr/ReplicationTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,35 @@
// Class header
#include "replica/contr/ReplicationTask.h"

// System headers
#include <vector>

// Qserv headers
#include "replica/config/Configuration.h"
#include "replica/jobs/FindAllJob.h"
#include "replica/jobs/FixUpJob.h"
#include "replica/jobs/ReplicateJob.h"
#include "replica/jobs/RebalanceJob.h"
#include "replica/jobs/PurgeJob.h"
#include "replica/mysql/DatabaseMySQL.h"
#include "replica/mysql/DatabaseMySQLGenerator.h"
#include "replica/mysql/DatabaseMySQLUtils.h"
#include "replica/services/DatabaseServices.h"
#include "replica/util/ReplicaInfo.h"

using namespace std;

namespace lsst::qserv::replica {

using namespace database::mysql;

ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge) {
bool qservChunkMapUpdate, unsigned int replicationIntervalSec,
bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync,
replicationIntervalSec, purge));
qservChunkMapUpdate, replicationIntervalSec, purge));
}

bool ReplicationTask::onRun() {
Expand All @@ -51,6 +63,8 @@ bool ReplicationTask::onRun() {
launch<FindAllJob>(priority, saveReplicaInfo, allWorkers);
sync(_qservSyncTimeoutSec, _forceQservSync);

if (_qservChunkMapUpdate) _updateChunkMap();

launch<FixUpJob>(priority);
sync(_qservSyncTimeoutSec, _forceQservSync);

Expand All @@ -73,10 +87,81 @@ bool ReplicationTask::onRun() {
ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge)
bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge)
: Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec),
_qservSyncTimeoutSec(qservSyncTimeoutSec),
_forceQservSync(forceQservSync),
_qservChunkMapUpdate(qservChunkMapUpdate),
_purge(purge) {}

void ReplicationTask::_updateChunkMap() {
// Open MySQL connection using the RAII-style handler that would automatically
// abort the transaction should any problem occured when loading data into the table.
ConnectionHandler h;
try {
h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta"));
} catch (exception const& ex) {
error("failed to connect to the czar's database server, ex: " + string(ex.what()));
return;
}
QueryGenerator const g(h.conn);

// Get info on known chunk replicas from the persistent store of the Replication system
// and package those into ready-to-ingest data.
bool const allDatabases = true;
string const emptyDatabaseFilter;
bool const isPublished = true;
bool const includeFileInfo = true; // need this to access tables sizes
vector<string> rows;
for (auto const& workerName : serviceProvider()->config()->workers()) {
vector<ReplicaInfo> replicas;
serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter,
allDatabases, isPublished, includeFileInfo);
for (auto const& replica : replicas) {
for (auto const& fileInfo : replica.fileInfo()) {
if (fileInfo.isData() && !fileInfo.isOverlap()) {
rows.push_back(g.packVals(workerName, replica.database(), fileInfo.baseTable(),
replica.chunk(), fileInfo.size));
}
}
}
}
if (rows.empty()) {
warn("no replicas found in the persistent state of the Replication system");
return;
}

// Get the limit for the length of the bulk insert queries. The limit is needed
// to run the query generation.
size_t maxQueryLength = 0;
string const globalVariableName = "max_allowed_packet";
try {
string const query = g.showVars(SqlVarScope::GLOBAL, globalVariableName);
h.conn->executeInOwnTransaction([&query, &maxQueryLength](auto conn) {
bool const noMoreThanOne = true;
if (!selectSingleValue(conn, query, maxQueryLength, "Value", noMoreThanOne)) {
throw runtime_error("no such variable found");
}
});
} catch (exception const& ex) {
error("failed to get a value of GLOBAL '" + globalVariableName + "', ex: " + string(ex.what()));
return;
}

// Execute a sequence of queries atomically
vector<string> const deleteQueries = {g.delete_("chunkMap"), g.delete_("chunkMapStatus")};
vector<string> insertQueries = g.insertPacked(
"chunkMap", g.packIds("worker", "database", "table", "chunk", "size"), rows, maxQueryLength);
insertQueries.push_back(g.insert("chunkMapStatus", Sql::NOW));
try {
h.conn->executeInOwnTransaction([&deleteQueries, &insertQueries](auto conn) {
for (auto const& query : deleteQueries) conn->execute(query);
for (auto const& query : insertQueries) conn->execute(query);
});
} catch (exception const& ex) {
error("failed to update chunk map in the Czar database, ex: " + string(ex.what()));
return;
}
}

} // namespace lsst::qserv::replica
17 changes: 13 additions & 4 deletions src/replica/contr/ReplicationTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ class ReplicationTask : public Task {
* @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving
* up on the Qserv synchronization requests.
* @param forceQservSync Force chunk removal at worker resource collections if 'true'.
* @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'.
* @param replicationIntervalSec The number of seconds to wait in the end of each
* iteration loop before to begin the new one.
* @param purge Purge excess replicas if 'true'.
* @return The smart pointer to a new object
*/
static Ptr create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate,
unsigned int replicationIntervalSec, bool purge);

protected:
Expand All @@ -72,15 +73,23 @@ class ReplicationTask : public Task {
private:
/// @see ReplicationTask::create()
ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate,
unsigned int replicationIntervalSec, bool purge);

void _updateChunkMap();

/// The maximum number of seconds to be waited before giving up
/// on the Qserv synchronization requests.
unsigned int const _qservSyncTimeoutSec;

bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
bool const _purge; ///< Purge excess replicas if 'true'.
/// Force removal at worker resource collections if 'true'.
bool const _forceQservSync;

/// Update the chunk disposition map in Qserv's QMeta database if 'true'.
bool const _qservChunkMapUpdate;

/// Purge excess replicas if 'true'.
bool const _purge;
};

} // namespace lsst::qserv::replica
Expand Down
6 changes: 6 additions & 0 deletions src/replica/contr/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ class Task : public EventLogger, public std::enable_shared_from_this<Task> {
*/
void debug(std::string const& msg) { LOGS(_log, LOG_LVL_DEBUG, context() << msg); }

/**
* Log a message into the Logger's LOG_LVL_WARN stream.
* @param msg A message to be logged.
*/
void warn(std::string const& msg) { LOGS(_log, LOG_LVL_WARN, context() << msg); }

/**
* Log a message into the Logger's LOG_LVL_ERROR stream.
* @param msg A message to be logged.
Expand Down

0 comments on commit 1010539

Please sign in to comment.