From 55acb857c1f2e56bb5b213a009a4ae1bb4dd5496 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Tue, 3 Sep 2013 19:01:56 +0800 Subject: [PATCH 01/12] Added column_name() method --- include/libcql/cql_result.hpp | 6 ++++++ .../internal/cql_message_result_impl.hpp | 6 ++++++ .../libcql/internal/cql_result_metadata.hpp | 7 +++++++ .../internal/cql_message_result_impl.cpp | 12 +++++++++++ src/libcql/internal/cql_result_metadata.cpp | 20 +++++++++++++++++-- 5 files changed, 49 insertions(+), 2 deletions(-) diff --git a/include/libcql/cql_result.hpp b/include/libcql/cql_result.hpp index fbb9d42..09f92f1 100644 --- a/include/libcql/cql_result.hpp +++ b/include/libcql/cql_result.hpp @@ -59,6 +59,12 @@ namespace cql { virtual bool exists(const std::string& column) const = 0; + virtual bool + column_name(int i, + std::string& output_keyspace, + std::string& output_table, + std::string& output_column) const = 0; + virtual bool column_class(int i, std::string& output) const = 0; diff --git a/include/libcql/internal/cql_message_result_impl.hpp b/include/libcql/internal/cql_message_result_impl.hpp index 5dcb7a0..4c9313b 100644 --- a/include/libcql/internal/cql_message_result_impl.hpp +++ b/include/libcql/internal/cql_message_result_impl.hpp @@ -78,6 +78,12 @@ namespace cql { bool exists(const std::string& column) const; + bool + column_name(int i, + std::string& output_keyspace, + std::string& output_table, + std::string& output_column) const; + bool column_class(int i, std::string& output) const; diff --git a/include/libcql/internal/cql_result_metadata.hpp b/include/libcql/internal/cql_result_metadata.hpp index 91341b0..732fa5f 100644 --- a/include/libcql/internal/cql_result_metadata.hpp +++ b/include/libcql/internal/cql_result_metadata.hpp @@ -82,6 +82,12 @@ namespace cql { const std::string& table, const std::string& column) const; + bool + column_name(int i, + std::string& output_keyspace, + std::string& output_table, + std::string& output_column) const; + bool column_class(int i, std::string& output) const; @@ -179,6 +185,7 @@ namespace cql { private: struct option_t { + column_name_t name; cql::cql_column_type_enum primary_type; cql::cql_column_type_enum collection_primary_type; cql::cql_column_type_enum collection_secondary_type; diff --git a/src/libcql/internal/cql_message_result_impl.cpp b/src/libcql/internal/cql_message_result_impl.cpp index 23542b2..5749b5f 100644 --- a/src/libcql/internal/cql_message_result_impl.cpp +++ b/src/libcql/internal/cql_message_result_impl.cpp @@ -219,6 +219,18 @@ cql::cql_message_result_impl_t::exists(const std::string& column) const return _metadata.exists(column); } +bool +cql::cql_message_result_impl_t::column_name(int i, + std::string& output_keyspace, + std::string& output_table, + std::string& output_column) const +{ + return _metadata.column_name(i, + output_keyspace, + output_table, + output_column); +} + bool cql::cql_message_result_impl_t::column_class(int i, std::string& output) const diff --git a/src/libcql/internal/cql_result_metadata.cpp b/src/libcql/internal/cql_result_metadata.cpp index bad6539..f006bb7 100644 --- a/src/libcql/internal/cql_result_metadata.cpp +++ b/src/libcql/internal/cql_result_metadata.cpp @@ -106,8 +106,8 @@ cql::cql_result_metadata_t::read(cql::cql_byte_t* input) input = cql::decode_option(input, option.collection_secondary_type, option.collection_secondary_class); } - column_name_t name(keyspace_name, table_name, column_name); - _column_name_idx.insert(column_name_idx_t::value_type(name, i)); + option.name = column_name_t(keyspace_name, table_name, column_name); + _column_name_idx.insert(column_name_idx_t::value_type(option.name, i)); _columns.push_back(option); } return input; @@ -173,6 +173,22 @@ cql::cql_result_metadata_t::global_table(const std::string& table) _global_table_name = table; } +bool +cql::cql_result_metadata_t::column_name(int i, + std::string& output_keyspace, + std::string& output_table, + std::string& output_column) const +{ + if (i > _column_count || i < 0) { + return false; + } + + output_keyspace = _columns[i].name.get<0>(); + output_table = _columns[i].name.get<1>(); + output_column = _columns[i].name.get<2>(); + return true; +} + bool cql::cql_result_metadata_t::column_class(int i, std::string& output) const From cc6bb3d4c2e7bb00449e8f5c8a070555b856d0ea Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Tue, 1 Oct 2013 14:30:12 +0800 Subject: [PATCH 02/12] Added timestamp to bigint output --- src/libcql/internal/cql_message_result_impl.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libcql/internal/cql_message_result_impl.cpp b/src/libcql/internal/cql_message_result_impl.cpp index 5749b5f..cf1e301 100644 --- a/src/libcql/internal/cql_message_result_impl.cpp +++ b/src/libcql/internal/cql_message_result_impl.cpp @@ -409,7 +409,8 @@ bool cql::cql_message_result_impl_t::get_bigint(int i, cql::cql_bigint_t& output) const { - if (is_valid(i, cql::CQL_COLUMN_TYPE_BIGINT)) { + if (is_valid(i, cql::CQL_COLUMN_TYPE_BIGINT) + || is_valid(i, cql::CQL_COLUMN_TYPE_TIMESTAMP)) { cql::decode_bigint(_row[i] + sizeof(cql_int_t), output); return true; } From a51046dafac80a620c47e9490439bb3ecac197ca Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Mon, 16 Dec 2013 01:50:06 +0800 Subject: [PATCH 03/12] Modified bigint accessor to also read counters --- src/libcql/internal/cql_message_result_impl.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libcql/internal/cql_message_result_impl.cpp b/src/libcql/internal/cql_message_result_impl.cpp index cf1e301..e5e53e1 100644 --- a/src/libcql/internal/cql_message_result_impl.cpp +++ b/src/libcql/internal/cql_message_result_impl.cpp @@ -410,7 +410,8 @@ cql::cql_message_result_impl_t::get_bigint(int i, cql::cql_bigint_t& output) const { if (is_valid(i, cql::CQL_COLUMN_TYPE_BIGINT) - || is_valid(i, cql::CQL_COLUMN_TYPE_TIMESTAMP)) { + || is_valid(i, cql::CQL_COLUMN_TYPE_TIMESTAMP) + || is_valid(i, cql::CQL_COLUMN_TYPE_COUNTER)) { cql::decode_bigint(_row[i] + sizeof(cql_int_t), output); return true; } From 7d68fbf818fb735f6874a6231e92265f7616cc3c Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Mon, 20 Jan 2014 16:11:16 +0800 Subject: [PATCH 04/12] Fixed bug in credential transmission --- src/libcql/internal/cql_message_credentials_impl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libcql/internal/cql_message_credentials_impl.cpp b/src/libcql/internal/cql_message_credentials_impl.cpp index af7ba5a..c465bf5 100644 --- a/src/libcql/internal/cql_message_credentials_impl.cpp +++ b/src/libcql/internal/cql_message_credentials_impl.cpp @@ -102,11 +102,11 @@ cql::cql_message_credentials_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_credentials_impl_t::prepare(cql::cql_error_t*) { - size_t size = 0; + size_t size = 2; BOOST_FOREACH(const credentials_map_t::value_type& pair, _credentials) { size += pair.first.size(); size += pair.second.size(); - size += 2; + size += 4; } _buffer->resize(size); From 2c498b8a4a4ea560a3ab6f453bbb145e47562a42 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Mon, 20 Jan 2014 16:12:23 +0800 Subject: [PATCH 05/12] Catching connect errors in connect callback --- include/libcql/internal/cql_client_impl.hpp | 24 +++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index 80c392e..f877524 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -148,6 +148,10 @@ namespace cql { _port = port; _connect_callback = callback; _connect_errback = errback; + _connect_message_callback_wrap message_callback(callback); + _connect_error_callback_wrap error_callback(errback); + _callback_storage.put(_reserved_stream_id, callback_pair_t(message_callback, error_callback)); + resolve(); } @@ -335,6 +339,26 @@ namespace cql { } private: + struct _connect_message_callback_wrap + { + cql_connection_callback_t callback; + _connect_message_callback_wrap(cql_connection_callback_t _callback) : callback(_callback) {}; + void operator()(cql_client_t& client, const cql::cql_stream_id_t, cql::cql_result_t*) + { + callback(client); + } + }; + + struct _connect_error_callback_wrap + { + cql_connection_errback_t callback; + _connect_error_callback_wrap(cql_connection_errback_t _callback) : callback(_callback) {}; + void operator()(cql_client_t& client, const cql::cql_stream_id_t, const cql::cql_error_t& error) + { + callback(client, error); + } + }; + inline cql::cql_error_t create_stream_id_error() { From 7e74cebe32bd29e32f91d7af99f74eb915780d81 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Fri, 24 Jan 2014 10:34:39 +0800 Subject: [PATCH 06/12] Replacing broken small_indexed_storage impl. Fixes crashes --- .../libcql/internal/cql_callback_storage.hpp | 132 +++++------------- 1 file changed, 37 insertions(+), 95 deletions(-) diff --git a/include/libcql/internal/cql_callback_storage.hpp b/include/libcql/internal/cql_callback_storage.hpp index 18577b9..036e47a 100644 --- a/include/libcql/internal/cql_callback_storage.hpp +++ b/include/libcql/internal/cql_callback_storage.hpp @@ -21,6 +21,7 @@ #include #include +#include #include namespace cql { @@ -28,138 +29,79 @@ namespace cql { template class small_indexed_storage { private: - template - class entry_t { - struct { - struct { - // -1 - there is no next free index - // -2 - flags that his one has been allocated (for checks and stuff) - int32_t index; // signed because might be -1 - int32_t count; // doesn't matter never should be negative (see index restrictions) - } next_free; - _value_t value; - } e; - - public: - _value_t& - value() - { - return e.value; - } - - const _value_t& - value() const - { - return e.value; - } - - int8_t - next_free_index() const - { - return e.next_free.index; - } - - // not counting this (so 0 means there are no free blocks behind this one) - int8_t - next_free_cnt() const - { - return e.next_free.count; - } - - // count does not include this (so 0 means there are no free blocks behind this one) - // you won't need to set cnt in any case except initial allocation - void - set_next_free(int32_t index, - int32_t cnt = 0) - { - e.next_free.index = index; - e.next_free.count = cnt; - } - - void - set_value(const _value_t& val) - { - e.value = val; - } - - bool - is_allocated() - { - return e.next_free.index == -2; - } - - void - set_allocated() - { - e.next_free.index = -2; - } + template + struct bucket { + t value; + int is_allocated; }; - - typedef entry_t array_entry_t; - array_entry_t* array; - int32_t next_free_index; + + std::vector< bucket* > array; + std::vector frees; public: - explicit - small_indexed_storage(uint16_t size) : - next_free_index(0) + small_indexed_storage(int32_t size) { - array = new array_entry_t[size]; - array[0].set_next_free(-1, size-1); + array.reserve(size); } - ~small_indexed_storage() { - delete [] array; + for(size_t i=0; i= 0) { - if (array[next_free_index].next_free_cnt() > 0) { - array[++next_free_index].set_next_free(array[result].next_free_index(), array[result].next_free_cnt()-1); - } - else { - next_free_index = array[next_free_index].next_free_index(); - } - // mark it allocated - array[result].set_allocated(); + if (frees.size()) { + int32_t ret = frees.back(); + frees.pop_back(); + return ret; + } else { + bucket* ptr = (bucket*)calloc(1, sizeof(bucket)); + array.push_back(ptr); + return array.size() - 1; } - - return result; } void release(int32_t index) { - array[index].set_next_free(next_free_index); - next_free_index = index; + if (array[index]->is_allocated) { + array[index]->value.~value_t(); + array[index]->is_allocated = 0; + } + frees.push_back(index); } bool has(int32_t index) const { - return array[index].is_allocated(); + return (int32_t)array.size() > index && array[index]->is_allocated; } value_t& get(int32_t index) { - return array[index].value(); + return array[index]->value; } const value_t& get(int32_t index) const { - return array[index].value(); + return array[index]->value; } void put(int32_t index, const value_t& val) { - array[index].set_value(val); + if (array[index]->is_allocated) { + array[index]->value.~value_t(); + } + array[index]->is_allocated = 1; + new (&array[index]->value) value_t(val); } }; From 1b8a29a1a2c26cc75e841ea7dd23d8ae454aeac2 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Fri, 24 Jan 2014 11:37:07 +0800 Subject: [PATCH 07/12] Fixing race condition resulting in block forever with very quick query results --- include/libcql/internal/cql_client_impl.hpp | 81 +++++++-------------- 1 file changed, 28 insertions(+), 53 deletions(-) diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index f877524..e6b28a8 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -151,7 +151,7 @@ namespace cql { _connect_message_callback_wrap message_callback(callback); _connect_error_callback_wrap error_callback(errback); _callback_storage.put(_reserved_stream_id, callback_pair_t(message_callback, error_callback)); - + resolve(); } @@ -202,20 +202,7 @@ namespace cql { cql::cql_client_t::cql_message_callback_t callback, cql::cql_client_t::cql_message_errback_t errback) { - cql::cql_stream_id_t stream = create_request(new cql::cql_message_query_impl_t(query, consistency), - boost::bind(&cql_client_impl_t::write_handle, - this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - - if (stream != -1) { - _callback_storage.put(stream, callback_pair_t(callback, errback)); - return stream; - } - else { - errback(*this, stream, create_stream_id_error()); - return -1; - } + return _execute(new cql::cql_message_query_impl_t(query, consistency), callback, errback); } cql::cql_stream_id_t @@ -223,20 +210,7 @@ namespace cql { cql::cql_client_t::cql_message_callback_t callback, cql::cql_client_t::cql_message_errback_t errback) { - cql::cql_stream_id_t stream = create_request(new cql::cql_message_prepare_impl_t(query), - boost::bind(&cql_client_impl_t::write_handle, - this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - - if (stream != -1) { - _callback_storage.put(stream, callback_pair_t(callback, errback)); - return stream; - } - else { - errback(*this, stream, create_stream_id_error()); - return -1; - } + return _execute(new cql::cql_message_prepare_impl_t(query), callback, errback); } cql::cql_stream_id_t @@ -244,20 +218,7 @@ namespace cql { cql::cql_client_t::cql_message_callback_t callback, cql::cql_client_t::cql_message_errback_t errback) { - cql::cql_stream_id_t stream = create_request(message->impl(), - boost::bind(&cql_client_impl_t::write_handle, - this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - - if (stream != -1) { - _callback_storage.put(stream, callback_pair_t(callback, errback)); - return stream; - } - else { - errback(*this, stream, create_stream_id_error()); - return -1; - } + return _execute(message->impl(), callback, errback); } bool @@ -339,6 +300,25 @@ namespace cql { } private: + cql::cql_stream_id_t + _execute(cql::cql_message_t* message, + cql::cql_client_t::cql_message_callback_t& callback, + cql::cql_client_t::cql_message_errback_t& errback) + { + cql::cql_stream_id_t stream = _callback_storage.allocate(); + if (stream == -1) { + errback(*this, stream, create_stream_id_error()); + return -1; + } + _callback_storage.put(stream, callback_pair_t(callback, errback)); + return create_request(message, + boost::bind(&cql_client_impl_t::write_handle, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred), + stream); + } + struct _connect_message_callback_wrap { cql_connection_callback_t callback; @@ -487,15 +467,11 @@ namespace cql { cql::cql_stream_id_t create_request(cql::cql_message_t* message, write_callback_t callback, - bool use_reserved_stream_id = false) + cql::cql_stream_id_t id) { cql::cql_error_t err; message->prepare(&err); - cql::cql_stream_id_t id = use_reserved_stream_id ? _reserved_stream_id : _callback_storage.allocate(); - if (id == -1) { - return id; - } cql::cql_header_impl_t header(CQL_VERSION_1_REQUEST, CQL_FLAG_NOFLAG, id, @@ -718,8 +694,7 @@ namespace cql { this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), - true); - + _reserved_stream_id); _events_registered = true; } @@ -731,7 +706,7 @@ namespace cql { this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), - true); + _reserved_stream_id); // start listening header_read(); @@ -747,7 +722,7 @@ namespace cql { this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), - true); + _reserved_stream_id); } void @@ -760,7 +735,7 @@ namespace cql { this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred), - true); + _reserved_stream_id); } inline void From a1d178afd7aa9651f98d07f6e09ec17792c726a1 Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Fri, 24 Jan 2014 13:14:33 +0800 Subject: [PATCH 08/12] Fixed memory leak --- .../libcql/internal/cql_callback_storage.hpp | 3 +- include/libcql/internal/cql_client_impl.hpp | 49 ++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/include/libcql/internal/cql_callback_storage.hpp b/include/libcql/internal/cql_callback_storage.hpp index 036e47a..788f20d 100644 --- a/include/libcql/internal/cql_callback_storage.hpp +++ b/include/libcql/internal/cql_callback_storage.hpp @@ -34,7 +34,7 @@ namespace cql { t value; int is_allocated; }; - + std::vector< bucket* > array; std::vector frees; public: @@ -69,6 +69,7 @@ namespace cql { release(int32_t index) { if (array[index]->is_allocated) { + array[index]->value.release(); array[index]->value.~value_t(); array[index]->is_allocated = 0; } diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index e6b28a8..0afb1d3 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -79,8 +79,30 @@ namespace cql { public: typedef std::list request_buffer_t; - typedef std::pair callback_pair_t; - typedef cql::small_indexed_storage callback_storage_t; + + struct callback_item_t + { + callback_item_t(cql_message_callback_t _message_callback, + cql_message_errback_t _error_callback, + cql::cql_message_t* _message = 0) : message_callback(_message_callback), + error_callback(_error_callback), + message(_message) + {} + + void release() + { + if (message) { + delete(message); + message = 0; + } + } + + cql_message_callback_t message_callback; + cql_message_errback_t error_callback; + cql::cql_message_t* message; + }; + + typedef cql::small_indexed_storage callback_storage_t; typedef boost::function write_callback_t; @@ -150,7 +172,7 @@ namespace cql { _connect_errback = errback; _connect_message_callback_wrap message_callback(callback); _connect_error_callback_wrap error_callback(errback); - _callback_storage.put(_reserved_stream_id, callback_pair_t(message_callback, error_callback)); + _callback_storage.put(_reserved_stream_id, callback_item_t(message_callback, error_callback)); resolve(); } @@ -202,7 +224,7 @@ namespace cql { cql::cql_client_t::cql_message_callback_t callback, cql::cql_client_t::cql_message_errback_t errback) { - return _execute(new cql::cql_message_query_impl_t(query, consistency), callback, errback); + return _execute(new cql::cql_message_query_impl_t(query, consistency), callback, errback, true); } cql::cql_stream_id_t @@ -210,7 +232,7 @@ namespace cql { cql::cql_client_t::cql_message_callback_t callback, cql::cql_client_t::cql_message_errback_t errback) { - return _execute(new cql::cql_message_prepare_impl_t(query), callback, errback); + return _execute(new cql::cql_message_prepare_impl_t(query), callback, errback, true); } cql::cql_stream_id_t @@ -303,14 +325,19 @@ namespace cql { cql::cql_stream_id_t _execute(cql::cql_message_t* message, cql::cql_client_t::cql_message_callback_t& callback, - cql::cql_client_t::cql_message_errback_t& errback) + cql::cql_client_t::cql_message_errback_t& errback, + bool cleanup = false) { cql::cql_stream_id_t stream = _callback_storage.allocate(); if (stream == -1) { errback(*this, stream, create_stream_id_error()); return -1; } - _callback_storage.put(stream, callback_pair_t(callback, errback)); + if (cleanup) { + _callback_storage.put(stream, callback_item_t(callback, errback, message)); + } else { + _callback_storage.put(stream, callback_item_t(callback, errback)); + } return create_request(message, boost::bind(&cql_client_impl_t::write_handle, this, @@ -612,9 +639,9 @@ namespace cql { log(CQL_LOG_DEBUG, "received result message " + header.str()); cql_stream_id_t stream_id = header.stream(); if (_callback_storage.has(stream_id)) { - callback_pair_t callback_pair = _callback_storage.get(stream_id); + callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); - callback_pair.first(*this, header.stream(), dynamic_cast(_response_message.release())); + callback_pair.message_callback(*this, header.stream(), dynamic_cast(_response_message.release())); } else { log(CQL_LOG_INFO, "no callback found for message " + header.str()); } @@ -633,14 +660,14 @@ namespace cql { { cql_stream_id_t stream_id = header.stream(); if (_callback_storage.has(stream_id)) { - callback_pair_t callback_pair = _callback_storage.get(stream_id); + callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); cql::cql_message_error_impl_t* m = dynamic_cast(_response_message.get()); cql::cql_error_t cql_error; cql_error.cassandra = true; cql_error.code = m->code(); cql_error.message = m->message(); - callback_pair.second(*this, header.stream(), cql_error); + callback_pair.error_callback(*this, header.stream(), cql_error); } else { log(CQL_LOG_INFO, "no callback found for message " + header.str() + " " + _response_message->str()); } From ebf5dea3299b5638ae23be1eb139481ce923dade Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Tue, 28 Jan 2014 02:20:56 +0800 Subject: [PATCH 09/12] Commenting expensive log function calls --- include/libcql/internal/cql_client_impl.hpp | 55 ++++++++++++--------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index 0afb1d3..7afed40 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -70,6 +70,12 @@ #include "libcql/internal/cql_message_supported_impl.hpp" #include "libcql/cql_serialization.hpp" +#ifndef CLIENTLOG +#define LOG(Lvl, Message) {}; +#else +#define LOG(Lvl, Message) log(Lvl, Message); +#endif + namespace cql { template @@ -259,7 +265,7 @@ namespace cql { close() { _closing = true; - log(CQL_LOG_INFO, "closing connection"); + LOG(CQL_LOG_INFO, "closing connection"); boost::system::error_code ec; _transport->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); _transport->lowest_layer().close(); @@ -424,7 +430,7 @@ namespace cql { void resolve() { - log(CQL_LOG_DEBUG, "resolving remote host " + _server + ":" + boost::lexical_cast(_port)); + LOG(CQL_LOG_DEBUG, "resolving remote host " + _server + ":" + boost::lexical_cast(_port)); boost::asio::ip::tcp::resolver::query query(_server, boost::lexical_cast(_port)); _resolver.async_resolve(query, boost::bind(&cql_client_impl_t::resolve_handle, @@ -438,7 +444,7 @@ namespace cql { boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { if (!err) { - log(CQL_LOG_DEBUG, "resolved remote host, attempting to connect"); + LOG(CQL_LOG_DEBUG, "resolved remote host, attempting to connect"); #if BOOST_VERSION >= 104800 boost::asio::async_connect(_transport->lowest_layer(), endpoint_iterator, @@ -453,7 +459,7 @@ namespace cql { #endif } else { - log(CQL_LOG_CRITICAL, "error resolving remote host " + err.message()); + LOG(CQL_LOG_CRITICAL, "error resolving remote host " + err.message()); check_transport_err(err); } } @@ -462,7 +468,7 @@ namespace cql { connect_handle(const boost::system::error_code& err) { if (!err) { - log(CQL_LOG_DEBUG, "connection successful to remote host"); + LOG(CQL_LOG_DEBUG, "connection successful to remote host"); if (_transport->requires_handshake()) { _transport->async_handshake(boost::bind(&cql_client_impl_t::handshake_handle, this, @@ -473,7 +479,7 @@ namespace cql { } } else { - log(CQL_LOG_CRITICAL, "error connecting to remote host " + err.message()); + LOG(CQL_LOG_CRITICAL, "error connecting to remote host " + err.message()); check_transport_err(err); } } @@ -482,11 +488,11 @@ namespace cql { handshake_handle(const boost::system::error_code& err) { if (!err) { - log(CQL_LOG_DEBUG, "successful ssl handshake with remote host"); + LOG(CQL_LOG_DEBUG, "successful ssl handshake with remote host"); options_write(); } else { - log(CQL_LOG_CRITICAL, "error performing ssl handshake " + err.message()); + LOG(CQL_LOG_CRITICAL, "error performing ssl handshake " + err.message()); check_transport_err(err); } } @@ -506,7 +512,7 @@ namespace cql { message->size()); header.prepare(&err); - log(CQL_LOG_DEBUG, "sending message: " + header.str() + " " + message->str()); + LOG(CQL_LOG_DEBUG, "sending message: " + header.str() + " " + message->str()); std::vector buf; buf.push_back(boost::asio::buffer(&(*header.buffer())[0], header.size())); @@ -536,10 +542,11 @@ namespace cql { } } if (!err) { - log(CQL_LOG_DEBUG, "wrote to socket " + boost::lexical_cast(num_bytes) + " bytes"); + num_bytes++; + LOG(CQL_LOG_DEBUG, "wrote to socket " + boost::lexical_cast(num_bytes) + " bytes"); } else { - log(CQL_LOG_ERROR, "error writing to socket " + err.message()); + LOG(CQL_LOG_ERROR, "error writing to socket " + err.message()); check_transport_err(err); } } @@ -563,15 +570,15 @@ namespace cql { if (!err) { cql::cql_error_t decode_err; if (_response_header.consume(&decode_err)) { - log(CQL_LOG_DEBUG, "received header for message " + _response_header.str()); + LOG(CQL_LOG_DEBUG, "received header for message " + _response_header.str()); body_read(_response_header); } else { - log(CQL_LOG_ERROR, "error decoding header " + _response_header.str()); + LOG(CQL_LOG_ERROR, "error decoding header " + _response_header.str()); } } else { - log(CQL_LOG_ERROR, "error reading header " + err.message()); + LOG(CQL_LOG_ERROR, "error reading header " + err.message()); check_transport_err(err); } } @@ -624,7 +631,7 @@ namespace cql { body_read_handle(const cql::cql_header_impl_t& header, const boost::system::error_code& err) { - log(CQL_LOG_DEBUG, "received body for message " + header.str()); + LOG(CQL_LOG_DEBUG, "received body for message " + header.str()); if (!err) { @@ -636,21 +643,21 @@ namespace cql { case CQL_OPCODE_RESULT: { - log(CQL_LOG_DEBUG, "received result message " + header.str()); + LOG(CQL_LOG_DEBUG, "received result message " + header.str()); cql_stream_id_t stream_id = header.stream(); if (_callback_storage.has(stream_id)) { callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); callback_pair.message_callback(*this, header.stream(), dynamic_cast(_response_message.release())); } else { - log(CQL_LOG_INFO, "no callback found for message " + header.str()); + LOG(CQL_LOG_INFO, "no callback found for message " + header.str()); } break; } case CQL_OPCODE_EVENT: - log(CQL_LOG_DEBUG, "received event message"); + LOG(CQL_LOG_DEBUG, "received event message"); if (_event_callback) { _event_callback(*this, dynamic_cast(_response_message.release())); } @@ -669,12 +676,12 @@ namespace cql { cql_error.message = m->message(); callback_pair.error_callback(*this, header.stream(), cql_error); } else { - log(CQL_LOG_INFO, "no callback found for message " + header.str() + " " + _response_message->str()); + LOG(CQL_LOG_INFO, "no callback found for message " + header.str() + " " + _response_message->str()); } break; } case CQL_OPCODE_READY: - log(CQL_LOG_DEBUG, "received ready message"); + LOG(CQL_LOG_DEBUG, "received ready message"); if (!_events_registered) { events_register(); } @@ -688,7 +695,7 @@ namespace cql { break; case CQL_OPCODE_SUPPORTED: - log(CQL_LOG_DEBUG, "received supported message " + _response_message->str()); + LOG(CQL_LOG_DEBUG, "received supported message " + _response_message->str()); startup_write(); break; @@ -697,15 +704,15 @@ namespace cql { break; default: - log(CQL_LOG_ERROR, "unhandled opcode " + header.str()); + LOG(CQL_LOG_ERROR, "unhandled opcode " + header.str()); } } else { - log(CQL_LOG_ERROR, "error deserializing result message " + consume_error.message); + LOG(CQL_LOG_ERROR, "error deserializing result message " + consume_error.message); } } else { - log(CQL_LOG_ERROR, "error reading body " + err.message()); + LOG(CQL_LOG_ERROR, "error reading body " + err.message()); check_transport_err(err); } header_read(); // loop From 0d7b3a49fd0c4ab07b907a2652de308b0fb15a9b Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Sat, 8 Feb 2014 03:00:01 +0800 Subject: [PATCH 10/12] Moving buffer up to cql_message_t, less copying, removing shared_ptr<> --- include/libcql/cql_future_result.hpp | 2 +- include/libcql/internal/cql_client_impl.hpp | 43 ++++++------------- include/libcql/internal/cql_header_impl.hpp | 2 +- include/libcql/internal/cql_message.hpp | 20 +++++++-- .../internal/cql_message_credentials_impl.hpp | 11 ----- .../internal/cql_message_error_impl.hpp | 8 +--- .../internal/cql_message_event_impl.hpp | 10 ----- .../internal/cql_message_execute_impl.hpp | 8 +--- .../internal/cql_message_options_impl.hpp | 12 ------ .../internal/cql_message_prepare_impl.hpp | 11 ----- .../internal/cql_message_query_impl.hpp | 11 ----- .../internal/cql_message_ready_impl.hpp | 12 ------ .../internal/cql_message_register_impl.hpp | 11 ----- .../internal/cql_message_result_impl.hpp | 8 ---- .../internal/cql_message_startup_impl.hpp | 11 ----- .../internal/cql_message_supported_impl.hpp | 11 ----- src/libcql/internal/cql_header_impl.cpp | 12 +++--- .../internal/cql_message_credentials_impl.cpp | 26 ++--------- .../internal/cql_message_error_impl.cpp | 29 ++----------- .../internal/cql_message_event_impl.cpp | 26 +---------- .../internal/cql_message_execute_impl.cpp | 25 ++--------- .../internal/cql_message_options_impl.cpp | 20 --------- .../internal/cql_message_prepare_impl.cpp | 27 ++---------- .../internal/cql_message_query_impl.cpp | 31 ++----------- .../internal/cql_message_ready_impl.cpp | 20 --------- .../internal/cql_message_register_impl.cpp | 26 ++--------- .../internal/cql_message_result_impl.cpp | 26 +---------- .../internal/cql_message_startup_impl.cpp | 30 ++----------- .../internal/cql_message_supported_impl.cpp | 24 +---------- src/libcql/internal/cql_result_metadata.cpp | 15 ++++--- 30 files changed, 77 insertions(+), 451 deletions(-) diff --git a/include/libcql/cql_future_result.hpp b/include/libcql/cql_future_result.hpp index 8b6c895..eac69c9 100644 --- a/include/libcql/cql_future_result.hpp +++ b/include/libcql/cql_future_result.hpp @@ -56,7 +56,7 @@ namespace cql { cql::cql_client_t* client; cql::cql_stream_id_t stream; - boost::shared_ptr result; + cql::cql_result_t* result; cql::cql_error_t error; }; diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index 7afed40..2954c52 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -84,8 +84,6 @@ namespace cql { { public: - typedef std::list request_buffer_t; - struct callback_item_t { callback_item_t(cql_message_callback_t _message_callback, @@ -118,7 +116,6 @@ namespace cql { _port(0), _resolver(io_service), _transport(transport), - _request_buffer(0), _callback_storage(128), _connect_callback(0), _connect_errback(0), @@ -138,7 +135,6 @@ namespace cql { _port(0), _resolver(io_service), _transport(transport), - _request_buffer(0), _callback_storage(128), _connect_callback(0), _connect_errback(0), @@ -515,14 +511,9 @@ namespace cql { LOG(CQL_LOG_DEBUG, "sending message: " + header.str() + " " + message->str()); std::vector buf; - buf.push_back(boost::asio::buffer(&(*header.buffer())[0], header.size())); - _request_buffer.push_back(header.buffer()); + buf.push_back(boost::asio::buffer(&header.buffer()[0], header.size())); if (header.length() != 0) { - buf.push_back(boost::asio::buffer(&(*message->buffer())[0], message->size())); - _request_buffer.push_back(message->buffer()); - } - else { - _request_buffer.push_back(cql_message_buffer_t()); + buf.push_back(boost::asio::buffer(&message->buffer()[0], message->size())); } boost::asio::async_write(*_transport, buf, callback); @@ -534,13 +525,6 @@ namespace cql { write_handle(const boost::system::error_code& err, std::size_t num_bytes) { - if (!_request_buffer.empty()) { - // the write request is complete free the request buffers - _request_buffer.pop_front(); - if (!_request_buffer.empty()) { - _request_buffer.pop_front(); - } - } if (!err) { num_bytes++; LOG(CQL_LOG_DEBUG, "wrote to socket " + boost::lexical_cast(num_bytes) + " bytes"); @@ -555,7 +539,7 @@ namespace cql { header_read() { boost::asio::async_read(*_transport, - boost::asio::buffer(&(*_response_header.buffer())[0], _response_header.size()), + boost::asio::buffer(&_response_header.buffer()[0], _response_header.size()), #if BOOST_VERSION >= 104800 boost::asio::transfer_exactly(sizeof(cql::cql_header_impl_t)), #else @@ -590,34 +574,36 @@ namespace cql { switch (header.opcode()) { case CQL_OPCODE_ERROR: - _response_message.reset(new cql::cql_message_error_impl_t(header.length())); + _response_message.reset(new cql::cql_message_error_impl_t()); break; case CQL_OPCODE_RESULT: - _response_message.reset(new cql::cql_message_result_impl_t(header.length())); + if (!_response_message.get() || _response_message->opcode() != header.opcode()) { + _response_message.reset(new cql::cql_message_result_impl_t()); + } break; case CQL_OPCODE_SUPPORTED: - _response_message.reset(new cql::cql_message_supported_impl_t(header.length())); + _response_message.reset(new cql::cql_message_supported_impl_t()); break; case CQL_OPCODE_READY: - _response_message.reset(new cql::cql_message_ready_impl_t(header.length())); + _response_message.reset(new cql::cql_message_ready_impl_t()); break; case CQL_OPCODE_EVENT: - _response_message.reset(new cql::cql_message_event_impl_t(header.length())); + _response_message.reset(new cql::cql_message_event_impl_t()); break; default: // need some bucket to put the data so we can get past the unknown // body in the stream it will be discarded by the body_read_handle - _response_message.reset(new cql::cql_message_result_impl_t(header.length())); + _response_message.reset(new cql::cql_message_result_impl_t()); break; } - + _response_message->buffer().resize(header.length()); boost::asio::async_read(*_transport, - boost::asio::buffer(&(*_response_message->buffer())[0], _response_message->size()), + boost::asio::buffer(&_response_message->buffer()[0], _response_message->size()), #if BOOST_VERSION >= 104800 boost::asio::transfer_exactly(header.length()), #else @@ -648,7 +634,7 @@ namespace cql { if (_callback_storage.has(stream_id)) { callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); - callback_pair.message_callback(*this, header.stream(), dynamic_cast(_response_message.release())); + callback_pair.message_callback(*this, header.stream(), dynamic_cast(_response_message.get())); } else { LOG(CQL_LOG_INFO, "no callback found for message " + header.str()); } @@ -794,7 +780,6 @@ namespace cql { boost::asio::ip::tcp::resolver _resolver; std::auto_ptr _transport; cql::cql_stream_id_t _stream_counter; - request_buffer_t _request_buffer; cql::cql_header_impl_t _response_header; std::auto_ptr _response_message; callback_storage_t _callback_storage; diff --git a/include/libcql/internal/cql_header_impl.hpp b/include/libcql/internal/cql_header_impl.hpp index 6f9b490..a10e990 100644 --- a/include/libcql/internal/cql_header_impl.hpp +++ b/include/libcql/internal/cql_header_impl.hpp @@ -47,7 +47,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t + cql_message_buffer_t& buffer(); cql_int_t diff --git a/include/libcql/internal/cql_message.hpp b/include/libcql/internal/cql_message.hpp index 9c76d78..b54a3d3 100644 --- a/include/libcql/internal/cql_message.hpp +++ b/include/libcql/internal/cql_message.hpp @@ -27,16 +27,22 @@ namespace cql { struct cql_error_t; - typedef boost::shared_ptr > cql_message_buffer_t; + typedef std::vector cql_message_buffer_t; class cql_message_t { public: + cql_message_t() {} + cql_message_t(size_t size) : _buffer(size) {} + virtual cql::cql_opcode_enum opcode() const = 0; virtual cql_int_t - size() const = 0; + size() const + { + return _buffer.size(); + } virtual std::string str() const = 0; @@ -47,11 +53,17 @@ namespace cql { virtual bool prepare(cql::cql_error_t* err) = 0; - virtual cql_message_buffer_t - buffer() = 0; + cql_message_buffer_t& + buffer() + { + return _buffer; + } virtual ~cql_message_t(){}; + protected: + cql::cql_message_buffer_t _buffer; + }; } // namespace cql diff --git a/include/libcql/internal/cql_message_credentials_impl.hpp b/include/libcql/internal/cql_message_credentials_impl.hpp index de32243..ca6bc3f 100644 --- a/include/libcql/internal/cql_message_credentials_impl.hpp +++ b/include/libcql/internal/cql_message_credentials_impl.hpp @@ -30,10 +30,6 @@ namespace cql { public: - cql_message_credentials_impl_t(); - - cql_message_credentials_impl_t(size_t size); - void credentials(const std::map& c); @@ -43,9 +39,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -55,13 +48,9 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: typedef std::map credentials_map_t; - cql::cql_message_buffer_t _buffer; credentials_map_t _credentials; }; diff --git a/include/libcql/internal/cql_message_error_impl.hpp b/include/libcql/internal/cql_message_error_impl.hpp index f0cbb34..0e36c1c 100644 --- a/include/libcql/internal/cql_message_error_impl.hpp +++ b/include/libcql/internal/cql_message_error_impl.hpp @@ -32,8 +32,6 @@ namespace cql { cql_message_error_impl_t(); - cql_message_error_impl_t(size_t size); - cql_message_error_impl_t(cql_int_t code, const std::string& message); @@ -52,9 +50,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -64,11 +59,10 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t + cql_message_buffer_t& buffer(); private: - cql::cql_message_buffer_t _buffer; cql_int_t _code; std::string _message; }; diff --git a/include/libcql/internal/cql_message_event_impl.hpp b/include/libcql/internal/cql_message_event_impl.hpp index 1ba47ed..4afd978 100644 --- a/include/libcql/internal/cql_message_event_impl.hpp +++ b/include/libcql/internal/cql_message_event_impl.hpp @@ -34,14 +34,9 @@ namespace cql { cql_message_event_impl_t(); - cql_message_event_impl_t(size_t size); - cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -75,12 +70,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; - cql_event_enum _event_type; cql_event_topology_enum _topology_change; cql_event_schema_enum _schema_change; diff --git a/include/libcql/internal/cql_message_execute_impl.hpp b/include/libcql/internal/cql_message_execute_impl.hpp index 5d2bdc6..c19ed54 100644 --- a/include/libcql/internal/cql_message_execute_impl.hpp +++ b/include/libcql/internal/cql_message_execute_impl.hpp @@ -38,8 +38,6 @@ namespace cql { cql_message_execute_impl_t(); - cql_message_execute_impl_t(size_t size); - cql_message_execute_impl_t(const std::vector& id, cql::cql_consistency_enum consistency); @@ -85,9 +83,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -97,13 +92,12 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t + cql_message_buffer_t& buffer(); private: typedef std::list params_container_t; - cql::cql_message_buffer_t _buffer; std::vector _query_id; cql::cql_consistency_enum _consistency; params_container_t _params; diff --git a/include/libcql/internal/cql_message_options_impl.hpp b/include/libcql/internal/cql_message_options_impl.hpp index 3d5c168..f6bd0da 100644 --- a/include/libcql/internal/cql_message_options_impl.hpp +++ b/include/libcql/internal/cql_message_options_impl.hpp @@ -29,16 +29,10 @@ namespace cql { { public: - cql_message_options_impl_t(); - - cql_message_options_impl_t(size_t size); cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -47,12 +41,6 @@ namespace cql { bool prepare(cql::cql_error_t* err); - - cql_message_buffer_t - buffer(); - - private: - cql::cql_message_buffer_t _buffer; }; } // namespace cql diff --git a/include/libcql/internal/cql_message_prepare_impl.hpp b/include/libcql/internal/cql_message_prepare_impl.hpp index b03db50..1bfddf7 100644 --- a/include/libcql/internal/cql_message_prepare_impl.hpp +++ b/include/libcql/internal/cql_message_prepare_impl.hpp @@ -30,10 +30,6 @@ namespace cql { public: - cql_message_prepare_impl_t(); - - cql_message_prepare_impl_t(size_t size); - cql_message_prepare_impl_t(const std::string& query); const std::string& @@ -45,9 +41,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -57,11 +50,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; std::string _query; }; diff --git a/include/libcql/internal/cql_message_query_impl.hpp b/include/libcql/internal/cql_message_query_impl.hpp index f431f8d..317e215 100644 --- a/include/libcql/internal/cql_message_query_impl.hpp +++ b/include/libcql/internal/cql_message_query_impl.hpp @@ -30,10 +30,6 @@ namespace cql { public: - cql_message_query_impl_t(); - - cql_message_query_impl_t(size_t size); - cql_message_query_impl_t(const std::string& query, cql::cql_short_t consistency); @@ -52,9 +48,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -64,11 +57,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; cql::cql_short_t _consistency; std::string _query; }; diff --git a/include/libcql/internal/cql_message_ready_impl.hpp b/include/libcql/internal/cql_message_ready_impl.hpp index f6fc695..06aa032 100644 --- a/include/libcql/internal/cql_message_ready_impl.hpp +++ b/include/libcql/internal/cql_message_ready_impl.hpp @@ -29,16 +29,10 @@ namespace cql { { public: - cql_message_ready_impl_t(); - - cql_message_ready_impl_t(size_t size); cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -47,12 +41,6 @@ namespace cql { bool prepare(cql::cql_error_t* err); - - cql_message_buffer_t - buffer(); - - private: - cql::cql_message_buffer_t _buffer; }; } // namespace cql diff --git a/include/libcql/internal/cql_message_register_impl.hpp b/include/libcql/internal/cql_message_register_impl.hpp index fe3319c..bdd8fc0 100644 --- a/include/libcql/internal/cql_message_register_impl.hpp +++ b/include/libcql/internal/cql_message_register_impl.hpp @@ -30,16 +30,9 @@ namespace cql { public: - cql_message_register_impl_t(); - - cql_message_register_impl_t(size_t size); - cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - void events(const std::list& c); @@ -55,11 +48,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; std::list _events; }; diff --git a/include/libcql/internal/cql_message_result_impl.hpp b/include/libcql/internal/cql_message_result_impl.hpp index 4c9313b..6904539 100644 --- a/include/libcql/internal/cql_message_result_impl.hpp +++ b/include/libcql/internal/cql_message_result_impl.hpp @@ -35,7 +35,6 @@ namespace cql { { public: - cql_message_result_impl_t(size_t size); cql_message_result_impl_t(); @@ -45,9 +44,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -69,9 +65,6 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - const cql_result_metadata_t& get_metadata(); @@ -217,7 +210,6 @@ namespace cql { private: - cql::cql_message_buffer_t _buffer; cql::cql_byte_t* _pos; std::vector _row; size_t _row_pos; diff --git a/include/libcql/internal/cql_message_startup_impl.hpp b/include/libcql/internal/cql_message_startup_impl.hpp index 35fa6b7..c8d5928 100644 --- a/include/libcql/internal/cql_message_startup_impl.hpp +++ b/include/libcql/internal/cql_message_startup_impl.hpp @@ -30,10 +30,6 @@ namespace cql { public: - cql_message_startup_impl_t(); - - cql_message_startup_impl_t(size_t size); - void compression(const std::string& c); @@ -49,9 +45,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -61,11 +54,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; std::string _version; std::string _compression; }; diff --git a/include/libcql/internal/cql_message_supported_impl.hpp b/include/libcql/internal/cql_message_supported_impl.hpp index 99e25d1..cb72135 100644 --- a/include/libcql/internal/cql_message_supported_impl.hpp +++ b/include/libcql/internal/cql_message_supported_impl.hpp @@ -34,10 +34,6 @@ namespace cql { public: - cql_message_supported_impl_t(); - - cql_message_supported_impl_t(size_t size); - void compressions(const std::list& c); @@ -53,9 +49,6 @@ namespace cql { cql::cql_opcode_enum opcode() const; - cql_int_t - size() const; - std::string str() const; @@ -65,11 +58,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t - buffer(); - private: - cql::cql_message_buffer_t _buffer; std::list _versions; std::list _compressions; }; diff --git a/src/libcql/internal/cql_header_impl.cpp b/src/libcql/internal/cql_header_impl.cpp index 0cab026..f10b3ee 100644 --- a/src/libcql/internal/cql_header_impl.cpp +++ b/src/libcql/internal/cql_header_impl.cpp @@ -29,7 +29,7 @@ #define CQL_HEADER_SIZE sizeof(_version) + sizeof(_flags) + sizeof(_stream) + sizeof(_opcode) + sizeof(_length) cql::cql_header_impl_t::cql_header_impl_t() : - _buffer(new std::vector(CQL_HEADER_SIZE, 0)), + _buffer(CQL_HEADER_SIZE, 0), _version(0), _flags(0), _stream(0), @@ -42,7 +42,7 @@ cql::cql_header_impl_t::cql_header_impl_t(cql::cql_byte_t version, cql::cql_stream_id_t stream, cql::cql_byte_t opcode, cql::cql_int_t length) : - _buffer(new std::vector(CQL_HEADER_SIZE, 0)), + _buffer(CQL_HEADER_SIZE, 0), _version(version), _flags(flags), _stream(stream), @@ -50,7 +50,7 @@ cql::cql_header_impl_t::cql_header_impl_t(cql::cql_byte_t version, _length(length) {} -cql::cql_message_buffer_t +cql::cql_message_buffer_t& cql::cql_header_impl_t::buffer() { return _buffer; @@ -62,7 +62,7 @@ cql::cql_header_impl_t::str() const std::stringstream output; output << std::setfill('0') << "0x"; - BOOST_FOREACH(cql::cql_byte_t b, *_buffer) { + BOOST_FOREACH(cql::cql_byte_t b, _buffer) { output << std::setw(2) << cql::hex(b); } @@ -78,7 +78,7 @@ cql::cql_header_impl_t::str() const bool cql::cql_header_impl_t::prepare(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); stream.put(_version); @@ -92,7 +92,7 @@ cql::cql_header_impl_t::prepare(cql::cql_error_t*) bool cql::cql_header_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); _version = stream.get(); diff --git a/src/libcql/internal/cql_message_credentials_impl.cpp b/src/libcql/internal/cql_message_credentials_impl.cpp index c465bf5..45d3824 100644 --- a/src/libcql/internal/cql_message_credentials_impl.cpp +++ b/src/libcql/internal/cql_message_credentials_impl.cpp @@ -36,20 +36,6 @@ #include "libcql/internal/cql_message_credentials_impl.hpp" -cql::cql_message_credentials_impl_t::cql_message_credentials_impl_t() : - _buffer(new std::vector(0)) -{} - -cql::cql_message_credentials_impl_t::cql_message_credentials_impl_t(size_t size) : - _buffer(new std::vector(size)) -{} - -cql::cql_message_buffer_t -cql::cql_message_credentials_impl_t::buffer() -{ - return _buffer; -} - void cql::cql_message_credentials_impl_t::credentials(const std::map& c) { @@ -68,12 +54,6 @@ cql::cql_message_credentials_impl_t::opcode() const return CQL_OPCODE_CREDENTIALS; } -cql::cql_int_t -cql::cql_message_credentials_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_credentials_impl_t::str() const { @@ -92,7 +72,7 @@ cql::cql_message_credentials_impl_t::str() const bool cql::cql_message_credentials_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); cql::decode_string_map(stream, _credentials); @@ -108,9 +88,9 @@ cql::cql_message_credentials_impl_t::prepare(cql::cql_error_t*) size += pair.second.size(); size += 4; } - _buffer->resize(size); + _buffer.resize(size); - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_string_map(stream, _credentials); diff --git a/src/libcql/internal/cql_message_error_impl.cpp b/src/libcql/internal/cql_message_error_impl.cpp index 11aff4e..9fec1d7 100644 --- a/src/libcql/internal/cql_message_error_impl.cpp +++ b/src/libcql/internal/cql_message_error_impl.cpp @@ -26,20 +26,11 @@ #include "libcql/internal/cql_message_error_impl.hpp" cql::cql_message_error_impl_t::cql_message_error_impl_t() : - _buffer(new std::vector()), - _code(0), - _message() -{} - -cql::cql_message_error_impl_t::cql_message_error_impl_t(size_t size) : - _buffer(new std::vector(size)), - _code(0), - _message() + _code(0) {} cql::cql_message_error_impl_t::cql_message_error_impl_t(cql::cql_int_t code, const std::string& message) : - _buffer(new std::vector(0)), _code(code), _message(message) {} @@ -74,12 +65,6 @@ cql::cql_message_error_impl_t::opcode() const return CQL_OPCODE_ERROR; } -cql::cql_int_t -cql::cql_message_error_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_error_impl_t::str() const { @@ -89,7 +74,7 @@ cql::cql_message_error_impl_t::str() const bool cql::cql_message_error_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream input(&buffer); cql::decode_int(input, _code); @@ -100,18 +85,12 @@ cql::cql_message_error_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_error_impl_t::prepare(cql::cql_error_t*) { - _buffer->resize(sizeof(_code) + sizeof(cql::cql_short_t) + _message.size()); + _buffer.resize(sizeof(_code) + sizeof(cql::cql_short_t) + _message.size()); - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream output(&buffer); cql::encode_int(output, _code); cql::encode_string(output, _message); return true; } - -cql::cql_message_buffer_t -cql::cql_message_error_impl_t::buffer() -{ - return _buffer; -} diff --git a/src/libcql/internal/cql_message_event_impl.cpp b/src/libcql/internal/cql_message_event_impl.cpp index 4439f06..d48ea40 100644 --- a/src/libcql/internal/cql_message_event_impl.cpp +++ b/src/libcql/internal/cql_message_event_impl.cpp @@ -24,7 +24,6 @@ cql::cql_message_event_impl_t::cql_message_event_impl_t() : - _buffer(new std::vector(0)), _event_type(CQL_EVENT_TYPE_UNKOWN), _topology_change(CQL_EVENT_TOPOLOGY_UNKNOWN), _schema_change(CQL_EVENT_SCHEMA_UNKNOWN), @@ -32,21 +31,6 @@ cql::cql_message_event_impl_t::cql_message_event_impl_t() : _port(0) {} -cql::cql_message_event_impl_t::cql_message_event_impl_t(size_t size) : - _buffer(new std::vector(size)), - _event_type(CQL_EVENT_TYPE_UNKOWN), - _topology_change(CQL_EVENT_TOPOLOGY_UNKNOWN), - _schema_change(CQL_EVENT_SCHEMA_UNKNOWN), - _status_change(CQL_EVENT_STATUS_UNKNOWN), - _port(0) -{} - -cql::cql_message_buffer_t -cql::cql_message_event_impl_t::buffer() -{ - return _buffer; -} - cql::cql_opcode_enum cql::cql_message_event_impl_t::opcode() const { @@ -107,12 +91,6 @@ cql::cql_message_event_impl_t::port() const return _port; } -cql::cql_int_t -cql::cql_message_event_impl_t::size() const -{ - return _buffer->size(); -} - bool cql::cql_message_event_impl_t::consume(cql::cql_error_t*) { @@ -125,7 +103,7 @@ cql::cql_message_event_impl_t::consume(cql::cql_error_t*) _status_change = CQL_EVENT_STATUS_UNKNOWN; _schema_change = CQL_EVENT_SCHEMA_UNKNOWN; - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); std::string event_type; @@ -193,7 +171,7 @@ cql::cql_message_event_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_event_impl_t::prepare(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); diff --git a/src/libcql/internal/cql_message_execute_impl.cpp b/src/libcql/internal/cql_message_execute_impl.cpp index 1b5fcf3..60e6825 100644 --- a/src/libcql/internal/cql_message_execute_impl.cpp +++ b/src/libcql/internal/cql_message_execute_impl.cpp @@ -30,28 +30,15 @@ #include "libcql/internal/cql_message_execute_impl.hpp" cql::cql_message_execute_impl_t::cql_message_execute_impl_t() : - _buffer(new std::vector()), - _consistency(cql::CQL_CONSISTENCY_ONE) -{} - -cql::cql_message_execute_impl_t::cql_message_execute_impl_t(size_t size) : - _buffer(new std::vector(size)), _consistency(cql::CQL_CONSISTENCY_ONE) {} cql::cql_message_execute_impl_t::cql_message_execute_impl_t(const std::vector& id, cql::cql_consistency_enum consistency) : - _buffer(new std::vector()), _query_id(id), _consistency(consistency) {} -cql::cql_message_buffer_t -cql::cql_message_execute_impl_t::buffer() -{ - return _buffer; -} - const std::vector& cql::cql_message_execute_impl_t::query_id() const { @@ -148,12 +135,6 @@ cql::cql_message_execute_impl_t::opcode() const return CQL_OPCODE_EXECUTE; } -cql::cql_int_t -cql::cql_message_execute_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_execute_impl_t::str() const { @@ -174,7 +155,7 @@ cql::cql_message_execute_impl_t::str() const bool cql::cql_message_execute_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); _params.clear(); @@ -237,9 +218,9 @@ cql::cql_message_execute_impl_t::prepare(cql::cql_error_t*) BOOST_FOREACH(const param_t& p, _params) { size += p.size() + sizeof(cql_int_t); } - _buffer->resize(size); + _buffer.resize(size); - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_short_bytes(stream, _query_id); diff --git a/src/libcql/internal/cql_message_options_impl.cpp b/src/libcql/internal/cql_message_options_impl.cpp index f04c0e4..f4cf9c8 100644 --- a/src/libcql/internal/cql_message_options_impl.cpp +++ b/src/libcql/internal/cql_message_options_impl.cpp @@ -19,26 +19,6 @@ #include "libcql/internal/cql_defines.hpp" #include "libcql/internal/cql_message_options_impl.hpp" -cql::cql_message_options_impl_t::cql_message_options_impl_t() : - _buffer(new std::vector(0)) -{} - -cql::cql_message_options_impl_t::cql_message_options_impl_t(size_t size) : - _buffer(new std::vector(size, 0)) -{} - -cql::cql_message_buffer_t -cql::cql_message_options_impl_t::buffer() -{ - return _buffer; -} - -cql::cql_int_t -cql::cql_message_options_impl_t::size() const -{ - return _buffer->size(); -} - cql::cql_opcode_enum cql::cql_message_options_impl_t::opcode() const { diff --git a/src/libcql/internal/cql_message_prepare_impl.cpp b/src/libcql/internal/cql_message_prepare_impl.cpp index 942cd4e..602a4d5 100644 --- a/src/libcql/internal/cql_message_prepare_impl.cpp +++ b/src/libcql/internal/cql_message_prepare_impl.cpp @@ -23,25 +23,10 @@ #include "libcql/internal/cql_message_prepare_impl.hpp" -cql::cql_message_prepare_impl_t::cql_message_prepare_impl_t() : - _buffer(new std::vector()) -{} - -cql::cql_message_prepare_impl_t::cql_message_prepare_impl_t(size_t size) : - _buffer(new std::vector(size)) -{} - cql::cql_message_prepare_impl_t::cql_message_prepare_impl_t(const std::string& query) : - _buffer(new std::vector()), _query(query) {} -cql::cql_message_buffer_t -cql::cql_message_prepare_impl_t::buffer() -{ - return _buffer; -} - const std::string& cql::cql_message_prepare_impl_t::query() const { @@ -60,12 +45,6 @@ cql::cql_message_prepare_impl_t::opcode() const return CQL_OPCODE_PREPARE; } -cql::cql_int_t -cql::cql_message_prepare_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_prepare_impl_t::str() const { @@ -75,7 +54,7 @@ cql::cql_message_prepare_impl_t::str() const bool cql::cql_message_prepare_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); cql::decode_long_string(stream, _query); return true; @@ -84,8 +63,8 @@ cql::cql_message_prepare_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_prepare_impl_t::prepare(cql::cql_error_t*) { - _buffer->resize(_query.size() + sizeof(cql::cql_int_t)); - cql::vector_stream_t buffer(*_buffer); + _buffer.resize(_query.size() + sizeof(cql::cql_int_t)); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_long_string(stream, _query); return true; diff --git a/src/libcql/internal/cql_message_query_impl.cpp b/src/libcql/internal/cql_message_query_impl.cpp index ed45b59..4a04b94 100644 --- a/src/libcql/internal/cql_message_query_impl.cpp +++ b/src/libcql/internal/cql_message_query_impl.cpp @@ -26,31 +26,12 @@ #include "libcql/internal/cql_message_query_impl.hpp" -cql::cql_message_query_impl_t::cql_message_query_impl_t() : - _buffer(new std::vector()), - _consistency(0), - _query() -{} - -cql::cql_message_query_impl_t::cql_message_query_impl_t(size_t size) : - _buffer(new std::vector(size)), - _consistency(0), - _query() -{} - cql::cql_message_query_impl_t::cql_message_query_impl_t(const std::string& query, cql::cql_short_t consistency) : - _buffer(new std::vector()), _consistency(consistency), _query(query) {} -cql::cql_message_buffer_t -cql::cql_message_query_impl_t::buffer() -{ - return _buffer; -} - const std::string& cql::cql_message_query_impl_t::query() const { @@ -81,12 +62,6 @@ cql::cql_message_query_impl_t::opcode() const return CQL_OPCODE_QUERY; } -cql::cql_int_t -cql::cql_message_query_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_query_impl_t::str() const { @@ -96,7 +71,7 @@ cql::cql_message_query_impl_t::str() const bool cql::cql_message_query_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); cql::decode_long_string(stream, _query); stream.read(reinterpret_cast(&_consistency), sizeof(_consistency)); @@ -107,9 +82,9 @@ cql::cql_message_query_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_query_impl_t::prepare(cql::cql_error_t*) { - _buffer->resize(_query.size() + sizeof(cql::cql_int_t) + sizeof(cql::cql_short_t)); + _buffer.resize(_query.size() + sizeof(cql::cql_int_t) + sizeof(cql::cql_short_t)); - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_long_string(stream, _query); cql::encode_short(stream, _consistency); diff --git a/src/libcql/internal/cql_message_ready_impl.cpp b/src/libcql/internal/cql_message_ready_impl.cpp index e6f92e4..1eb9bb7 100644 --- a/src/libcql/internal/cql_message_ready_impl.cpp +++ b/src/libcql/internal/cql_message_ready_impl.cpp @@ -19,32 +19,12 @@ #include "libcql/internal/cql_defines.hpp" #include "libcql/internal/cql_message_ready_impl.hpp" -cql::cql_message_ready_impl_t::cql_message_ready_impl_t() : - _buffer(new std::vector()) -{} - -cql::cql_message_ready_impl_t::cql_message_ready_impl_t(size_t size) : - _buffer(new std::vector(size)) -{} - -cql::cql_message_buffer_t -cql::cql_message_ready_impl_t::buffer() -{ - return _buffer; -} - cql::cql_opcode_enum cql::cql_message_ready_impl_t::opcode() const { return CQL_OPCODE_READY; } -cql::cql_int_t -cql::cql_message_ready_impl_t::size() const -{ - return 0; -} - std::string cql::cql_message_ready_impl_t::str() const { diff --git a/src/libcql/internal/cql_message_register_impl.cpp b/src/libcql/internal/cql_message_register_impl.cpp index 52f5578..837230d 100644 --- a/src/libcql/internal/cql_message_register_impl.cpp +++ b/src/libcql/internal/cql_message_register_impl.cpp @@ -25,32 +25,12 @@ #include "libcql/internal/cql_message_register_impl.hpp" -cql::cql_message_register_impl_t::cql_message_register_impl_t() : - _buffer(new std::vector()) -{} - -cql::cql_message_register_impl_t::cql_message_register_impl_t(size_t size) : - _buffer(new std::vector(size)) -{} - -cql::cql_message_buffer_t -cql::cql_message_register_impl_t::buffer() -{ - return _buffer; -} - cql::cql_opcode_enum cql::cql_message_register_impl_t::opcode() const { return CQL_OPCODE_REGISTER; } -cql::cql_int_t -cql::cql_message_register_impl_t::size() const -{ - return _buffer->size(); -} - void cql::cql_message_register_impl_t::events(const std::list& c) { @@ -72,7 +52,7 @@ cql::cql_message_register_impl_t::str() const bool cql::cql_message_register_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); cql::decode_string_list(stream, _events); @@ -86,9 +66,9 @@ cql::cql_message_register_impl_t::prepare(cql::cql_error_t*) BOOST_FOREACH(const std::string& event, _events) { size += event.size() + sizeof(cql_short_t); } - _buffer->resize(size); + _buffer.resize(size); - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_string_list(stream, _events); diff --git a/src/libcql/internal/cql_message_result_impl.cpp b/src/libcql/internal/cql_message_result_impl.cpp index e5e53e1..a800efe 100644 --- a/src/libcql/internal/cql_message_result_impl.cpp +++ b/src/libcql/internal/cql_message_result_impl.cpp @@ -58,7 +58,6 @@ result_type_string(cql::cql_short_t t) } cql::cql_message_result_impl_t::cql_message_result_impl_t() : - _buffer(new std::vector()), _pos(0), _row_pos(0), _row_count(0), @@ -67,22 +66,6 @@ cql::cql_message_result_impl_t::cql_message_result_impl_t() : _result_type(cql::CQL_RESULT_VOID) {} -cql::cql_message_result_impl_t::cql_message_result_impl_t(size_t size) : - _buffer(new std::vector(size)), - _pos(0), - _row_pos(0), - _row_count(0), - _column_count(0), - _query_id(0), - _result_type(cql::CQL_RESULT_VOID) -{} - -cql::cql_message_buffer_t -cql::cql_message_result_impl_t::buffer() -{ - return _buffer; -} - const cql::cql_result_metadata_t& cql::cql_message_result_impl_t::get_metadata() { @@ -101,12 +84,6 @@ cql::cql_message_result_impl_t::opcode() const return CQL_OPCODE_RESULT; } -cql::cql_int_t -cql::cql_message_result_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_result_impl_t::str() const { @@ -128,10 +105,11 @@ cql::cql_message_result_impl_t::str() const bool cql::cql_message_result_impl_t::consume(cql::cql_error_t*) { + _column_count = 0; _keyspace_name.clear(); _row_count = 0; _row_pos = 0; - _pos = &((*_buffer)[0]); + _pos = &(_buffer[0]); cql::cql_int_t result_type = 0; _pos = cql::decode_int(_pos, result_type); diff --git a/src/libcql/internal/cql_message_startup_impl.cpp b/src/libcql/internal/cql_message_startup_impl.cpp index 6cf898e..55408c2 100644 --- a/src/libcql/internal/cql_message_startup_impl.cpp +++ b/src/libcql/internal/cql_message_startup_impl.cpp @@ -25,24 +25,6 @@ #include "libcql/internal/cql_message_startup_impl.hpp" -cql::cql_message_startup_impl_t::cql_message_startup_impl_t() : - _buffer(new std::vector()), - _version(), - _compression() -{} - -cql::cql_message_startup_impl_t::cql_message_startup_impl_t(size_t size) : - _buffer(new std::vector(size)), - _version(), - _compression() -{} - -cql::cql_message_buffer_t -cql::cql_message_startup_impl_t::buffer() -{ - return _buffer; -} - void cql::cql_message_startup_impl_t::compression(const std::string& c) { @@ -73,12 +55,6 @@ cql::cql_message_startup_impl_t::opcode() const return CQL_OPCODE_STARTUP; } -cql::cql_int_t -cql::cql_message_startup_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_startup_impl_t::str() const { @@ -90,7 +66,7 @@ cql::cql_message_startup_impl_t::str() const bool cql::cql_message_startup_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); std::map startup; @@ -123,8 +99,8 @@ cql::cql_message_startup_impl_t::prepare(cql::cql_error_t*) size += strlen(CQL_COMPRESSION) + _compression.size() + (2* sizeof(cql::cql_short_t)); } - _buffer->resize(size); - cql::vector_stream_t buffer(*_buffer); + _buffer.resize(size); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); cql::encode_string_map(stream, startup); return true; diff --git a/src/libcql/internal/cql_message_supported_impl.cpp b/src/libcql/internal/cql_message_supported_impl.cpp index f7625b5..5b20183 100644 --- a/src/libcql/internal/cql_message_supported_impl.cpp +++ b/src/libcql/internal/cql_message_supported_impl.cpp @@ -24,20 +24,6 @@ #include "libcql/internal/cql_message_supported_impl.hpp" -cql::cql_message_supported_impl_t::cql_message_supported_impl_t() : - _buffer(new std::vector()) -{} - -cql::cql_message_supported_impl_t::cql_message_supported_impl_t(size_t size) : - _buffer(new std::vector(size)) -{} - -cql::cql_message_buffer_t -cql::cql_message_supported_impl_t::buffer() -{ - return _buffer; -} - void cql::cql_message_supported_impl_t::compressions(const std::list& c) { @@ -68,12 +54,6 @@ cql::cql_message_supported_impl_t::opcode() const return CQL_OPCODE_SUPPORTED; } -cql::cql_int_t -cql::cql_message_supported_impl_t::size() const -{ - return _buffer->size(); -} - std::string cql::cql_message_supported_impl_t::str() const { @@ -86,7 +66,7 @@ cql::cql_message_supported_impl_t::str() const bool cql::cql_message_supported_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::istream stream(&buffer); std::map > supported; @@ -104,7 +84,7 @@ cql::cql_message_supported_impl_t::consume(cql::cql_error_t*) bool cql::cql_message_supported_impl_t::prepare(cql::cql_error_t*) { - cql::vector_stream_t buffer(*_buffer); + cql::vector_stream_t buffer(_buffer); std::ostream stream(&buffer); std::map > supported; diff --git a/src/libcql/internal/cql_result_metadata.cpp b/src/libcql/internal/cql_result_metadata.cpp index f006bb7..1f9d4ab 100644 --- a/src/libcql/internal/cql_result_metadata.cpp +++ b/src/libcql/internal/cql_result_metadata.cpp @@ -77,9 +77,13 @@ cql::cql_result_metadata_t::read(cql::cql_byte_t* input) input = cql::decode_string(input, _global_table_name); } + if (_columns.size() < (size_t)_column_count) { + _columns.resize(_column_count); + } for (int i = 0; i < _column_count; ++i) { - std::string keyspace_name; - std::string table_name; + option_t& option = _columns[i]; + std::string& keyspace_name = option.name.get<0>(); + std::string& table_name = option.name.get<1>(); if (!(_flags & CQL_RESULT_ROWS_FLAGS_GLOBAL_TABLES_SPEC)) { input = cql::decode_string(input, keyspace_name); @@ -89,10 +93,9 @@ cql::cql_result_metadata_t::read(cql::cql_byte_t* input) keyspace_name = _global_keyspace_name; table_name = _global_table_name; } - std::string column_name; + std::string& column_name = option.name.get<2>(); input = cql::decode_string(input, column_name); - option_t option; input = cql::decode_option(input, option.primary_type, option.primary_class); if (option.primary_type == cql::CQL_COLUMN_TYPE_SET || option.primary_type == cql::CQL_COLUMN_TYPE_LIST) { @@ -106,9 +109,9 @@ cql::cql_result_metadata_t::read(cql::cql_byte_t* input) input = cql::decode_option(input, option.collection_secondary_type, option.collection_secondary_class); } - option.name = column_name_t(keyspace_name, table_name, column_name); +#if 0 _column_name_idx.insert(column_name_idx_t::value_type(option.name, i)); - _columns.push_back(option); +#endif } return input; } From c90e5ffd6bf81b2ed276e9c8b9786c916f3bfa3b Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Sun, 9 Feb 2014 00:39:07 +0800 Subject: [PATCH 11/12] Removing copying of header in body_read_handle --- include/libcql/internal/cql_client_impl.hpp | 38 +++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/include/libcql/internal/cql_client_impl.hpp b/include/libcql/internal/cql_client_impl.hpp index 2954c52..a1fb474 100644 --- a/include/libcql/internal/cql_client_impl.hpp +++ b/include/libcql/internal/cql_client_impl.hpp @@ -32,6 +32,7 @@ #include #include +#include #include #if BOOST_VERSION >= 104800 #include @@ -510,12 +511,14 @@ namespace cql { LOG(CQL_LOG_DEBUG, "sending message: " + header.str() + " " + message->str()); - std::vector buf; - buf.push_back(boost::asio::buffer(&header.buffer()[0], header.size())); - if (header.length() != 0) { - buf.push_back(boost::asio::buffer(&message->buffer()[0], message->size())); + if (header.length() == 0) { + boost::asio::async_write(*_transport, boost::asio::buffer(&header.buffer()[0], header.size()), callback); + } else { + boost::array buf; + buf[0] = boost::asio::buffer(&header.buffer()[0], header.size()); + buf[1] = boost::asio::buffer(&message->buffer()[0], message->size()); + boost::asio::async_write(*_transport, buf, callback); } - boost::asio::async_write(*_transport, buf, callback); // we have to keep the buffers around until the write is complete return id; @@ -609,34 +612,33 @@ namespace cql { #else boost::asio::transfer_all(), #endif - boost::bind(&cql_client_impl_t::body_read_handle, this, header, boost::asio::placeholders::error)); + boost::bind(&cql_client_impl_t::body_read_handle, this, boost::asio::placeholders::error)); } void - body_read_handle(const cql::cql_header_impl_t& header, - const boost::system::error_code& err) + body_read_handle(const boost::system::error_code& err) { - LOG(CQL_LOG_DEBUG, "received body for message " + header.str()); + LOG(CQL_LOG_DEBUG, "received body for message " + _response_header.str()); if (!err) { cql::cql_error_t consume_error; if (_response_message->consume(&consume_error)) { - switch (header.opcode()) { + switch (_response_header.opcode()) { case CQL_OPCODE_RESULT: { - LOG(CQL_LOG_DEBUG, "received result message " + header.str()); - cql_stream_id_t stream_id = header.stream(); + LOG(CQL_LOG_DEBUG, "received result message " + _response_header.str()); + cql_stream_id_t stream_id = _response_header.stream(); if (_callback_storage.has(stream_id)) { callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); - callback_pair.message_callback(*this, header.stream(), dynamic_cast(_response_message.get())); + callback_pair.message_callback(*this, _response_header.stream(), dynamic_cast(_response_message.get())); } else { - LOG(CQL_LOG_INFO, "no callback found for message " + header.str()); + LOG(CQL_LOG_INFO, "no callback found for message " + _response_header.str()); } break; @@ -651,7 +653,7 @@ namespace cql { case CQL_OPCODE_ERROR: { - cql_stream_id_t stream_id = header.stream(); + cql_stream_id_t stream_id = _response_header.stream(); if (_callback_storage.has(stream_id)) { callback_item_t callback_pair = _callback_storage.get(stream_id); _callback_storage.release(stream_id); @@ -660,9 +662,9 @@ namespace cql { cql_error.cassandra = true; cql_error.code = m->code(); cql_error.message = m->message(); - callback_pair.error_callback(*this, header.stream(), cql_error); + callback_pair.error_callback(*this, _response_header.stream(), cql_error); } else { - LOG(CQL_LOG_INFO, "no callback found for message " + header.str() + " " + _response_message->str()); + LOG(CQL_LOG_INFO, "no callback found for message " + _response_header.str() + " " + _response_message->str()); } break; } @@ -690,7 +692,7 @@ namespace cql { break; default: - LOG(CQL_LOG_ERROR, "unhandled opcode " + header.str()); + LOG(CQL_LOG_ERROR, "unhandled opcode " + _response_header.str()); } } else { From bedf2b31a03dfbc61dd20895822afa851a2b719a Mon Sep 17 00:00:00 2001 From: Dominic Letz Date: Sun, 9 Feb 2014 01:29:05 +0800 Subject: [PATCH 12/12] Removing heap allocation of cql header --- include/libcql/cql_vector_stream.hpp | 7 +++++++ include/libcql/internal/cql_header_impl.hpp | 7 +++++-- src/libcql/internal/cql_header_impl.cpp | 12 ++++-------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/include/libcql/cql_vector_stream.hpp b/include/libcql/cql_vector_stream.hpp index 69cc6b1..4a042dd 100644 --- a/include/libcql/cql_vector_stream.hpp +++ b/include/libcql/cql_vector_stream.hpp @@ -27,6 +27,13 @@ namespace cql { struct vector_stream_t : std::streambuf { + vector_stream_t(cql::cql_byte_t* begin, cql::cql_byte_t* end) + { + char* start = reinterpret_cast(begin); + this->setg(start, start, start + (end - begin)); + this->setp(start, start + (end - begin)); + } + vector_stream_t(std::vector& vec) { char* start = reinterpret_cast(&vec[0]); diff --git a/include/libcql/internal/cql_header_impl.hpp b/include/libcql/internal/cql_header_impl.hpp index a10e990..0b39e40 100644 --- a/include/libcql/internal/cql_header_impl.hpp +++ b/include/libcql/internal/cql_header_impl.hpp @@ -19,9 +19,12 @@ #ifndef CQL_HEADER_IMPL_H_ #define CQL_HEADER_IMPL_H_ +#include #include "libcql/cql.hpp" #include "libcql/internal/cql_message.hpp" +#define CQL_HEADER_SIZE sizeof(_version) + sizeof(_flags) + sizeof(_stream) + sizeof(_opcode) + sizeof(_length) + namespace cql { struct cql_error_t; @@ -47,7 +50,7 @@ namespace cql { bool prepare(cql::cql_error_t* err); - cql_message_buffer_t& + cql::cql_byte_t* buffer(); cql_int_t @@ -84,12 +87,12 @@ namespace cql { length(cql_int_t v); private: - cql_message_buffer_t _buffer; cql::cql_byte_t _version; cql::cql_byte_t _flags; cql::cql_stream_id_t _stream; cql::cql_byte_t _opcode; cql::cql_int_t _length; + boost::array _buffer; }; } // namespace cql diff --git a/src/libcql/internal/cql_header_impl.cpp b/src/libcql/internal/cql_header_impl.cpp index f10b3ee..5fa1b8e 100644 --- a/src/libcql/internal/cql_header_impl.cpp +++ b/src/libcql/internal/cql_header_impl.cpp @@ -26,10 +26,7 @@ #include "libcql/internal/cql_header_impl.hpp" -#define CQL_HEADER_SIZE sizeof(_version) + sizeof(_flags) + sizeof(_stream) + sizeof(_opcode) + sizeof(_length) - cql::cql_header_impl_t::cql_header_impl_t() : - _buffer(CQL_HEADER_SIZE, 0), _version(0), _flags(0), _stream(0), @@ -42,7 +39,6 @@ cql::cql_header_impl_t::cql_header_impl_t(cql::cql_byte_t version, cql::cql_stream_id_t stream, cql::cql_byte_t opcode, cql::cql_int_t length) : - _buffer(CQL_HEADER_SIZE, 0), _version(version), _flags(flags), _stream(stream), @@ -50,10 +46,10 @@ cql::cql_header_impl_t::cql_header_impl_t(cql::cql_byte_t version, _length(length) {} -cql::cql_message_buffer_t& +cql::cql_byte_t* cql::cql_header_impl_t::buffer() { - return _buffer; + return _buffer.elems; } std::string @@ -78,7 +74,7 @@ cql::cql_header_impl_t::str() const bool cql::cql_header_impl_t::prepare(cql::cql_error_t*) { - cql::vector_stream_t buffer(_buffer); + cql::vector_stream_t buffer(_buffer.begin(), _buffer.end()); std::ostream stream(&buffer); stream.put(_version); @@ -92,7 +88,7 @@ cql::cql_header_impl_t::prepare(cql::cql_error_t*) bool cql::cql_header_impl_t::consume(cql::cql_error_t*) { - cql::vector_stream_t buffer(_buffer); + cql::vector_stream_t buffer(_buffer.begin(), _buffer.end()); std::istream stream(&buffer); _version = stream.get();