Skip to content

Commit

Permalink
Use TranscoderInputStream to reduce confusion around ByteCount() (env…
Browse files Browse the repository at this point in the history
…oyproxy#225)

* Add TranscoderInputStream to reduce confusion

* fix_format
  • Loading branch information
lizan committed Apr 7, 2017
1 parent f71ef23 commit 987223f
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 97 deletions.
14 changes: 14 additions & 0 deletions contrib/endpoints/src/grpc/transcoding/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ cc_library(
"message_stream.h",
],
deps = [
":transcoder_input_stream",
"//external:protobuf",
],
)
Expand Down Expand Up @@ -125,6 +126,7 @@ cc_library(
"message_reader.h",
],
deps = [
":transcoder_input_stream",
"//external:protobuf",
],
)
Expand All @@ -144,6 +146,17 @@ cc_library(
],
)

cc_library(
name = "transcoder_input_stream",
srcs = [
"transcoder_input_stream.h",
],
visibility = ["//visibility:public"],
deps = [
"@protobuf_git//:protobuf",
],
)

cc_library(
name = "transcoding",
srcs = [
Expand Down Expand Up @@ -223,6 +236,7 @@ cc_library(
srcs = ["test_common.cc"],
hdrs = ["test_common.h"],
deps = [
":transcoder_input_stream",
"//external:googletest",
"//external:protobuf",
"//external:service_config",
Expand Down
10 changes: 3 additions & 7 deletions contrib/endpoints/src/grpc/transcoding/message_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <memory>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"

namespace google {
Expand All @@ -29,7 +28,7 @@ namespace transcoding {
namespace pb = ::google::protobuf;
namespace pbio = ::google::protobuf::io;

MessageReader::MessageReader(pbio::ZeroCopyInputStream* in)
MessageReader::MessageReader(TranscoderInputStream* in)
: in_(in),
current_message_size_(0),
have_current_message_size_(false),
Expand Down Expand Up @@ -99,7 +98,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
// Check if we have the current message size. If not try to read it.
if (!have_current_message_size_) {
const size_t kDelimiterSize = 5;
if (in_->ByteCount() < static_cast<pb::int64>(kDelimiterSize)) {
if (in_->BytesAvailable() < static_cast<pb::int64>(kDelimiterSize)) {
// We don't have 5 bytes available to read the length of the message.
// Find out whether the stream is finished and return false.
finished_ = IsStreamFinished(in_);
Expand All @@ -117,10 +116,7 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
have_current_message_size_ = true;
}

// We interpret ZeroCopyInputStream::ByteCount() as the number of bytes
// available for reading at the moment. Check if we have the full message
// available to read.
if (in_->ByteCount() < static_cast<pb::int64>(current_message_size_)) {
if (in_->BytesAvailable() < static_cast<pb::int64>(current_message_size_)) {
// We don't have a full message
return std::unique_ptr<pbio::ZeroCopyInputStream>();
}
Expand Down
11 changes: 3 additions & 8 deletions contrib/endpoints/src/grpc/transcoding/message_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <memory>

#include "google/protobuf/io/zero_copy_stream.h"
#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
#include "google/protobuf/stubs/status.h"

namespace google {
Expand Down Expand Up @@ -49,11 +49,6 @@ namespace transcoding {
// }
// }
//
// NOTE: MesssageReader assumes that ZeroCopyInputStream::ByteCount() returns
// the number of bytes available to read at the moment. That's what
// MessageReader uses to determine whether there is a complete message
// available or not.
//
// NOTE: MessageReader is unable to recognize the case when there is an
// incomplete message at the end of the input. The callers will need to
// detect it and act appropriately.
Expand All @@ -64,7 +59,7 @@ namespace transcoding {
//
class MessageReader {
public:
MessageReader(::google::protobuf::io::ZeroCopyInputStream* in);
MessageReader(TranscoderInputStream* in);

// If a full message is available, NextMessage() returns a ZeroCopyInputStream
// over the message. Otherwise returns nullptr - this might be temporary, the
Expand All @@ -82,7 +77,7 @@ class MessageReader {
bool Finished() const { return finished_; }

private:
::google::protobuf::io::ZeroCopyInputStream* in_;
TranscoderInputStream* in_;
// The size of the current message.
unsigned int current_message_size_;
// Whether we have read the current message size or not
Expand Down
30 changes: 12 additions & 18 deletions contrib/endpoints/src/grpc/transcoding/message_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <string>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"

namespace google {
Expand All @@ -32,12 +31,12 @@ namespace pbio = ::google::protobuf::io;
namespace {

// a ZeroCopyInputStream implementation over a MessageStream implementation
class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {
class InputStreamOverMessageStream : public TranscoderInputStream {
public:
// src - the underlying MessageStream. ZeroCopyStreamOverMessageStream doesn't
// src - the underlying MessageStream. InputStreamOverMessageStream doesn't
// maintain the ownership of src, the caller must make sure it exists
// throughtout the lifetime of ZeroCopyStreamOverMessageStream.
ZeroCopyStreamOverMessageStream(MessageStream* src)
// throughtout the lifetime of InputStreamOverMessageStream.
InputStreamOverMessageStream(MessageStream* src)
: src_(src), message_(), position_(0) {}

// ZeroCopyInputStream implementation
Expand Down Expand Up @@ -72,19 +71,15 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {

bool Skip(int) { return false; } // Not implemented (no need)

::google::protobuf::int64 ByteCount() const {
// NOTE: we are changing the ByteCount() interpretation. In our case
// ByteCount() returns the number of bytes available for reading at this
// moment. In the original interpretation it is supposed to be the number
// of bytes read so far.
// We need this such that the consumers are able to read the gRPC delimited
// message stream only if there is a full message available.
google::protobuf::int64 ByteCount() const { return 0; } // Not implemented

int64_t BytesAvailable() const {
if (position_ >= message_.size()) {
// If the current message is all done, try to read the next message
// to make sure we return the correct byte count.
const_cast<ZeroCopyStreamOverMessageStream*>(this)->ReadNextMessage();
const_cast<InputStreamOverMessageStream*>(this)->ReadNextMessage();
}
return static_cast<::google::protobuf::int64>(message_.size() - position_);
return static_cast<int64_t>(message_.size() - position_);
}

private:
Expand All @@ -109,10 +104,9 @@ class ZeroCopyStreamOverMessageStream : public pbio::ZeroCopyInputStream {

} // namespace

std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
MessageStream::CreateZeroCopyInputStream() {
return std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>(
new ZeroCopyStreamOverMessageStream(this));
std::unique_ptr<TranscoderInputStream> MessageStream::CreateInputStream() {
return std::unique_ptr<TranscoderInputStream>(
new InputStreamOverMessageStream(this));
}

} // namespace transcoding
Expand Down
4 changes: 2 additions & 2 deletions contrib/endpoints/src/grpc/transcoding/message_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <string>

#include "contrib/endpoints/src/grpc/transcoding/transcoder_input_stream.h"
#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/stubs/status.h"

Expand Down Expand Up @@ -73,8 +74,7 @@ class MessageStream {
// Virtual destructor
virtual ~MessageStream() {}
// Creates ZeroCopyInputStream implementation based on this stream
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream>
CreateZeroCopyInputStream();
std::unique_ptr<TranscoderInputStream> CreateInputStream();
};

} // namespace transcoding
Expand Down
58 changes: 29 additions & 29 deletions contrib/endpoints/src/grpc/transcoding/message_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {

bool Test(const Messages& messages) {
TestMessageStream test_message_stream;
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
auto input_stream = test_message_stream.CreateInputStream();

const void* data = nullptr;
int size = 0;

// Check that Next() returns true and a 0-sized buffer meaning that
// nothing is available at the moment.
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -91,13 +91,13 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
test_message_stream.AddMessage(message);

// message.size() bytes must be available for reading
if (static_cast<int>(message.size()) != zero_copy_stream->ByteCount()) {
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
if (static_cast<int>(message.size()) != input_stream->BytesAvailable()) {
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
return false;
}

// Now try to read & match the message
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -120,16 +120,16 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
// Not a valid test case
continue;
}
zero_copy_stream->BackUp(backup_size);
input_stream->BackUp(backup_size);

// backup_size bytes must be available for reading again
if (static_cast<int>(backup_size) != zero_copy_stream->ByteCount()) {
EXPECT_EQ(message.size(), zero_copy_stream->ByteCount());
if (static_cast<int>(backup_size) != input_stream->BytesAvailable()) {
EXPECT_EQ(message.size(), input_stream->BytesAvailable());
return false;
}

// Now Next() must return the backed up data again.
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -143,7 +143,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
}

// At this point no data should be available
if (!zero_copy_stream->Next(&data, &size)) {
if (!input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream finished unexpectedly" << std::endl;
return false;
}
Expand All @@ -156,7 +156,7 @@ class ZeroCopyInputStreamOverMessageStreamTest : public ::testing::Test {
// Now finish the MessageStream & make sure the ZeroCopyInputStream has
// ended.
test_message_stream.Finish();
if (zero_copy_stream->Next(&data, &size)) {
if (input_stream->Next(&data, &size)) {
ADD_FAILURE() << "The stream still hasn't finished" << std::endl;
return false;
}
Expand Down Expand Up @@ -201,14 +201,14 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DifferenteSizesOneStream) {

TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
TestMessageStream test_message_stream;
auto zero_copy_stream = test_message_stream.CreateZeroCopyInputStream();
auto input_stream = test_message_stream.CreateInputStream();

const void* data = nullptr;
int size = 0;

// Check that Next() returns true and a 0-sized buffer meaning that
// nothing is available at the moment.
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(0, size);

// Test messages
Expand All @@ -221,37 +221,37 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
test_message_stream.AddMessage(message1);

// message1 is available for reading
EXPECT_EQ(message1.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message1.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message1, std::string(reinterpret_cast<const char*>(data), size));

// Back up a bit
zero_copy_stream->BackUp(5);
input_stream->BackUp(5);

// Now read the backed up data again
EXPECT_EQ(5, zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(5, input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message1.substr(message1.size() - 5),
std::string(reinterpret_cast<const char*>(data), size));

// Add message2 to the MessageStream
test_message_stream.AddMessage(message2);

// message2 is available for reading
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));

// Back up all of message2
zero_copy_stream->BackUp(message2.size());
input_stream->BackUp(message2.size());

// Now read message2 again
EXPECT_EQ(message2.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message2.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message2, std::string(reinterpret_cast<const char*>(data), size));

// At this point no data should be available
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(0, size);

// Add both message3 & message4 & finish the MessageStream afterwards
Expand All @@ -260,16 +260,16 @@ TEST_F(ZeroCopyInputStreamOverMessageStreamTest, DirectTest) {
test_message_stream.Finish();

// Read & match both message3 & message4
EXPECT_EQ(message3.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message3.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message3, std::string(reinterpret_cast<const char*>(data), size));

EXPECT_EQ(message4.size(), zero_copy_stream->ByteCount());
EXPECT_TRUE(zero_copy_stream->Next(&data, &size));
EXPECT_EQ(message4.size(), input_stream->BytesAvailable());
EXPECT_TRUE(input_stream->Next(&data, &size));
EXPECT_EQ(message4, std::string(reinterpret_cast<const char*>(data), size));

// All done!
EXPECT_FALSE(zero_copy_stream->Next(&data, &size));
EXPECT_FALSE(input_stream->Next(&data, &size));
}

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <string>

#include "google/protobuf/io/zero_copy_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
#include "google/protobuf/stubs/status.h"
#include "google/protobuf/util/json_util.h"
Expand All @@ -31,7 +30,7 @@ namespace transcoding {

ResponseToJsonTranslator::ResponseToJsonTranslator(
::google::protobuf::util::TypeResolver* type_resolver, std::string type_url,
bool streaming, ::google::protobuf::io::ZeroCopyInputStream* in)
bool streaming, TranscoderInputStream* in)
: type_resolver_(type_resolver),
type_url_(std::move(type_url)),
streaming_(streaming),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class ResponseToJsonTranslator : public MessageStream {
// format (http://www.grpc.io/docs/guides/wire.html)
ResponseToJsonTranslator(
::google::protobuf::util::TypeResolver* type_resolver,
std::string type_url, bool streaming,
::google::protobuf::io::ZeroCopyInputStream* in);
std::string type_url, bool streaming, TranscoderInputStream* in);

// MessageStream implementation
bool NextMessage(std::string* message);
Expand Down
2 changes: 1 addition & 1 deletion contrib/endpoints/src/grpc/transcoding/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void TestZeroCopyInputStream::BackUp(int count) {
position_ -= count;
}

pb::int64 TestZeroCopyInputStream::ByteCount() const {
int64_t TestZeroCopyInputStream::BytesAvailable() const {
auto total = current_.size() - position_;
for (auto chunk : chunks_) {
total += chunk.size();
Expand Down
Loading

0 comments on commit 987223f

Please sign in to comment.