Skip to content

Commit

Permalink
Merge PR ceph#23635 into master
Browse files Browse the repository at this point in the history
* refs/pull/23635/head:
	mds: use monotonic clock in beacon
	mds: simplify beacon init

Reviewed-by: Zheng Yan <zyan@redhat.com>
  • Loading branch information
batrick committed Aug 21, 2018
2 parents b613fbd + e77a2f5 commit 7a69ebb
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 136 deletions.
116 changes: 46 additions & 70 deletions src/mds/Beacon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,10 @@

Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) :
Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
name(name_), standby_for_rank(MDS_RANK_NONE),
standby_for_fscid(FS_CLUSTER_ID_NONE), want_state(MDSMap::STATE_BOOT),
awaiting_seq(-1)
{
last_seq = 0;
was_laggy = false;

epoch = 0;
}


Beacon::~Beacon()
name(name_)
{
}


void Beacon::init(const MDSMap &mdsmap)
{
Mutex::Locker l(lock);
Expand Down Expand Up @@ -113,40 +101,30 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
version_t seq = m->get_seq();

// update lab
if (seq_stamp.count(seq)) {
utime_t now = ceph_clock_now();
if (seq_stamp[seq] > last_acked_stamp) {
last_acked_stamp = seq_stamp[seq];
utime_t rtt = now - last_acked_stamp;

dout(5) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " rtt " << rtt << dendl;

if (was_laggy && rtt < g_conf()->mds_beacon_grace) {
dout(0) << "handle_mds_beacon no longer laggy" << dendl;
was_laggy = false;
laggy_until = now;
}
} else {
// Mark myself laggy if system clock goes backwards. Hopping
// later beacons will clear it.
dout(0) << "handle_mds_beacon system clock goes backwards, "
<< "mark myself laggy" << dendl;
last_acked_stamp = now - utime_t(g_conf()->mds_beacon_grace + 1, 0);
was_laggy = true;
auto it = seq_stamp.find(seq);
if (it != seq_stamp.end()) {
auto now = clock::now();

last_acked_stamp = it->second;
auto rtt = std::chrono::duration<double>(now - last_acked_stamp).count();

dout(5) << "received beacon reply " << ceph_mds_state_name(m->get_state()) << " seq " << m->get_seq() << " rtt " << rtt << dendl;

if (laggy && rtt < g_conf()->mds_beacon_grace) {
dout(0) << " MDS is no longer laggy" << dendl;
laggy = false;
last_laggy = now;
}

// clean up seq_stamp map
while (!seq_stamp.empty() &&
seq_stamp.begin()->first <= seq)
seq_stamp.erase(seq_stamp.begin());
seq_stamp.erase(seq_stamp.begin(), ++it);

// Wake a waiter up if present
if (awaiting_seq == seq) {
waiting_cond.Signal();
}
} else {
dout(1) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
}
}
Expand All @@ -168,8 +146,9 @@ void Beacon::send_and_wait(const double duration)
<< " for up to " << duration << "s" << dendl;

utime_t timeout;
timeout.set_from_double(ceph_clock_now() + duration);
while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
timeout.set_from_double(ceph_clock_now()+duration);
while (!seq_stamp.empty()
&& seq_stamp.begin()->first <= awaiting_seq
&& ceph_clock_now() < timeout) {
waiting_cond.WaitUntil(lock, timeout);
}
Expand All @@ -194,17 +173,20 @@ void Beacon::_send()
_send();
}));

auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();

if (!cct->get_heartbeat_map()->is_healthy()) {
/* If anything isn't progressing, let avoid sending a beacon so that
* the MDS will consider us laggy */
dout(0) << __func__ << " skipping beacon, heartbeat map not healthy" << dendl;
dout(0) << "Skipping beacon heartbeat to monitors (last acked " << since << "s ago); MDS internal heartbeat is not healthy!" << dendl;
return;
}

++last_seq;
dout(5) << __func__ << " " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;
dout(5) << "Sending beacon " << ceph_mds_state_name(want_state) << " seq " << last_seq << dendl;

seq_stamp[last_seq] = ceph_clock_now();
seq_stamp[last_seq] = now;

