Skip to content

Commit

Permalink
[lldb] [Core] Split read thread support into ThreadedCommunication
Browse files Browse the repository at this point in the history
Split the read thread support from Communication into a dedicated
ThreadedCommunication subclass.  The read thread support is used only
by a subset of Communication consumers, and it adds a lot of complexity
to the base class.  Furthermore, having a dedicated subclass makes it
clear whether a particular consumer needs to account for the possibility
of read thread being running or not.

The modules currently calling `StartReadThread()` are updated to use
`ThreadedCommunication`.  The remaining modules use the simplified
`Communication` class.

`SBCommunication` is changed to use `ThreadedCommunication` in order
to avoid changing the public API.

`CommunicationKDP` is updated in order to (hopefully) compile with
the new code.  However, I do not have a Darwin box to test it, so I've
limited the changes to the bare minimum.

`GDBRemoteCommunication` is updated to become a `Broadcaster` directly.
Since it does not inherit from `ThreadedCommunication`, its event
support no longer collides with the one used for read thread and can
be implemented cleanly.  The support for
`eBroadcastBitReadThreadDidExit` is removed from the code -- since
the read thread was not used, this event was never reported.

Sponsored by: The FreeBSD Foundation
Differential Revision: https://reviews.llvm.org/D133251
  • Loading branch information
mgorny committed Sep 6, 2022
1 parent 83552e8 commit 9823d42
Show file tree
Hide file tree
Showing 18 changed files with 680 additions and 554 deletions.
2 changes: 1 addition & 1 deletion lldb/include/lldb/API/SBCommunication.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class LLDB_API SBCommunication {
SBCommunication(const SBCommunication &) = delete;
const SBCommunication &operator=(const SBCommunication &) = delete;

lldb_private::Communication *m_opaque = nullptr;
lldb_private::ThreadedCommunication *m_opaque = nullptr;
bool m_opaque_owned = false;
};

Expand Down
230 changes: 14 additions & 216 deletions lldb/include/lldb/Core/Communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,15 @@
#ifndef LLDB_CORE_COMMUNICATION_H
#define LLDB_CORE_COMMUNICATION_H

#include "lldb/Host/HostThread.h"
#include "lldb/Utility/Broadcaster.h"
#include "lldb/Utility/Timeout.h"
#include "lldb/lldb-defines.h"
#include "lldb/lldb-enumerations.h"
#include "lldb/lldb-forward.h"
#include "lldb/lldb-types.h"

#include <atomic>
#include <mutex>
#include <ratio>
#include <string>

#include <cstddef>
#include <cstdint>

