Skip to content

Commit

Permalink
Changed code to use QMeta::ChunkMap instead of json object.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Apr 8, 2024
1 parent 71acedf commit 23e0bcd
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 30 deletions.
28 changes: 14 additions & 14 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

// Qserv headers
#include "qmeta/Exceptions.h"
#include "qmeta/QMeta.h"
#include "util/Bug.h"
#include "util/TimeUtils.h"

Expand Down Expand Up @@ -69,11 +68,8 @@ bool CzarChunkMap::_read() {
return false;
}

auto const& jsChunks = qChunkMap.chunks;
LOGS(_log, LOG_LVL_DEBUG, "chunkMap=" << jsChunks);

// Make the new maps.
auto [chunkMapPtr, wcMapPtr] = makeNewMaps(jsChunks);
auto [chunkMapPtr, wcMapPtr] = makeNewMaps(qChunkMap);

verify(*chunkMapPtr, *wcMapPtr);
LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
Expand All @@ -88,18 +84,22 @@ bool CzarChunkMap::_read() {
}

pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap>> CzarChunkMap::makeNewMaps(
nlohmann::json const& jsChunks) {
qmeta::QMeta::ChunkMap const& qChunkMap) {
// Create new maps.
auto wcMapPtr = make_shared<WorkerChunkMap>();
auto chunkMapPtr = make_shared<ChunkMap>();

for (auto const& [workerId, dbs] : jsChunks.items()) {
for (auto const& [dbName, tables] : dbs.items()) {
for (auto const& [tableName, chunks] : tables.items()) {
for (auto const& [index, chunkNumNSz] : chunks.items()) {
// Workers -> Databases map
for (auto const& [workerId, dbs] : qChunkMap.workers) {
// Databases -> Tables map
for (auto const& [dbName, tables] : dbs) {
// Tables -> Chunks map
for (auto const& [tableName, chunks] : tables) {
// vector of ChunkInfo
for (qmeta::QMeta::ChunkMap::ChunkInfo const& chunkInfo : chunks) {
try {
int64_t chunkNum = chunkNumNSz.at(0);
int64_t sz = chunkNumNSz.at(1);
int64_t chunkNum = chunkInfo.chunk;
int64_t sz = chunkInfo.size;
LOGS(_log, LOG_LVL_DEBUG,
"workerdId=" << workerId << " db=" << dbName << " table=" << tableName
<< " chunk=" << chunkNum << " sz=" << sz);
Expand All @@ -109,12 +109,12 @@ pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap
throw ChunkMapException(
ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
" chunk=" + to_string(chunkInfo.chunk) + " " + exc.what());
} catch (out_of_range const& exc) {
throw ChunkMapException(
ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
" chunk=" + to_string(chunkInfo.chunk) + " " + exc.what());
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@

// Qserv headers
#include "global/clock_defs.h"
#include "qmeta/QMeta.h"
#include "util/Issue.h"

namespace lsst::qserv::qmeta {
class QMeta;
}

namespace lsst::qserv::czar {

class ChunkMapException : public util::Issue {
Expand Down Expand Up @@ -177,9 +174,9 @@ class CzarChunkMap {
/// descending.
static void calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize);

/// Make new ChunkMap and WorkerChunkMap from the data in `jsChunks`.
/// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`.
static std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>, std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
makeNewMaps(nlohmann::json const& jsChunks);
makeNewMaps(qmeta::QMeta::ChunkMap const& qChunkMap);

/// Verify that all chunks belong to at least one worker and that all chunks are represented in shared
/// scans.
Expand Down
43 changes: 41 additions & 2 deletions src/czar/testCzar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

// Qserv headers
#include "czar/CzarChunkMap.h"
#include "qmeta/QMeta.h"

namespace test = boost::test_tools;
using namespace lsst::qserv;
Expand All @@ -49,6 +50,42 @@ using namespace std;

BOOST_AUTO_TEST_SUITE(Suite)

void insertIntoQChunkMap(qmeta::QMeta::ChunkMap& qChunkMap, string const& workerId, string const& dbName,
string const& tableName, unsigned int chunkNum, size_t sz) {
qChunkMap.workers[workerId][dbName][tableName].push_back(qmeta::QMeta::ChunkMap::ChunkInfo{chunkNum, sz});
}

qmeta::QMeta::ChunkMap convertJsonToChunkMap(nlohmann::json const& jsChunks) {
qmeta::QMeta::ChunkMap qChunkMap;
for (auto const& [workerId, dbs] : jsChunks.items()) {
for (auto const& [dbName, tables] : dbs.items()) {
for (auto const& [tableName, chunks] : tables.items()) {
for (auto const& [index, chunkNumNSz] : chunks.items()) {
try {
int64_t chunkNum = chunkNumNSz.at(0);
int64_t sz = chunkNumNSz.at(1);
LOGS(_log, LOG_LVL_DEBUG,
"workerdId=" << workerId << " db=" << dbName << " table=" << tableName
<< " chunk=" << chunkNum << " sz=" << sz);
insertIntoQChunkMap(qChunkMap, workerId, dbName, tableName, chunkNum, sz);
} catch (invalid_argument const& exc) {
throw czar::ChunkMapException(
ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
} catch (out_of_range const& exc) {
throw czar::ChunkMapException(
ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
}
}
}
}
}
return qChunkMap;
}

BOOST_AUTO_TEST_CASE(CzarChunkMap) {
// Each chunk only occurs on one worker
string test1 = R"(
Expand Down Expand Up @@ -149,12 +186,14 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) {
)";

auto jsTest1 = nlohmann::json::parse(test1);
auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(jsTest1);
qmeta::QMeta::ChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1);
auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1);
czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure.
LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 1 passed");

auto jsTest2 = nlohmann::json::parse(test2);
tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(jsTest2);
qmeta::QMeta::ChunkMap qChunkMap2 = convertJsonToChunkMap(jsTest2);
tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(qChunkMap2);
czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure.
LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 2 passed");
}
Expand Down
1 change: 0 additions & 1 deletion src/qmeta/QMetaMysql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr<qdisp::MessageStor
}
}

//&&& HEAD
QMeta::ChunkMap QMetaMysql::getChunkMap(chrono::time_point<chrono::system_clock> const& prevUpdateTime) {
lock_guard<mutex> lock(_dbMutex);

Expand Down
9 changes: 2 additions & 7 deletions src/qmeta/testQMeta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,11 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) {
}
BOOST_CHECK(caught);
}
/* &&&
<<<<<<< HEAD
=======
*/

BOOST_AUTO_TEST_CASE(getChunkMap) {
// The test assumes that the underlying tables exists and it's empty.
QMeta::ChunkMap chunkMap;
BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError);
}
/* &&&
>>>>>>> 07d082050 (Added code to read chunk disposition map and organize for czar use.)
*/

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 23e0bcd

Please sign in to comment.