Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions gtests/src/integration/objects/cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
return *this;
}

/**
* Assign the local address to bind; passing an empty string will clear
* the local address.
*
* @param name An IP address or hostname
* @return Cluster object
*/
Cluster& with_local_address(const std::string& name) {
EXPECT_EQ(CASS_OK, cass_cluster_set_local_address(get(),
name.c_str()));
return *this;
}

/**
* Assign the number of connections made to each node/server for each
* connections thread
Expand Down
93 changes: 93 additions & 0 deletions gtests/src/integration/tests/test_control_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
* Control connection integration tests; single node cluster
*/
class ControlConnectionTests : public Integration {

public:

void SetUp() {
// Call the parent setup function (don't automatically start session,
// because we don't want any connections established until we have
// set up the cluster).
is_session_requested_ = false;
Integration::SetUp();
}

protected:
/**
* Execute multiple requests and ensure the expected nodes are used during
Expand Down Expand Up @@ -186,6 +197,87 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, ConnectUsingInvalidPort) {
}
}

/**
* Perform session connection using unresolvable local IP address
*
* This test will attempt to perform a connection using an unresolvable local
* IP address and ensure the control connection is not established against a
* single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingUnresolvableLocalIpAddress) {
CHECK_FAILURE;

// Attempt to connect to the server using an unresolvable local IP address
Cluster cluster = default_cluster();
EXPECT_EQ(CASS_ERROR_LIB_HOST_RESOLUTION,
cass_cluster_set_local_address(cluster.get(), "unknown.invalid"));
}

/**
* Perform session connection using unbindable local IP address
*
* This test will attempt to perform a connection using an unbindable local IP
* address and ensure the control connection is not established against a
* single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingUnbindableLocalIpAddress) {
CHECK_FAILURE;

// Attempt to connect to the server using an unbindable local IP address
logger_.add_critera("Unable to bind local address: address not available");
Cluster cluster = default_cluster().with_local_address("1.1.1.1");
try {
cluster.connect();
FAIL() << "Connection was established using unbindable local IP address";
} catch (Session::Exception& se) {
ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code());
ASSERT_GE(logger_.count(), 1u);
}
}

/**
* Perform session connection using valid local IP address but invalid
* remote address
*
* This test will attempt to perform a connection using a valid local IP
* address and invalid remote address and ensure the control connection is
* not established against a single node cluster.
*
* @test_category control_connection
* @since core:1.0.0
* @expected_result Control connection will not be established
*/
CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
ConnectUsingValidLocalIpAddressButInvalidRemote) {
CHECK_FAILURE;

// Attempt to connect to the server using an valid local IP address
// but invalid remote address. The specified remote is not routable
// from the specified local.
logger_.add_critera("Unable to establish a control connection to host " \
"1.1.1.1 because of the following error: " \
"Connect error 'operation not permitted'");
Cluster cluster = Cluster::build().with_contact_points("1.1.1.1")
.with_local_address("127.0.0.1");
try {
cluster.connect();
FAIL() << "Connection was established using invalid IP address";
} catch (Session::Exception& se) {
ASSERT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, se.error_code());
ASSERT_GE(logger_.count(), 1u);
}
}

