Skip to content

Commit

Permalink
IPC: Fix race condition when destroying connections
Browse files Browse the repository at this point in the history
It was possible to encounter data races when when requesting connection
callbacks on the message thread, but creating/destroying connection
objects on a background thread.

This change ensures that a message will not be processed if the
destination connection is destroyed before the message is delivered.
  • Loading branch information
reuk committed Oct 29, 2020
1 parent bef6a91 commit fb83c45
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 42 deletions.
122 changes: 85 additions & 37 deletions modules/juce_events/interprocess/juce_InterprocessConnection.cpp
Expand Up @@ -32,19 +32,64 @@ struct InterprocessConnection::ConnectionThread : public Thread
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
};

class SafeActionImpl
{
public:
explicit SafeActionImpl (InterprocessConnection& p)
: ref (p) {}

template <typename Fn>
void ifSafe (Fn&& fn)
{
const ScopedLock lock (mutex);

if (safe)
fn (ref);

This comment has been minimized.

Copy link
@kunitoki

kunitoki Nov 5, 2020

do you need to keep the lock while executing the function ? can the function change the safe variable ? if not, you'll better use an atomic variable and avoid the use of this safe action class whatsoever

}

void setSafe (bool s)
{
const ScopedLock lock (mutex);
safe = s;
}

bool isSafe()
{
const ScopedLock lock (mutex);
return safe;
}

private:
CriticalSection mutex;
InterprocessConnection& ref;
bool safe = false;
};

class InterprocessConnection::SafeAction : public SafeActionImpl
{
using SafeActionImpl::SafeActionImpl;
};

//==============================================================================
InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
: useMessageThread (callbacksOnMessageThread),
magicMessageHeader (magicMessageHeaderNumber)
magicMessageHeader (magicMessageHeaderNumber),
safeAction (std::make_shared<SafeAction> (*this))
{
thread.reset (new ConnectionThread (*this));
}

InterprocessConnection::~InterprocessConnection()
{
// You *must* call `disconnect` in the destructor of your derived class to ensure
// that any pending messages are not delivered. If the messages were delivered after
// destroying the derived class, we'd end up calling the pure virtual implementations
// of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
// not a good idea!
jassert (! safeAction->isSafe());

callbackConnectionState = false;
disconnect();
masterReference.clear();
thread.reset();
}

Expand All @@ -54,32 +99,29 @@ bool InterprocessConnection::connectToSocket (const String& hostName,
{
disconnect();

const ScopedLock sl (pipeAndSocketLock);
socket.reset (new StreamingSocket());
auto s = std::make_unique<StreamingSocket>();

This comment has been minimized.

Copy link
@kunitoki

kunitoki Nov 5, 2020

i hope this is not the direction juce is going. please don't use acronyms and abbreviations in juce code, this kind of code is that makes juce less readable and defying its style completely.


if (socket->connect (hostName, portNumber, timeOutMillisecs))
if (s->connect (hostName, portNumber, timeOutMillisecs))
{
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
const ScopedLock sl (pipeAndSocketLock);
initialiseWithSocket (std::move (s));
return true;
}

socket.reset();
return false;
}

bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
{
disconnect();

std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
auto newPipe = std::make_unique<NamedPipe>();

if (newPipe->openExisting (pipeName))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
initialiseWithPipe (std::move (newPipe));
return true;
}

Expand All @@ -90,13 +132,13 @@ bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs,
{
disconnect();

std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
auto newPipe = std::make_unique<NamedPipe>();

if (newPipe->createNewPipe (pipeName, mustNotExist))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
initialiseWithPipe (std::move (newPipe));
return true;
}

Expand All @@ -116,6 +158,8 @@ void InterprocessConnection::disconnect()
thread->stopThread (4000);
deletePipeAndSocket();
connectionLostInt();

safeAction->setSafe (false);
}

void InterprocessConnection::deletePipeAndSocket()
Expand Down Expand Up @@ -176,45 +220,47 @@ int InterprocessConnection::writeData (void* data, int dataSize)
}

//==============================================================================
void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
void InterprocessConnection::initialise()
{
jassert (socket == nullptr && pipe == nullptr);
socket.reset (newSocket);

safeAction->setSafe (true);
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
}

void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
{
jassert (socket == nullptr && pipe == nullptr);
pipe.reset (newPipe);
socket = std::move (newSocket);
initialise();
}

threadIsRunning = true;
connectionMadeInt();
thread->startThread();
void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
{
jassert (socket == nullptr && pipe == nullptr);
pipe = std::move (newPipe);
initialise();
}

