Skip to content

Commit

Permalink
Merge PR ceph#53280 into wip-vshankar-testing-20230926.081818
Browse files Browse the repository at this point in the history
* refs/pull/53280/head:
	mds: fix issuing redundant reintegrate/migrate_stray requests
	mds: record the internal client request and receive client reply

Reviewed-by: Patrick Donnelly <pdonnell@redhat.com>
  • Loading branch information
vshankar committed Sep 26, 2023
2 parents 7e9ae82 + 53d9e65 commit a2e911c
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/mds/CDentry.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ class CDentry : public MDSCacheObject, public LRUObject, public Counter<CDentry>
mempool::mds_co::map<client_t,ClientLease*> client_lease_map;
std::map<int, std::unique_ptr<BatchOp>> batch_ops;

ceph_tid_t reintegration_reqid = 0;


protected:
friend class Migrator;
Expand Down
33 changes: 33 additions & 0 deletions src/mds/MDSMetaRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_MDS_META_REQUEST_H
#define CEPH_MDS_META_REQUEST_H

#include "include/types.h"

struct MDSMetaRequest {
private:
int op;
ceph_tid_t tid;
public:
explicit MDSMetaRequest(int o, ceph_tid_t t) :
op(o), tid(t) { }
virtual ~MDSMetaRequest() { }

int get_op() { return op; }
ceph_tid_t get_tid() { return tid; }
};

#endif // !CEPH_MDS_META_REQUEST_H
2 changes: 2 additions & 0 deletions src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,7 @@ bool MDSRank::is_valid_message(const cref_t<Message> &m) {
type == CEPH_MSG_CLIENT_RECONNECT ||
type == CEPH_MSG_CLIENT_RECLAIM ||
type == CEPH_MSG_CLIENT_REQUEST ||
type == CEPH_MSG_CLIENT_REPLY ||
type == MSG_MDS_PEER_REQUEST ||
type == MSG_MDS_HEARTBEAT ||
type == MSG_MDS_TABLE_REQUEST ||
Expand Down Expand Up @@ -1244,6 +1245,7 @@ void MDSRank::handle_message(const cref_t<Message> &m)
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
case CEPH_MSG_CLIENT_REPLY:
server->dispatch(m);
break;
case MSG_MDS_PEER_REQUEST:
Expand Down
3 changes: 3 additions & 0 deletions src/mds/MDSRank.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "Server.h"
#include "MetricsHandler.h"
#include "osdc/Journaler.h"
#include "MDSMetaRequest.h"

// Full .h import instead of forward declaration for PerfCounter, for the
// benefit of those including this header and using MDSRank::logger
Expand Down Expand Up @@ -423,6 +424,8 @@ class MDSRank {
PerfCounters *logger = nullptr, *mlogger = nullptr;
OpTracker op_tracker;

std::map<ceph_tid_t, std::unique_ptr<MDSMetaRequest>> internal_client_requests;

// The last different state I held before current
MDSMap::DaemonState last_state = MDSMap::STATE_BOOT;
// The state assigned to me by the MDSMap
Expand Down
30 changes: 30 additions & 0 deletions src/mds/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "Mutation.h"
#include "MetricsHandler.h"
#include "cephfs_features.h"
#include "MDSContext.h"

#include "msg/Messenger.h"

Expand Down Expand Up @@ -360,6 +361,9 @@ void Server::dispatch(const cref_t<Message> &m)
case CEPH_MSG_CLIENT_REQUEST:
handle_client_request(ref_cast<MClientRequest>(m));
return;
case CEPH_MSG_CLIENT_REPLY:
handle_client_reply(ref_cast<MClientReply>(m));
return;
case CEPH_MSG_CLIENT_RECLAIM:
handle_client_reclaim(ref_cast<MClientReclaim>(m));
return;
Expand Down Expand Up @@ -2340,6 +2344,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &
mds->send_message_client(reply, session);
}

if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) {
mds->send_message(reply, mdr->client_request->get_connection());
}

