Skip to content

Commit

Permalink
k/fetch_request: moved op_contex implementation to cc file
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@vectorized.io>
  • Loading branch information
mmaslankaprv committed Nov 19, 2020
1 parent 0626c3e commit b0b7932
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 65 deletions.
66 changes: 66 additions & 0 deletions src/v/kafka/requests/fetch_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,4 +563,70 @@ fetch_api::process(request_context&& rctx, ss::smp_service_group ssg) {
});
}

void op_context::reset_context() {
initial_fetch = false;
response_iterator = response.begin();
}

// decode request and initialize budgets
op_context::op_context(request_context&& ctx, ss::smp_service_group ssg)
: rctx(std::move(ctx))
, ssg(ssg)
, response_size(0)
, response_error(false)
, response_iterator(response.begin()) {
/*
* decode request and prepare the inital response
*/
request.decode(rctx);
if (likely(!request.topics.empty())) {
response.partitions.reserve(request.topics.size());
}

if (auto delay = request.debounce_delay(); delay) {
deadline = model::timeout_clock::now() + delay.value();
}

/*
* TODO: max size is multifaceted. it needs to be absolute, but also
* integrate with other resource contraints that are dynamic within the
* kafka server itself.
*/
static constexpr size_t MAX_SIZE = 128 << 20;
bytes_left = std::min(MAX_SIZE, size_t(request.max_bytes));
}

// insert and reserve space for a new topic in the response
void op_context::start_response_topic(const fetch_request::topic& topic) {
auto& p = response.partitions.emplace_back(topic.name);
p.responses.reserve(topic.partitions.size());
}

// add to the response the result of fetching from a partition
void op_context::set_partition_response(
fetch_response::partition_response&& r) {
if (r.error != error_code::none) {
response_error = true;
}
if (r.record_set) {
response_size += r.record_set->size_bytes();
bytes_left -= std::min(bytes_left, r.record_set->size_bytes());
}
if (!initial_fetch) {
// replace response
*response_iterator->partition_response = std::move(r);
} else {
response.partitions.back().responses.push_back(std::move(r));
}
}

bool op_context::should_stop_fetch() const {
return !request.debounce_delay() || over_min_bytes() || request.empty()
|| response_error;
}

bool op_context::over_min_bytes() const {
return static_cast<int32_t>(response_size) >= request.min_bytes;
}

} // namespace kafka
80 changes: 15 additions & 65 deletions src/v/kafka/requests/fetch_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,21 @@ std::ostream& operator<<(std::ostream&, const fetch_response&);
* Fetch operation context
*/
struct op_context {
void reset_context();

// decode request and initialize budgets
op_context(request_context&& ctx, ss::smp_service_group ssg);

// insert and reserve space for a new topic in the response
void start_response_topic(const fetch_request::topic& topic);

// add to the response the result of fetching from a partition
void set_partition_response(fetch_response::partition_response&& r);

bool should_stop_fetch() const;

bool over_min_bytes() const;

request_context rctx;
ss::smp_service_group ssg;
fetch_request request;
Expand All @@ -352,71 +367,6 @@ struct op_context {
bool initial_fetch = true;

fetch_response::iterator response_iterator;

void reset_context() {
initial_fetch = false;
response_iterator = response.begin();
}

// decode request and initialize budgets
op_context(request_context&& ctx, ss::smp_service_group ssg)
: rctx(std::move(ctx))
, ssg(ssg)
, response_size(0)
, response_error(false)
, response_iterator(response.begin()) {
/*
* decode request and prepare the inital response
*/
request.decode(rctx);
if (likely(!request.topics.empty())) {
response.partitions.reserve(request.topics.size());
}

if (auto delay = request.debounce_delay(); delay) {
deadline = model::timeout_clock::now() + delay.value();
}

/*
* TODO: max size is multifaceted. it needs to be absolute, but also
* integrate with other resource contraints that are dynamic within the
* kafka server itself.
*/
static constexpr size_t MAX_SIZE = 128 << 20;
bytes_left = std::min(MAX_SIZE, size_t(request.max_bytes));
}

// insert and reserve space for a new topic in the response
void start_response_topic(const fetch_request::topic& topic) {
auto& p = response.partitions.emplace_back(topic.name);
p.responses.reserve(topic.partitions.size());
}

// add to the response the result of fetching from a partition
void set_partition_response(fetch_response::partition_response&& r) {
if (r.error != error_code::none) {
response_error = true;
}
if (r.record_set) {
response_size += r.record_set->size_bytes();
bytes_left -= std::min(bytes_left, r.record_set->size_bytes());
}
if (!initial_fetch) {
// replace response
*response_iterator->partition_response = std::move(r);
} else {
response.partitions.back().responses.push_back(std::move(r));
}
}

bool should_stop_fetch() const {
return !request.debounce_delay() || over_min_bytes() || request.empty()
|| response_error;
}

bool over_min_bytes() const {
return static_cast<int32_t>(response_size) >= request.min_bytes;
}
};

struct fetch_config {
Expand Down

0 comments on commit b0b7932

Please sign in to comment.