Skip to content

Commit

Permalink
Bug 1789440 - Track reply message IDs for MessageChannel async replie…
Browse files Browse the repository at this point in the history
…s, r=ipc-reviewers,mccr8 a=RyanVM

Differential Revision: https://phabricator.services.mozilla.com/D156569
  • Loading branch information
mystor committed Sep 7, 2022
1 parent 5fbf64d commit b4f40f8
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 27 deletions.
7 changes: 4 additions & 3 deletions ipc/glue/MessageChannel.cpp
Expand Up @@ -811,9 +811,10 @@ void MessageChannel::StopPostponingSends() {
}

UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback(
const Message& aMsg) {
const Message& aMsg, int32_t aActorId) {
auto iter = mPendingResponses.find(aMsg.seqno());
if (iter != mPendingResponses.end()) {
if (iter != mPendingResponses.end() && iter->second->mActorId == aActorId &&
iter->second->mReplyMsgId == aMsg.type()) {
UniquePtr<MessageChannel::UntypedCallbackHolder> ret =
std::move(iter->second);
mPendingResponses.erase(iter);
Expand All @@ -823,7 +824,7 @@ UniquePtr<MessageChannel::UntypedCallbackHolder> MessageChannel::PopCallback(
return nullptr;
}

void MessageChannel::RejectPendingResponsesForActor(ActorIdType aActorId) {
void MessageChannel::RejectPendingResponsesForActor(int32_t aActorId) {
auto itr = mPendingResponses.begin();
while (itr != mPendingResponses.end()) {
if (itr->second.get()->mActorId != aActorId) {
Expand Down
39 changes: 20 additions & 19 deletions ipc/glue/MessageChannel.h
Expand Up @@ -115,29 +115,30 @@ class MessageChannel : HasResultCodes {

typedef mozilla::Monitor Monitor;

// We could templatize the actor type but it would unnecessarily
// expand the code size. Using the actor address as the
// identifier is already good enough.
typedef void* ActorIdType;

public:
using Message = IPC::Message;

struct UntypedCallbackHolder {
UntypedCallbackHolder(ActorIdType aActorId, RejectCallback&& aReject)
: mActorId(aActorId), mReject(std::move(aReject)) {}
UntypedCallbackHolder(int32_t aActorId, Message::msgid_t aReplyMsgId,
RejectCallback&& aReject)
: mActorId(aActorId),
mReplyMsgId(aReplyMsgId),
mReject(std::move(aReject)) {}

virtual ~UntypedCallbackHolder() = default;

void Reject(ResponseRejectReason&& aReason) { mReject(std::move(aReason)); }

ActorIdType mActorId;
int32_t mActorId;
Message::msgid_t mReplyMsgId;
RejectCallback mReject;
};

template <typename Value>
struct CallbackHolder : public UntypedCallbackHolder {
CallbackHolder(ActorIdType aActorId, ResolveCallback<Value>&& aResolve,
RejectCallback&& aReject)
: UntypedCallbackHolder(aActorId, std::move(aReject)),
CallbackHolder(int32_t aActorId, Message::msgid_t aReplyMsgId,
ResolveCallback<Value>&& aResolve, RejectCallback&& aReject)
: UntypedCallbackHolder(aActorId, aReplyMsgId, std::move(aReject)),
mResolve(std::move(aResolve)) {}

void Resolve(Value&& aReason) { mResolve(std::move(aReason)); }
Expand All @@ -152,7 +153,6 @@ class MessageChannel : HasResultCodes {
public:
static constexpr int32_t kNoTimeout = INT32_MIN;

typedef IPC::Message Message;
using ScopedPort = mozilla::ipc::ScopedPort;

explicit MessageChannel(const char* aName, IToplevelProtocol* aListener);
Expand Down Expand Up @@ -242,9 +242,9 @@ class MessageChannel : HasResultCodes {
// Asynchronously send a message to the other side of the channel
// and wait for asynchronous reply.
template <typename Value>
void Send(UniquePtr<Message> aMsg, ActorIdType aActorId,
ResolveCallback<Value>&& aResolve, RejectCallback&& aReject)
EXCLUDES(*mMonitor) {
void Send(UniquePtr<Message> aMsg, int32_t aActorId,
Message::msgid_t aReplyMsgId, ResolveCallback<Value>&& aResolve,
RejectCallback&& aReject) EXCLUDES(*mMonitor) {
int32_t seqno = NextSeqno();
aMsg->set_seqno(seqno);
if (!Send(std::move(aMsg))) {
Expand All @@ -253,8 +253,8 @@ class MessageChannel : HasResultCodes {
}

UniquePtr<UntypedCallbackHolder> callback =
MakeUnique<CallbackHolder<Value>>(aActorId, std::move(aResolve),
std::move(aReject));
MakeUnique<CallbackHolder<Value>>(
aActorId, aReplyMsgId, std::move(aResolve), std::move(aReject));
mPendingResponses.insert(std::make_pair(seqno, std::move(callback)));
gUnresolvedResponses++;
}
Expand All @@ -272,11 +272,12 @@ class MessageChannel : HasResultCodes {
bool CanSend() const EXCLUDES(*mMonitor);

// Remove and return a callback that needs reply
UniquePtr<UntypedCallbackHolder> PopCallback(const Message& aMsg);
UniquePtr<UntypedCallbackHolder> PopCallback(const Message& aMsg,
int32_t aActorId);

// Used to reject and remove pending responses owned by the given
// actor when it's about to be destroyed.
void RejectPendingResponsesForActor(ActorIdType aActorId);
void RejectPendingResponsesForActor(int32_t aActorId);

// If sending a sync message returns an error, this function gives a more
// descriptive error message.
Expand Down
6 changes: 4 additions & 2 deletions ipc/glue/ProtocolUtils.cpp
Expand Up @@ -551,9 +551,11 @@ void IProtocol::DestroySubtree(ActorDestroyReason aWhy) {
MOZ_ASSERT(CanRecv(), "destroying non-connected actor");
MOZ_ASSERT(mLifecycleProxy, "destroying zombie actor");

int32_t id = Id();

// If we're a managed actor, unregister from our manager
if (Manager()) {
Unregister(Id());
Unregister(id);
}

// Destroy subtree
Expand All @@ -580,7 +582,7 @@ void IProtocol::DestroySubtree(ActorDestroyReason aWhy) {
// The actor is being destroyed, reject any pending responses, invoke
// `ActorDestroy` to destroy it, and then clear our status to
// `LinkStatus::Destroyed`.
GetIPCChannel()->RejectPendingResponsesForActor(this);
GetIPCChannel()->RejectPendingResponsesForActor(id);
ActorDestroy(aWhy);
mLinkStatus = LinkStatus::Destroyed;
}
Expand Down
5 changes: 3 additions & 2 deletions ipc/glue/ProtocolUtils.h
Expand Up @@ -279,11 +279,12 @@ class IProtocol : public HasResultCodes {
UniquePtr<IPC::Message>* aReply);
template <typename Value>
void ChannelSend(UniquePtr<IPC::Message> aMsg,
IPC::Message::msgid_t aReplyMsgId,
ResolveCallback<Value>&& aResolve,
RejectCallback&& aReject) {
if (CanSend()) {
GetIPCChannel()->Send(std::move(aMsg), this, std::move(aResolve),
std::move(aReject));
GetIPCChannel()->Send(std::move(aMsg), Id(), aReplyMsgId,
std::move(aResolve), std::move(aReject));
} else {
WarnMessageDiscarded(aMsg.get());
aReject(ResponseRejectReason::SendError);
Expand Down
3 changes: 2 additions & 1 deletion ipc/ipdl/ipdl/lower.py
Expand Up @@ -4906,7 +4906,7 @@ def genRecvAsyncReplyCase(self, md):
$*{prologue}
UniquePtr<MessageChannel::UntypedCallbackHolder> untypedCallback =
GetIPCChannel()->PopCallback(${msgvar});
GetIPCChannel()->PopCallback(${msgvar}, Id());
typedef MessageChannel::CallbackHolder<${resolvetype}> CallbackHolder;
auto* callback = static_cast<CallbackHolder*>(untypedCallback.get());
Expand Down Expand Up @@ -5454,6 +5454,7 @@ def sendAsync(self, md, msgexpr, actor=None):
send,
args=[
ExprMove(msgexpr),
ExprVar(md.pqReplyId()),
ExprMove(resolvefn),
ExprMove(rejectfn),
],
Expand Down

0 comments on commit b4f40f8

Please sign in to comment.