/**
* Perform session connection while forcing a control connection reconnect
*
Expand Down Expand Up @@ -495,6 +587,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests,
CHECK_FAILURE;

// Stop the cluster and attempt to perform a request
connect();
ccm_->stop_cluster();
Result result = session_.execute(SELECT_ALL_SYSTEM_LOCAL_CQL,
CASS_CONSISTENCY_ONE, false, false);
Expand Down
68 changes: 68 additions & 0 deletions gtests/src/unit/tests/test_jenkins.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2011 Google Inc. All Rights Reserved.
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>
#include <cstdint>

#include "jenkins_hash.hpp"

inline char* to_char_ptr (uint8_t* uptr) {
return reinterpret_cast<char *>(uptr);
}

inline const char* to_char_ptr (const uint8_t* uptr) {
return reinterpret_cast<const char *>(uptr);
}

TEST(JenkinsUnitTest, TestHash64) {

const uint64_t seed = 97;

const uint8_t b1[] = {
0xc7, 0x25, 0x1d, 0x5d, 0x75, 0x3a, 0x4e, 0x46, 0x22, 0x29, 0x4d, 0x6c, 0x67, 0x7a, 0xa8, 0x25,
0x71
};

const uint8_t b2[] = {
0x83, 0x8e, 0x7e, 0xf0, 0x71, 0xef, 0x9b, 0x3e, 0x4a, 0xe6, 0x12, 0x60, 0xc0, 0xa1, 0xf9, 0x94,
0x5a, 0x85, 0x9b, 0xb1, 0xf6, 0x86, 0x97, 0xe1, 0xab, 0x87, 0xc8, 0xab, 0xc1, 0x28, 0xd1, 0x72,
0x73, 0x0b, 0xda, 0x50, 0xe3, 0xe6, 0xf9, 0x42
};

const uint8_t b3[] = {
0xad, 0xe3, 0xaa, 0xb7, 0xd2, 0xbc, 0x3a, 0xe6, 0x60, 0xe4, 0xc6, 0xc1, 0x02, 0x0a, 0x3a, 0x50,
0x66, 0xb2, 0x26, 0x6c, 0x1d, 0x1b, 0x16, 0xb1, 0x1b, 0x51, 0x74, 0x9c, 0xa7, 0xbb, 0xad, 0x46,
0x25, 0x54, 0xca, 0x30, 0x3a, 0x31, 0xd0, 0x34, 0x56, 0xac, 0xb1, 0xca, 0xaf, 0x7f, 0x5c, 0xf3,
0x9e, 0x16, 0x94, 0x78, 0x84, 0xca, 0x60, 0x66, 0x27, 0x59, 0xe1, 0x99, 0xb4, 0xc4, 0xbd, 0x50,
0x48, 0x50, 0xcb, 0xa6, 0x0b, 0xe1, 0x71, 0x31, 0x49, 0x27, 0x11, 0x9e, 0xcc, 0xcd, 0xd8, 0x19,
0x09, 0xc6, 0xdf, 0x15, 0x64, 0x0d, 0xf7, 0x25, 0x5c, 0x48, 0x19, 0xc7, 0x6b, 0x10, 0x02, 0x7e,
0x31, 0x54, 0x2a, 0xd8, 0x92, 0xe5, 0xc5, 0xab, 0xe9, 0x3d, 0x57, 0x99, 0x9a, 0x93, 0x4f, 0x48,
0x3f, 0xfa, 0x73, 0x36, 0x03, 0xe1, 0xbd, 0x27, 0xe5, 0x06, 0x8a, 0x21, 0x33, 0xff, 0x91, 0x80,
0x36, 0x4d, 0x2d, 0x04, 0xc7, 0x11, 0xcc, 0x2a, 0xc0, 0xa9, 0x17, 0x18, 0x73, 0xff, 0xd5, 0x0e,
0x0d, 0x8b, 0x6f, 0x8b, 0xba, 0x8c, 0x37, 0x49, 0xb1, 0x31, 0x5b, 0xf4, 0x4d, 0xd7, 0x19, 0x10,
0x40, 0x6e, 0x61, 0x41, 0xf1, 0x55, 0xaa, 0x44, 0x79, 0x13, 0x57, 0x3b, 0x72, 0xac, 0xfe, 0xce,
0xf8, 0xd7, 0x07, 0x82, 0x05, 0xef, 0x0f, 0x53, 0x6c, 0xfe, 0x7d, 0x94, 0x48, 0xa5, 0x48, 0x42,
0x47, 0x70, 0x29, 0xe7, 0x7e, 0x53, 0xca, 0x88, 0x89, 0x8a, 0xec, 0xe5, 0x01, 0x44, 0xf5, 0xc5,
0xc9, 0x89, 0x6d, 0x6a, 0xf1, 0x26, 0x61, 0xae, 0x30, 0x50, 0x61, 0x68, 0x41, 0xac, 0x82, 0x40,
0xdb, 0x12, 0x00, 0x68, 0xad, 0x34, 0x52, 0xb2, 0xbb, 0xc5, 0x74, 0xf1, 0x3e, 0x00, 0x98, 0x6e,
0x1d, 0xc2, 0xd7, 0x7d, 0xc6, 0xc7, 0x10, 0xb2, 0xac, 0xcf, 0x8b, 0x25, 0xd9, 0x7d, 0xd5, 0x20
};

EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b1), sizeof(b1), seed) == 1789751740810280356ul);
EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b2), sizeof(b2), seed) == 4001818822847464429ul);
EXPECT_TRUE(cass::Hash64StringWithSeed(to_char_ptr(b3), sizeof(b3), seed) == 15240025333683105143ul);

}
34 changes: 19 additions & 15 deletions gtests/src/unit/tests/test_md5.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
/*
Copyright (c) DataStax, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Copyright 2011 Google Inc. All Rights Reserved.
//
// The following only applies to changes made to this file as part of YugaByte development.
//
// Portions Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
// Contains the legacy Bob Jenkins Lookup2-based hashing routines. These need to
// always return the same results as their values have been recorded in various
// places and cannot easily be updated. Original author: Sanjay Ghemawat

#include <gtest/gtest.h>

Expand Down
48 changes: 48 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,21 @@ typedef struct CassRetryPolicy_ CassRetryPolicy;
*/
typedef struct CassCustomPayload_ CassCustomPayload;

