Skip to content

Commit

Permalink
tx/types: move batch parsing logic into types header
Browse files Browse the repository at this point in the history
Break the monolith of rm_stm further and move unnecessarly logic
out of it.
  • Loading branch information
bharathv committed May 9, 2024
1 parent f38999c commit 62915b8
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 176 deletions.
156 changes: 2 additions & 154 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,51 +51,6 @@ ss::sstring abort_idx_name(model::offset first, model::offset last) {
return fmt::format("abort.idx.{}.{}", first, last);
}

model::record_batch make_tx_control_batch(
model::producer_identity pid, model::control_record_type crt) {
iobuf key;
kafka::protocol::encoder kw(key);
kw.write(model::current_control_record_version());
kw.write(static_cast<int16_t>(crt));

iobuf value;
kafka::protocol::encoder vw(value);
vw.write(static_cast<int16_t>(0));
vw.write(static_cast<int32_t>(0));

storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
builder.set_producer_identity(pid.id, pid.epoch);
builder.set_control_type();
builder.set_transactional_type();
builder.add_raw_kw(
std::move(key), std::move(value), std::vector<model::record_header>());

return std::move(builder).build();
}

model::control_record_type parse_control_batch(const model::record_batch& b) {
const auto& hdr = b.header();

vassert(
hdr.type == model::record_batch_type::raft_data,
"expect data batch type got {}",
hdr.type);
vassert(hdr.attrs.is_control(), "expect control attrs got {}", hdr.attrs);
vassert(
b.record_count() == 1, "control batch must contain a single record");

auto r = b.copy_records();
auto& record = *r.begin();
auto key = record.release_key();
kafka::protocol::decoder key_reader(std::move(key));
auto version = model::control_record_version(key_reader.read_int16());
vassert(
version == model::current_control_record_version,
"unknown control record version");
return model::control_record_type(key_reader.read_int16());
}

