Skip to content

Commit

Permalink
Implement gRPC transport (envoyproxy#16)
Browse files Browse the repository at this point in the history
* Add grpc transport implementation.

* Move GrpcStream to cc file.

* Add link flags -lm -lthread

* add -lrt flag
  • Loading branch information
qiwzhang committed Jan 25, 2017
1 parent d55ffb5 commit e6d25ab
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 6 deletions.
18 changes: 18 additions & 0 deletions mixerclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ cc_library(
"src/check_cache.h",
"src/client_impl.cc",
"src/client_impl.h",
"src/grpc_transport.cc",
"src/grpc_transport.h",
"src/signature.cc",
"src/signature.h",
"src/stream_transport.h",
Expand Down Expand Up @@ -128,3 +130,19 @@ cc_test(
"//external:googletest_main",
],
)

cc_test(
name = "grpc_transport_test",
size = "small",
srcs = ["src/grpc_transport_test.cc"],
linkopts = [
"-lm",
"-lpthread",
"-lrt",
],
linkstatic = 1,
deps = [
":mixer_client_lib",
"//external:googletest_main",
],
)
4 changes: 2 additions & 2 deletions mixerclient/include/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class WriteInterface {
// Write a request message.
virtual void Write(const RequestType &) = 0;
// Half close the write direction.
virtual void WriteDone() = 0;
virtual void WritesDone() = 0;
// If true, write direction is closed.
virtual bool is_write_closed() = 0;
virtual bool is_write_closed() const = 0;
};

// A stream read interface implemented by Mixer client
Expand Down
4 changes: 2 additions & 2 deletions mixerclient/src/client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class WriterImpl : public WriteInterface<RequestType> {
}
reader->OnClose(done_status);
}
void WriteDone() {}
bool is_write_closed() { return true; }
void WritesDone() {}
bool is_write_closed() const { return true; }

ReadInterface<ResponseType>* reader;
Status done_status;
Expand Down
121 changes: 121 additions & 0 deletions mixerclient/src/grpc_transport.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/grpc_transport.h"
#include <thread>

namespace istio {
namespace mixer_client {
namespace {

// A gRPC stream
template <class RequestType, class ResponseType>
class GrpcStream final : public WriteInterface<RequestType> {
public:
typedef std::unique_ptr<
::grpc::ClientReaderWriterInterface<RequestType, ResponseType>>
StreamPtr;
typedef std::function<StreamPtr(::grpc::ClientContext&)> StreamNewFunc;

GrpcStream(ReadInterface<ResponseType>* reader, StreamNewFunc create_func)
: reader_(reader), write_closed_(false) {
stream_ = create_func(context_);
worker_thread_ = std::thread([this]() { WorkerThread(); });
}

~GrpcStream() { worker_thread_.join(); }

void Write(const RequestType& request) override {
if (!stream_->Write(request)) {
WritesDone();
}
}

void WritesDone() override {
stream_->WritesDone();
write_closed_ = true;
}

bool is_write_closed() const override { return write_closed_; }

private:
// The worker loop to read response messages.
void WorkerThread() {
ResponseType response;
while (stream_->Read(&response)) {
reader_->OnRead(response);
}
::grpc::Status status = stream_->Finish();
// Convert grpc status to protobuf status.
::google::protobuf::util::Status pb_status(
::google::protobuf::util::error::Code(status.error_code()),
::google::protobuf::StringPiece(status.error_message()));
reader_->OnClose(pb_status);
}

// The client context.
::grpc::ClientContext context_;
// The reader writer stream.
StreamPtr stream_;
// The thread to read response.
std::thread worker_thread_;
// The reader interface from caller.
ReadInterface<ResponseType>* reader_;
// Indicates if write is closed.
bool write_closed_;
};

typedef GrpcStream<::istio::mixer::v1::CheckRequest,
::istio::mixer::v1::CheckResponse>
CheckGrpcStream;
typedef GrpcStream<::istio::mixer::v1::ReportRequest,
::istio::mixer::v1::ReportResponse>
ReportGrpcStream;
typedef GrpcStream<::istio::mixer::v1::QuotaRequest,
::istio::mixer::v1::QuotaResponse>
QuotaGrpcStream;

} // namespace

GrpcTransport::GrpcTransport(const std::string& mixer_server) {
channel_ = CreateChannel(mixer_server, ::grpc::InsecureChannelCredentials());
stub_ = ::istio::mixer::v1::Mixer::NewStub(channel_);
}

CheckWriterPtr GrpcTransport::NewStream(CheckReaderRawPtr reader) {
return CheckWriterPtr(new CheckGrpcStream(
reader,
[this](::grpc::ClientContext& context) -> CheckGrpcStream::StreamPtr {
return stub_->Check(&context);
}));
}

ReportWriterPtr GrpcTransport::NewStream(ReportReaderRawPtr reader) {
return ReportWriterPtr(new ReportGrpcStream(
reader,
[this](::grpc::ClientContext& context) -> ReportGrpcStream::StreamPtr {
return stub_->Report(&context);
}));
}

QuotaWriterPtr GrpcTransport::NewStream(QuotaReaderRawPtr reader) {
return QuotaWriterPtr(new QuotaGrpcStream(
reader,
[this](::grpc::ClientContext& context) -> QuotaGrpcStream::StreamPtr {
return stub_->Quota(&context);
}));
}

} // namespace mixer_client
} // namespace istio
43 changes: 43 additions & 0 deletions mixerclient/src/grpc_transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MIXERCLIENT_SRC_TRANSPORT_H
#define MIXERCLIENT_SRC_TRANSPORT_H

