Skip to content

Commit

Permalink
Add a read_full_buffer argument to ConnectionFileDescriptor::Read
Browse files Browse the repository at this point in the history
Summary:
AdbClient was attempting to handle the case where the socket input arrived in pieces, but it was
failing to handle the case where the connection was closed before that happened. In this case, it
would just spin in an infinite loop calling Connection::Read. (This was also the cause of the
spurious timeouts on the darwin->android buildbot. The exact cause of the premature EOF remains
to be investigated, but is likely a server bug.)

Since this wait-for-a-certain-number-of-bytes seems like a useful functionality to have, I am
moving it (with the infinite loop fixed) to the Connection class, and adding an
appropriate test for it.

Reviewers: clayborg, zturner, ovyalov

Subscribers: tberghammer, danalbert, lldb-commits

Differential Revision: http://reviews.llvm.org/D19533

llvm-svn: 268380
  • Loading branch information
labath committed May 3, 2016
1 parent fe98b2f commit 2407602
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 127 deletions.
15 changes: 10 additions & 5 deletions lldb/include/lldb/Core/Connection.h
Expand Up @@ -114,6 +114,14 @@ class Connection
/// @param[in] timeout_usec
/// The number of microseconds to wait for the data.
///
/// @param[in] read_full_buffer
/// If true, continues reading until the specified number of bytes is
/// read or some exceptional event occurs, which would prevent the
/// buffer from being filled (timeout, end of file, I/O error, etc.).
/// If false, the function returns as soon as at least some part of
/// the data is available (traditional behavior of the read system
/// call).
///
/// @param[out] status
/// On return, indicates whether the call was successful or terminated
/// due to some error condition.
Expand All @@ -129,11 +137,8 @@ class Connection
/// @see size_t Communication::Read (void *, size_t, uint32_t);
//------------------------------------------------------------------
virtual size_t
Read (void *dst,
size_t dst_len,
uint32_t timeout_usec,
lldb::ConnectionStatus &status,
Error *error_ptr) = 0;
Read(void *dst, size_t dst_len, uint32_t timeout_usec, bool read_full_buffer, lldb::ConnectionStatus &status,
Error *error_ptr) = 0;

//------------------------------------------------------------------
/// The actual write function that attempts to write to the
Expand Down
7 changes: 2 additions & 5 deletions lldb/include/lldb/Core/ConnectionSharedMemory.h
Expand Up @@ -43,11 +43,8 @@ class ConnectionSharedMemory :
Disconnect (Error *error_ptr) override;

size_t
Read (void *dst,
size_t dst_len,
uint32_t timeout_usec,
lldb::ConnectionStatus &status,
Error *error_ptr) override;
Read(void *dst, size_t dst_len, uint32_t timeout_usec, bool read_full_buffer, lldb::ConnectionStatus &status,
Error *error_ptr) override;

size_t
Write (const void *src, size_t src_len, lldb::ConnectionStatus &status, Error *error_ptr) override;
Expand Down
4 changes: 3 additions & 1 deletion lldb/include/lldb/Host/posix/ConnectionFileDescriptorPosix.h
Expand Up @@ -59,7 +59,9 @@ class ConnectionFileDescriptor : public Connection

lldb::ConnectionStatus Disconnect(Error *error_ptr) override;

size_t Read(void *dst, size_t dst_len, uint32_t timeout_usec, lldb::ConnectionStatus &status, Error *error_ptr) override;
size_t
Read(void *dst, size_t dst_len, uint32_t timeout_usec, bool read_full_buffer, lldb::ConnectionStatus &status,
Error *error_ptr) override;

size_t Write(const void *src, size_t src_len, lldb::ConnectionStatus &status, Error *error_ptr) override;

Expand Down
6 changes: 4 additions & 2 deletions lldb/source/Core/Communication.cpp
Expand Up @@ -191,7 +191,8 @@ Communication::Read (void *dst, size_t dst_len, uint32_t timeout_usec, Connectio
lldb::ConnectionSP connection_sp (m_connection_sp);
if (connection_sp)
{
return connection_sp->Read (dst, dst_len, timeout_usec, status, error_ptr);
const bool read_full_buffer = false;
return connection_sp->Read(dst, dst_len, timeout_usec, read_full_buffer, status, error_ptr);
}

if (error_ptr)
Expand Down Expand Up @@ -326,7 +327,8 @@ Communication::ReadFromConnection (void *dst,
Error *error_ptr)
{
lldb::ConnectionSP connection_sp(m_connection_sp);
return (connection_sp ? connection_sp->Read(dst, dst_len, timeout_usec, status, error_ptr) : 0);
const bool read_full_buffer = false;
return (connection_sp ? connection_sp->Read(dst, dst_len, read_full_buffer, timeout_usec, status, error_ptr) : 0);
}

bool
Expand Down
7 changes: 2 additions & 5 deletions lldb/source/Core/ConnectionSharedMemory.cpp
Expand Up @@ -94,11 +94,8 @@ ConnectionSharedMemory::Disconnect (Error *error_ptr)
}