raft::replicate_options make_replicate_options() {
auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
Expand All @@ -104,116 +59,9 @@ raft::replicate_options make_replicate_options() {

} // namespace

fence_batch_data read_fence_batch(model::record_batch&& b) {
const auto& hdr = b.header();
auto bid = model::batch_identity::from(hdr);

vassert(
b.record_count() == 1,
"model::record_batch_type::tx_fence batch must contain a single record");
auto r = b.copy_records();
auto& record = *r.begin();
auto val_buf = record.release_value();

iobuf_parser val_reader(std::move(val_buf));
auto version = reflection::adl<int8_t>{}.from(val_reader);
vassert(
version <= fence_control_record_version,
"unknown fence record version: {} expected: {}",
version,
fence_control_record_version);

std::optional<model::tx_seq> tx_seq{};
std::optional<std::chrono::milliseconds> transaction_timeout_ms;
if (version >= fence_control_record_v1_version) {
tx_seq = reflection::adl<model::tx_seq>{}.from(val_reader);
transaction_timeout_ms
= reflection::adl<std::chrono::milliseconds>{}.from(val_reader);
}
model::partition_id tm{model::legacy_tm_ntp.tp.partition};
if (version >= fence_control_record_version) {
tm = reflection::adl<model::partition_id>{}.from(val_reader);
}

auto key_buf = record.release_key();
iobuf_parser key_reader(std::move(key_buf));
auto batch_type = reflection::adl<model::record_batch_type>{}.from(
key_reader);
vassert(
hdr.type == batch_type,
"broken model::record_batch_type::tx_fence batch. expected batch type {} "
"got: {}",
hdr.type,
batch_type);
auto p_id = model::producer_id(reflection::adl<int64_t>{}.from(key_reader));
vassert(
p_id == bid.pid.id,
"broken model::record_batch_type::tx_fence batch. expected pid {} got: "
"{}",
bid.pid.id,
p_id);
return fence_batch_data{bid, tx_seq, transaction_timeout_ms, tm};
}

model::record_batch make_fence_batch_v1(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms) {
iobuf key;
auto pid_id = pid.id;
reflection::serialize(key, model::record_batch_type::tx_fence, pid_id);

iobuf value;
reflection::serialize(
value, fence_control_record_v1_version, tx_seq, transaction_timeout_ms);

storage::record_batch_builder builder(
model::record_batch_type::tx_fence, model::offset(0));
builder.set_producer_identity(pid.id, pid.epoch);
builder.set_control_type();
builder.add_raw_kv(std::move(key), std::move(value));

return std::move(builder).build();
}

model::record_batch make_fence_batch_v2(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms,
model::partition_id tm) {
iobuf key;
auto pid_id = pid.id;
reflection::serialize(key, model::record_batch_type::tx_fence, pid_id);

iobuf value;
// the key byte representation must not change because it's used in
// compaction
reflection::serialize(
value, fence_control_record_version, tx_seq, transaction_timeout_ms, tm);

storage::record_batch_builder builder(
model::record_batch_type::tx_fence, model::offset(0));
builder.set_producer_identity(pid.id, pid.epoch);
builder.set_control_type();
builder.add_raw_kv(std::move(key), std::move(value));

return std::move(builder).build();
}

model::record_batch rm_stm::make_fence_batch(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms,
model::partition_id tm) {
if (is_transaction_partitioning()) {
return make_fence_batch_v2(pid, tx_seq, transaction_timeout_ms, tm);
}
return make_fence_batch_v1(pid, tx_seq, transaction_timeout_ms);
}

model::control_record_type
rm_stm::parse_tx_control_batch(const model::record_batch& b) {
return parse_control_batch(b);
return tx::parse_control_batch(b);
}

void rm_stm::log_state::forget(const model::producer_identity& pid) {
Expand Down Expand Up @@ -1742,7 +1590,7 @@ ss::future<> rm_stm::apply(const model::record_batch& b) {
b.header().producer_id);
} else if (hdr.type == model::record_batch_type::raft_data) {
if (hdr.attrs.is_control()) {
apply_control(bid.pid, parse_control_batch(b));
apply_control(bid.pid, tx::parse_control_batch(b));
} else {
apply_data(bid, hdr);
}
Expand Down
18 changes: 0 additions & 18 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ class rm_stm final : public raft::persisted_stm<> {
producer_ptr maybe_create_producer(model::producer_identity);
void cleanup_producer_state(model::producer_identity);
ss::future<> reset_producers();
model::record_batch make_fence_batch(
model::producer_identity,
model::tx_seq,
std::chrono::milliseconds,
model::partition_id);
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
model::producer_identity,
model::tx_seq,
Expand Down Expand Up @@ -467,17 +462,4 @@ class rm_stm_factory : public state_machine_factory {
ss::sharded<topic_table>& _topics;
};

model::record_batch make_fence_batch_v1(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms);

model::record_batch make_fence_batch_v2(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms,
model::partition_id tm);

tx::fence_batch_data read_fence_batch(model::record_batch&& b);

} // namespace cluster
122 changes: 122 additions & 0 deletions src/v/cluster/rm_stm_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "cluster/rm_stm_types.h"

#include "storage/record_batch_builder.h"

namespace cluster::tx {

bool deprecated_seq_entry::operator==(const deprecated_seq_entry& other) const {
Expand Down Expand Up @@ -122,6 +124,126 @@ std::ostream& operator<<(std::ostream& o, const abort_snapshot& as) {
as.aborted.size());
return o;
}

model::record_batch make_fence_batch(
model::producer_identity pid,
model::tx_seq tx_seq,
std::chrono::milliseconds transaction_timeout_ms,
model::partition_id tm) {
iobuf key;
auto pid_id = pid.id;
reflection::serialize(key, model::record_batch_type::tx_fence, pid_id);

iobuf value;
// the key byte representation must not change because it's used in
// compaction
reflection::serialize(
value, fence_control_record_version, tx_seq, transaction_timeout_ms, tm);

storage::record_batch_builder builder(
model::record_batch_type::tx_fence, model::offset(0));
builder.set_producer_identity(pid.id, pid.epoch);
builder.set_control_type();
builder.add_raw_kv(std::move(key), std::move(value));

return std::move(builder).build();
}

fence_batch_data read_fence_batch(model::record_batch&& b) {
const auto& hdr = b.header();
auto bid = model::batch_identity::from(hdr);

vassert(
b.record_count() == 1,
"model::record_batch_type::tx_fence batch must contain a single record");
auto r = b.copy_records();
auto& record = *r.begin();
auto val_buf = record.release_value();

iobuf_parser val_reader(std::move(val_buf));
auto version = reflection::adl<int8_t>{}.from(val_reader);
vassert(
version <= fence_control_record_version,
"unknown fence record version: {} expected: {}",
version,
fence_control_record_version);

std::optional<model::tx_seq> tx_seq{};
std::optional<std::chrono::milliseconds> transaction_timeout_ms;
if (version >= fence_control_record_v1_version) {
tx_seq = reflection::adl<model::tx_seq>{}.from(val_reader);
transaction_timeout_ms
= reflection::adl<std::chrono::milliseconds>{}.from(val_reader);
}
model::partition_id tm{model::legacy_tm_ntp.tp.partition};
if (version >= fence_control_record_version) {
tm = reflection::adl<model::partition_id>{}.from(val_reader);
}

auto key_buf = record.release_key();
iobuf_parser key_reader(std::move(key_buf));
auto batch_type = reflection::adl<model::record_batch_type>{}.from(
key_reader);
vassert(
hdr.type == batch_type,
"broken model::record_batch_type::tx_fence batch. expected batch type {} "
"got: {}",
hdr.type,
batch_type);
auto p_id = model::producer_id(reflection::adl<int64_t>{}.from(key_reader));
vassert(
p_id == bid.pid.id,
"broken model::record_batch_type::tx_fence batch. expected pid {} got: "
"{}",
bid.pid.id,
p_id);
return fence_batch_data{bid, tx_seq, transaction_timeout_ms, tm};
}

model::control_record_type parse_control_batch(const model::record_batch& b) {
const auto& hdr = b.header();
vassert(
hdr.type == model::record_batch_type::raft_data,
"expect data batch type got {}",
hdr.type);
vassert(hdr.attrs.is_control(), "expect control attrs got {}", hdr.attrs);
vassert(
b.record_count() == 1, "control batch must contain a single record");

auto r = b.copy_records();
auto& record = *r.begin();
auto key = record.release_key();
kafka::protocol::decoder key_reader(std::move(key));
auto version = model::control_record_version(key_reader.read_int16());
vassert(
version == model::current_control_record_version,
"unknown control record version");
return model::control_record_type(key_reader.read_int16());
}

model::record_batch make_tx_control_batch(
model::producer_identity pid, model::control_record_type crt) {
iobuf key;
kafka::protocol::encoder kw(key);
kw.write(model::current_control_record_version());
kw.write(static_cast<int16_t>(crt));

iobuf value;
kafka::protocol::encoder vw(value);
vw.write(static_cast<int16_t>(0));
vw.write(static_cast<int32_t>(0));

storage::record_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
builder.set_producer_identity(pid.id, pid.epoch);
builder.set_control_type();
builder.set_transactional_type();
builder.add_raw_kw(
std::move(key), std::move(value), std::vector<model::record_header>());

return std::move(builder).build();
}

}; // namespace cluster::tx

namespace reflection {
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/rm_stm_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "cluster/types.h"
#include "kafka/protocol/wire.h"
#include "reflection/async_adl.h"

namespace cluster::tx {
Expand Down Expand Up @@ -66,6 +67,19 @@ struct tx_data {
model::partition_id tm_partition;
};

model::record_batch make_fence_batch(
model::producer_identity,
model::tx_seq,
std::chrono::milliseconds,
model::partition_id);

tx::fence_batch_data read_fence_batch(model::record_batch&& b);

model::control_record_type parse_control_batch(const model::record_batch&);

model::record_batch
make_tx_control_batch(model::producer_identity pid, model::control_record_type);

// snapshot related types

struct abort_index {
Expand Down
Loading

0 comments on commit 62915b8

Please sign in to comment.