assert(want_state != MDSMap::STATE_NULL);

Expand Down Expand Up @@ -258,17 +240,14 @@ bool Beacon::is_laggy()
{
Mutex::Locker l(lock);

if (last_acked_stamp == utime_t())
return false;

utime_t now = ceph_clock_now();
utime_t since = now - last_acked_stamp;
auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
if (since > g_conf()->mds_beacon_grace) {
dout(1) << "is_laggy " << since << " > " << g_conf()->mds_beacon_grace
<< " since last acked beacon" << dendl;
was_laggy = true;
if (since > (g_conf()->mds_beacon_grace*2) &&
now > last_mon_reconnect + g_conf()->mds_beacon_interval) {
laggy = true;
auto last_reconnect = std::chrono::duration<double>(now-last_mon_reconnect).count();
if (since > (g_conf()->mds_beacon_grace*2) && last_reconnect > g_conf()->mds_beacon_interval) {
// maybe it's not us?
dout(1) << "initiating monitor reconnect; maybe we're not the slow one"
<< dendl;
Expand All @@ -280,13 +259,6 @@ bool Beacon::is_laggy()
return false;
}

utime_t Beacon::get_laggy_until() const
{
Mutex::Locker l(lock);

return laggy_until;
}

void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const newstate)
{
Mutex::Locker l(lock);
Expand Down Expand Up @@ -395,30 +367,34 @@ void Beacon::notify_health(MDSRank const *mds)
set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);

utime_t cutoff = ceph_clock_now();
cutoff -= g_conf()->mds_recall_state_timeout;
utime_t last_recall = mds->mdcache->last_recall_state;
auto mds_recall_state_timeout = g_conf()->mds_recall_state_timeout;
auto last_recall = mds->mdcache->last_recall_state;
auto last_recall_span = std::chrono::duration<double>(clock::now()-last_recall).count();
bool recall_state_timedout = last_recall_span > mds_recall_state_timeout;

std::list<MDSHealthMetric> late_recall_metrics;
std::list<MDSHealthMetric> large_completed_requests_metrics;
for (set<Session*>::iterator i = sessions.begin(); i != sessions.end(); ++i) {
Session *session = *i;
if (!session->recalled_at.is_zero()) {
for (auto& session : sessions) {
if (session->recalled_at != Session::clock::zero()) {
auto last_recall_sent = session->last_recall_sent;
auto recalled_at = session->recalled_at;
auto recalled_at_span = std::chrono::duration<double>(clock::now()-recalled_at).count();

dout(20) << "Session servicing RECALL " << session->info.inst
<< ": " << session->recalled_at << " " << session->recall_release_count
<< ": " << recalled_at_span << "s ago " << session->recall_release_count
<< "/" << session->recall_count << dendl;
if (last_recall < cutoff || session->last_recall_sent < last_recall) {
if (recall_state_timedout || last_recall_sent < last_recall) {
dout(20) << " no longer recall" << dendl;
session->clear_recalled_at();
} else if (session->recalled_at < cutoff) {
dout(20) << " exceeded timeout " << session->recalled_at << " vs. " << cutoff << dendl;
} else if (recalled_at_span > mds_recall_state_timeout) {
dout(20) << " exceeded timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
std::ostringstream oss;
oss << "Client " << session->get_human_name() << " failing to respond to cache pressure";
MDSHealthMetric m(MDS_HEALTH_CLIENT_RECALL, HEALTH_WARN, oss.str());
m.metadata["client_id"] = stringify(session->get_client());
late_recall_metrics.push_back(m);
} else {
dout(20) << " within timeout " << session->recalled_at << " vs. " << cutoff << dendl;
dout(20) << " within timeout " << recalled_at_span << " vs. " << mds_recall_state_timeout << dendl;
}
}
if ((session->get_num_trim_requests_warnings() > 0 &&
Expand Down Expand Up @@ -473,7 +449,7 @@ void Beacon::notify_health(MDSRank const *mds)

{
auto complaint_time = g_conf()->osd_op_complaint_time;
auto now = ceph::coarse_mono_clock::now();
auto now = clock::now();
auto cutoff = now - ceph::make_timespan(complaint_time);

std::string count;
Expand Down
32 changes: 19 additions & 13 deletions src/mds/Beacon.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ class MDSRank;
class Beacon : public Dispatcher
{
public:
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;

Beacon(CephContext *cct_, MonClient *monc_, std::string_view name);
~Beacon() override;
~Beacon() override {};

void init(const MDSMap &mdsmap);
void shutdown();
Expand Down Expand Up @@ -73,7 +76,10 @@ class Beacon : public Dispatcher
void send_and_wait(const double duration);

bool is_laggy();
utime_t get_laggy_until() const;
double last_cleared_laggy() const {
Mutex::Locker l(lock);
return std::chrono::duration<double>(clock::now()-last_laggy).count();
}

private:
void _notify_mdsmap(const MDSMap &mdsmap);
Expand All @@ -86,29 +92,29 @@ class Beacon : public Dispatcher

// Items we duplicate from the MDS to have access under our own lock
std::string name;
version_t epoch;
version_t epoch = 0;
CompatSet compat;
mds_rank_t standby_for_rank;
mds_rank_t standby_for_rank = MDS_RANK_NONE;
std::string standby_for_name;
fs_cluster_id_t standby_for_fscid;
fs_cluster_id_t standby_for_fscid = FS_CLUSTER_ID_NONE;
bool standby_replay = false;
MDSMap::DaemonState want_state;
MDSMap::DaemonState want_state = MDSMap::STATE_BOOT;

// Internal beacon state
version_t last_seq; // last seq sent to monitor
std::map<version_t,utime_t> seq_stamp; // seq # -> time sent
utime_t last_acked_stamp; // last time we sent a beacon that got acked
utime_t last_mon_reconnect;
bool was_laggy;
utime_t laggy_until;
version_t last_seq = 0; // last seq sent to monitor
std::map<version_t,time> seq_stamp; // seq # -> time sent
time last_acked_stamp = clock::zero(); // last time we sent a beacon that got acked
time last_mon_reconnect = clock::zero();
bool laggy = false;
time last_laggy = clock::zero();

// Health status to be copied into each beacon message
MDSHealth health;

// Ticker
Context *sender = nullptr;

version_t awaiting_seq;
version_t awaiting_seq = -1;
Cond waiting_cond;
};

Expand Down
2 changes: 1 addition & 1 deletion src/mds/MDCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7536,7 +7536,7 @@ void MDCache::check_memory_usage()
mds->mlogger->set(l_mdm_heap, last.get_heap());

if (cache_toofull()) {
last_recall_state = ceph_clock_now();
last_recall_state = clock::now();
mds->server->recall_client_state();
}

Expand Down
5 changes: 4 additions & 1 deletion src/mds/MDCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ static const int PREDIRTY_SHALLOW = 4; // only go to immediate parent (for easie

class MDCache {
public:
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;

typedef std::map<mds_rank_t, MCacheExpire::ref> expiremap;

// my master
Expand Down Expand Up @@ -768,7 +771,7 @@ class MDCache {
void trim_client_leases();
void check_memory_usage();

utime_t last_recall_state;
time last_recall_state;

// shutdown
private:
Expand Down
5 changes: 0 additions & 5 deletions src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1031,11 +1031,6 @@ void MDSRank::retry_dispatch(const Message::const_ref &m)
dec_dispatch_depth();
}

utime_t MDSRank::get_laggy_until() const
{
return beacon.get_laggy_until();
}

double MDSRank::get_dispatch_queue_max_age(utime_t now) const
{
return messenger->get_dispatch_queue_max_age(now);
Expand Down
5 changes: 4 additions & 1 deletion src/mds/MDSRank.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,10 @@ class MDSRank {
*/
void damaged_unlocked();

utime_t get_laggy_until() const;
double last_cleared_laggy() const {
return beacon.last_cleared_laggy();
}

double get_dispatch_queue_max_age(utime_t now) const;

void send_message_mds(const Message::ref& m, mds_rank_t mds);
Expand Down

0 comments on commit 7a69ebb

Please sign in to comment.