if (req->is_queued_for_replay() &&
(mdr->has_completed || reply->get_result() < 0)) {
if (reply->get_result() < 0) {
Expand Down Expand Up @@ -2572,6 +2580,28 @@ void Server::handle_client_request(const cref_t<MClientRequest> &req)
return;
}

void Server::handle_client_reply(const cref_t<MClientReply> &reply)
{
dout(4) << "handle_client_reply " << *reply << dendl;

ceph_assert(reply->is_safe());
ceph_tid_t tid = reply->get_tid();

if (mds->internal_client_requests.count(tid) == 0) {
dout(1) << " no pending request on tid " << tid << dendl;
return;
}

switch (reply->get_op()) {
case CEPH_MDS_OP_RENAME:
break;
default:
dout(5) << " unknown client op " << reply->get_op() << dendl;
}

mds->internal_client_requests.erase(tid);
}

void Server::handle_osd_map()
{
/* Note that we check the OSDMAP_FULL flag directly rather than
Expand Down
1 change: 1 addition & 0 deletions src/mds/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class Server {

// -- requests --
void handle_client_request(const cref_t<MClientRequest> &m);
void handle_client_reply(const cref_t<MClientReply> &m);

void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn,
LogEvent *le, MDSLogContextBase *fin);
Expand Down
30 changes: 26 additions & 4 deletions src/mds/StrayManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,41 @@ void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
{
dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;

if (straydn->reintegration_reqid) {
dout(20) << __func__ << ": stray dentry " << *straydn
<< " is already under reintegrating" << dendl;
return;
}

logger->inc(l_mdc_strays_reintegrated);

// rename it to remote linkage .
filepath src(straydn->get_name(), straydn->get_dir()->ino());
filepath dst(rdn->get_name(), rdn->get_dir()->ino());

ceph_tid_t tid = mds->issue_tid();

auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);

auto ptr = std::make_unique<StrayEvalRequest>(CEPH_MDS_OP_RENAME, tid, straydn);
mds->internal_client_requests.emplace(tid, std::move(ptr));

mds->send_message_mds(req, rdn->authority().first);
}

void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
{
dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;

if (dn->reintegration_reqid) {
dout(20) << __func__ << ": stray dentry " << *dn
<< " is already under migrating" << dendl;
return;
}

logger->inc(l_mdc_strays_migrated);

// rename it to another mds.
Expand All @@ -700,10 +717,15 @@ void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
filepath src(dn->get_name(), dirino);
filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));

ceph_tid_t tid = mds->issue_tid();

auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
req->set_filepath(dst);
req->set_filepath2(src);
req->set_tid(mds->issue_tid());
req->set_tid(tid);

auto ptr = std::make_unique<StrayEvalRequest>(CEPH_MDS_OP_RENAME, tid, dn);
mds->internal_client_requests.emplace(tid, std::move(ptr));

mds->send_message_mds(req, to);
}
Expand Down
17 changes: 16 additions & 1 deletion src/mds/StrayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,30 @@
#include <list>
#include "Mutation.h"
#include "PurgeQueue.h"
#include "MDSMetaRequest.h"
#include "CDentry.h"

class MDSRank;
class CInode;
class CDentry;

class StrayManager
{
// My public interface is for consumption by MDCache
public:
struct StrayEvalRequest : public MDSMetaRequest {
CDentry *dentry;
public:
explicit StrayEvalRequest(int o, ceph_tid_t t, CDentry *d) :
MDSMetaRequest(o, t), dentry(d) {
dentry->get(CDentry::PIN_PURGING);
dentry->reintegration_reqid = t;
}
~StrayEvalRequest() {
dentry->reintegration_reqid = 0;
dentry->put(CDentry::PIN_PURGING);
}
};

explicit StrayManager(MDSRank *mds, PurgeQueue &purge_queue_);
void set_logger(PerfCounters *l) {logger = l;}
void activate();
Expand Down

0 comments on commit a2e911c

Please sign in to comment.