Skip to content

Commit

Permalink
Merge pull request ceph#31480 from ukernel/wip-20
Browse files Browse the repository at this point in the history
client: auto reconnect after blacklisted

Reviewed-by: Jeff Layton <jlayton@redhat.com>
  • Loading branch information
gregsfortytwo committed Apr 3, 2020
2 parents 45288ad + 107b91d commit 40fb155
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 76 deletions.
2 changes: 1 addition & 1 deletion qa/tasks/cephfs/test_client_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def test_reconnect_after_blacklisted(self):
self.mount_a.umount_wait()

if isinstance(self.mount_a, FuseMount):
self.skipTest("Not implemented in FUSE client yet")
self.mount_a.mount(mount_options=['--client_reconnect_stale=1', '--fuse_disable_pagecache=1'])
else:
try:
self.mount_a.mount(mount_options=['recover_session=clean'])
Expand Down
157 changes: 103 additions & 54 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1728,16 +1728,14 @@ int Client::make_request(MetaRequest *request,
// open a session?
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);

if (session->state == MetaSession::STATE_REJECTED) {
request->abort(-EPERM);
break;
}
// wait
if (session->state == MetaSession::STATE_OPENING) {
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
// Abort requests on REJECT from MDS
if (rejected_by_mds.count(mds)) {
request->abort(-EPERM);
break;
}
continue;
}

Expand Down Expand Up @@ -2039,20 +2037,6 @@ MetaSession *Client::_open_mds_session(mds_rank_t mds)
ceph_assert(em.second); /* not already present */
MetaSession *session = &em.first->second;

// Maybe skip sending a request to open if this MDS daemon
// has previously sent us a REJECT.
if (rejected_by_mds.count(mds)) {
if (rejected_by_mds[mds] == session->addrs) {
ldout(cct, 4) << __func__ << " mds." << mds << " skipping "
"because we were rejected" << dendl;
return session;
} else {
ldout(cct, 4) << __func__ << " mds." << mds << " old inst "
"rejected us, trying with new inst" << dendl;
rejected_by_mds.erase(mds);
}
}

auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
m->metadata = metadata;
m->supported_features = feature_bitset_t(CEPHFS_FEATURES_CLIENT_SUPPORTED);
Expand All @@ -2067,16 +2051,20 @@ void Client::_close_mds_session(MetaSession *s)
s->con->send_message2(make_message<MClientSession>(CEPH_SESSION_REQUEST_CLOSE, s->seq));
}

void Client::_closed_mds_session(MetaSession *s)
void Client::_closed_mds_session(MetaSession *s, int err, bool rejected)
{
ldout(cct, 5) << __func__ << " mds." << s->mds_num << " seq " << s->seq << dendl;
s->state = MetaSession::STATE_CLOSED;
if (rejected && s->state != MetaSession::STATE_CLOSING)
s->state = MetaSession::STATE_REJECTED;
else
s->state = MetaSession::STATE_CLOSED;
s->con->mark_down();
signal_context_list(s->waiting_for_open);
mount_cond.notify_all();
remove_session_caps(s);
remove_session_caps(s, err);
kick_requests_closed(s);
mds_sessions.erase(s->mds_num);
if (s->state == MetaSession::STATE_CLOSED)
mds_sessions.erase(s->mds_num);
}

void Client::handle_client_session(const MConstRef<MClientSession>& m)
Expand All @@ -2098,9 +2086,8 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
if (!missing_features.empty()) {
lderr(cct) << "mds." << from << " lacks required features '"
<< missing_features << "', closing session " << dendl;
rejected_by_mds[session->mds_num] = session->addrs;
_close_mds_session(session);
_closed_mds_session(session);
_closed_mds_session(session, -EPERM, true);
break;
}
session->mds_features = std::move(m->supported_features);
Expand Down Expand Up @@ -2163,8 +2150,7 @@ void Client::handle_client_session(const MConstRef<MClientSession>& m)
error_str = "unknown error";
lderr(cct) << "mds." << from << " rejected us (" << error_str << ")" << dendl;

rejected_by_mds[session->mds_num] = session->addrs;
_closed_mds_session(session);
_closed_mds_session(session, -EPERM, true);
}
break;

Expand Down Expand Up @@ -2192,6 +2178,10 @@ void Client::_kick_stale_sessions()

for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
MetaSession &s = it->second;
if (s.state == MetaSession::STATE_REJECTED) {
mds_sessions.erase(it++);
continue;
}
++it;
if (s.state == MetaSession::STATE_STALE)
_closed_mds_session(&s);
Expand Down Expand Up @@ -3219,8 +3209,10 @@ void Client::put_cap_ref(Inode *in, int cap)
}
}

