Skip to content

Commit

Permalink
Merge "Fix READ_TIMEOUT error protocol encoding" from Pekka
Browse files Browse the repository at this point in the history
"This series fixes READ_TIMEOUT error CQL binary protocol encoding.
Please note that the "data_present" flag is always set to false. We need
to revisit that later."
  • Loading branch information
avikivity committed Jul 27, 2015
2 parents 2e745be + a4225fa commit 584b70a
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 17 deletions.
56 changes: 56 additions & 0 deletions exceptions/request_timeout_exception.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/

#pragma once

#include "exceptions/exceptions.hh"
#include "db/consistency_level.hh"

namespace exceptions {

class request_timeout_exception : public cassandra_exception {
public:
db::consistency_level consistency;
int32_t received;
int32_t block_for;

request_timeout_exception(exception_code code, db::consistency_level consistency, int32_t received, int32_t block_for)
: cassandra_exception{code, sprint("Operation timed out - received only %d responses.", received)}
, consistency{consistency}
, received{received}
, block_for{block_for}
{ }
};

class read_timeout_exception : public request_timeout_exception {
public:
bool data_present;

read_timeout_exception(db::consistency_level consistency, int32_t received, int32_t block_for, bool data_present)
: request_timeout_exception{exception_code::READ_TIMEOUT, consistency, received, block_for}
, data_present{data_present}
{ }
};

}
32 changes: 15 additions & 17 deletions service/storage_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "core/future-util.hh"
#include "db/read_repair_decision.hh"
#include "db/config.hh"
#include "exceptions/request_timeout_exception.hh"
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/iterator/counting_iterator.hpp>
Expand Down Expand Up @@ -1375,14 +1376,9 @@ class digest_mismatch_exception : public std::runtime_error {
digest_mismatch_exception() : std::runtime_error("Digest mismatch") {}
};

class read_timeout_exception : public cassandra_exception {
public:
read_timeout_exception(size_t block_for, size_t got) :
cassandra_exception(exception_code::READ_TIMEOUT, sprint("Read operation timed out: waited for %lu got %lu", block_for, got)) {}
};

