diff --git a/mixerclient/BUILD b/mixerclient/BUILD index 37c7ff7ae855..a67bb5fcf6ae 100644 --- a/mixerclient/BUILD +++ b/mixerclient/BUILD @@ -19,11 +19,15 @@ load("@protobuf_git//:protobuf.bzl", "cc_proto_library") cc_library( name = "mixer_client_lib", srcs = [ + "src/attribute_context.cc", + "src/attribute_context.h", + "src/attribute_converter.h", "src/client_impl.cc", "src/client_impl.h", - "src/stream_transport.h", "src/signature.cc", "src/signature.h", + "src/stream_transport.h", + "src/transport.h", "utils/md5.cc", "utils/md5.h", ], @@ -104,4 +108,3 @@ cc_test( "//external:googletest_main", ], ) - diff --git a/mixerclient/src/attribute_context.cc b/mixerclient/src/attribute_context.cc new file mode 100644 index 000000000000..08f4411cddc1 --- /dev/null +++ b/mixerclient/src/attribute_context.cc @@ -0,0 +1,100 @@ +/* Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "src/attribute_context.h" +#include "google/protobuf/timestamp.pb.h" + +using ::google::protobuf::Map; +using ::google::protobuf::Timestamp; + +namespace istio { +namespace mixer_client { +namespace { + +// TODO: add code to build context to reduce attributes. +// Only check these attributes to build context. +std::set kContextSet = {"serviceName", "peerId", "location", + "apiName", "apiVersion"}; + +// Convert timestamp from time_point to Timestamp +Timestamp CreateTimestamp(std::chrono::system_clock::time_point tp) { + Timestamp time_stamp; + long long nanos = std::chrono::duration_cast( + tp.time_since_epoch()) + .count(); + + time_stamp.set_seconds(nanos / 1000000000); + time_stamp.set_nanos(nanos % 1000000000); + return time_stamp; +} + +} // namespace + +void AttributeContext::FillProto(const Attributes& attributes, + ::istio::mixer::v1::Attributes* pb) { + // TODO build context use kContextSet to reduce attributes. + + // Fill attributes. + int next_dict_index = dict_map_.size(); + bool dict_changed = false; + for (const auto& it : attributes.attributes) { + const std::string& name = it.first; + + // Find index for the name. + int index; + const auto& dict_it = dict_map_.find(name); + if (dict_it == dict_map_.end()) { + dict_changed = true; + index = ++next_dict_index; + // Assume attribute names are a fixed name set. + // so not need to remove names from dictionary. + dict_map_[name] = index; + } else { + index = dict_it->second; + } + + // Fill the attribute to proper map. + switch (it.second.type) { + case Attributes::Value::ValueType::STRING: + (*pb->mutable_string_attributes())[index] = it.second.str_v; + break; + case Attributes::Value::ValueType::BYTES: + (*pb->mutable_bytes_attributes())[index] = it.second.str_v; + break; + case Attributes::Value::ValueType::INT64: + (*pb->mutable_int64_attributes())[index] = it.second.value.int64_v; + break; + case Attributes::Value::ValueType::DOUBLE: + (*pb->mutable_double_attributes())[index] = it.second.value.double_v; + break; + case Attributes::Value::ValueType::BOOL: + (*pb->mutable_bool_attributes())[index] = it.second.value.bool_v; + break; + case Attributes::Value::ValueType::TIME: + (*pb->mutable_timestamp_attributes())[index] = + CreateTimestamp(it.second.time_v); + break; + } + } + + if (dict_changed) { + Map* dict = pb->mutable_dictionary(); + for (const auto& it : dict_map_) { + (*dict)[it.second] = it.first; + } + } +} + +} // namespace mixer_client +} // namespace istio diff --git a/mixerclient/src/attribute_context.h b/mixerclient/src/attribute_context.h new file mode 100644 index 000000000000..2c94672885a6 --- /dev/null +++ b/mixerclient/src/attribute_context.h @@ -0,0 +1,46 @@ +/* Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MIXERCLIENT_ATTRIBUTE_CONTEXT_H +#define MIXERCLIENT_ATTRIBUTE_CONTEXT_H + +#include "include/client.h" + +namespace istio { +namespace mixer_client { + +// A class to manage dictionary and context. +class AttributeContext { + public: + AttributeContext() : request_index_(0) {} + + // Fill Attributes protobuf. + void FillProto(const Attributes &attributes, + ::istio::mixer::v1::Attributes *pb); + // Increments request_index + int64_t IncRequestIndex() { return ++request_index_; } + + private: + // dictionary map. + std::map dict_map_; + + // The request_index for this context. + int64_t request_index_; +}; + +} // namespace mixer_client +} // namespace istio + +#endif // MIXERCLIENT_ATTRIBUTE_CONTEXT_H diff --git a/mixerclient/src/attribute_converter.h b/mixerclient/src/attribute_converter.h new file mode 100644 index 000000000000..7a2077f57a2e --- /dev/null +++ b/mixerclient/src/attribute_converter.h @@ -0,0 +1,47 @@ +/* Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MIXERCLIENT_ATTRIBUTE_CONVERTER_H +#define MIXERCLIENT_ATTRIBUTE_CONVERTER_H + +#include "include/client.h" + +namespace istio { +namespace mixer_client { + +// Underlying stream ID. +typedef int64_t StreamID; + +// An interface to convert from struct to protobuf. +// It is called by StreamTransport after picking a stream to use. +// It will be implemented by Transport with correct attribute +// context. +template +class AttributeConverter { + public: + // virtual destructor + virtual ~AttributeConverter() {} + + // Convert attributes from struct to protobuf + // It requires an attribute context. A different stream_id + // requires a new attribute context. + virtual void FillProto(StreamID stream_id, const Attributes& attributes, + RequestType* request) = 0; +}; + +} // namespace mixer_client +} // namespace istio + +#endif // MIXERCLIENT_ATTRIBUTE_CONVERTER_H diff --git a/mixerclient/src/client_impl.cc b/mixerclient/src/client_impl.cc index fc5650e022c2..e69ebafdb9f9 100644 --- a/mixerclient/src/client_impl.cc +++ b/mixerclient/src/client_impl.cc @@ -37,21 +37,18 @@ MixerClientImpl::MixerClientImpl(const MixerClientOptions &options) MixerClientImpl::~MixerClientImpl() {} void MixerClientImpl::Check(const Attributes &attributes, DoneFunc on_done) { - CheckRequest request; CheckResponse response; - check_transport_.Call(request, &response, on_done); + check_transport_.Send(attributes, &response, on_done); } void MixerClientImpl::Report(const Attributes &attributes, DoneFunc on_done) { - ReportRequest request; ReportResponse response; - report_transport_.Call(request, &response, on_done); + report_transport_.Send(attributes, &response, on_done); } void MixerClientImpl::Quota(const Attributes &attributes, DoneFunc on_done) { - QuotaRequest request; QuotaResponse response; - quota_transport_.Call(request, &response, on_done); + quota_transport_.Send(attributes, &response, on_done); } // Creates a MixerClient object. diff --git a/mixerclient/src/client_impl.h b/mixerclient/src/client_impl.h index 9cd2f803a523..f936e9a54604 100644 --- a/mixerclient/src/client_impl.h +++ b/mixerclient/src/client_impl.h @@ -17,7 +17,7 @@ #define MIXERCLIENT_CLIENT_IMPL_H #include "include/client.h" -#include "src/stream_transport.h" +#include "src/transport.h" namespace istio { namespace mixer_client { @@ -36,14 +36,12 @@ class MixerClientImpl : public MixerClient { private: MixerClientOptions options_; - StreamTransport<::istio::mixer::v1::CheckRequest, - ::istio::mixer::v1::CheckResponse> + Transport<::istio::mixer::v1::CheckRequest, ::istio::mixer::v1::CheckResponse> check_transport_; - StreamTransport<::istio::mixer::v1::ReportRequest, - ::istio::mixer::v1::ReportResponse> + Transport<::istio::mixer::v1::ReportRequest, + ::istio::mixer::v1::ReportResponse> report_transport_; - StreamTransport<::istio::mixer::v1::QuotaRequest, - ::istio::mixer::v1::QuotaResponse> + Transport<::istio::mixer::v1::QuotaRequest, ::istio::mixer::v1::QuotaResponse> quota_transport_; GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(MixerClientImpl); diff --git a/mixerclient/src/stream_transport.h b/mixerclient/src/stream_transport.h index e264df84dd2d..bd38e6636451 100644 --- a/mixerclient/src/stream_transport.h +++ b/mixerclient/src/stream_transport.h @@ -17,6 +17,7 @@ #define MIXERCLIENT_STREAM_TRANSPORT_H #include "include/client.h" +#include "src/attribute_converter.h" namespace istio { namespace mixer_client { @@ -89,11 +90,15 @@ class ReaderImpl : public ReadInterface { template class StreamTransport { public: - StreamTransport(TransportInterface* transport) - : transport_(transport), reader_(nullptr), writer_(nullptr) {} + StreamTransport(TransportInterface* transport, + AttributeConverter* converter) + : transport_(transport), + converter_(converter), + reader_(nullptr), + writer_(nullptr) {} // Make a ping-pong call. - void Call(const RequestType& request, ResponseType* response, + void Call(const Attributes& attributes, ResponseType* response, DoneFunc on_done) { if (transport_ == nullptr) { on_done(::google::protobuf::util::Status( @@ -119,6 +124,10 @@ class StreamTransport { delete writer; }); } + RequestType request; + // Cast the writer_ raw pointer as StreamID. + converter_->FillProto(reinterpret_cast(writer_), attributes, + &request); reader_->AddRequest(request.request_index(), response, on_done); writer_->Write(request); } @@ -126,6 +135,8 @@ class StreamTransport { private: // The transport interface to create a new stream. TransportInterface* transport_; + // Attribute converter. + AttributeConverter* converter_; // The reader object for current stream. ReaderImpl* reader_; // The writer object for current stream. diff --git a/mixerclient/src/stream_transport_test.cc b/mixerclient/src/stream_transport_test.cc index 6f86c2e09e3e..c846627f5245 100644 --- a/mixerclient/src/stream_transport_test.cc +++ b/mixerclient/src/stream_transport_test.cc @@ -32,6 +32,9 @@ using ::testing::_; namespace istio { namespace mixer_client { +const int64_t kRequestIndex1 = 111; +const int64_t kRequestIndex2 = 222; + class MockTransport : public TransportInterface { public: MOCK_METHOD1(NewStream, CheckWriterPtr(CheckReaderRawPtr)); @@ -47,11 +50,18 @@ class MockWriter : public WriteInterface { MOCK_METHOD0_T(is_write_closed, bool()); }; +template +class MockAttributeConverter : public AttributeConverter { + public: + MOCK_METHOD3_T(FillProto, void(StreamID, const Attributes&, T*)); +}; + class TransportImplTest : public ::testing::Test { public: - TransportImplTest() : stream_(&mock_transport_) {} + TransportImplTest() : stream_(&mock_transport_, &mock_converter_) {} MockTransport mock_transport_; + MockAttributeConverter mock_converter_; StreamTransport stream_; }; @@ -66,22 +76,26 @@ TEST_F(TransportImplTest, TestSingleCheck) { return CheckWriterPtr(writer); })); + EXPECT_CALL(mock_converter_, FillProto(_, _, _)) + .WillOnce(Invoke([](StreamID, const Attributes&, CheckRequest* request) { + request->set_request_index(kRequestIndex1); + })); + CheckRequest request_out; EXPECT_CALL(*writer, Write(_)) .WillOnce( Invoke([&request_out](const CheckRequest& r) { request_out = r; })); - CheckRequest request_in; - request_in.set_request_index(111); + Attributes attributes; CheckResponse response_in; - response_in.set_request_index(request_in.request_index()); + response_in.set_request_index(kRequestIndex1); CheckResponse response_out; Status status_out = Status::UNKNOWN; - stream_.Call(request_in, &response_out, + stream_.Call(attributes, &response_out, [&status_out](Status status) { status_out = status; }); // Write request. - EXPECT_TRUE(MessageDifferencer::Equals(request_in, request_out)); + EXPECT_EQ(request_out.request_index(), kRequestIndex1); // But not response EXPECT_FALSE(status_out.ok()); EXPECT_FALSE(MessageDifferencer::Equals(response_in, response_out)); @@ -105,28 +119,30 @@ TEST_F(TransportImplTest, TestTwoOutOfOrderChecks) { reader = r; return CheckWriterPtr(writer); })); + int64_t index_array[2] = {kRequestIndex1, kRequestIndex2}; + int index_idx = 0; + EXPECT_CALL(mock_converter_, FillProto(_, _, _)) + .WillRepeatedly(Invoke([&index_array, &index_idx]( + StreamID, const Attributes&, CheckRequest* request) { + request->set_request_index(index_array[index_idx++]); + })); + EXPECT_CALL(*writer, Write(_)).Times(2); EXPECT_CALL(*writer, is_write_closed()).WillOnce(Return(false)); - // Send two requests: 1 and 2 - // But OnRead() is called out of order: 2 and 1 - CheckRequest request_in_1; - request_in_1.set_request_index(111); - CheckResponse response_in_1; - response_in_1.set_request_index(request_in_1.request_index()); - - CheckRequest request_in_2; - request_in_2.set_request_index(222); - CheckResponse response_in_2; - response_in_2.set_request_index(request_in_2.request_index()); - + Attributes attributes; CheckResponse response_out_1; - stream_.Call(request_in_1, &response_out_1, [](Status status) {}); + stream_.Call(attributes, &response_out_1, [](Status status) {}); CheckResponse response_out_2; - stream_.Call(request_in_2, &response_out_2, [](Status status) {}); + stream_.Call(attributes, &response_out_2, [](Status status) {}); // Write response in wrong order + CheckResponse response_in_2; + response_in_2.set_request_index(kRequestIndex2); reader->OnRead(response_in_2); + + CheckResponse response_in_1; + response_in_1.set_request_index(kRequestIndex1); reader->OnRead(response_in_1); EXPECT_TRUE(MessageDifferencer::Equals(response_in_1, response_out_1)); @@ -145,14 +161,16 @@ TEST_F(TransportImplTest, TestCheckWithStreamClose) { reader = r; return CheckWriterPtr(writer); })); + EXPECT_CALL(mock_converter_, FillProto(_, _, _)) + .WillOnce(Invoke([](StreamID, const Attributes&, CheckRequest* request) { + request->set_request_index(kRequestIndex1); + })); EXPECT_CALL(*writer, Write(_)).Times(1); - CheckRequest request_in; - request_in.set_request_index(111); - + Attributes attributes; CheckResponse response_out; Status status_out = Status::UNKNOWN; - stream_.Call(request_in, &response_out, + stream_.Call(attributes, &response_out, [&status_out](Status status) { status_out = status; }); // Close the stream @@ -168,6 +186,21 @@ TEST_F(TransportImplTest, TestHalfClose) { CheckReaderRawPtr readers[2]; int idx = 0; + EXPECT_CALL(mock_transport_, NewStream(An())) + .WillRepeatedly(Invoke( + [&writers, &readers, &idx](CheckReaderRawPtr r) -> CheckWriterPtr { + readers[idx] = r; + return CheckWriterPtr(writers[idx++]); + })); + + int64_t index_array[2] = {kRequestIndex1, kRequestIndex2}; + int index_idx = 0; + EXPECT_CALL(mock_converter_, FillProto(_, _, _)) + .WillRepeatedly(Invoke([&index_array, &index_idx]( + StreamID, const Attributes&, CheckRequest* request) { + request->set_request_index(index_array[index_idx++]); + })); + EXPECT_CALL(mock_transport_, NewStream(An())) .WillRepeatedly(Invoke( [&writers, &readers, &idx](CheckReaderRawPtr r) -> CheckWriterPtr { @@ -180,25 +213,21 @@ TEST_F(TransportImplTest, TestHalfClose) { EXPECT_CALL(*writers[1], Write(_)).Times(1); // Send the first request. - CheckRequest request_in_1; - request_in_1.set_request_index(111); - CheckResponse response_in_1; - response_in_1.set_request_index(request_in_1.request_index()); + Attributes attributes; CheckResponse response_out_1; - stream_.Call(request_in_1, &response_out_1, [](Status status) {}); - - // Send the second request - CheckRequest request_in_2; - request_in_2.set_request_index(222); - CheckResponse response_in_2; - response_in_2.set_request_index(request_in_2.request_index()); + stream_.Call(attributes, &response_out_1, [](Status status) {}); CheckResponse response_out_2; - stream_.Call(request_in_2, &response_out_2, [](Status status) {}); + stream_.Call(attributes, &response_out_2, [](Status status) {}); // Write responses + CheckResponse response_in_2; + response_in_2.set_request_index(kRequestIndex2); readers[1]->OnRead(response_in_2); + + CheckResponse response_in_1; + response_in_1.set_request_index(kRequestIndex1); readers[0]->OnRead(response_in_1); EXPECT_TRUE(MessageDifferencer::Equals(response_in_1, response_out_1)); diff --git a/mixerclient/src/transport.h b/mixerclient/src/transport.h new file mode 100644 index 000000000000..c3f73d3b89fb --- /dev/null +++ b/mixerclient/src/transport.h @@ -0,0 +1,63 @@ +/* Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MIXERCLIENT_SRC_TRANSPORT_H +#define MIXERCLIENT_SRC_TRANSPORT_H + +#include "src/attribute_context.h" +#include "src/attribute_converter.h" +#include "src/stream_transport.h" + +namespace istio { +namespace mixer_client { + +// A class to transport the request +template +class Transport : public AttributeConverter { + public: + Transport(TransportInterface* transport) + : stream_(transport, this), last_stream_id_(0) {} + + // Send the attributes + void Send(const Attributes& attributes, ResponseType* response, + DoneFunc on_done) { + stream_.Call(attributes, response, on_done); + } + + // Convert to a protobuf + void FillProto(StreamID stream_id, const Attributes& attributes, + RequestType* request) { + if (stream_id != last_stream_id_) { + attribute_context_.reset(new AttributeContext); + last_stream_id_ = stream_id; + } + attribute_context_->FillProto(attributes, + request->mutable_attribute_update()); + request->set_request_index(attribute_context_->IncRequestIndex()); + } + + private: + // A stream transport + StreamTransport stream_; + // The attribute context for sending attributes + std::unique_ptr attribute_context_; + // Last used underlying stream id; + StreamID last_stream_id_; +}; + +} // namespace mixer_client +} // namespace istio + +#endif // MIXERCLIENT_SRC_TRANSPORT_H