int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
int Client::get_caps(Fh *fh, int need, int want, int *phave, loff_t endoff)
{
Inode *in = fh->inode.get();

int r = check_pool_perm(in, need);
if (r < 0)
return r;
Expand All @@ -3234,6 +3226,12 @@ int Client::get_caps(Inode *in, int need, int want, int *phave, loff_t endoff)
return -EBADF;
}

if ((fh->mode & CEPH_FILE_MODE_WR) && fh->gen != fd_gen)
return -EBADF;

if ((in->flags & I_ERROR_FILELOCK) && fh->has_any_filelocks())
return -EIO;

int implemented;
int have = in->caps_issued(&implemented);

Expand Down Expand Up @@ -4138,7 +4136,7 @@ void Client::remove_all_caps(Inode *in)
remove_cap(&in->caps.begin()->second, true);
}

void Client::remove_session_caps(MetaSession *s)
void Client::remove_session_caps(MetaSession *s, int err)
{
ldout(cct, 10) << __func__ << " mds." << s->mds_num << dendl;

Expand All @@ -4150,7 +4148,10 @@ void Client::remove_session_caps(MetaSession *s)
dirty_caps = in->dirty_caps | in->flushing_caps;
in->wanted_max_size = 0;
in->requested_max_size = 0;
if (in->has_any_filelocks())
in->flags |= I_ERROR_FILELOCK;
}
auto caps = cap->implemented;
if (cap->wanted | cap->issued)
in->flags |= I_CAP_DROPPED;
remove_cap(cap, false);
Expand All @@ -4165,6 +4166,20 @@ void Client::remove_session_caps(MetaSession *s)
in->mark_caps_clean();
put_inode(in.get());
}
caps &= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_BUFFER;
if (caps && !in->caps_issued_mask(caps, true)) {
if (err == -EBLACKLISTED) {
if (in->oset.dirty_or_tx) {
lderr(cct) << __func__ << " still has dirty data on " << *in << dendl;
in->set_async_err(err);
}
objectcacher->purge_set(&in->oset);
} else {
objectcacher->release_set(&in->oset);
}
_schedule_invalidate_callback(in.get(), 0, 0);
}

signal_cond_list(in->waitfor_caps);
}
s->flushing_caps_tids.clear();
Expand Down Expand Up @@ -5960,6 +5975,13 @@ int Client::mount(const std::string &mount_root, const UserPerm& perms,

void Client::_close_sessions()
{
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
if (it->second.state == MetaSession::STATE_REJECTED)
mds_sessions.erase(it++);
else
++it;
}

while (!mds_sessions.empty()) {
// send session closes!
for (auto &p : mds_sessions) {
Expand Down Expand Up @@ -6021,7 +6043,7 @@ void Client::_abort_mds_sessions(int err)
// Force-close all sessions
while(!mds_sessions.empty()) {
auto& session = mds_sessions.begin()->second;
_closed_mds_session(&session);
_closed_mds_session(&session, err);
}
}

Expand Down Expand Up @@ -6246,6 +6268,16 @@ void Client::tick()
}

trim_cache(true);

if (blacklisted && mounted &&
last_auto_reconnect + 30 * 60 < now &&
cct->_conf.get_val<bool>("client_reconnect_stale")) {
messenger->client_reset();
fd_gen++; // invalidate open files
blacklisted = false;
_kick_stale_sessions();
last_auto_reconnect = now;
}
}