size_t
ConnectionSharedMemory::Read (void *dst,
size_t dst_len,
uint32_t timeout_usec,
ConnectionStatus &status,
Error *error_ptr)
ConnectionSharedMemory::Read(void *dst, size_t dst_len, uint32_t timeout_usec, bool read_full_buffer,
ConnectionStatus &status, Error *error_ptr)
{
status = eConnectionStatusSuccess;
return 0;
Expand Down
5 changes: 3 additions & 2 deletions lldb/source/Host/common/Editline.cpp
Expand Up @@ -580,6 +580,7 @@ Editline::GetCharacter (EditLineCharType * c)
// Read an actual character
while (true)
{
const bool read_full_buffer = false; // Doesn't really matter, we're reading one byte only.
lldb::ConnectionStatus status = lldb::eConnectionStatusSuccess;
char ch = 0;

Expand All @@ -588,12 +589,12 @@ Editline::GetCharacter (EditLineCharType * c)
// for someone to interrupt us. After Read returns, immediately lock the mutex again and
// check if we were interrupted.
m_output_mutex.Unlock();
int read_count = m_input_connection.Read(&ch, 1, UINT32_MAX, status, NULL);
int read_count = m_input_connection.Read(&ch, 1, UINT32_MAX, read_full_buffer, status, NULL);
m_output_mutex.Lock();
if (m_editor_status == EditorStatus::Interrupted)
{
while (read_count > 0 && status == lldb::eConnectionStatusSuccess)
read_count = m_input_connection.Read(&ch, 1, UINT32_MAX, status, NULL);
read_count = m_input_connection.Read(&ch, 1, UINT32_MAX, read_full_buffer, status, NULL);
lldbassert(status == lldb::eConnectionStatusInterrupted);
return 0;
}
Expand Down
49 changes: 30 additions & 19 deletions lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp
Expand Up @@ -411,7 +411,8 @@ ConnectionFileDescriptor::Disconnect(Error *error_ptr)
}

size_t
ConnectionFileDescriptor::Read(void *dst, size_t dst_len, uint32_t timeout_usec, ConnectionStatus &status, Error *error_ptr)
ConnectionFileDescriptor::Read(void *dst, size_t dst_len, uint32_t timeout_usec, bool read_full_buffer,
ConnectionStatus &status, Error *error_ptr)
{
Log *log(lldb_private::GetLogIfAnyCategoriesSet(LIBLLDB_LOG_CONNECTION));

Expand All @@ -434,26 +435,36 @@ ConnectionFileDescriptor::Read(void *dst, size_t dst_len, uint32_t timeout_usec,
return 0;
}

status = BytesAvailable(timeout_usec, error_ptr);
if (status != eConnectionStatusSuccess)
return 0;

size_t total_bytes_read = 0;
char *dst_buf = static_cast<char *>(dst);
auto now = std::chrono::steady_clock::now();
const auto deadline = now + std::chrono::microseconds(timeout_usec);
Error error;
size_t bytes_read = dst_len;
error = m_read_sp->Read(dst, bytes_read);

if (log)
do
{
log->Printf("%p ConnectionFileDescriptor::Read() fd = %" PRIu64 ", dst = %p, dst_len = %" PRIu64 ") => %" PRIu64 ", error = %s",
static_cast<void *>(this), static_cast<uint64_t>(m_read_sp->GetWaitableHandle()), static_cast<void *>(dst),
static_cast<uint64_t>(dst_len), static_cast<uint64_t>(bytes_read), error.AsCString());
}
timeout_usec = std::chrono::duration_cast<std::chrono::microseconds>(deadline - now).count();
status = BytesAvailable(timeout_usec, error_ptr);
if (status != eConnectionStatusSuccess)
return 0;

if (bytes_read == 0)
{
error.Clear(); // End-of-file. Do not automatically close; pass along for the end-of-file handlers.
status = eConnectionStatusEndOfFile;
}
size_t bytes_read = dst_len - total_bytes_read;
error = m_read_sp->Read(dst_buf + total_bytes_read, bytes_read);
if (log)
{
log->Printf("%p ConnectionFileDescriptor::Read() fd = %" PRIu64 ", dst = %p, dst_len = %" PRIu64
") => %" PRIu64 ", error = %s",
this, static_cast<uint64_t>(m_read_sp->GetWaitableHandle()), dst,
static_cast<uint64_t>(dst_len), static_cast<uint64_t>(bytes_read), error.AsCString());
}
total_bytes_read += bytes_read;
if (bytes_read == 0)
{
// End-of-file. Do not automatically close; pass along for the end-of-file handlers.
error.Clear();
status = eConnectionStatusEndOfFile;
}
now = std::chrono::steady_clock::now();
} while (read_full_buffer && total_bytes_read < dst_len && status == eConnectionStatusSuccess && now < deadline);

if (error_ptr)
*error_ptr = error;
Expand Down Expand Up @@ -509,7 +520,7 @@ ConnectionFileDescriptor::Read(void *dst, size_t dst_len, uint32_t timeout_usec,

return 0;
}
return bytes_read;
return total_bytes_read;
}