/**
* Contains the set of Cass Objects used to control a session and execute queries
*/
typedef struct SessionObjects {
CassFuture* cass_future;
CassSession* cass_session;
CassCluster* cass_cluster;
operator=(SessionObjects obj) {
cass_cluster = obj.cass_cluster;
cass_session = obj.cass_session;
cass_future = obj.cass_future;
}
};


/**
* A snapshot of the session's performance/diagnostic metrics.
*
Expand Down Expand Up @@ -946,6 +961,39 @@ CASS_EXPORT CassError
cass_cluster_set_port(CassCluster* cluster,
int port);

/**
* Sets the local address to bind when connecting to the cluster,
* if desired.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] name IP address to bind, or empty string for no binding.
* Only numeric addresses are supported; no resolution is done.
* @return CASS_OK if successful, otherwise an error occurred.
*/
CASS_EXPORT CassError
cass_cluster_set_local_address(CassCluster* cluster,
const char* name);

/**
* Same as cass_cluster_set_local_address(), but with lengths for string
* parameters.
*
* @public @memberof CassCluster
*
* @param[in] cluster
* @param[in] name
* @param[in] name_length
* @return same as cass_cluster_set_local_address()
*
* @see cass_cluster_set_local_address()
*/
CASS_EXPORT CassError
cass_cluster_set_local_address_n(CassCluster* cluster,
const char* name,
size_t name_length);

/**
* Sets the SSL context and enables SSL.
*
Expand Down
19 changes: 19 additions & 0 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,25 @@ CassError cass_cluster_set_prepare_on_up_or_add_host(CassCluster* cluster,
return CASS_OK;
}

CassError cass_cluster_set_local_address(CassCluster* cluster,
const char* name) {
return cass_cluster_set_local_address_n(cluster, name, SAFE_STRLEN(name));
}

CassError cass_cluster_set_local_address_n(CassCluster* cluster,
const char* name,
size_t name_length) {
cass::Address address; // default to AF_UNSPEC
if (name_length == 0 ||
name == NULL ||
cass::Address::from_string(std::string(name, name_length), 0, &address)) {
cluster->config().set_local_address(address);
} else {
return CASS_ERROR_LIB_HOST_RESOLUTION;
}
return CASS_OK;
}

CassError cass_cluster_set_no_compact(CassCluster* cluster,
cass_bool_t enabled) {
cluster->config().set_no_compact(enabled == cass_true);
Expand Down
8 changes: 8 additions & 0 deletions src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ class Config {
prepare_on_up_or_add_host_ = enabled;
}

const Address* local_address() const {
return local_address_.is_valid() ? &local_address_ : NULL; }

void set_local_address(const Address& address) {
local_address_ = address;
}

bool no_compact() const { return no_compact_; }

void set_no_compact(bool enabled) {
Expand Down Expand Up @@ -448,6 +455,7 @@ class Config {
bool use_randomized_contact_points_;
bool prepare_on_all_hosts_;
bool prepare_on_up_or_add_host_;
Address local_address_;
bool no_compact_;
};

Expand Down
12 changes: 11 additions & 1 deletion src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ Connection::Connection(uv_loop_t* loop,
LOG_WARN("Unable to set tcp keepalive");
}

const Address* local_address = config_.local_address();
if (local_address) {
int rc = uv_tcp_bind(&socket_, local_address->addr(), 0);
if (rc) {
notify_error("Unable to bind local address: " + std::string(UV_ERRSTR(rc, loop_)));
return;
}
}

SslContext* ssl_context = config_.ssl_context();
if (ssl_context != NULL) {
ssl_session_.reset(ssl_context->create_session(host));
Expand Down Expand Up @@ -378,7 +387,8 @@ void Connection::set_state(ConnectionState new_state) {

switch (state_) {
case CONNECTION_STATE_NEW:
assert(new_state == CONNECTION_STATE_CONNECTING &&
assert((new_state == CONNECTION_STATE_CONNECTING ||
new_state == CONNECTION_STATE_CLOSE_DEFUNCT) &&
"Invalid connection state after new");
state_ = new_state;
break;
Expand Down
Loading