//==============================================================================
struct ConnectionStateMessage : public MessageManager::MessageBase
{
ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
: owner (ipc), connectionMade (connected)
ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
: safeAction (ipc), connectionMade (connected)
{}

void messageCallback() override
{
if (auto* ipc = owner.get())
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
if (connectionMade)
ipc->connectionMade();
owner.connectionMade();
else
ipc->connectionLost();
}
owner.connectionLost();
});
}

WeakReference<InterprocessConnection> owner;
std::shared_ptr<SafeActionImpl> safeAction;
bool connectionMade;

JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
Expand All @@ -227,7 +273,7 @@ void InterprocessConnection::connectionMadeInt()
callbackConnectionState = true;

if (useMessageThread)
(new ConnectionStateMessage (this, true))->post();
(new ConnectionStateMessage (safeAction, true))->post();
else
connectionMade();
}
Expand All @@ -240,25 +286,27 @@ void InterprocessConnection::connectionLostInt()
callbackConnectionState = false;

if (useMessageThread)
(new ConnectionStateMessage (this, false))->post();
(new ConnectionStateMessage (safeAction, false))->post();
else
connectionLost();
}
}

struct DataDeliveryMessage : public Message
{
DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
: owner (ipc), data (d)
DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
: safeAction (ipc), data (d)
{}

void messageCallback() override
{
if (auto* ipc = owner.get())
ipc->messageReceived (data);
safeAction->ifSafe ([this] (InterprocessConnection& owner)

This comment has been minimized.

Copy link
@kunitoki

kunitoki Nov 5, 2020

why not simply use the ReferenceCountedObject and WeakReference instead of rolling the SafeAction stuff ?

{
owner.messageReceived (data);
});
}

WeakReference<InterprocessConnection> owner;
std::shared_ptr<SafeActionImpl> safeAction;
MemoryBlock data;
};

Expand All @@ -267,7 +315,7 @@ void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
jassert (callbackConnectionState);

if (useMessageThread)
(new DataDeliveryMessage (this, data))->post();
(new DataDeliveryMessage (safeAction, data))->post();
else
messageReceived (data);
}
Expand Down
18 changes: 14 additions & 4 deletions modules/juce_events/interprocess/juce_InterprocessConnection.h
Expand Up @@ -43,6 +43,9 @@ class MemoryBlock;
To act as a socket server and create connections for one or more client, see the
InterprocessConnectionServer class.
IMPORTANT NOTE: Your derived Connection class *must* call `disconnect` in its destructor
in order to cancel any pending messages before the class is destroyed.
@see InterprocessConnectionServer, Socket, NamedPipe
@tags{Events}
Expand Down Expand Up @@ -117,7 +120,11 @@ class JUCE_API InterprocessConnection
*/
bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist = false);

/** Disconnects and closes any currently-open sockets or pipes. */
/** Disconnects and closes any currently-open sockets or pipes.
Derived classes *must* call this in their destructors in order to avoid undefined
behaviour.
*/
void disconnect();

/** True if a socket or pipe is currently active. */
Expand Down Expand Up @@ -187,8 +194,9 @@ class JUCE_API InterprocessConnection
int pipeReceiveMessageTimeout = -1;

friend class InterprocessConnectionServer;
void initialiseWithSocket (StreamingSocket*);
void initialiseWithPipe (NamedPipe*);
void initialise();
void initialiseWithSocket (std::unique_ptr<StreamingSocket>);
void initialiseWithPipe (std::unique_ptr<NamedPipe>);
void deletePipeAndSocket();
void connectionMadeInt();
void connectionLostInt();
Expand All @@ -200,10 +208,12 @@ class JUCE_API InterprocessConnection
std::unique_ptr<ConnectionThread> thread;
std::atomic<bool> threadIsRunning { false };

class SafeAction;
std::shared_ptr<SafeAction> safeAction;

void runThread();
int writeData (void*, int);

JUCE_DECLARE_WEAK_REFERENCEABLE (InterprocessConnection)
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnection)
};

Expand Down
Expand Up @@ -73,7 +73,7 @@ void InterprocessConnectionServer::run()

if (clientSocket != nullptr)
if (auto* newConnection = createConnectionObject())
newConnection->initialiseWithSocket (clientSocket.release());
newConnection->initialiseWithSocket (std::move (clientSocket));
}
}

Expand Down

0 comments on commit fb83c45

Please sign in to comment.