void Client::renew_caps()
Expand Down Expand Up @@ -8678,7 +8710,7 @@ int Client::lookup_name(Inode *ino, Inode *parent, const UserPerm& perms)
Fh *Client::_create_fh(Inode *in, int flags, int cmode, const UserPerm& perms)
{
ceph_assert(in);
Fh *f = new Fh(in, flags, cmode, perms);
Fh *f = new Fh(in, flags, cmode, fd_gen, perms);

ldout(cct, 10) << __func__ << " " << in->ino << " mode " << cmode << dendl;

Expand Down Expand Up @@ -8806,7 +8838,8 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp,
if (cmode & CEPH_FILE_MODE_RD)
need |= CEPH_CAP_FILE_RD;

result = get_caps(in, need, want, &have, -1);
Fh fh(in, flags, cmode, fd_gen, perms);
result = get_caps(&fh, need, want, &have, -1);
if (result < 0) {
ldout(cct, 8) << "Unable to get caps after open of inode " << *in <<
" . Denying open: " <<
Expand Down Expand Up @@ -9137,7 +9170,7 @@ int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
r = get_caps(in, CEPH_CAP_FILE_RD, want, &have, -1);
r = get_caps(f, CEPH_CAP_FILE_RD, want, &have, -1);
if (r < 0) {
goto done;
}
Expand Down Expand Up @@ -9567,7 +9600,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_BUFFER;
int r = get_caps(in, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
int r = get_caps(f, CEPH_CAP_FILE_WR|CEPH_CAP_AUTH_SHARED, want, &have, endoff);
if (r < 0)
return r;

Expand Down Expand Up @@ -9656,9 +9689,11 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
in->truncate_size, in->truncate_seq,
&onfinish);
client_lock.unlock();
onfinish.wait();
r = onfinish.wait();
client_lock.lock();
_sync_write_commit(in);
if (r < 0)
goto done;
}

// if we get here, write was successful, update client metadata
Expand Down Expand Up @@ -10105,6 +10140,9 @@ int Client::_do_filelock(Inode *in, Fh *fh, int lock_type, int op, int sleep,
<< " type " << fl->l_type << " owner " << owner
<< " " << fl->l_start << "~" << fl->l_len << dendl;

if (in->flags & I_ERROR_FILELOCK)
return -EIO;

int lock_cmd;
if (F_RDLCK == fl->l_type)
lock_cmd = CEPH_LOCK_SHARED;
Expand Down Expand Up @@ -10278,30 +10316,39 @@ void Client::_release_filelocks(Fh *fh)
Inode *in = fh->inode.get();
ldout(cct, 10) << __func__ << " " << fh << " ino " << in->ino << dendl;

list<ceph_filelock> activated_locks;

list<pair<int, ceph_filelock> > to_release;

if (fh->fcntl_locks) {
auto &lock_state = fh->fcntl_locks;
for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
p != lock_state->held_locks.end();
++p)
to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, p->second));
for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
auto q = p++;
if (in->flags & I_ERROR_FILELOCK) {
lock_state->remove_lock(q->second, activated_locks);
} else {
to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FCNTL, q->second));
}
}
lock_state.reset();
}
if (fh->flock_locks) {
auto &lock_state = fh->flock_locks;
for(multimap<uint64_t, ceph_filelock>::iterator p = lock_state->held_locks.begin();
p != lock_state->held_locks.end();
++p)
to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, p->second));
for(auto p = lock_state->held_locks.begin(); p != lock_state->held_locks.end(); ) {
auto q = p++;
if (in->flags & I_ERROR_FILELOCK) {
lock_state->remove_lock(q->second, activated_locks);
} else {
to_release.push_back(pair<int, ceph_filelock>(CEPH_LOCK_FLOCK, q->second));
}
}
lock_state.reset();
}

if (to_release.empty())
return;
if ((in->flags & I_ERROR_FILELOCK) && !in->has_any_filelocks())
in->flags &= ~I_ERROR_FILELOCK;

// mds has already released filelocks if session was closed.
if (in->caps.empty())
if (to_release.empty())
return;

struct flock fl;
Expand Down Expand Up @@ -13254,7 +13301,10 @@ int Client::ll_read(Fh *fh, loff_t off, loff_t len, bufferlist *bl)

/* We can't return bytes written larger than INT_MAX, clamp len to that */
len = std::min(len, (loff_t)INT_MAX);
return _read(fh, off, len, bl);
int r = _read(fh, off, len, bl);
ldout(cct, 3) << "ll_read " << fh << " " << off << "~" << len << " = " << r
<< dendl;
return r;
}

int Client::ll_read_block(Inode *in, uint64_t blockid,
Expand Down Expand Up @@ -13496,7 +13546,7 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length)
}

int have;
int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
int r = get_caps(fh, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, -1);
if (r < 0)
return r;

Expand Down Expand Up @@ -14042,8 +14092,7 @@ void Client::ms_handle_remote_reset(Connection *con)
case MetaSession::STATE_OPEN:
{
objecter->maybe_request_map(); /* to check if we are blacklisted */
const auto& conf = cct->_conf;
if (conf->client_reconnect_stale) {
if (cct->_conf.get_val<bool>("client_reconnect_stale")) {
ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
_closed_mds_session(s);
} else {
Expand Down Expand Up @@ -14425,14 +14474,14 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,
MetaSession *session;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
if (session->state == MetaSession::STATE_REJECTED)
return -EPERM;
if (session->state != MetaSession::STATE_OPENING) {
// umounting?
return -EINVAL;
}
ldout(cct, 10) << "waiting for session to mds." << mds << " to open" << dendl;
wait_on_context_list(session->waiting_for_open);
if (rejected_by_mds.count(mds))
return -EPERM;
continue;
}

Expand Down

0 comments on commit 40fb155

Please sign in to comment.