size_t
Expand Down
24 changes: 11 additions & 13 deletions lldb/source/Plugins/Platform/Android/AdbClient.cpp
Expand Up @@ -34,7 +34,7 @@ using namespace lldb_private::platform_android;

namespace {

const uint32_t kReadTimeout = 1000000; // 1 second
const uint32_t kReadTimeout = 4000000; // 4 seconds
const char * kOKAY = "OKAY";
const char * kFAIL = "FAIL";
const char * kDATA = "DATA";
Expand Down Expand Up @@ -251,7 +251,9 @@ AdbClient::ReadMessageStream (std::vector<char>& message, uint32_t timeout_ms)
if (elapsed_time >= timeout_ms)
return Error("Timed out");

size_t n = m_conn.Read(buffer, sizeof(buffer), 1000 * (timeout_ms - elapsed_time), status, &error);
const bool read_full_buffer = true;
size_t n =
m_conn.Read(buffer, sizeof(buffer), 1000 * (timeout_ms - elapsed_time), read_full_buffer, status, &error);
if (n > 0)
message.insert(message.end(), &buffer[0], &buffer[n]);
}
Expand Down Expand Up @@ -490,19 +492,15 @@ AdbClient::ReadSyncHeader (std::string &response_id, uint32_t &data_len)
Error
AdbClient::ReadAllBytes (void *buffer, size_t size)
{
const bool read_full_buffer = true;
Error error;
ConnectionStatus status;
char *read_buffer = static_cast<char*>(buffer);

size_t tota_read_bytes = 0;
while (tota_read_bytes < size)
{
auto read_bytes = m_conn.Read (read_buffer + tota_read_bytes, size - tota_read_bytes, kReadTimeout, status, &error);
if (error.Fail ())
return error;
tota_read_bytes += read_bytes;
}
return error;
size_t read_bytes = m_conn.Read(buffer, size, kReadTimeout, read_full_buffer, status, &error);
if (error.Fail())
return error;
if (read_bytes < size)
return Error("Unable to read full buffer.");
return Error();
}

