Skip to content

Commit

Permalink
mds: use monotonic waits in Beacon
Browse files Browse the repository at this point in the history
This guarantees that the sender thread cannot be disrupted by system clock
changes. This commit also simplifies the sender thread by manually managing the
thread and avoiding unnecessary context creation.

Fixes: http://tracker.ceph.com/issues/26962

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
  • Loading branch information
batrick committed Aug 21, 2018
1 parent 7a69ebb commit a5fc29b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 63 deletions.
107 changes: 58 additions & 49 deletions src/mds/Beacon.cc
Expand Up @@ -27,42 +27,65 @@

#include "Beacon.h"

#include <chrono>

#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix *_dout << "mds.beacon." << name << ' '

using namespace std::chrono_literals;

Beacon::Beacon(CephContext *cct, MonClient *monc, std::string_view name)
:
Dispatcher(cct),
beacon_interval(g_conf()->mds_beacon_interval),
monc(monc),
name(name)
{
}

Beacon::~Beacon()
{
shutdown();
}

Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string_view name_) :
Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
name(name_)
void Beacon::shutdown()
{
std::unique_lock<std::mutex> lock(mutex);
if (!finished) {
finished = true;
lock.unlock();
sender.join();
}
}

void Beacon::init(const MDSMap &mdsmap)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);

_notify_mdsmap(mdsmap);
standby_for_rank = mds_rank_t(g_conf()->mds_standby_for_rank);
standby_for_name = g_conf()->mds_standby_for_name;
standby_for_fscid = fs_cluster_id_t(g_conf()->mds_standby_for_fscid);
standby_replay = g_conf()->mds_standby_replay;

// Spawn threads and start messaging
timer.init();
_send();
}


void Beacon::shutdown()
{
Mutex::Locker l(lock);
if (sender) {
timer.cancel_event(sender);
sender = NULL;
}
timer.shutdown();
sender = std::thread([this]() {
std::unique_lock<std::mutex> lock(mutex);
std::condition_variable c; // no one wakes us
while (!finished) {
auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_send).count();
auto interval = beacon_interval;
if (since >= interval*.90) {
_send();
} else {
interval -= since;
}
dout(20) << "sender thread waiting interval " << interval << "s" << dendl;
c.wait_for(lock, interval*1s);
}
});
}

bool Beacon::ms_can_fast_dispatch2(const Message::const_ref& m) const
Expand Down Expand Up @@ -96,7 +119,7 @@ bool Beacon::ms_dispatch2(const Message::ref& m)
*/
void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);

version_t seq = m->get_seq();

Expand All @@ -120,9 +143,7 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)
seq_stamp.erase(seq_stamp.begin(), ++it);

// Wake a waiter up if present
if (awaiting_seq == seq) {
waiting_cond.Signal();
}
cvar.notify_all();
} else {
dout(1) << "discarding unexpected beacon reply " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
Expand All @@ -132,28 +153,26 @@ void Beacon::handle_mds_beacon(const MMDSBeacon::const_ref &m)

void Beacon::send()
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);
_send();
}


void Beacon::send_and_wait(const double duration)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);
_send();
awaiting_seq = last_seq;
auto awaiting_seq = last_seq;
dout(20) << __func__ << ": awaiting " << awaiting_seq
<< " for up to " << duration << "s" << dendl;

utime_t timeout;
timeout.set_from_double(ceph_clock_now()+duration);
while (!seq_stamp.empty()
&& seq_stamp.begin()->first <= awaiting_seq
&& ceph_clock_now() < timeout) {
waiting_cond.WaitUntil(lock, timeout);
auto start = clock::now();
while (!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq) {
auto now = clock::now();
auto s = duration*.95-std::chrono::duration<double>(now-start).count();
if (s < 0) break;
cvar.wait_for(lock, s*1s);
}

awaiting_seq = -1;
}


Expand All @@ -162,17 +181,6 @@ void Beacon::send_and_wait(const double duration)
*/
void Beacon::_send()
{
if (sender) {
timer.cancel_event(sender);
}
sender = timer.add_event_after(
g_conf()->mds_beacon_interval,
new FunctionContext([this](int) {
assert(lock.is_locked_by_me());
sender = nullptr;
_send();
}));

auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();

Expand Down Expand Up @@ -212,14 +220,15 @@ void Beacon::_send()
beacon->set_sys_info(sys_info);
}
monc->send_mon_message(beacon.detach());
last_send = now;
}

/**
* Call this when there is a new MDSMap available
*/
void Beacon::notify_mdsmap(const MDSMap &mdsmap)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);

_notify_mdsmap(mdsmap);
}
Expand All @@ -238,7 +247,7 @@ void Beacon::_notify_mdsmap(const MDSMap &mdsmap)

bool Beacon::is_laggy()
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);

auto now = clock::now();
auto since = std::chrono::duration<double>(now-last_acked_stamp).count();
Expand All @@ -261,7 +270,7 @@ bool Beacon::is_laggy()

void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const newstate)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);

// Update mdsmap epoch atomically with updating want_state, so that when
// we send a beacon with the new want state it has the latest epoch, and
Expand All @@ -286,7 +295,7 @@ void Beacon::set_want_state(const MDSMap &mdsmap, MDSMap::DaemonState const news
*/
void Beacon::notify_health(MDSRank const *mds)
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);
if (!mds) {
// No MDS rank held
return;
Expand Down Expand Up @@ -489,7 +498,7 @@ void Beacon::notify_health(MDSRank const *mds)

MDSMap::DaemonState Beacon::get_want_state() const
{
Mutex::Locker l(lock);
std::unique_lock lock(mutex);
return want_state;
}

25 changes: 11 additions & 14 deletions src/mds/Beacon.h
Expand Up @@ -16,11 +16,12 @@
#ifndef BEACON_STATE_H
#define BEACON_STATE_H

#include <mutex>
#include <string_view>
#include <thread>

#include "include/types.h"
#include "include/Context.h"
#include "common/Mutex.h"
#include "msg/Dispatcher.h"

#include "messages/MMDSBeacon.h"
Expand All @@ -44,8 +45,8 @@ class Beacon : public Dispatcher
using clock = ceph::coarse_mono_clock;
using time = ceph::coarse_mono_time;

Beacon(CephContext *cct_, MonClient *monc_, std::string_view name);
~Beacon() override {};
Beacon(CephContext *cct, MonClient *monc, std::string_view name);
~Beacon() override;

void init(const MDSMap &mdsmap);
void shutdown();
Expand Down Expand Up @@ -77,18 +78,21 @@ class Beacon : public Dispatcher

bool is_laggy();
double last_cleared_laggy() const {
Mutex::Locker l(lock);
std::unique_lock lock(mutex);
return std::chrono::duration<double>(clock::now()-last_laggy).count();
}

private:
void _notify_mdsmap(const MDSMap &mdsmap);
void _send();

//CephContext *cct;
mutable Mutex lock;
mutable std::mutex mutex;
std::thread sender;
std::condition_variable cvar;
time last_send = clock::zero();
double beacon_interval = 5.0;
bool finished = false;
MonClient* monc;
SafeTimer timer;

// Items we duplicate from the MDS to have access under our own lock
std::string name;
Expand All @@ -110,13 +114,6 @@ class Beacon : public Dispatcher

// Health status to be copied into each beacon message
MDSHealth health;

// Ticker
Context *sender = nullptr;

version_t awaiting_seq = -1;
Cond waiting_cond;
};

#endif // BEACON_STATE_H

0 comments on commit a5fc29b

Please sign in to comment.