Skip to content
This repository has been archived by the owner on Dec 11, 2020. It is now read-only.

Commit

Permalink
Logger changes for go game ctrl
Browse files Browse the repository at this point in the history
  • Loading branch information
jma127 committed May 23, 2018
1 parent 8605f95 commit 83a4b99
Showing 1 changed file with 98 additions and 65 deletions.
163 changes: 98 additions & 65 deletions src_cpp/elfgames/go/game_ctrl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <chrono>
#include <fstream>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
Expand All @@ -25,6 +24,7 @@
#include "elf/base/ctrl.h"
#include "elf/concurrency/ConcurrentQueue.h"
#include "elf/concurrency/Counter.h"
#include "elf/logging/IndexedLoggerFactory.h"

#include "game_stats.h"
#include "go_game_specific.h"
Expand All @@ -36,6 +36,20 @@ using ThreadedCtrlBase =
using Ctrl = elf::CtrlT<elf::concurrency::ConcurrentQueue>;
using Addr = elf::Addr;

namespace elfgames {
namespace go {
namespace game_ctrl {

inline elf::logging::IndexedLoggerFactory* getLoggerFactory() {
static elf::logging::IndexedLoggerFactory factory(
[](const std::string& name) { return spdlog::stderr_color_mt(name); });
return &factory;
}

} // namespace game_ctrl
} // namespace go
} // namespace elfgames