Error
Expand Down
1 change: 1 addition & 0 deletions lldb/unittests/Host/CMakeLists.txt
@@ -1,5 +1,6 @@
add_lldb_unittest(HostTests
FileSpecTest.cpp
ConnectionFileDescriptorPosixTest.cpp
SocketAddressTest.cpp
SocketTest.cpp
SymbolsTest.cpp
Expand Down
135 changes: 135 additions & 0 deletions lldb/unittests/Host/ConnectionFileDescriptorPosixTest.cpp
@@ -0,0 +1,135 @@
//===-- ConnectionFileDescriptorPosixTest.cpp -------------------*- C++ -*-===//
//
// The LLVM Compiler Infrastructure
//
// This file is distributed under the University of Illinois Open Source
// License. See LICENSE.TXT for details.
//
//===----------------------------------------------------------------------===//

#if defined(_MSC_VER) && (_HAS_EXCEPTIONS == 0)
// Workaround for MSVC standard library bug, which fails to include <thread> when
// exceptions are disabled.
#include <eh.h>
#endif

#include "gtest/gtest.h"

#include "SocketUtil.h"

#include "lldb/Host/ConnectionFileDescriptor.h"

using namespace lldb_private;
using namespace lldb;

class ConnectionFileDescriptorPosixTest : public testing::Test
{
public:
void
SetUp() override
{
#if defined(_MSC_VER)
WSADATA data;
::WSAStartup(MAKEWORD(2, 2), &data);
#endif
}

void
TearDown() override
{
#if defined(_MSC_VER)
::WSACleanup();
#endif
}
};

TEST_F(ConnectionFileDescriptorPosixTest, ReadAll)
{
const bool read_full_buffer = true;

std::unique_ptr<TCPSocket> socket_a_up;
std::unique_ptr<TCPSocket> socket_b_up;
std::tie(socket_a_up, socket_b_up) = CreateConnectedTCPSockets();

ConnectionFileDescriptor connection_a(socket_a_up.release());

// First, make sure Read returns nothing.
const auto k_reasonable_timeout_us = 10 * 1000;
char buffer[100];
ConnectionStatus status;
Error error;
size_t bytes_read =
connection_a.Read(buffer, sizeof buffer, k_reasonable_timeout_us, read_full_buffer, status, &error);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(eConnectionStatusTimedOut, status);
ASSERT_EQ(0u, bytes_read);

// Write some data, and make sure it arrives.
const char data[] = {1, 2, 3, 4};
size_t bytes_written = sizeof data;
error = socket_b_up->Write(data, bytes_written);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(sizeof data, bytes_written);
bytes_read = connection_a.Read(buffer, sizeof data, k_reasonable_timeout_us, read_full_buffer, status, &error);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(eConnectionStatusSuccess, status);
ASSERT_EQ(sizeof data, bytes_read);
ASSERT_EQ(0, memcmp(buffer, data, sizeof data));
memset(buffer, 0, sizeof buffer);

// Write the data in two chunks. Make sure we read all of it.
std::future<Error> future_error = std::async(std::launch::async, [&socket_b_up, data]() {
size_t bytes_written = sizeof(data) / 2;
Error error = socket_b_up->Write(data, bytes_written);
if (error.Fail())
return error;
std::this_thread::sleep_for(std::chrono::microseconds(k_reasonable_timeout_us / 10));
bytes_written = sizeof(data) / 2;
return socket_b_up->Write(data + bytes_written, bytes_written);
});
bytes_read = connection_a.Read(buffer, sizeof data, k_reasonable_timeout_us, read_full_buffer, status, &error);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(eConnectionStatusSuccess, status);
ASSERT_EQ(sizeof data, bytes_read);
ASSERT_TRUE(future_error.get().Success()) << future_error.get().AsCString();
ASSERT_EQ(0, memcmp(buffer, data, sizeof data));

// Close the remote end, make sure Read result is reasonable.
socket_b_up.reset();
bytes_read = connection_a.Read(buffer, sizeof buffer, k_reasonable_timeout_us, read_full_buffer, status, &error);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(eConnectionStatusEndOfFile, status);
ASSERT_EQ(0u, bytes_read);
}

TEST_F(ConnectionFileDescriptorPosixTest, Read)
{
const bool read_full_buffer = false;

std::unique_ptr<TCPSocket> socket_a_up;
std::unique_ptr<TCPSocket> socket_b_up;
std::tie(socket_a_up, socket_b_up) = CreateConnectedTCPSockets();

ConnectionFileDescriptor connection_a(socket_a_up.release());

const uint32_t k_very_large_timeout_us = 10 * 1000 * 1000;
char buffer[100];
ConnectionStatus status;
Error error;

// Write some data (but not a full buffer). Make sure it arrives, and we do not wait too long.
const char data[] = {1, 2, 3, 4};
size_t bytes_written = sizeof data;
error = socket_b_up->Write(data, bytes_written);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(sizeof data, bytes_written);

const auto start = std::chrono::steady_clock::now();
size_t bytes_read =
connection_a.Read(buffer, sizeof buffer, k_very_large_timeout_us, read_full_buffer, status, &error);
ASSERT_TRUE(error.Success()) << error.AsCString();
ASSERT_EQ(eConnectionStatusSuccess, status);
ASSERT_EQ(sizeof data, bytes_read);
ASSERT_EQ(0, memcmp(buffer, data, sizeof data));
ASSERT_LT(std::chrono::steady_clock::now(), start + std::chrono::microseconds(k_very_large_timeout_us / 10));
}

0 comments on commit 2407602

Please sign in to comment.