Skip to content

Commit

Permalink
[#9370] Test traffic compression with encryption
Browse files Browse the repository at this point in the history
Summary:
This diff adds tests for simultaneous traffic compression and encryption.

Also refactored RefinedStream to avoid extra copy for compressed streams.
Added test that checks that we actually compress the traffic.
Added test for disabled refined streams.

Test Plan:
ybd --gtest_filter TestRpcSecureCompression.*
ybd --gtest_filter TestRpcCompression.Compression
ybd --gtest_filter ExternalMiniClusterSecureTest.InsecureCql
ybd --gtest_filter SecureConnectionTest.Compression

Reviewers: bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D12406
  • Loading branch information
spolitov committed Jul 30, 2021
1 parent 2ae5f92 commit e9c7f79
Show file tree
Hide file tree
Showing 32 changed files with 649 additions and 343 deletions.
1 change: 0 additions & 1 deletion ent/src/yb/integration-tests/CMakeLists-include.txt
Expand Up @@ -35,7 +35,6 @@ set(YB_ENT_CURRENT_SOURCE_DIR
set(INTEGRATION_TESTS_LIB_EXTENSIONS cdc_test_util PARENT_SCOPE)

set(INTEGRATION_TESTS_EXTENSIONS_TESTS
external_mini_cluster_secure_test
secure_connection_test
snapshot-test
transaction-ent-test
Expand Down
26 changes: 21 additions & 5 deletions ent/src/yb/integration-tests/external_mini_cluster_ent.cc
Expand Up @@ -21,11 +21,25 @@

using namespace std::literals;

DECLARE_string(certs_dir);
DECLARE_bool(allow_insecure_connections);
DECLARE_bool(node_to_node_encryption_use_client_certificates);
DECLARE_bool(use_client_to_server_encryption);
DECLARE_bool(use_node_to_node_encryption);
DECLARE_string(certs_dir);

namespace yb {

std::string FlagToString(bool flag) {
return flag ? "true" : "false";
}

const std::string& FlagToString(const std::string& flag) {
return flag;
}

#define YB_FORWARD_FLAG(flag_name) \
"--" BOOST_PP_STRINGIZE(flag_name) "="s + FlagToString(BOOST_PP_CAT(FLAGS_, flag_name))

void StartSecure(
std::unique_ptr<ExternalMiniCluster>* cluster,
std::unique_ptr<rpc::SecureContext>* secure_context,
Expand All @@ -39,10 +53,12 @@ void StartSecure(

ExternalMiniClusterOptions opts;
opts.extra_tserver_flags = {
"--use_node_to_node_encryption=true", "--allow_insecure_connections=false",
"--certs_dir=" + FLAGS_certs_dir,
"--node_to_node_encryption_use_client_certificates="s +
(FLAGS_node_to_node_encryption_use_client_certificates ? "true" : "false")};
YB_FORWARD_FLAG(allow_insecure_connections),
YB_FORWARD_FLAG(certs_dir),
YB_FORWARD_FLAG(node_to_node_encryption_use_client_certificates),
YB_FORWARD_FLAG(use_client_to_server_encryption),
YB_FORWARD_FLAG(use_node_to_node_encryption),
};
opts.extra_master_flags = opts.extra_tserver_flags;
opts.extra_master_flags.insert(
opts.extra_master_flags.end(), master_flags.begin(), master_flags.end());
Expand Down
22 changes: 18 additions & 4 deletions ent/src/yb/integration-tests/secure_connection_test.cc
Expand Up @@ -30,12 +30,14 @@

DECLARE_bool(TEST_private_broadcast_address);
DECLARE_bool(allow_insecure_connections);
DECLARE_bool(enable_stream_compression);
DECLARE_bool(node_to_node_encryption_use_client_certificates);
DECLARE_bool(use_client_to_server_encryption);
DECLARE_bool(use_node_to_node_encryption);
DECLARE_bool(verify_client_endpoint);
DECLARE_bool(verify_server_endpoint);
DECLARE_int32(TEST_nodes_per_cloud);
DECLARE_int32(stream_compression_algo);
DECLARE_int32(yb_client_admin_operation_timeout_sec);
DECLARE_string(TEST_public_hostname_suffix);
DECLARE_string(cert_file_pattern);
Expand Down Expand Up @@ -208,27 +210,39 @@ TEST_F_EX(SecureConnectionTest, VerifyNameOnly, SecureConnectionVerifyNameOnlyTe
TestSimpleOps();
}

class SecureConnectionCipherList : public SecureConnectionTest {
class SecureConnectionCipherListTest : public SecureConnectionTest {
void SetUp() override {
FLAGS_cipher_list = "HIGH";
FLAGS_ssl_protocols = "tls12";
SecureConnectionTest::SetUp();
}
};

TEST_F_EX(SecureConnectionTest, CipherList, SecureConnectionCipherList) {
TEST_F_EX(SecureConnectionTest, CipherList, SecureConnectionCipherListTest) {
TestSimpleOps();
}

class SecureConnectionCipherSuites : public SecureConnectionTest {
class SecureConnectionCipherSuitesTest : public SecureConnectionTest {
void SetUp() override {
FLAGS_ciphersuites = "TLS_AES_128_CCM_8_SHA256";
FLAGS_ssl_protocols = "tls13";
SecureConnectionTest::SetUp();
}
};

TEST_F_EX(SecureConnectionTest, CipherSuites, SecureConnectionCipherSuites) {
TEST_F_EX(SecureConnectionTest, CipherSuites, SecureConnectionCipherSuitesTest) {
TestSimpleOps();
}

class SecureConnectionCompressionTest : public SecureConnectionTest {
void SetUp() override {
FLAGS_enable_stream_compression = true;
FLAGS_stream_compression_algo = 1;
SecureConnectionTest::SetUp();
}
};

TEST_F_EX(SecureConnectionTest, Compression, SecureConnectionCompressionTest) {
TestSimpleOps();
}

Expand Down
1 change: 1 addition & 0 deletions ent/src/yb/server/secure.cc
Expand Up @@ -109,6 +109,7 @@ Result<std::unique_ptr<rpc::SecureContext>> SetupSecureContext(
SecureContextType type, rpc::MessengerBuilder* builder) {
auto use = type == SecureContextType::kInternal ? FLAGS_use_node_to_node_encryption
: FLAGS_use_client_to_server_encryption;
LOG(INFO) << __func__ << ": " << type << ", " << use;
if (!use) {
ApplyCompressedStream(builder, rpc::TcpStream::Factory());
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Expand Up @@ -161,6 +161,7 @@ endfunction()
ADD_YB_CQL_TEST(cql-index-test)
ADD_YB_CQL_TEST(cql-test)
ADD_YB_CQL_TEST(cql-tablet-split-test)
ADD_YB_CQL_TEST(external_mini_cluster_secure_test)

set(YB_TEST_LINK_LIBS ${YB_TEST_LINK_LIBS_SAVED})

Expand Down
Expand Up @@ -26,6 +26,7 @@
#include "yb/yql/cql/ql/util/errcodes.h"
#include "yb/yql/cql/ql/util/statement_result.h"

#include "yb/integration-tests/cql_test_util.h"
#include "yb/integration-tests/external_mini_cluster_ent.h"

DECLARE_bool(use_client_to_server_encryption);
Expand All @@ -46,6 +47,8 @@ class ExternalMiniClusterSecureTest :
const auto sub_dir = JoinPathSegments("ent", "test_certs");
FLAGS_certs_dir = JoinPathSegments(env_util::GetRootDir(sub_dir), sub_dir);

SetUpFlags();

MiniClusterTestWithClient::SetUp();

ASSERT_NO_FATALS(StartSecure(&cluster_, &secure_context_, &messenger_));
Expand All @@ -55,6 +58,9 @@ class ExternalMiniClusterSecureTest :
DontVerifyClusterBeforeNextTearDown(); // Verify requires insecure connection.
}

virtual void SetUpFlags() {
}

void DoTearDown() override {
messenger_->Shutdown();
MiniClusterTestWithClient::DoTearDown();
Expand Down Expand Up @@ -90,6 +96,34 @@ TEST_F(ExternalMiniClusterSecureTest, Simple) {
}
}

class ExternalMiniClusterSecureAllowInsecureTest : public ExternalMiniClusterSecureTest {
public:
void SetUpFlags() override {
FLAGS_allow_insecure_connections = true;
}
};

// Test that CQL driver could connect to cluster with not encrypted connection.
// So we are checking disabled mode of RefinedStream.
// For this test with allow insecure (i.e. not encrypted) connections.
TEST_F_EX(ExternalMiniClusterSecureTest, InsecureCql, ExternalMiniClusterSecureAllowInsecureTest) {
std::vector<std::string> hosts;
for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
hosts.push_back(cluster_->tablet_server(i)->bind_host());
}

auto cql_port = cluster_->tablet_server(0)->cql_rpc_port();
LOG(INFO) << "CQL port: " << cql_port;
auto driver = std::make_unique<CppCassandraDriver>(
hosts, cql_port, UsePartitionAwareRouting::kTrue);

auto session = ASSERT_RESULT(EstablishSession(driver.get()));
ASSERT_OK(session.ExecuteQuery("CREATE TABLE t (k INT PRIMARY KEY, v INT)"));
ASSERT_OK(session.ExecuteQuery("INSERT INTO t (k, v) VALUES (1, 2)"));
auto content = ASSERT_RESULT(session.ExecuteAndRenderToString("SELECT * FROM t"));
ASSERT_EQ(content, "1,2");
}

class ExternalMiniClusterSecureWithClientCertsTest : public ExternalMiniClusterSecureTest {
void SetUp() override {
FLAGS_node_to_node_encryption_use_client_certificates = true;
Expand Down
4 changes: 2 additions & 2 deletions src/yb/rpc/acceptor.cc
Expand Up @@ -154,8 +154,8 @@ void Acceptor::IoHandler(ev::io& io, int events) {
VLOG(2) << "calling accept() on socket " << socket.GetFd();
Status s = socket.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
if (!s.ok()) {
if (!Socket::IsTemporarySocketError(s)) {
LOG(WARNING) << "Acceptor: accept failed: " << s.ToString();
if (!s.IsTryAgain()) {
LOG(WARNING) << "Acceptor: accept failed: " << s;
}
return;
}
Expand Down
24 changes: 19 additions & 5 deletions src/yb/rpc/binary_call_parser.cc
Expand Up @@ -57,7 +57,7 @@ BinaryCallParser::BinaryCallParser(
buffer_tracker_ = MemTracker::FindOrCreateTracker("Reading", parent_tracker);
}

Result<ProcessDataResult> BinaryCallParser::Parse(
Result<ProcessCallsResult> BinaryCallParser::Parse(
const rpc::ConnectionPtr& connection, const IoVecs& data, ReadBufferFull read_buffer_full,
const MemTrackerPtr* tracker_for_throttle) {
if (call_data_.should_reject()) {
Expand Down Expand Up @@ -107,7 +107,11 @@ Result<ProcessDataResult> BinaryCallParser::Parse(
<< (*tracker_for_throttle)->LogUsage("");
if (ShouldThrottleRpc(*tracker_for_throttle, call_data_size, "Ignoring RPC call: ")) {
call_data_ = CallData(call_data_size, ShouldReject::kTrue);
return ProcessDataResult{ full_input_size, Slice(), call_data_size - call_received_size };
return ProcessCallsResult{
.consumed = full_input_size,
.buffer = Slice(),
.bytes_to_skip = call_data_size - call_received_size
};
}
}

Expand All @@ -121,7 +125,10 @@ Result<ProcessDataResult> BinaryCallParser::Parse(
VLOG(4) << "BinaryCallParser::Parse, consumed: " << consumed
<< " returning: { full_input_size: " << full_input_size
<< " buffer.size(): " << buffer.size() << " }";
return ProcessDataResult{full_input_size, buffer};
return ProcessCallsResult{
.consumed = full_input_size,
.buffer = buffer,
};
} else if (read_buffer_full && consumed == 0) {
auto consumption = blocking_mem_tracker ? blocking_mem_tracker->consumption() : -1;
auto limit = blocking_mem_tracker ? blocking_mem_tracker->limit() : -1;
Expand All @@ -132,7 +139,11 @@ Result<ProcessDataResult> BinaryCallParser::Parse(
<< ", consumption: " << consumption << " of " << limit << ". Call will be ignored.\n"
<< DumpMemoryUsage();
call_data_ = CallData(call_data_size, ShouldReject::kTrue);
return ProcessDataResult{full_input_size, Slice(), call_data_size - call_received_size};
return ProcessCallsResult{
.consumed = full_input_size,
.buffer = Slice(),
.bytes_to_skip = call_data_size - call_received_size
};
} else {
// For backward compatibility in behavior until we fix
// https://github.com/yugabyte/yugabyte-db/issues/2563.
Expand All @@ -157,7 +168,10 @@ Result<ProcessDataResult> BinaryCallParser::Parse(
consumed += total_length;
}
VLOG(4) << "BinaryCallParser::Parse, returning: { consumed: " << consumed << " buffer: empty }";
return ProcessDataResult{ consumed, Slice() };
return ProcessCallsResult {
.consumed = consumed,
.buffer = Slice(),
};
}

} // namespace rpc
Expand Down
6 changes: 3 additions & 3 deletions src/yb/rpc/binary_call_parser.h
Expand Up @@ -45,9 +45,9 @@ class BinaryCallParser {

// If tracker_for_throttle is not nullptr - throttle big requests when tracker_for_throttle
// (or any of its ancestors) exceeds soft memory limit.
Result<ProcessDataResult> Parse(const rpc::ConnectionPtr& connection, const IoVecs& data,
ReadBufferFull read_buffer_full,
const MemTrackerPtr* tracker_for_throttle);
Result<ProcessCallsResult> Parse(const rpc::ConnectionPtr& connection, const IoVecs& data,
ReadBufferFull read_buffer_full,
const MemTrackerPtr* tracker_for_throttle);

private:
MemTrackerPtr buffer_tracker_;
Expand Down

0 comments on commit e9c7f79

Please sign in to comment.