struct CtrlInfo {
int num_games;
GameOptions options;
Expand Down Expand Up @@ -82,15 +96,19 @@ struct CtrlInfo {
class ThreadedDispatcher : public ThreadedCtrlBase {
public:
ThreadedDispatcher(CtrlInfo& info)
: ThreadedCtrlBase(info.ctrl, 500), ctrl_info_(info) {
: ThreadedCtrlBase(info.ctrl, 500),
ctrl_info_(info),
logger_(elfgames::go::game_ctrl::getLoggerFactory()->makeLogger(
"elfgames::go::ThreadedDispatcher-",
"")) {
start<std::pair<Addr, MsgRestart>, MsgRestart, MsgRequest>();
}

// Called by game threads
void RegGame(int game_idx) {
ctrl_.RegMailbox<MsgRequest, MsgRestart>(
"game_" + std::to_string(game_idx));
// cout << "Register game " << game_idx << endl;
logger_->debug("Register game {}", game_idx);
game_counter_.increment();
}

Expand All @@ -114,11 +132,10 @@ class ThreadedDispatcher : public ThreadedCtrlBase {

// Wait for confirm from the other side. If result is nontrivial.
if (msg.result == RestartReply::UPDATE_MODEL) {
// std::cout << "[" << msg.game_idx << "] On receive done. " << endl;
logger_->debug("(game_idx {}) on receive done", msg.game_idx);
MsgRestart msg2;
ctrl_.waitMail(&msg2);
// std::cout << "[" << msg.game_idx << "] Broadcast update complete. " <<
// endl;
logger_->debug("(game_idx {}) broadcast update complete", msg.game_idx);
}
return msg;
}
Expand All @@ -133,15 +150,13 @@ class ThreadedDispatcher : public ThreadedCtrlBase {
void before_loop() override {
// Wait for all games + this processing thread.
int num_games = ctrl_info_.num_games;
std::cout << "Wait all games[" << num_games << "] to register their mailbox"
<< std::endl;
logger_->info("Wait all {} games to register their mailbox", num_games);
game_counter_.waitUntilCount(num_games);
game_counter_.reset();
std::cout << "All games [" << num_games << "] registered" << std::endl;
logger_->info("All {} games have registered their mailbox", num_games);
}

void on_thread() override {
// cout << "Register Recv threads" << endl;
MsgRequest msg;
if (ctrl_.peekMail(&msg, 0)) {
process_request(msg);
Expand All @@ -154,15 +169,14 @@ class ThreadedDispatcher : public ThreadedCtrlBase {
return false;
}

std::cout << elf_utils::now()
<< ", EvalCtrl get new request: " << request.info() << std::endl;
logger_->info("process_request called: {}", request.info());
curr_request_ = request;

MsgRequest wait_request;
wait_request.vers.set_wait();

std::vector<Addr> addrs = ctrl_.filterPrefix(std::string("game"));
// std::cout << "EvalCtrl: #addrs: " << addrs.size() << std::endl;
logger_->debug("process_request # addrs: {}", addrs.size());

// Check request
size_t n = curr_request_.client_ctrl.num_game_thread_used < 0
Expand All @@ -184,8 +198,10 @@ class ThreadedDispatcher : public ThreadedCtrlBase {
// Wait until we get all confirmations.
for (size_t i = 0; i < addrs.size(); ++i) {
ctrl_.waitMail(&msg);
// std::cout << "EvalCtrl: Get confirm from " << msg.second.result << ",
// game_idx = " << msg.second.game_idx << std::endl;
logger_->debug(
"process_request confirm received: from {}, game_idx {}",
msg.second.result,
msg.second.game_idx);
switch (msg.second.result) {
case RestartReply::UPDATE_MODEL:
addrs_to_reply.push_back(msg.first);
Expand All @@ -201,10 +217,12 @@ class ThreadedDispatcher : public ThreadedCtrlBase {

if (update_model) {
// Once it is done, send to Python side.
std::cout << elf_utils::now() << " Get actionable request: black_ver = "
<< request.vers.black_ver
<< ", white_ver = " << request.vers.white_ver
<< ", #addrs_to_reply: " << addrs_to_reply.size() << std::endl;
logger_->info(
"process_request actionable request received: black_ver {}, "
"white_ver {}, addrs_to_reply.size {}",
request.vers.black_ver,
request.vers.white_ver,
addrs_to_reply.size());
elf::FuncsWithState funcs = ctrl_info_.client->BindStateToFunctions(
{start_target_}, &request.vers);
ctrl_info_.client->sendWait({start_target_}, &funcs);
Expand All @@ -215,12 +233,20 @@ class ThreadedDispatcher : public ThreadedCtrlBase {
}
return true;
}

private:
std::shared_ptr<spdlog::logger> logger_;
};

class ThreadedSelfplay : public ThreadedCtrlBase {
public:
ThreadedSelfplay(CtrlInfo& info)
: ThreadedCtrlBase(info.ctrl, 10000), ctrl_info_(info), rng_(time(NULL)) {
: ThreadedCtrlBase(info.ctrl, 10000),
ctrl_info_(info),
rng_(time(NULL)),
logger_(elfgames::go::game_ctrl::getLoggerFactory()->makeLogger(
"elfgames::go::ThreadedSelfplay-",
"")) {
start<int64_t>();
}

Expand All @@ -229,14 +255,13 @@ class ThreadedSelfplay : public ThreadedCtrlBase {
while (
(res = ctrl_info_.selfplay_ctrl->needWaitForMoreSample(selfplay_ver)) ==
SelfPlaySubCtrl::CtrlResult::INSUFFICIENT_SAMPLE) {
std::cout << elf_utils::now() << ", Insufficient sample for model "
<< selfplay_ver << "... waiting 30s" << std::endl;
logger_->info(
"INSUFFICIENT_SAMPLE for model {}, waiting 30s", selfplay_ver);
std::this_thread::sleep_for(30s);
}

if (res == SelfPlaySubCtrl::CtrlResult::SUFFICIENT_SAMPLE) {
std::cout << elf_utils::now() << ", Sufficient sample for model "
<< selfplay_ver << std::endl;
logger_->info("SUFFICIENT_SAMPLE for model {}", selfplay_ver);
ctrl_info_.selfplay_ctrl->notifyCurrentWeightUpdate();
}
}
Expand Down Expand Up @@ -264,8 +289,7 @@ class ThreadedSelfplay : public ThreadedCtrlBase {

// After setCurrModel, new model from python side with the old selfplay_ver
// will not enter the replay buffer
std::cout << "Updating .. old_ver: " << old_ver << ", new_ver: " << ver
<< std::endl;
logger_->info("Updating from version {} to version {}", old_ver, ver);
// A better model is found, clean up old games (or not?)
if (!ctrl_info_.options.keep_prev_selfplay) {
ctrl_info_.reader->clear();
Expand All @@ -287,6 +311,9 @@ class ThreadedSelfplay : public ThreadedCtrlBase {
ctrl_info_.client->BindStateToFunctions({train_ctrl_}, &msg);
ctrl_info_.client->sendWait({train_ctrl_}, &funcs);
}

private:
std::shared_ptr<spdlog::logger> logger_;
};

class ThreadedWriterCtrl : public ThreadedCtrlBase {
Expand All @@ -295,7 +322,10 @@ class ThreadedWriterCtrl : public ThreadedCtrlBase {
: ThreadedCtrlBase(info.ctrl, 0),
ctrl_info_(info),
request_destination_(request_dest),
records_(info.writer->identity()) {
records_(info.writer->identity()),
logger_(elfgames::go::game_ctrl::getLoggerFactory()->makeLogger(
"elfgames::go::ThreadedWriterCtrl-",
"")) {
start<>();
}

Expand All @@ -321,22 +351,20 @@ class ThreadedWriterCtrl : public ThreadedCtrlBase {
std::string smsg;
// Will block..
if (!ctrl_info_.writer->getReplyNoblock(&smsg)) {
std::cout << elf_utils::now()
<< ", WriterCtrl: no message, sleep for a while .. "
<< std::endl;
logger_->info("no message, sleeping for 10s");
std::this_thread::sleep_for(10s);
return;
}

std::cout << elf_utils::now() << " In reply func: Message got..."
<< std::endl;
// cout << smsg << endl;
logger_->info("received message");

json j = json::parse(smsg);
MsgRequestSeq msg = MsgRequestSeq::createFromJson(j);
if (msg.seq != seq_) {
std::cout << "Warning! The sequence number [" << msg.seq
<< "] in the msg is different from " << seq_ << std::endl;
logger_->warn(
"expected sequence number {}, but got {} in the message",
seq_,
msg.seq);
}

ctrl_.sendMail(request_destination_, msg.request);
Expand All @@ -351,21 +379,19 @@ class ThreadedWriterCtrl : public ThreadedCtrlBase {
// Send data.
std::lock_guard<std::mutex> lock(record_mutex_);

/*
std::cout << "Sending state update[" << records_.identity << "][" <<
elf_utils::now() << "]"; for (const auto& s : records_.states) { std::cout
<< s.second.info() << ", ";
}
std::cout << std::endl;
*/
std::cout << "Sending state update[" << elf_utils::now()
<< "], #records: " << records_.records.size()
<< ", #states: " << records_.states.size() << std::endl;
logger_->debug(
"Sending state update (identity: {}) with {} records and {} states",
records_.identity,
records_.records.size(),
records_.states.size());

ctrl_info_.writer->Insert(records_.dumpJsonString());
records_.clear();
seq_ = msg.seq + 1;
}

private:
std::shared_ptr<spdlog::logger> logger_;
};

class TrainCtrl {
Expand All @@ -377,7 +403,10 @@ class TrainCtrl {
const GameOptions& options,
const elf::ai::tree_search::TSOptions& mcts_opt)
: ctrl_info_(num_games, client, options, mcts_opt, reader),
rng_(time(NULL)) {
rng_(time(NULL)),
logger_(elfgames::go::game_ctrl::getLoggerFactory()->makeLogger(
"elfgames::go::TrainCtrl-",
"")) {
using std::placeholders::_1;
using std::placeholders::_2;

Expand All @@ -403,7 +432,7 @@ class TrainCtrl {
}

bool setInitialVersion(int64_t init_version) {
std::cout << "Setting init version: " << init_version << std::endl;
logger_->info("setting init_version: {}", init_version);
ctrl_info_.eval_ctrl->setBaselineModel(init_version);

if (ctrl_info_.selfplay_ctrl->getCurrModel() < 0) {
Expand All @@ -414,8 +443,10 @@ class TrainCtrl {
}

bool setEvalMode(int64_t new_ver, int64_t old_ver) {
std::cout << "Setting eval mode: new: " << new_ver << ", old: " << old_ver
<< std::endl;
logger_->info(
"setting eval mode from old value ({}) to new value ({})",
old_ver,
new_ver);
ctrl_info_.client_mgr->setSelfplayOnlyRatio(0.0);
ctrl_info_.eval_ctrl->setBaselineModel(old_ver);
ctrl_info_.eval_ctrl->addNewModelForEvaluation(old_ver, new_ver);
Expand Down Expand Up @@ -478,8 +509,10 @@ class TrainCtrl {
ClientInfo& info = ctrl_info_.client_mgr->getClient(identity);

if (info.justAllocated()) {
std::cout << "New allocated: " << identity << ", "
<< ctrl_info_.client_mgr->info() << std::endl;
logger_->info(
"newly allocated: identity {}, info {}",
identity,
ctrl_info_.client_mgr->info());
}

MsgRequestSeq request;
Expand All @@ -493,14 +526,6 @@ class TrainCtrl {
bool on_receive_data(const Addr&, const Records& rs) {
ClientInfo& info = ctrl_info_.client_mgr->getClient(rs.identity);

// Print out the stats.
/*
std::cout << "State update[" << rs.identity << "][" << elf_utils::now() <<
"]"; for (const auto& s : rs.states) { std::cout << s.second.info() << ", ";
}
std::cout << std::endl;
*/

for (const auto& s : rs.states) {
info.stateUpdate(s.second);
}
Expand Down Expand Up @@ -530,11 +555,15 @@ class TrainCtrl {
valid_eval++;
}

std::cout << "TrainCtrl: Receive data[" << recv_count_ << "] from "
<< rs.identity << ", #state_update: " << rs.states.size()
<< ", #records: " << rs.records.size()
<< ", #valid_selfplay: " << valid_selfplay
<< ", #valid_eval: " << valid_eval << std::endl;
logger_->info(
"received {} records from {}, with {} state updates, {} records, {} "
"valid selfplays, and {} evals",
recv_count_,
rs.identity,
rs.states.size(),
rs.records.size(),
valid_selfplay,
valid_eval);
}
return true;
}
Expand Down Expand Up @@ -570,18 +599,21 @@ class TrainCtrl {
}
break;
case CLIENT_INVALID:
std::cout << "Warning! Invalid client_type! " << std::endl;
logger_->warn("invalid client type");
break;
}
}

private:
CtrlInfo ctrl_info_;

bool eval_mode_ = false;

std::unique_ptr<ThreadedSelfplay> threaded_selfplay_;
int recv_count_ = 0;
std::mt19937 rng_;

std::shared_ptr<spdlog::logger> logger_;
};

// Ctrl for evaluation/selfplay client.
Expand Down Expand Up @@ -658,6 +690,7 @@ class EvalCtrl {
return true;
}

private:
CtrlInfo ctrl_info_;

std::string end_target_ = "game_end";
Expand Down

0 comments on commit 83a4b99

Please sign in to comment.