-
Notifications
You must be signed in to change notification settings - Fork 14.5k
[lldb] Update JSONTransport to use MainLoop for reading. #148300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This updates JSONTransport to use a MainLoop for reading messages. This also allows us to read in larger chunks than we did previously. With the event driven reading operations we can read in chunks and store the contents in an internal buffer. Separately we can parse the buffer and split the contents up into messages. Our previous version approach would read a byte at a time, which is less efficient.
@llvm/pr-subscribers-lldb Author: John Harrison (ashgti) ChangesThis updates JSONTransport to use a MainLoop for reading messages. This also allows us to read in larger chunks than we did previously. With the event driven reading operations we can read in chunks and store the contents in an internal buffer. Separately we can parse the buffer and split the contents up into messages. Our previous version approach would read a byte at a time, which is less efficient. Patch is 47.17 KiB, truncated to 20.00 KiB below, full version: https://github.com/llvm/llvm-project/pull/148300.diff 11 Files Affected:
diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h
index 4087cdf2b42f7..170aa4a8b8811 100644
--- a/lldb/include/lldb/Host/JSONTransport.h
+++ b/lldb/include/lldb/Host/JSONTransport.h
@@ -13,13 +13,15 @@
#ifndef LLDB_HOST_JSONTRANSPORT_H
#define LLDB_HOST_JSONTRANSPORT_H
+#include "lldb/Host/MainLoopBase.h"
#include "lldb/lldb-forward.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/JSON.h"
-#include <chrono>
+#include <string>
#include <system_error>
+#include <vector>
namespace lldb_private {
@@ -68,6 +70,10 @@ class TransportInvalidError : public llvm::ErrorInfo<TransportInvalidError> {
/// A transport class that uses JSON for communication.
class JSONTransport {
public:
+ using ReadHandleUP = MainLoopBase::ReadHandleUP;
+ template <typename T>
+ using Callback = std::function<void(MainLoopBase &, llvm::Expected<T>)>;
+
JSONTransport(lldb::IOObjectSP input, lldb::IOObjectSP output);
virtual ~JSONTransport() = default;
@@ -83,24 +89,59 @@ class JSONTransport {
return WriteImpl(message);
}
- /// Reads the next message from the input stream.
+ /// Registers the transport with the MainLoop.
template <typename T>
- llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
- llvm::Expected<std::string> message = ReadImpl(timeout);
- if (!message)
- return message.takeError();
- return llvm::json::parse<T>(/*JSON=*/*message);
+ llvm::Expected<ReadHandleUP> RegisterReadObject(MainLoopBase &loop,
+ Callback<T> callback) {
+ Status error;
+ ReadHandleUP handle = loop.RegisterReadObject(
+ m_input,
+ [&](MainLoopBase &loop) {
+ char buf[1024];
+ size_t len = sizeof(buf);
+ do {
+ if (llvm::Error error = m_input->Read(buf, len).takeError()) {
+ callback(loop, std::move(error));
+ return;
+ }
+
+ if (len == 0) // EOF
+ break;
+
+ m_buffer.append(std::string(buf, len));
+ } while (len == sizeof(buf));
+
+ llvm::Expected<std::vector<std::string>> messages = Parse();
+ if (llvm::Error error = messages.takeError()) {
+ callback(loop, std::move(error));
+ return;
+ }
+
+ for (const auto &message : *messages)
+ if constexpr (std::is_same<T, std::string>::value)
+ callback(loop, message);
+ else
+ callback(loop, llvm::json::parse<T>(message));
+
+ // On EOF, request termination after handling all the messages.
+ if (len == 0)
+ callback(loop, llvm::make_error<TransportEOFError>());
+ },
+ error);
+ if (error.Fail())
+ return error.takeError();
+ return handle;
}
protected:
virtual void Log(llvm::StringRef message);
virtual llvm::Error WriteImpl(const std::string &message) = 0;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) = 0;
+ virtual llvm::Expected<std::vector<std::string>> Parse() = 0;
lldb::IOObjectSP m_input;
lldb::IOObjectSP m_output;
+ std::string m_buffer;
};
/// A transport class for JSON with a HTTP header.
@@ -111,14 +152,12 @@ class HTTPDelimitedJSONTransport : public JSONTransport {
virtual ~HTTPDelimitedJSONTransport() = default;
protected:
- virtual llvm::Error WriteImpl(const std::string &message) override;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) override;
-
- // FIXME: Support any header.
- static constexpr llvm::StringLiteral kHeaderContentLength =
- "Content-Length: ";
- static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n\r\n";
+ llvm::Error WriteImpl(const std::string &message) override;
+ llvm::Expected<std::vector<std::string>> Parse() override;
+
+ static constexpr llvm::StringLiteral kHeaderContentLength = "Content-Length";
+ static constexpr llvm::StringLiteral kHeaderFieldSeparator = ":";
+ static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n";
};
/// A transport class for JSON RPC.
@@ -129,9 +168,8 @@ class JSONRPCTransport : public JSONTransport {
virtual ~JSONRPCTransport() = default;
protected:
- virtual llvm::Error WriteImpl(const std::string &message) override;
- virtual llvm::Expected<std::string>
- ReadImpl(const std::chrono::microseconds &timeout) override;
+ llvm::Error WriteImpl(const std::string &message) override;
+ llvm::Expected<std::vector<std::string>> Parse() override;
static constexpr llvm::StringLiteral kMessageSeparator = "\n";
};
diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp
index 546c12c8f7114..01922daf8e285 100644
--- a/lldb/source/Host/common/JSONTransport.cpp
+++ b/lldb/source/Host/common/JSONTransport.cpp
@@ -7,17 +7,14 @@
//===----------------------------------------------------------------------===//
#include "lldb/Host/JSONTransport.h"
-#include "lldb/Utility/IOObject.h"
#include "lldb/Utility/LLDBLog.h"
#include "lldb/Utility/Log.h"
-#include "lldb/Utility/SelectHelper.h"
#include "lldb/Utility/Status.h"
#include "lldb/lldb-forward.h"
#include "llvm/ADT/StringExtras.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"
#include "llvm/Support/raw_ostream.h"
-#include <optional>
#include <string>
#include <utility>
@@ -25,64 +22,6 @@ using namespace llvm;
using namespace lldb;
using namespace lldb_private;
-/// ReadFull attempts to read the specified number of bytes. If EOF is
-/// encountered, an empty string is returned.
-static Expected<std::string>
-ReadFull(IOObject &descriptor, size_t length,
- std::optional<std::chrono::microseconds> timeout = std::nullopt) {
- if (!descriptor.IsValid())
- return llvm::make_error<TransportInvalidError>();
-
- bool timeout_supported = true;
- // FIXME: SelectHelper does not work with NativeFile on Win32.
-#if _WIN32
- timeout_supported = descriptor.GetFdType() == IOObject::eFDTypeSocket;
-#endif
-
- if (timeout && timeout_supported) {
- SelectHelper sh;
- sh.SetTimeout(*timeout);
- sh.FDSetRead(
- reinterpret_cast<lldb::socket_t>(descriptor.GetWaitableHandle()));
- Status status = sh.Select();
- if (status.Fail()) {
- // Convert timeouts into a specific error.
- if (status.GetType() == lldb::eErrorTypePOSIX &&
- status.GetError() == ETIMEDOUT)
- return make_error<TransportTimeoutError>();
- return status.takeError();
- }
- }
-
- std::string data;
- data.resize(length);
- Status status = descriptor.Read(data.data(), length);
- if (status.Fail())
- return status.takeError();
-
- // Read returns '' on EOF.
- if (length == 0)
- return make_error<TransportEOFError>();
-
- // Return the actual number of bytes read.
- return data.substr(0, length);
-}
-
-static Expected<std::string>
-ReadUntil(IOObject &descriptor, StringRef delimiter,
- std::optional<std::chrono::microseconds> timeout = std::nullopt) {
- std::string buffer;
- buffer.reserve(delimiter.size() + 1);
- while (!llvm::StringRef(buffer).ends_with(delimiter)) {
- Expected<std::string> next =
- ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
- if (auto Err = next.takeError())
- return std::move(Err);
- buffer += *next;
- }
- return buffer.substr(0, buffer.size() - delimiter.size());
-}
-
JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output)
: m_input(std::move(input)), m_output(std::move(output)) {}
@@ -90,44 +29,55 @@ void JSONTransport::Log(llvm::StringRef message) {
LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message);
}
-Expected<std::string>
-HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
- if (!m_input || !m_input->IsValid())
- return llvm::make_error<TransportInvalidError>();
+Expected<std::vector<std::string>> HTTPDelimitedJSONTransport::Parse() {
+ if (m_buffer.empty())
+ return std::vector<std::string>{};
+
+ std::vector<std::string> messages;
+ llvm::StringRef buf = m_buffer;
+ size_t content_length = 0, end_of_last_message = 0, cursor = 0;
+ do {
+ auto idx = buf.find(kHeaderSeparator, cursor);
+ if (idx == StringRef::npos)
+ break;
+
+ auto header = buf.slice(cursor, idx);
+ cursor = idx + kHeaderSeparator.size();
+
+ // An empty line separates the headers from the message body.
+ if (header.empty()) {
+ // Not enough data, wait for the next chunk to arrive.
+ if (content_length + cursor > buf.size())
+ break;
+
+ std::string body = buf.substr(cursor, content_length).str();
+ end_of_last_message = cursor + content_length;
+ cursor += content_length;
+ Log(llvm::formatv("--> {0}", body).str());
+ messages.push_back(body);
+ content_length = 0;
+ continue;
+ }
+
+ // HTTP Headers are `<field-name>: [<field-value>]`.
+ if (!header.contains(kHeaderFieldSeparator))
+ return make_error<StringError>("malformed content header",
+ inconvertibleErrorCode());
+
+ auto [name, value] = header.split(kHeaderFieldSeparator);
+ if (name.lower() == kHeaderContentLength.lower()) {
+ value = value.trim();
+ if (value.trim().consumeInteger(10, content_length))
+ return make_error<StringError>(
+ formatv("invalid content length: {0}", value).str(),
+ inconvertibleErrorCode());
+ }
+ } while (cursor < buf.size());
- IOObject *input = m_input.get();
- Expected<std::string> message_header =
- ReadFull(*input, kHeaderContentLength.size(), timeout);
- if (!message_header)
- return message_header.takeError();
- if (*message_header != kHeaderContentLength)
- return createStringError(formatv("expected '{0}' and got '{1}'",
- kHeaderContentLength, *message_header)
- .str());
-
- Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator);
- if (!raw_length)
- return handleErrors(raw_length.takeError(),
- [&](const TransportEOFError &E) -> llvm::Error {
- return createStringError(
- "unexpected EOF while reading header separator");
- });
-
- size_t length;
- if (!to_integer(*raw_length, length))
- return createStringError(
- formatv("invalid content length {0}", *raw_length).str());
-
- Expected<std::string> raw_json = ReadFull(*input, length);
- if (!raw_json)
- return handleErrors(
- raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error {
- return createStringError("unexpected EOF while reading JSON");
- });
-
- Log(llvm::formatv("--> {0}", *raw_json).str());
-
- return raw_json;
+ // Store the remainder of the buffer for the next read callback.
+ m_buffer = buf.substr(end_of_last_message);
+
+ return messages;
}
Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
@@ -138,25 +88,29 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
std::string Output;
raw_string_ostream OS(Output);
- OS << kHeaderContentLength << message.length() << kHeaderSeparator << message;
+ OS << kHeaderContentLength << kHeaderFieldSeparator << ' ' << message.length()
+ << kHeaderSeparator << kHeaderSeparator << message;
size_t num_bytes = Output.size();
return m_output->Write(Output.data(), num_bytes).takeError();
}
-Expected<std::string>
-JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) {
- if (!m_input || !m_input->IsValid())
- return make_error<TransportInvalidError>();
-
- IOObject *input = m_input.get();
- Expected<std::string> raw_json =
- ReadUntil(*input, kMessageSeparator, timeout);
- if (!raw_json)
- return raw_json.takeError();
-
- Log(llvm::formatv("--> {0}", *raw_json).str());
-
- return *raw_json;
+Expected<std::vector<std::string>> JSONRPCTransport::Parse() {
+ std::vector<std::string> messages;
+ StringRef buf = m_buffer;
+ do {
+ size_t idx = buf.find(kMessageSeparator);
+ if (idx == StringRef::npos)
+ break;
+ std::string raw_json = buf.substr(0, idx).str();
+ buf = buf.substr(idx + 1);
+ Log(llvm::formatv("--> {0}", raw_json).str());
+ messages.push_back(raw_json);
+ } while (!buf.empty());
+
+ // Store the remainder of the buffer for the next read callback.
+ m_buffer = buf.str();
+
+ return messages;
}
Error JSONRPCTransport::WriteImpl(const std::string &message) {
diff --git a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
index b72b98de412b4..3c21d7fca5536 100644
--- a/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
+++ b/lldb/test/API/tools/lldb-dap/io/TestDAP_io.py
@@ -48,18 +48,18 @@ def test_invalid_header(self):
lldb-dap handles invalid message headers.
"""
process = self.launch()
- process.stdin.write(b"not the corret message header")
+ process.stdin.write(b"not the correct message header")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), 0)
def test_partial_header(self):
"""
- lldb-dap handles parital message headers.
+ lldb-dap handles partial message headers.
"""
process = self.launch()
process.stdin.write(b"Content-Length: ")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), 0)
def test_incorrect_content_length(self):
"""
@@ -68,7 +68,7 @@ def test_incorrect_content_length(self):
process = self.launch()
process.stdin.write(b"Content-Length: abc")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), 0)
def test_partial_content_length(self):
"""
@@ -77,4 +77,4 @@ def test_partial_content_length(self):
process = self.launch()
process.stdin.write(b"Content-Length: 10\r\n\r\n{")
process.stdin.close()
- self.assertEqual(process.wait(timeout=5.0), 1)
+ self.assertEqual(process.wait(timeout=5.0), 0)
diff --git a/lldb/tools/lldb-dap/DAP.cpp b/lldb/tools/lldb-dap/DAP.cpp
index fd89f52595ec6..63f9c9ddb7390 100644
--- a/lldb/tools/lldb-dap/DAP.cpp
+++ b/lldb/tools/lldb-dap/DAP.cpp
@@ -23,13 +23,14 @@
#include "Transport.h"
#include "lldb/API/SBBreakpoint.h"
#include "lldb/API/SBCommandInterpreter.h"
-#include "lldb/API/SBCommandReturnObject.h"
#include "lldb/API/SBEvent.h"
#include "lldb/API/SBLanguageRuntime.h"
#include "lldb/API/SBListener.h"
#include "lldb/API/SBProcess.h"
#include "lldb/API/SBStream.h"
-#include "lldb/Utility/IOObject.h"
+#include "lldb/Host/JSONTransport.h"
+#include "lldb/Host/MainLoop.h"
+#include "lldb/Host/MainLoopBase.h"
#include "lldb/Utility/Status.h"
#include "lldb/lldb-defines.h"
#include "lldb/lldb-enumerations.h"
@@ -52,7 +53,7 @@
#include <cstdarg>
#include <cstdint>
#include <cstdio>
-#include <fstream>
+#include <functional>
#include <future>
#include <memory>
#include <mutex>
@@ -919,6 +920,8 @@ llvm::Error DAP::Disconnect(bool terminateDebuggee) {
SendTerminatedEvent();
disconnecting = true;
+ m_loop.AddPendingCallback(
+ [](MainLoopBase &loop) { loop.RequestTermination(); });
return ToError(error);
}
@@ -949,75 +952,76 @@ static std::optional<T> getArgumentsIfRequest(const Message &pm,
return args;
}
-llvm::Error DAP::Loop() {
- // Can't use \a std::future<llvm::Error> because it doesn't compile on
- // Windows.
- std::future<lldb::SBError> queue_reader =
- std::async(std::launch::async, [&]() -> lldb::SBError {
- llvm::set_thread_name(transport.GetClientName() + ".transport_handler");
- auto cleanup = llvm::make_scope_exit([&]() {
- // Ensure we're marked as disconnecting when the reader exits.
- disconnecting = true;
- m_queue_cv.notify_all();
- });
-
- while (!disconnecting) {
- llvm::Expected<Message> next =
- transport.Read<protocol::Message>(std::chrono::seconds(1));
- if (next.errorIsA<TransportEOFError>()) {
- consumeError(next.takeError());
- break;
- }
+Status DAP::TransportHandler() {
+ llvm::set_thread_name(transport.GetClientName() + ".transport_handler");
- // If the read timed out, continue to check if we should disconnect.
- if (next.errorIsA<TransportTimeoutError>()) {
- consumeError(next.takeError());
- continue;
- }
+ auto cleanup = llvm::make_scope_exit([&]() {
+ // Ensure we're marked as disconnecting when the reader exits.
+ disconnecting = true;
+ m_queue_cv.notify_all();
+ });
- if (llvm::Error err = next.takeError()) {
- lldb::SBError errWrapper;
- errWrapper.SetErrorString(llvm::toString(std::move(err)).c_str());
- return errWrapper;
- }
+ Status status;
+ auto handle = transport.RegisterReadObject<protocol::Message>(
+ m_loop,
+ [&](MainLoopBase &loop, llvm::Expected<protocol::Message> message) {
+ if (message.errorIsA<TransportEOFError>()) {
+ llvm::consumeError(message.takeError());
+ loop.RequestTermination();
+ return;
+ }
- if (const protocol::Request *req =
- std::get_if<protocol::Request>(&*next);
- req && req->arguments == "disconnect")
- disconnecting = true;
-
- const std::optional<CancelArguments> cancel_args =
- getArgumentsIfRequest<CancelArguments>(*next, "cancel");
- if (cancel_args) {
- {
- std::lock_guard<std::mutex> guard(m_cancelled_requests_mutex);
- if (cancel_args->requestId)
- m_cancelled_requests.insert(*cancel_args->requestId);
- }
+ if (llvm::Error err = message.takeError()) {
+ status = Status::FromError(std::move(err));
+ loop.RequestTermination();
+ return;
+ }
- // If a cancel is requested for the active request, make a best
- // effort attempt to interrupt.
- std::lock_guard<std::mutex> guard(m_active_request_mutex);
- if (m_active_request &&
- cancel_args->requestId == m_active_request->seq) {
- DAP_LOG(
- log,
- "({0}) interrupting inflight request (command={1} seq={2})",
- transport.GetClientName(), m_active_request->command,
- m_active_request->seq);
- debugger.RequestInterrupt();
- }
- }
+ if (const protocol::Request *req =
+ std::get_if<protocol::Request>(&*message);
+ req && req->arguments == "disconnect")
+ disconnecting = true;
+ const std::optional<CancelArguments> cancel_args =
+ getArgumentsIfRequest<CancelArguments>(*message, "cancel");
+ if (cancel_args) {
{
- std::lock_guard<std::mutex> guard(m_queue_mutex);
- m_queue.push_back(std::move(*next));
+ std::lock_guard<std::mutex> guard(m_cancelled_requests_mutex);
+ if (cancel_args->requestId)
+ m_cancelled_requests.insert(*cancel_args->requestId);
+ }
+
+ // If a cancel is requested for the active request, make a best
+ // effort attempt to interrupt.
+ std::lock_guard<std::mutex> guard(m_active_request_mutex);
+ if (m_active_request &&
+ cancel_args->requestId == m_active_request->seq) {
+ DAP_LOG(log,
+ "({0}) interrupting inflight request (command={1} seq={2})",
+ transport.GetClientName(), m_active_request->command,
+ m_active_request->seq);
+ debugger.RequestInterrupt();
}
- m_queue_cv.notify_one();
}
- return lldb::SBError();
+ {
+ std::loc...
[truncated]
|
This should fix #146576 |
process.stdin.close() | ||
self.assertEqual(process.wait(timeout=5.0), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you quickly explain what these 1 and 0 means in both versions of the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comments in the test.
These are testing how lldb-dap handles incorrect input, like partial messages or errors handling messages.
For example,
# This should be fine, no messages in the buffer, nothing left to do so it exit cleanly.
$ printf "" | ~/Projects/lldb-build/bin/lldb-dap
# exit status 0
# malformed message header
$ printf "\r\n" | ~/Projects/lldb-build/bin/lldb-dap
DAP session error: [1:0, byte=0]: Unexpected EOF
# exit status 1
# incomplete message
$ printf "Content-Length: 10\r\n" | ~/Projects/lldb-build/bin/lldb-dap
DAP session error: transport EOF with unhandled contents Content-Length: 10
# exit status 1
# parse error
$ printf "Content-Length: 3\r\n\r\naaa" | ~/Projects/lldb-build/bin/lldb-dap
DAP session error: [1:1, byte=1]: Invalid JSON value
# exit status 1
consumeError(next.takeError()); | ||
break; | ||
} | ||
Status DAP::TransportHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you return an llvm::Error? I find Status not as powerful as Error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ran into issues when building with MSVC 2019 and using an std::future<llvm::Error>
(on line 1023), see #137388
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty cool that the MainLoop can be used for this.
auto idx = buf.find(kHeaderSeparator, cursor); | ||
if (idx == StringRef::npos) | ||
break; | ||
|
||
auto header = buf.slice(cursor, idx); | ||
cursor = idx + kHeaderSeparator.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can get rid of idx
if you use auto [head, tail] = buf.split(kHeaderSeparator)
and then make buf = tail
while !buf.empty()
. Less "lumberjacky" as @labath might call it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure exactly how to use that though because we need to track how much of the buffer we've consumed and where we are for parsing the headers.
end_of_last_message
is tracking the index of the amount of the buffer we've consumed and cursor
is tracking the position of the headers we've parsed so far.
If I use split, I lose track of the index in the overall buffer for marking what we've consumed when we break out of the loop.
I added Windows support to MainLoop for pipes in 1a7b7e2, which unblocks this. |
This updates JSONTransport to use a MainLoop for reading messages.
This also allows us to read in larger chunks than we did previously. With the event driven reading operations we can read in chunks and store the contents in an internal buffer. Separately we can parse the buffer and split the contents up into messages.
Our previous version approach would read a byte at a time, which is less efficient.