namespace lldb_private {
class Connection;
class ConstString;
Expand All @@ -38,90 +31,22 @@ class Status;
/// approach has a couple of advantages: it allows a single instance of this
/// class to be used even though its connection can change. Connections could
/// negotiate for different connections based on abilities like starting with
/// Bluetooth and negotiating up to WiFi if available. It also allows this
/// class to be subclassed by any interfaces that don't want to give bytes but
/// want to validate and give out packets. This can be done by overriding:
///
/// AppendBytesToCache (const uint8_t *src, size_t src_len, bool broadcast);
///
/// Communication inherits from Broadcaster which means it can be used in
/// conjunction with Listener to wait for multiple broadcaster objects and
/// multiple events from each of those objects. Communication defines a set of
/// pre-defined event bits (see enumerations definitions that start with
/// "eBroadcastBit" below).
///
/// There are two modes in which communications can occur:
/// \li single-threaded
/// \li multi-threaded
///
/// In single-threaded mode, all reads and writes happen synchronously on the
/// calling thread.
///
/// In multi-threaded mode, a read thread is spawned that continually reads
/// data and caches any received bytes. To start the read thread clients call:
///
/// bool Communication::StartReadThread (Status *);
///
/// If true is returned a read thread has been spawned that will continually
/// execute a call to the pure virtual DoRead function:
/// Bluetooth and negotiating up to WiFi if available.
///
/// size_t Communication::ReadFromConnection (void *, size_t, uint32_t);
///
/// When bytes are received the data gets cached in \a m_bytes and this class
/// will broadcast a \b eBroadcastBitReadThreadGotBytes event. Clients that
/// want packet based communication should override AppendBytesToCache. The
/// subclasses can choose to call the built in AppendBytesToCache with the \a
/// broadcast parameter set to false. This will cause the \b
/// eBroadcastBitReadThreadGotBytes event not get broadcast, and then the
/// subclass can post a \b eBroadcastBitPacketAvailable event when a full
/// packet of data has been received.
///
/// If the connection is disconnected a \b eBroadcastBitDisconnected event
/// gets broadcast. If the read thread exits a \b
/// eBroadcastBitReadThreadDidExit event will be broadcast. Clients can also
/// post a \b eBroadcastBitReadThreadShouldExit event to this object which
/// will cause the read thread to exit.
class Communication : public Broadcaster {
/// When using this class, all reads and writes happen synchronously on the
/// calling thread. There is also a ThreadedCommunication class that supports
/// multi-threaded mode.
class Communication {
public:
FLAGS_ANONYMOUS_ENUM(){
eBroadcastBitDisconnected =
(1u << 0), ///< Sent when the communications connection is lost.
eBroadcastBitReadThreadGotBytes =
(1u << 1), ///< Sent by the read thread when bytes become available.
eBroadcastBitReadThreadDidExit =
(1u
<< 2), ///< Sent by the read thread when it exits to inform clients.
eBroadcastBitReadThreadShouldExit =
(1u << 3), ///< Sent by clients that need to cancel the read thread.
eBroadcastBitPacketAvailable =
(1u << 4), ///< Sent when data received makes a complete packet.
eBroadcastBitNoMorePendingInput = (1u << 5), ///< Sent by the read thread
///to indicate all pending
///input has been processed.
kLoUserBroadcastBit =
(1u << 16), ///< Subclasses can used bits 31:16 for any needed events.
kHiUserBroadcastBit = (1u << 31),
eAllEventBits = 0xffffffff};

typedef void (*ReadThreadBytesReceived)(void *baton, const void *src,
size_t src_len);

/// Construct the Communication object with the specified name for the
/// Broadcaster that this object inherits from.
///
/// \param[in] broadcaster_name
/// The name of the broadcaster object. This name should be as
/// complete as possible to uniquely identify this object. The
/// broadcaster name can be updated after the connect function
/// is called.
Communication(const char *broadcaster_name);
/// Construct the Communication object.
Communication();

/// Destructor.
///
/// The destructor is virtual since this class gets subclassed.
~Communication() override;
virtual ~Communication();

void Clear();
virtual void Clear();

/// Connect using the current connection by passing \a url to its connect
/// function. string.
Expand All @@ -148,7 +73,7 @@ class Communication : public Broadcaster {
///
/// \see Status& Communication::GetError ();
/// \see bool Connection::Disconnect ();
lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);
virtual lldb::ConnectionStatus Disconnect(Status *error_ptr = nullptr);

/// Check if the connection is valid.
///
Expand All @@ -166,13 +91,6 @@ class Communication : public Broadcaster {
/// If no read thread is running, this function call the connection's
/// Connection::Read(...) function to get any available.
///
/// If a read thread has been started, this function will check for any
/// cached bytes that have already been read and return any currently
/// available bytes. If no bytes are cached, it will wait for the bytes to
/// become available by listening for the \a eBroadcastBitReadThreadGotBytes
/// event. If this function consumes all of the bytes in the cache, it will
/// reset the \a eBroadcastBitReadThreadGotBytes event bit.
///
/// \param[in] dst
/// A destination buffer that must be at least \a dst_len bytes
/// long.
Expand All @@ -188,8 +106,9 @@ class Communication : public Broadcaster {
/// The number of bytes actually read.
///
/// \see size_t Connection::Read (void *, size_t);
size_t Read(void *dst, size_t dst_len, const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);
virtual size_t Read(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);

/// The actual write function that attempts to write to the communications
/// protocol.
Expand Down Expand Up @@ -237,146 +156,25 @@ class Communication : public Broadcaster {
///
/// \see
/// class Connection
void SetConnection(std::unique_ptr<Connection> connection);

/// Starts a read thread whose sole purpose it to read bytes from the
/// current connection. This function will call connection's read function:
///
/// size_t Connection::Read (void *, size_t);
///
/// When bytes are read and cached, this function will call:
///
/// Communication::AppendBytesToCache (const uint8_t * bytes, size_t len,
/// bool
/// broadcast);
///
/// Subclasses should override this function if they wish to override the
/// default action of caching the bytes and broadcasting a \b
/// eBroadcastBitReadThreadGotBytes event.
///
/// \return
/// \b True if the read thread was successfully started, \b
/// false otherwise.
///
/// \see size_t Connection::Read (void *, size_t);
/// \see void Communication::AppendBytesToCache (const uint8_t * bytes,
/// size_t len, bool broadcast);
virtual bool StartReadThread(Status *error_ptr = nullptr);

/// Stops the read thread by cancelling it.
///
/// \return
/// \b True if the read thread was successfully canceled, \b
/// false otherwise.
virtual bool StopReadThread(Status *error_ptr = nullptr);

virtual bool JoinReadThread(Status *error_ptr = nullptr);
/// Checks if there is a currently running read thread.
///
/// \return
/// \b True if the read thread is running, \b false otherwise.
bool ReadThreadIsRunning();

/// The read thread function. This function will call the "DoRead"
/// function continuously and wait for data to become available. When data
/// is received it will append the available data to the internal cache and
/// broadcast a \b eBroadcastBitReadThreadGotBytes event.
///
/// \param[in] comm_ptr
/// A pointer to an instance of this class.
///
/// \return
/// \b NULL.
///
/// \see void Communication::ReadThreadGotBytes (const uint8_t *, size_t);
lldb::thread_result_t ReadThread();

void SetReadThreadBytesReceivedCallback(ReadThreadBytesReceived callback,
void *callback_baton);

/// Wait for the read thread to process all outstanding data.
///
/// After this function returns, the read thread has processed all data that
/// has been waiting in the Connection queue.
///
void SynchronizeWithReadThread();
virtual void SetConnection(std::unique_ptr<Connection> connection);

static std::string ConnectionStatusAsString(lldb::ConnectionStatus status);

bool GetCloseOnEOF() const { return m_close_on_eof; }

void SetCloseOnEOF(bool b) { m_close_on_eof = b; }

static ConstString &GetStaticBroadcasterClass();

ConstString &GetBroadcasterClass() const override {
return GetStaticBroadcasterClass();
}

protected:
lldb::ConnectionSP m_connection_sp; ///< The connection that is current in use
///by this communications class.
HostThread m_read_thread; ///< The read thread handle in case we need to
///cancel the thread.
std::atomic<bool> m_read_thread_enabled;
std::atomic<bool> m_read_thread_did_exit;
std::string
m_bytes; ///< A buffer to cache bytes read in the ReadThread function.
std::recursive_mutex m_bytes_mutex; ///< A mutex to protect multi-threaded
///access to the cached bytes.
lldb::ConnectionStatus m_pass_status; ///< Connection status passthrough
///from read thread.
Status m_pass_error; ///< Error passthrough from read thread.
std::mutex
m_write_mutex; ///< Don't let multiple threads write at the same time...
std::mutex m_synchronize_mutex;
ReadThreadBytesReceived m_callback;
void *m_callback_baton;
bool m_close_on_eof;

size_t ReadFromConnection(void *dst, size_t dst_len,
const Timeout<std::micro> &timeout,
lldb::ConnectionStatus &status, Status *error_ptr);

/// Append new bytes that get read from the read thread into the internal
/// object byte cache. This will cause a \b eBroadcastBitReadThreadGotBytes
/// event to be broadcast if \a broadcast is true.
///
/// Subclasses can override this function in order to inspect the received
/// data and check if a packet is available.
///
/// Subclasses can also still call this function from the overridden method
/// to allow the caching to correctly happen and suppress the broadcasting
/// of the \a eBroadcastBitReadThreadGotBytes event by setting \a broadcast
/// to false.
///
/// \param[in] src
/// A source buffer that must be at least \a src_len bytes
/// long.
///
/// \param[in] src_len
/// The number of bytes to append to the cache.
virtual void AppendBytesToCache(const uint8_t *src, size_t src_len,
bool broadcast,
lldb::ConnectionStatus status);

/// Get any available bytes from our data cache. If this call empties the
/// data cache, the \b eBroadcastBitReadThreadGotBytes event will be reset
/// to signify no more bytes are available.
///
/// \param[in] dst
/// A destination buffer that must be at least \a dst_len bytes
/// long.
///
/// \param[in] dst_len
/// The number of bytes to attempt to read from the cache,
/// and also the max number of bytes that can be placed into
/// \a dst.
///
/// \return
/// The number of bytes extracted from the data cache.
size_t GetCachedBytes(void *dst, size_t dst_len);

private:
Communication(const Communication &) = delete;
const Communication &operator=(const Communication &) = delete;
Expand Down

0 comments on commit 9823d42

Please sign in to comment.