Skip to content

Commit

Permalink
treewide: require type to compute cell memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
pdziepak committed May 31, 2018
1 parent 418c159 commit ec9d166
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 57 deletions.
2 changes: 1 addition & 1 deletion atomic_cell_or_collection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public:
bool operator==(const atomic_cell_or_collection& other) const {
return _data == other._data;
}
size_t external_memory_usage() const {
size_t external_memory_usage(const abstract_type&) const {
return _data.external_memory_usage();
}
friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&);
Expand Down
14 changes: 12 additions & 2 deletions flat_mutation_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@
#include <boost/range/adaptor/transformed.hpp>
#include <seastar/util/defer.hh>

static size_t compute_buffer_size(const schema& s, circular_buffer<mutation_fragment>& buffer)
{
return boost::accumulate(
buffer
| boost::adaptors::transformed([&s] (const mutation_fragment& mf) {
return mf.memory_usage(s);
}), size_t(0)
);
}

void flat_mutation_reader::impl::forward_buffer_to(const position_in_partition& pos) {
_buffer.erase(std::remove_if(_buffer.begin(), _buffer.end(), [this, &pos] (mutation_fragment& f) {
return !f.relevant_for_range_assuming_after(*_schema, pos);
}), _buffer.end());

_buffer_size = boost::accumulate(_buffer | boost::adaptors::transformed(std::mem_fn(&mutation_fragment::memory_usage)), size_t(0));
_buffer_size = compute_buffer_size(*_schema, _buffer);
}

void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
Expand All @@ -41,7 +51,7 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
});
_buffer.erase(_buffer.begin(), next_partition_start);

_buffer_size = boost::accumulate(_buffer | boost::adaptors::transformed(std::mem_fn(&mutation_fragment::memory_usage)), size_t(0));
_buffer_size = compute_buffer_size(*_schema, _buffer);
}

flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutation_reader::impl& original) {
Expand Down
4 changes: 2 additions & 2 deletions flat_mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public:
void push_mutation_fragment(Args&&... args) {
seastar::memory::on_alloc_point(); // for exception safety tests
_buffer.emplace_back(std::forward<Args>(args)...);
_buffer_size += _buffer.back().memory_usage();
_buffer_size += _buffer.back().memory_usage(*_schema);
}
void clear_buffer() {
_buffer.erase(_buffer.begin(), _buffer.end());
Expand Down Expand Up @@ -132,7 +132,7 @@ public:
mutation_fragment pop_mutation_fragment() {
auto mf = std::move(_buffer.front());
_buffer.pop_front();
_buffer_size -= mf.memory_usage();
_buffer_size -= mf.memory_usage(*_schema);
return mf;
}

Expand Down
6 changes: 3 additions & 3 deletions frozen_mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,18 @@ class fragmenting_mutation_freezer {

future<stop_iteration> consume(static_row&& sr) {
_sr = std::move(sr);
_dirty_size += _sr->memory_usage();
_dirty_size += _sr->memory_usage(_schema);
return maybe_flush();
}

future<stop_iteration> consume(clustering_row&& cr) {
_dirty_size += cr.memory_usage();
_dirty_size += cr.memory_usage(_schema);
_crs.emplace_back(std::move(cr));
return maybe_flush();
}

future<stop_iteration> consume(range_tombstone&& rt) {
_dirty_size += rt.memory_usage();
_dirty_size += rt.memory_usage(_schema);
_rts.apply(_schema, std::move(rt));
return maybe_flush();
}
Expand Down
12 changes: 7 additions & 5 deletions memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ class flush_memory_accounter {
};

class partition_snapshot_accounter {
const schema& _schema;
flush_memory_accounter& _accounter;
public:
partition_snapshot_accounter(flush_memory_accounter& acct): _accounter(acct) {}
partition_snapshot_accounter(const schema& s, flush_memory_accounter& acct)
: _schema(s), _accounter(acct) {}

// We will be passed mutation fragments here, and they are allocated using the standard
// allocator. So we can't compute the size in memtable precisely. However, precise accounting is
Expand All @@ -439,11 +441,11 @@ class partition_snapshot_accounter {
// allocation. As long as our size read here is lesser or equal to the size in the memtables, we
// are safe, and worst case we will allow a bit fewer requests in.
void operator()(const range_tombstone& rt) {
_accounter.update_bytes_read(rt.memory_usage());
_accounter.update_bytes_read(rt.memory_usage(_schema));
}

void operator()(const static_row& sr) {
_accounter.update_bytes_read(sr.external_memory_usage());
_accounter.update_bytes_read(sr.external_memory_usage(_schema));
}

void operator()(const partition_start& ph) {}
Expand All @@ -457,7 +459,7 @@ class partition_snapshot_accounter {
// and we don't know which one(s) contributed to the generation of this mutation fragment.
//
// We will add the size of the struct here, and that should be good enough.
_accounter.update_bytes_read(sizeof(rows_entry) + cr.external_memory_usage());
_accounter.update_bytes_read(sizeof(rows_entry) + cr.external_memory_usage(_schema));
}
};

Expand Down Expand Up @@ -500,7 +502,7 @@ class flush_reader final : public flat_mutation_reader::impl, private iterator_r
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key());
auto snp_schema = key_and_snp->second->schema();
auto mpsr = make_partition_snapshot_flat_reader<partition_snapshot_accounter>(snp_schema, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, _flushed_memory);
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
if (snp_schema->version() != schema()->version()) {
_partition_reader = transform(std::move(mpsr), schema_upgrader(schema()));
} else {
Expand Down
2 changes: 1 addition & 1 deletion memtable.hh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public:
size_t size_in_allocator(allocation_strategy& allocator) {
auto size = size_in_allocator_without_rows(allocator);
for (auto&& v : _pe.versions()) {
size += v.size_in_allocator(allocator);
size += v.size_in_allocator(*_schema, allocator);
}
return size;
}
Expand Down
32 changes: 16 additions & 16 deletions mutation_fragment.hh
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ public:

position_in_partition_view position() const;

size_t external_memory_usage() const {
return _ck.external_memory_usage() + _cells.external_memory_usage();
size_t external_memory_usage(const schema& s) const {
return _ck.external_memory_usage() + _cells.external_memory_usage(s, column_kind::regular_column);
}

size_t memory_usage() const {
return sizeof(clustering_row) + external_memory_usage();
size_t memory_usage(const schema& s) const {
return sizeof(clustering_row) + external_memory_usage(s);
}

bool equal(const schema& s, const clustering_row& other) const {
Expand Down Expand Up @@ -162,12 +162,12 @@ public:

position_in_partition_view position() const;

size_t external_memory_usage() const {
return _cells.external_memory_usage();
size_t external_memory_usage(const schema& s) const {
return _cells.external_memory_usage(s, column_kind::static_column);
}

size_t memory_usage() const {
return sizeof(static_row) + external_memory_usage();
size_t memory_usage(const schema& s) const {
return sizeof(static_row) + external_memory_usage(s);
}

bool equal(const schema& s, const static_row& other) const {
Expand All @@ -193,12 +193,12 @@ public:

position_in_partition_view position() const;

size_t external_memory_usage() const {
size_t external_memory_usage(const schema&) const {
return _key.external_memory_usage();
}

size_t memory_usage() const {
return sizeof(partition_start) + external_memory_usage();
size_t memory_usage(const schema& s) const {
return sizeof(partition_start) + external_memory_usage(s);
}

bool equal(const schema& s, const partition_start& other) const {
Expand All @@ -212,12 +212,12 @@ class partition_end final {
public:
position_in_partition_view position() const;

size_t external_memory_usage() const {
size_t external_memory_usage(const schema&) const {
return 0;
}

size_t memory_usage() const {
return sizeof(partition_end) + external_memory_usage();
size_t memory_usage(const schema& s) const {
return sizeof(partition_end) + external_memory_usage(s);
}

bool equal(const schema& s, const partition_end& other) const {
Expand Down Expand Up @@ -466,9 +466,9 @@ public:
abort();
}

size_t memory_usage() const {
size_t memory_usage(const schema& s) const {
if (!_data->_size_in_bytes) {
_data->_size_in_bytes = sizeof(data) + visit([] (auto& mf) -> size_t { return mf.external_memory_usage(); });
_data->_size_in_bytes = sizeof(data) + visit([&s] (auto& mf) -> size_t { return mf.external_memory_usage(s); });
}
return *_data->_size_in_bytes;
}
Expand Down
28 changes: 15 additions & 13 deletions mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1162,41 +1162,43 @@ row::find_cell(column_id id) const {
return c_a_h ? &c_a_h->cell : nullptr;
}

size_t row::external_memory_usage() const {
size_t row::external_memory_usage(const schema& s, column_kind kind) const {
size_t mem = 0;
if (_type == storage_type::vector) {
mem += _storage.vector.v.external_memory_usage();
column_id id = 0;
for (auto&& c_a_h : _storage.vector.v) {
mem += c_a_h.cell.external_memory_usage();
auto& cdef = s.column_at(kind, id++);
mem += c_a_h.cell.external_memory_usage(*cdef.type);
}
} else {
for (auto&& ce : _storage.set) {
mem += sizeof(cell_entry) + ce.cell().external_memory_usage();
auto& cdef = s.column_at(kind, ce.id());
mem += sizeof(cell_entry) + ce.cell().external_memory_usage(*cdef.type);
}
}
return mem;
}

size_t rows_entry::memory_usage() const {
size_t rows_entry::memory_usage(const schema& s) const {
size_t size = 0;
if (!dummy()) {
size += key().external_memory_usage();
}
return size +
row().cells().external_memory_usage() +
row().cells().external_memory_usage(s, column_kind::regular_column) +
sizeof(rows_entry);
}

size_t mutation_partition::external_memory_usage() const {
size_t mutation_partition::external_memory_usage(const schema& s) const {
size_t sum = 0;
auto& s = static_row();
sum += s.external_memory_usage();
sum += static_row().external_memory_usage(s, column_kind::static_column);
for (auto& clr : clustered_rows()) {
sum += clr.memory_usage();
sum += clr.memory_usage(s);
}

for (auto& rtb : row_tombstones()) {
sum += rtb.memory_usage();
sum += rtb.memory_usage(s);
}

return sum;
Expand Down Expand Up @@ -2017,12 +2019,12 @@ class reconcilable_result_builder {
}
stop_iteration consume(static_row&& sr, tombstone, bool is_alive) {
_static_row_is_alive = is_alive;
_memory_accounter.update(sr.memory_usage());
_memory_accounter.update(sr.memory_usage(_schema));
return _mutation_consumer->consume(std::move(sr));
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive) {
_live_rows += is_alive;
auto stop = _memory_accounter.update_and_check(cr.memory_usage());
auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema));
if (is_alive) {
// We are considering finishing current read only after consuming a
// live clustering row. While sending a single live row is enough to
Expand All @@ -2034,7 +2036,7 @@ class reconcilable_result_builder {
return _mutation_consumer->consume(std::move(cr)) || _stop;
}
stop_iteration consume(range_tombstone&& rt) {
_memory_accounter.update(rt.memory_usage());
_memory_accounter.update(rt.memory_usage(_schema));
return _mutation_consumer->consume(std::move(rt));
}

Expand Down
6 changes: 3 additions & 3 deletions mutation_partition.hh
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public:

bool equal(column_kind kind, const schema& this_schema, const row& other, const schema& other_schema) const;

size_t external_memory_usage() const;
size_t external_memory_usage(const schema&, column_kind) const;

cell_hash_opt cell_hash_for(column_id id) const;

Expand Down Expand Up @@ -842,7 +842,7 @@ public:
bool equal(const schema& s, const rows_entry& other) const;
bool equal(const schema& s, const rows_entry& other, const schema& other_schema) const;

size_t memory_usage() const;
size_t memory_usage(const schema&) const;
void on_evicted(cache_tracker&) noexcept;
};

Expand Down Expand Up @@ -1108,7 +1108,7 @@ public:
bool is_static_row_live(const schema&,
gc_clock::time_point query_time = gc_clock::time_point::min()) const;

size_t external_memory_usage() const;
size_t external_memory_usage(const schema&) const;
private:
template<typename Func>
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
Expand Down
2 changes: 1 addition & 1 deletion partition_snapshot_row_cursor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public:
size_t memory_usage() const {
size_t result = 0;
for (const position_in_version& v : _current_row) {
result += v.it->memory_usage();
result += v.it->memory_usage(_schema);
}
return result;
}
Expand Down
8 changes: 4 additions & 4 deletions partition_version.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ stop_iteration partition_version::clear_gently(cache_tracker* tracker) noexcept
return _partition.clear_gently(tracker);
}

size_t partition_version::size_in_allocator(allocation_strategy& allocator) const {
size_t partition_version::size_in_allocator(const schema& s, allocation_strategy& allocator) const {
return allocator.object_memory_size_in_allocator(this) +
partition().external_memory_usage();
partition().external_memory_usage(s);
}

namespace {
Expand Down Expand Up @@ -489,7 +489,7 @@ coroutine partition_entry::apply_to_incomplete(const schema& s, partition_entry&
auto current = &*src_snp->version();
while (current) {
dirty_size += allocator.object_memory_size_in_allocator(current)
+ current->partition().static_row().external_memory_usage();
+ current->partition().static_row().external_memory_usage(s, column_kind::static_column);
dst.partition().apply(current->partition().partition_tombstone());
if (static_row_continuous) {
row& static_row = dst.partition().static_row();
Expand All @@ -500,7 +500,7 @@ coroutine partition_entry::apply_to_incomplete(const schema& s, partition_entry&
static_row.apply(s, column_kind::static_column, current->partition().static_row());
}
}
dirty_size += current->partition().row_tombstones().external_memory_usage();
dirty_size += current->partition().row_tombstones().external_memory_usage(s);
range_tombstone_list& tombstones = dst.partition().row_tombstones();
// FIXME: defer while applying range tombstones
if (can_move) {
Expand Down
2 changes: 1 addition & 1 deletion partition_version.hh
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public:
bool is_referenced_from_entry() const;
partition_version_ref& back_reference() { return *_backref; }

size_t size_in_allocator(allocation_strategy& allocator) const;
size_t size_in_allocator(const schema& s, allocation_strategy& allocator) const;
};

using partition_version_range = anchorless_list_base_hook<partition_version>::range;
Expand Down
6 changes: 3 additions & 3 deletions range_tombstone.hh
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ public:
return true;
}

size_t external_memory_usage() const {
size_t external_memory_usage(const schema&) const {
return start.external_memory_usage() + end.external_memory_usage();
}

size_t memory_usage() const {
return sizeof(range_tombstone) + external_memory_usage();
size_t memory_usage(const schema& s) const {
return sizeof(range_tombstone) + external_memory_usage(s);
}
private:
void move_assign(range_tombstone&& rt) {
Expand Down
4 changes: 2 additions & 2 deletions range_tombstone_list.hh
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ public:

friend std::ostream& operator<<(std::ostream& out, const range_tombstone_list&);
bool equal(const schema&, const range_tombstone_list&) const;
size_t external_memory_usage() const {
size_t external_memory_usage(const schema& s) const {
size_t result = 0;
for (auto& rtb : _tombstones) {
result += rtb.memory_usage();
result += rtb.memory_usage(s);
}
return result;
}
Expand Down

0 comments on commit ec9d166

Please sign in to comment.