Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/inc/cpp-pcp-client/connector/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ using CloseCode = CloseCodeValues::value_;

class LIBCPP_PCP_CLIENT_EXPORT Connection {
public:
/// To keep track of WebSocket timings
ConnectionTimings timings;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you make this public?

Copy link
Contributor Author

@parisiale parisiale Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the new AssociationTimings will be stored in the Connector, it makes more sense to me to allow Connector accessing directly the ConnectionTimings instead of having a copy from a Connection getter call. This is done considering that the actual user interface is provided by Connector, that does not expose the Connection instance.

No strong opinion here, I'm happy to make it private again if you think it's the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems ok.


/// The Connection class provides the necessary logic to establish
/// and use a PCP connection over WebSocket.
/// The constructor throws an connection_config_error if it fails
Expand Down Expand Up @@ -184,9 +187,6 @@ class LIBCPP_PCP_CLIENT_EXPORT Connection {
/// Exponential backoff interval for re-connect
uint32_t connection_backoff_ms_ { CONNECTION_BACKOFF_MS };

/// Keep track of connection timings
ConnectionTimings connection_timings_;

/// To manage the connection state
Util::mutex state_mutex_;

Expand Down
10 changes: 10 additions & 0 deletions lib/inc/cpp-pcp-client/connector/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
#include <cpp-pcp-client/connector/client_metadata.hpp>
#include <cpp-pcp-client/connector/session_association.hpp>
#include <cpp-pcp-client/connector/errors.hpp>
#include <cpp-pcp-client/connector/timings.hpp>

#include <cpp-pcp-client/validator/validator.hpp>
#include <cpp-pcp-client/validator/schema.hpp>

#include <cpp-pcp-client/protocol/chunks.hpp>
#include <cpp-pcp-client/protocol/message.hpp>

#include <cpp-pcp-client/util/thread.hpp>

#include <cpp-pcp-client/export.h>

#include <memory>
Expand Down Expand Up @@ -130,6 +135,11 @@ class LIBCPP_PCP_CLIENT_EXPORT Connector {
/// then, false otherwise.
bool isAssociated() const;

/// Returns the connection timings of the underlying connection
/// if established, otherwise the ConnectionTimings' default
/// constructor.
ConnectionTimings getConnectionTimings() const;

/// Starts the Monitoring Task in a separate thread.
/// Such task will periodically check the state of the
/// underlying connection and re-establish it in case it has
Expand Down
9 changes: 6 additions & 3 deletions lib/inc/cpp-pcp-client/connector/timings.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef CPP_PCP_CLIENT_SRC_CONNECTOR_TIMINGS_H_
#define CPP_PCP_CLIENT_SRC_CONNECTOR_TIMINGS_H_

#include <cpp-pcp-client/export.h>

#include <boost/chrono/chrono.hpp>

#include <string>
Expand All @@ -11,8 +13,7 @@ namespace PCPClient {
// ConnectionTimings
//

class ConnectionTimings {
public:
struct LIBCPP_PCP_CLIENT_EXPORT ConnectionTimings {
using Duration_us = boost::chrono::duration<int, boost::micro>;

boost::chrono::high_resolution_clock::time_point start;
Expand All @@ -24,7 +25,9 @@ class ConnectionTimings {
bool connection_started { false };
bool connection_failed { false };

ConnectionTimings();
/// Sets the `start` time_point member to the current instant,
/// the other time_points to epoch, and the flags to false
void reset();

/// Time interval to establish the TCP connection [us]
Duration_us getTCPInterval() const;
Expand Down
106 changes: 67 additions & 39 deletions lib/src/connector/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Connection::Connection(std::string broker_ws_uri,

Connection::Connection(std::vector<std::string> broker_ws_uris,
ClientMetadata client_metadata)
: broker_ws_uris_ { std::move(broker_ws_uris) },
: timings {},
broker_ws_uris_ { std::move(broker_ws_uris) },
client_metadata_ { std::move(client_metadata) },
connection_state_ { ConnectionState::initialized },
connection_target_index_ { 0u },
Expand Down Expand Up @@ -120,27 +121,32 @@ Connection::Connection(std::vector<std::string> broker_ws_uris,
}
}

Connection::~Connection() {
Connection::~Connection()
{
cleanUp();
}

ConnectionState Connection::getConnectionState() const {
ConnectionState Connection::getConnectionState() const
{
return connection_state_.load();
}

//
// Callback modifiers
//

void Connection::setOnOpenCallback(std::function<void()> c_b) {
void Connection::setOnOpenCallback(std::function<void()> c_b)
{
onOpen_callback = c_b;
}

void Connection::setOnMessageCallback(std::function<void(std::string msg)> c_b) {
void Connection::setOnMessageCallback(std::function<void(std::string msg)> c_b)
{
onMessage_callback_ = c_b;
}

void Connection::resetCallbacks() {
void Connection::resetCallbacks()
{
onOpen_callback = [](){}; // NOLINT [false positive readability/braces]
onMessage_callback_ = [](std::string message){}; // NOLINT [false positive readability/braces]
}
Expand All @@ -149,11 +155,13 @@ void Connection::resetCallbacks() {
// Synchronous calls
//

inline static void doSleep(int ms = CONNECTION_MIN_INTERVAL_MS) {
inline static void doSleep(int ms = CONNECTION_MIN_INTERVAL_MS)
{
Util::this_thread::sleep_for(Util::chrono::milliseconds(ms));
}

void Connection::connect(int max_connect_attempts) {
void Connection::connect(int max_connect_attempts)
{
// FSM
// - states are ConnectionState:
// * initialized - connecting - open - closing - closed
Expand Down Expand Up @@ -233,7 +241,8 @@ void Connection::connect(int max_connect_attempts) {
throw connection_fatal_error { msg };
}

void Connection::send(const std::string& msg) {
void Connection::send(const std::string& msg)
{
websocketpp::lib::error_code ec;
endpoint_->send(connection_handle_,
msg,
Expand All @@ -244,7 +253,8 @@ void Connection::send(const std::string& msg) {
lth_loc::format("failed to send message: {1}", ec.message()) };
}

void Connection::send(void* const serialized_msg_ptr, size_t msg_len) {
void Connection::send(void* const serialized_msg_ptr, size_t msg_len)
{
websocketpp::lib::error_code ec;
endpoint_->send(connection_handle_,
serialized_msg_ptr,
Expand All @@ -256,7 +266,8 @@ void Connection::send(void* const serialized_msg_ptr, size_t msg_len) {
lth_loc::format("failed to send message: {1}", ec.message()) };
}

void Connection::ping(const std::string& binary_payload) {
void Connection::ping(const std::string& binary_payload)
{
websocketpp::lib::error_code ec;
endpoint_->ping(connection_handle_, binary_payload, ec);
if (ec)
Expand All @@ -265,7 +276,8 @@ void Connection::ping(const std::string& binary_payload) {
ec.message()) };
}

void Connection::close(CloseCode code, const std::string& reason) {
void Connection::close(CloseCode code, const std::string& reason)
{
LOG_DEBUG("About to close the WebSocket connection");
Util::lock_guard<Util::mutex> the_lock { state_mutex_ };
auto c_s = connection_state_.load();
Expand All @@ -284,7 +296,8 @@ void Connection::close(CloseCode code, const std::string& reason) {
// Private interface
//

void Connection::connectAndWait() {
void Connection::connectAndWait()
{
connect_();
lth_util::Timer timer {};
while (connection_state_.load() != ConnectionState::open
Expand All @@ -293,15 +306,17 @@ void Connection::connectAndWait() {
}
}

void Connection::tryClose() {
void Connection::tryClose()
{
try {
close();
} catch (connection_processing_error& e) {
LOG_WARNING("Cleanup failure: {1}", e.what());
}
}

void Connection::cleanUp() {
void Connection::cleanUp()
{
auto c_s = connection_state_.load();

switch (c_s) {
Expand Down Expand Up @@ -347,9 +362,10 @@ void Connection::cleanUp() {
endpoint_thread_->join();
}

void Connection::connect_() {
void Connection::connect_()
{
connection_state_ = ConnectionState::connecting;
connection_timings_ = ConnectionTimings();
timings.reset();
websocketpp::lib::error_code ec;
auto ws_uri = getWsUri();
WS_Client_Type::connection_ptr connection_ptr {
Expand All @@ -373,12 +389,14 @@ void Connection::connect_() {
}
}

std::string const& Connection::getWsUri() {
std::string const& Connection::getWsUri()
{
auto c_t = connection_target_index_.load();
return broker_ws_uris_[c_t % broker_ws_uris_.size()];
}

void Connection::switchWsUri() {
void Connection::switchWsUri()
{
auto old_t = getWsUri();
++connection_target_index_;
auto current_t = getWsUri();
Expand All @@ -392,7 +410,8 @@ void Connection::switchWsUri() {
//

template <typename Verifier>
class verbose_verification {
class verbose_verification
{
public:
verbose_verification(Verifier verifier)
: verifier_(verifier)
Expand Down Expand Up @@ -422,7 +441,8 @@ make_verbose_verification(Verifier verifier)
return verbose_verification<Verifier>(verifier);
}

WS_Context_Ptr Connection::onTlsInit(WS_Connection_Handle hdl) {
WS_Context_Ptr Connection::onTlsInit(WS_Connection_Handle hdl)
{
LOG_DEBUG("WebSocket TLS initialization event; about to validate the certificate");
// NB: for TLS certificates, refer to:
// www.boost.org/doc/libs/1_56_0/doc/html/boost_asio/reference/ssl__context.html
Expand Down Expand Up @@ -457,42 +477,47 @@ WS_Context_Ptr Connection::onTlsInit(WS_Connection_Handle hdl) {
return ctx;
}

void Connection::onClose(WS_Connection_Handle hdl) {
void Connection::onClose(WS_Connection_Handle hdl)
{
Util::lock_guard<Util::mutex> the_lock { state_mutex_ };
connection_timings_.close = Util::chrono::high_resolution_clock::now();
timings.close = Util::chrono::high_resolution_clock::now();
auto con = endpoint_->get_con_from_hdl(hdl);
LOG_DEBUG("WebSocket on close event: {1} (code: {2}) - {3}",
con->get_ec().message(), con->get_remote_close_code(),
connection_timings_.toString());
timings.toString());
connection_state_ = ConnectionState::closed;
}

void Connection::onFail(WS_Connection_Handle hdl) {
void Connection::onFail(WS_Connection_Handle hdl)
{
Util::lock_guard<Util::mutex> the_lock { state_mutex_ };
connection_timings_.close = Util::chrono::high_resolution_clock::now();
connection_timings_.connection_failed = true;
timings.close = Util::chrono::high_resolution_clock::now();
timings.connection_failed = true;
auto con = endpoint_->get_con_from_hdl(hdl);
LOG_DEBUG("WebSocket on fail event - {1}", connection_timings_.toString());
LOG_DEBUG("WebSocket on fail event - {1}", timings.toString());
LOG_WARNING("WebSocket on fail event (connection loss): {1} (code: {2})",
con->get_ec().message(), con->get_remote_close_code());
connection_state_ = ConnectionState::closed;
}

bool Connection::onPing(WS_Connection_Handle hdl, std::string binary_payload) {
bool Connection::onPing(WS_Connection_Handle hdl, std::string binary_payload)
{
LOG_TRACE("WebSocket onPing event - payload: {1}", binary_payload);
// Returning true so the transport layer will send back a pong
return true;
}

void Connection::onPong(WS_Connection_Handle hdl, std::string binary_payload) {
void Connection::onPong(WS_Connection_Handle hdl, std::string binary_payload)
{
LOG_DEBUG("WebSocket onPong event");
if (consecutive_pong_timeouts_) {
consecutive_pong_timeouts_ = 0;
}
}

void Connection::onPongTimeout(WS_Connection_Handle hdl,
std::string binary_payload) {
std::string binary_payload)
{
++consecutive_pong_timeouts_;
if (consecutive_pong_timeouts_ >= client_metadata_.pong_timeouts_before_retry) {
LOG_WARNING("WebSocket onPongTimeout event ({1} consecutive); "
Expand All @@ -506,20 +531,22 @@ void Connection::onPongTimeout(WS_Connection_Handle hdl,
}
}

void Connection::onPreTCPInit(WS_Connection_Handle hdl) {
connection_timings_.tcp_pre_init = Util::chrono::high_resolution_clock::now();
void Connection::onPreTCPInit(WS_Connection_Handle hdl)
{
timings.tcp_pre_init = Util::chrono::high_resolution_clock::now();
LOG_TRACE("WebSocket pre-TCP initialization event");
}

void Connection::onPostTCPInit(WS_Connection_Handle hdl) {
connection_timings_.tcp_post_init = Util::chrono::high_resolution_clock::now();
void Connection::onPostTCPInit(WS_Connection_Handle hdl)
{
timings.tcp_post_init = Util::chrono::high_resolution_clock::now();
LOG_TRACE("WebSocket post-TCP initialization event");
}

void Connection::onOpen(WS_Connection_Handle hdl) {
connection_timings_.open = Util::chrono::high_resolution_clock::now();
connection_timings_.connection_started = true;
LOG_DEBUG("WebSocket on open event - {1}", connection_timings_.toString());
timings.open = Util::chrono::high_resolution_clock::now();
timings.connection_started = true;
LOG_DEBUG("WebSocket on open event - {1}", timings.toString());
LOG_INFO("Successfully established a WebSocket connection with the PCP "
"broker at {1}", getWsUri());
connection_state_ = ConnectionState::open;
Expand All @@ -541,7 +568,8 @@ void Connection::onOpen(WS_Connection_Handle hdl) {
}

void Connection::onMessage(WS_Connection_Handle hdl,
WS_Client_Type::message_ptr msg) {
WS_Client_Type::message_ptr msg)
{
if (onMessage_callback_) {
try {
// NB: on_message_callback_ should not raise; in case of
Expand Down
5 changes: 5 additions & 0 deletions lib/src/connector/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ bool Connector::isAssociated() const
return isConnected() && session_association_.success.load();
}

ConnectionTimings Connector::getConnectionTimings() const
{
return (connection_ptr_ == nullptr ? ConnectionTimings() : connection_ptr_->timings);
}

void Connector::startMonitoring(const uint32_t max_connect_attempts,
const uint32_t connection_check_interval_s)
{
Expand Down
15 changes: 13 additions & 2 deletions lib/src/connector/timings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ namespace PCPClient {

namespace lth_loc = leatherman::locale;

ConnectionTimings::ConnectionTimings()
: start { boost::chrono::high_resolution_clock::now() } {
//
// ConnectionTimings
//

void ConnectionTimings::reset()
{
start = boost::chrono::high_resolution_clock::now();
tcp_pre_init = boost::chrono::high_resolution_clock::time_point();
tcp_post_init = boost::chrono::high_resolution_clock::time_point();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should be tcp_pre_init in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

open = boost::chrono::high_resolution_clock::time_point();
close = boost::chrono::high_resolution_clock::time_point();
connection_started = false;
connection_failed = false;
}

ConnectionTimings::Duration_us ConnectionTimings::getTCPInterval() const {
Expand Down
Loading