diff --git a/src/mds/CDentry.h b/src/mds/CDentry.h index 4dca5816ae6ff..1c2b6f892cec5 100644 --- a/src/mds/CDentry.h +++ b/src/mds/CDentry.h @@ -376,6 +376,8 @@ class CDentry : public MDSCacheObject, public LRUObject, public Counter mempool::mds_co::map client_lease_map; std::map> batch_ops; + ceph_tid_t reintegration_reqid = 0; + protected: friend class Migrator; diff --git a/src/mds/MDSMetaRequest.h b/src/mds/MDSMetaRequest.h new file mode 100644 index 0000000000000..ad47204106869 --- /dev/null +++ b/src/mds/MDSMetaRequest.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MDS_META_REQUEST_H +#define CEPH_MDS_META_REQUEST_H + +#include "include/types.h" + +struct MDSMetaRequest { +private: + int op; + ceph_tid_t tid; +public: + explicit MDSMetaRequest(int o, ceph_tid_t t) : + op(o), tid(t) { } + virtual ~MDSMetaRequest() { } + + int get_op() { return op; } + ceph_tid_t get_tid() { return tid; } +}; + +#endif // !CEPH_MDS_META_REQUEST_H diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 725784b10105a..46c66f3b3d88e 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -1191,6 +1191,7 @@ bool MDSRank::is_valid_message(const cref_t &m) { type == CEPH_MSG_CLIENT_RECONNECT || type == CEPH_MSG_CLIENT_RECLAIM || type == CEPH_MSG_CLIENT_REQUEST || + type == CEPH_MSG_CLIENT_REPLY || type == MSG_MDS_PEER_REQUEST || type == MSG_MDS_HEARTBEAT || type == MSG_MDS_TABLE_REQUEST || @@ -1244,6 +1245,7 @@ void MDSRank::handle_message(const cref_t &m) ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT); // fall-thru case CEPH_MSG_CLIENT_REQUEST: + case CEPH_MSG_CLIENT_REPLY: server->dispatch(m); break; case MSG_MDS_PEER_REQUEST: diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index d01a5894df479..48043df4baf61 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -43,6 +43,7 @@ #include "Server.h" #include "MetricsHandler.h" #include "osdc/Journaler.h" +#include "MDSMetaRequest.h" // Full .h import instead of forward declaration for PerfCounter, for the // benefit of those including this header and using MDSRank::logger @@ -423,6 +424,8 @@ class MDSRank { PerfCounters *logger = nullptr, *mlogger = nullptr; OpTracker op_tracker; + std::map> internal_client_requests; + // The last different state I held before current MDSMap::DaemonState last_state = MDSMap::STATE_BOOT; // The state assigned to me by the MDSMap diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 01a190c6bf8fb..d9f8ef6601509 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -31,6 +31,7 @@ #include "Mutation.h" #include "MetricsHandler.h" #include "cephfs_features.h" +#include "MDSContext.h" #include "msg/Messenger.h" @@ -360,6 +361,9 @@ void Server::dispatch(const cref_t &m) case CEPH_MSG_CLIENT_REQUEST: handle_client_request(ref_cast(m)); return; + case CEPH_MSG_CLIENT_REPLY: + handle_client_reply(ref_cast(m)); + return; case CEPH_MSG_CLIENT_RECLAIM: handle_client_reclaim(ref_cast(m)); return; @@ -2340,6 +2344,10 @@ void Server::reply_client_request(MDRequestRef& mdr, const ref_t & mds->send_message_client(reply, session); } + if (client_inst.name.is_mds() && reply->get_op() == CEPH_MDS_OP_RENAME) { + mds->send_message(reply, mdr->client_request->get_connection()); + } + if (req->is_queued_for_replay() && (mdr->has_completed || reply->get_result() < 0)) { if (reply->get_result() < 0) { @@ -2572,6 +2580,28 @@ void Server::handle_client_request(const cref_t &req) return; } +void Server::handle_client_reply(const cref_t &reply) +{ + dout(4) << "handle_client_reply " << *reply << dendl; + + ceph_assert(reply->is_safe()); + ceph_tid_t tid = reply->get_tid(); + + if (mds->internal_client_requests.count(tid) == 0) { + dout(1) << " no pending request on tid " << tid << dendl; + return; + } + + switch (reply->get_op()) { + case CEPH_MDS_OP_RENAME: + break; + default: + dout(5) << " unknown client op " << reply->get_op() << dendl; + } + + mds->internal_client_requests.erase(tid); +} + void Server::handle_osd_map() { /* Note that we check the OSDMAP_FULL flag directly rather than diff --git a/src/mds/Server.h b/src/mds/Server.h index 61096a5b68a49..6b2f9c188f5ac 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -159,6 +159,7 @@ class Server { // -- requests -- void handle_client_request(const cref_t &m); + void handle_client_reply(const cref_t &m); void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn, LogEvent *le, MDSLogContextBase *fin); diff --git a/src/mds/StrayManager.cc b/src/mds/StrayManager.cc index 325209da6e0e3..e9ec153d3fdb3 100644 --- a/src/mds/StrayManager.cc +++ b/src/mds/StrayManager.cc @@ -673,24 +673,41 @@ void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn) { dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl; + if (straydn->reintegration_reqid) { + dout(20) << __func__ << ": stray dentry " << *straydn + << " is already under reintegrating" << dendl; + return; + } + logger->inc(l_mdc_strays_reintegrated); - + // rename it to remote linkage . filepath src(straydn->get_name(), straydn->get_dir()->ino()); filepath dst(rdn->get_name(), rdn->get_dir()->ino()); + ceph_tid_t tid = mds->issue_tid(); + auto req = make_message(CEPH_MDS_OP_RENAME); req->set_filepath(dst); req->set_filepath2(src); - req->set_tid(mds->issue_tid()); + req->set_tid(tid); + + auto ptr = std::make_unique(CEPH_MDS_OP_RENAME, tid, straydn); + mds->internal_client_requests.emplace(tid, std::move(ptr)); mds->send_message_mds(req, rdn->authority().first); } - + void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to) { dout(10) << __func__ << " " << *dn << " to mds." << to << dendl; + if (dn->reintegration_reqid) { + dout(20) << __func__ << ": stray dentry " << *dn + << " is already under migrating" << dendl; + return; + } + logger->inc(l_mdc_strays_migrated); // rename it to another mds. @@ -700,10 +717,15 @@ void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to) filepath src(dn->get_name(), dirino); filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino))); + ceph_tid_t tid = mds->issue_tid(); + auto req = make_message(CEPH_MDS_OP_RENAME); req->set_filepath(dst); req->set_filepath2(src); - req->set_tid(mds->issue_tid()); + req->set_tid(tid); + + auto ptr = std::make_unique(CEPH_MDS_OP_RENAME, tid, dn); + mds->internal_client_requests.emplace(tid, std::move(ptr)); mds->send_message_mds(req, to); } diff --git a/src/mds/StrayManager.h b/src/mds/StrayManager.h index 86b6941a51313..874fbbb9a8dcd 100644 --- a/src/mds/StrayManager.h +++ b/src/mds/StrayManager.h @@ -19,15 +19,30 @@ #include #include "Mutation.h" #include "PurgeQueue.h" +#include "MDSMetaRequest.h" +#include "CDentry.h" class MDSRank; class CInode; -class CDentry; class StrayManager { // My public interface is for consumption by MDCache public: + struct StrayEvalRequest : public MDSMetaRequest { + CDentry *dentry; + public: + explicit StrayEvalRequest(int o, ceph_tid_t t, CDentry *d) : + MDSMetaRequest(o, t), dentry(d) { + dentry->get(CDentry::PIN_PURGING); + dentry->reintegration_reqid = t; + } + ~StrayEvalRequest() { + dentry->reintegration_reqid = 0; + dentry->put(CDentry::PIN_PURGING); + } + }; + explicit StrayManager(MDSRank *mds, PurgeQueue &purge_queue_); void set_logger(PerfCounters *l) {logger = l;} void activate();