class abstract_read_resolver {
protected:
db::consistency_level _cl;
size_t _targets_count;
promise<> _done_promise; // all target responded
bool _timedout = false; // will be true if request timeouts
Expand All @@ -1392,10 +1388,13 @@ class abstract_read_resolver {
virtual void on_timeout() {}
virtual size_t response_count() const = 0;
public:
abstract_read_resolver(size_t target_count, std::chrono::high_resolution_clock::time_point timeout) : _targets_count(target_count) {
abstract_read_resolver(db::consistency_level cl, size_t target_count, std::chrono::high_resolution_clock::time_point timeout)
: _cl(cl)
, _targets_count(target_count)
{
_timeout.set_callback([this] {
_timedout = true;
_done_promise.set_exception(read_timeout_exception(_targets_count, response_count()));
_done_promise.set_exception(read_timeout_exception(_cl, response_count(), _targets_count, false /* FIXME */));
on_timeout();
});
_timeout.arm(timeout);
Expand All @@ -1410,15 +1409,14 @@ class abstract_read_resolver {
};

class digest_read_resolver : public abstract_read_resolver {
db::consistency_level _cl;
size_t _block_for;
size_t _cl_responses = 0;
promise<> _cl_promise; // cl is reached
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _data_results;
std::vector<query::result_digest> _digest_results;

virtual void on_timeout() override {
_cl_promise.set_exception(read_timeout_exception(_block_for, _cl_responses));
_cl_promise.set_exception(read_timeout_exception(_cl, _cl_responses, _block_for, false /* FIXME */));
// we will not need them any more
_data_results.clear();
_digest_results.clear();
Expand All @@ -1432,7 +1430,7 @@ class digest_read_resolver : public abstract_read_resolver {
return std::find_if(_digest_results.begin() + 1, _digest_results.end(), [&first] (query::result_digest digest) { return digest != first; }) == _digest_results.end();
}
public:
digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout), _cl(cl), _block_for(block_for) {}
digest_read_resolver(db::consistency_level cl, size_t block_for, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout), _block_for(block_for) {}
void add_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<query::result>> result) {
if (!_timedout) {
_digest_results.emplace_back(result->digest());
Expand Down Expand Up @@ -1497,7 +1495,7 @@ class data_read_resolver : public abstract_read_resolver {
}

public:
data_read_resolver(size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(targets_count, timeout) {
data_read_resolver(db::consistency_level cl, size_t targets_count, std::chrono::high_resolution_clock::time_point timeout) : abstract_read_resolver(cl, targets_count, timeout) {
_data_results.reserve(targets_count);
}
void add_mutate_data(gms::inet_address from, foreign_ptr<lw_shared_ptr<reconcilable_result>> result) {
Expand Down Expand Up @@ -1655,8 +1653,8 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
return when_all(make_data_requests(resolver, _targets.begin(), _targets.begin() + 1),
_targets.size() > 1 ? make_digest_requests(resolver, _targets.begin() + 1, _targets.end()) : make_ready_future()).discard_result();
}
void reconciliate(std::chrono::high_resolution_clock::time_point timeout) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_targets.size(), timeout);
void reconciliate(db::consistency_level cl, std::chrono::high_resolution_clock::time_point timeout) {
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(cl, _targets.size(), timeout);
auto exec = shared_from_this();

make_mutation_data_requests(data_resolver, _targets.begin(), _targets.end()).finally([exec]{});
Expand All @@ -1682,7 +1680,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
// hold on to executor until all queries are complete
});

digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<> f) {
digest_resolver->has_cl().then_wrapped([this, exec, digest_resolver, timeout] (future<> f) {
try {
f.get();
exec->_result_promise.set_value(digest_resolver->resolve()); // can throw digest missmatch exception
Expand All @@ -1702,7 +1700,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
done.discard_result(); // no need for background check, discard done future explicitly
}
} catch (digest_mismatch_exception& ex) {
exec->reconciliate(timeout);
exec->reconciliate(_cl, timeout);
} catch (read_timeout_exception& ex) {
exec->_result_promise.set_exception(ex);
}
Expand Down Expand Up @@ -1735,7 +1733,7 @@ class range_slice_read_executor : public abstract_read_executor {
range_slice_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl, std::vector<gms::inet_address> targets) :
abstract_read_executor(std::move(cmd), std::move(pr), cl, targets.size(), std::move(targets)) {}
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(std::chrono::high_resolution_clock::time_point timeout) override {
reconciliate(timeout);
reconciliate(_cl, timeout);
return _result_promise.get_future();
}
};
Expand Down
21 changes: 21 additions & 0 deletions transport/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "service/query_state.hh"
#include "service/client_state.hh"
#include "transport/protocol_exception.hh"
#include "exceptions/request_timeout_exception.hh"

#include <cassert>
#include <string>
Expand Down Expand Up @@ -164,6 +165,7 @@ class cql_server::connection {
future<> process_batch(uint16_t stream, temporary_buffer<char> buf);
future<> process_register(uint16_t stream, temporary_buffer<char> buf);

future<> write_read_timeout_error(int16_t stream, exceptions::exception_code err, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present);
future<> write_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name);
future<> write_error(int16_t stream, exceptions::exception_code err, sstring msg);
future<> write_ready(int16_t stream);
Expand Down Expand Up @@ -214,6 +216,7 @@ class cql_server::response {
, _opcode{opcode}
{ }
scattered_message<char> make_message(uint8_t version);
void write_byte(uint8_t b);
void write_int(int32_t n);
void write_long(int64_t n);
void write_short(int16_t n);
Expand Down Expand Up @@ -375,6 +378,8 @@ future<> cql_server::connection::process_request() {
}).then_wrapped([stream = f.stream, this] (future<> f) {
try {
f.get();
} catch (const exceptions::read_timeout_exception& ex) {
write_read_timeout_error(stream, ex.code(), ex.consistency, ex.received, ex.block_for, ex.data_present);
} catch (const exceptions::already_exists_exception& ex) {
write_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name);
} catch (const exceptions::cassandra_exception& ex) {
Expand Down Expand Up @@ -489,6 +494,17 @@ future<> cql_server::connection::process_register(uint16_t stream, temporary_buf
return write_ready(stream);
}

future<> cql_server::connection::write_read_timeout_error(int16_t stream, exceptions::exception_code err, db::consistency_level cl, int32_t received, int32_t blockfor, bool data_present)
{
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR);
response->write_int(static_cast<int32_t>(err));
response->write_consistency(cl);
response->write_int(received);
response->write_int(blockfor);
response->write_byte(data_present);
return write_response(response);
}

future<> cql_server::connection::write_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name)
{
auto response = make_shared<cql_server::response>(stream, cql_binary_opcode::ERROR);
Expand Down Expand Up @@ -880,6 +896,11 @@ sstring cql_server::response::make_frame(uint8_t version, size_t length)
}
}

void cql_server::response::write_byte(uint8_t b)
{
_body.insert(_body.end(), b, 1);
}

void cql_server::response::write_int(int32_t n)
{
auto u = htonl(n);
Expand Down

0 comments on commit 584b70a

Please sign in to comment.