#include <grpc++/grpc++.h>

#include "include/transport.h"
#include "mixer/api/v1/service.grpc.pb.h"

namespace istio {
namespace mixer_client {

// A gRPC implementation of Mixer transport
class GrpcTransport : public TransportInterface {
public:
GrpcTransport(const std::string& mixer_server);

CheckWriterPtr NewStream(CheckReaderRawPtr reader);
ReportWriterPtr NewStream(ReportReaderRawPtr reader);
QuotaWriterPtr NewStream(QuotaReaderRawPtr reader);

private:
std::shared_ptr<::grpc::Channel> channel_;
std::unique_ptr<::istio::mixer::v1::Mixer::Stub> stub_;
};

} // namespace mixer_client
} // namespace istio

#endif // MIXERCLIENT_SRC_TRANSPORT_H
148 changes: 148 additions & 0 deletions mixerclient/src/grpc_transport_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "src/grpc_transport.h"

#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include "gmock/gmock.h"
#include "gtest/gtest.h"

#include <future>

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReaderWriter;

using ::istio::mixer::v1::CheckRequest;
using ::istio::mixer::v1::CheckResponse;
using ::istio::mixer::v1::ReportRequest;
using ::istio::mixer::v1::ReportResponse;
using ::istio::mixer::v1::QuotaRequest;
using ::istio::mixer::v1::QuotaResponse;
using ::google::protobuf::util::Status;

using ::testing::Invoke;
using ::testing::_;

namespace istio {
namespace mixer_client {

// A fake Mixer gRPC server: just echo a response for each request.
class MockMixerServerImpl final : public ::istio::mixer::v1::Mixer::Service {
public:
grpc::Status Check(
ServerContext* context,
ServerReaderWriter<CheckResponse, CheckRequest>* stream) override {
CheckRequest request;
while (stream->Read(&request)) {
CheckResponse response;
// Just echo it back with same request_index.
response.set_request_index(request.request_index());
stream->Write(response);
}
return grpc::Status::OK;
}
grpc::Status Report(
ServerContext* context,
ServerReaderWriter<ReportResponse, ReportRequest>* stream) override {
ReportRequest request;
while (stream->Read(&request)) {
ReportResponse response;
// Just echo it back with same request_index.
response.set_request_index(request.request_index());
stream->Write(response);
}
return grpc::Status::OK;
}
grpc::Status Quota(
ServerContext* context,
ServerReaderWriter<QuotaResponse, QuotaRequest>* stream) override {
QuotaRequest request;
while (stream->Read(&request)) {
QuotaResponse response;
// Just echo it back with same request_index.
response.set_request_index(request.request_index());
stream->Write(response);
}
return grpc::Status::OK;
}
};

template <class T>
class MockReader : public ReadInterface<T> {
public:
MOCK_METHOD1_T(OnRead, void(const T&));
MOCK_METHOD1_T(OnClose, void(const Status&));
};

class GrpcTransportTest : public ::testing::Test {
public:
void SetUp() {
// TODO: pick a un-used port. If this port is used, the test will fail.
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();

grpc_transport_.reset(new GrpcTransport(server_address));
}

void TearDown() { server_->Shutdown(); }

// server side
MockMixerServerImpl service_;
std::unique_ptr<Server> server_;

// client side
std::unique_ptr<GrpcTransport> grpc_transport_;
};

TEST_F(GrpcTransportTest, TestSuccessCheck) {
MockReader<CheckResponse> mock_reader;
auto writer = grpc_transport_->NewStream(&mock_reader);

CheckResponse response;
EXPECT_CALL(mock_reader, OnRead(_))
.WillOnce(Invoke([&response](const CheckResponse& r) { response = r; }));

std::promise<Status> status_promise;
std::future<Status> status_future = status_promise.get_future();
EXPECT_CALL(mock_reader, OnClose(_))
.WillOnce(Invoke([&status_promise](Status status) {
std::promise<Status> moved_promise(std::move(status_promise));
moved_promise.set_value(status);
}));

CheckRequest request;
request.set_request_index(111);
writer->Write(request);

// Close the stream
writer->WritesDone();

// Wait for OnClose() to be called.
status_future.wait();
EXPECT_TRUE(status_future.get().ok());
EXPECT_EQ(response.request_index(), request.request_index());
}

} // namespace mixer_client
} // namespace istio
4 changes: 2 additions & 2 deletions mixerclient/src/stream_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ template <class T>
class MockWriter : public WriteInterface<T> {
public:
MOCK_METHOD1_T(Write, void(const T&));
MOCK_METHOD0_T(WriteDone, void());
MOCK_METHOD0_T(is_write_closed, bool());
MOCK_METHOD0_T(WritesDone, void());
MOCK_CONST_METHOD0_T(is_write_closed, bool());
};

template <class T>
Expand Down

0 comments on commit e6d25ab

Please sign in to comment.