Skip to content

Commit

Permalink
Fix data race in slru
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Jan 15, 2024
1 parent be37140 commit e2c5e5f
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/Interpreters/Cache/FileCache.cpp
Expand Up @@ -817,7 +817,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
}

file_segment.reserved_size += size;
chassert(file_segment.reserved_size == queue_iterator->getEntry().size);
chassert(file_segment.reserved_size == queue_iterator->getEntry()->size);

if (query_context)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/Cache/FileSegment.cpp
Expand Up @@ -789,9 +789,9 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) cons

const auto & entry = it->getEntry();
UNUSED(entry);
chassert(entry.size == reserved_size);
chassert(entry.key == key());
chassert(entry.offset == offset());
chassert(entry->size == reserved_size);
chassert(entry->key == key());
chassert(entry->offset == offset());
};

if (download_state == State::DOWNLOADED)
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/Cache/IFileCachePriority.h
Expand Up @@ -31,13 +31,14 @@ class IFileCachePriority : private boost::noncopyable
std::atomic<size_t> size;
size_t hits = 0;
};
using EntryPtr = std::shared_ptr<Entry>;

class Iterator
{
public:
virtual ~Iterator() = default;

virtual const Entry & getEntry() const = 0;
virtual EntryPtr getEntry() const = 0;

virtual size_t increasePriority(const CacheGuard::Lock &) = 0;

Expand Down
56 changes: 30 additions & 26 deletions src/Interpreters/Cache/LRUFileCachePriority.cpp
Expand Up @@ -36,65 +36,66 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
const CacheGuard::Lock & lock,
bool /* is_startup */)
{
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
return std::make_shared<LRUIterator>(add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock));
}

LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(EntryPtr entry, const CacheGuard::Lock & lock)
{
if (entry.size == 0)
if (entry->size == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding zero size entries to LRU queue is not allowed "
"(key: {}, offset: {})", entry.key, entry.offset);
"(key: {}, offset: {})", entry->key, entry->offset);
}

#ifndef NDEBUG
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry->key && queue_entry->offset == entry->offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
"(Key: {}, offset: {}, size: {})",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);
}
#endif

const auto & size_limit = getSizeLimit(lock);
if (size_limit && current_size + entry.size > size_limit)
if (size_limit && current_size + entry->size > size_limit)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Not enough space to add {}:{} with size {}: current size: {}/{}",
entry.key, entry.offset, entry.size, current_size, size_limit);
entry->key, entry->offset, entry->size, current_size, size_limit);
}

auto iterator = queue.insert(queue.end(), entry);

updateSize(entry.size);
updateSize(entry->size);
updateElementsCount(1);

LOG_TEST(
log, "Added entry into LRU queue, key: {}, offset: {}, size: {}",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);

return LRUIterator(this, iterator);
}

LRUFileCachePriority::LRUQueue::iterator LRUFileCachePriority::remove(LRUQueue::iterator it, const CacheGuard::Lock &)
{
/// If size is 0, entry is invalidated, current_elements_num was already updated.
if (it->size)
const auto & entry = **it;
if (entry.size)
{
updateSize(-it->size);
updateSize(-entry.size);
updateElementsCount(-1);
}

LOG_TEST(
log, "Removed entry from LRU queue, key: {}, offset: {}, size: {}",
it->key, it->offset, it->size);
entry.key, entry.offset, entry.size);

return queue.erase(it);
}
Expand Down Expand Up @@ -143,27 +144,28 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
{
for (auto it = queue.begin(); it != queue.end();)
{
auto locked_key = it->key_metadata->tryLock();
if (!locked_key || it->size == 0)
const auto & entry = **it;
auto locked_key = entry.key_metadata->tryLock();
if (!locked_key || entry.size == 0)
{
it = remove(it, lock);
continue;
}

auto metadata = locked_key->tryGetByOffset(it->offset);
auto metadata = locked_key->tryGetByOffset(entry.offset);
if (!metadata)
{
it = remove(it, lock);
continue;
}

if (metadata->size() != it->size)
if (metadata->size() != entry.size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata "
"and priority queue: {} != {} ({})",
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
entry.size, metadata->size(), metadata->file_segment->getInfoForLog());
}

auto result = func(*locked_key, metadata);
Expand Down Expand Up @@ -249,7 +251,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(

LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &)
{
const auto & entry = it.getEntry();
const auto & entry = *it.getEntry();
if (entry.size == 0)
{
throw Exception(
Expand All @@ -261,7 +263,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry.key && queue_entry->offset == entry.offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
Expand Down Expand Up @@ -347,34 +349,36 @@ void LRUFileCachePriority::LRUIterator::invalidate()
{
assertValid();

const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Invalidating entry in LRU queue. Key: {}, offset: {}, previous size: {}",
iterator->key, iterator->offset, iterator->size);
entry->key, entry->offset, entry->size);

cache_priority->updateSize(-iterator->size);
cache_priority->updateSize(-entry->size);
cache_priority->updateElementsCount(-1);
iterator->size = 0;
entry->size = 0;
}

void LRUFileCachePriority::LRUIterator::updateSize(int64_t size)
{
assertValid();

const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Update size with {} in LRU queue for key: {}, offset: {}, previous size: {}",
size, iterator->key, iterator->offset, iterator->size);
size, entry->key, entry->offset, entry->size);

cache_priority->updateSize(size);
iterator->size += size;
entry->size += size;
}

size_t LRUFileCachePriority::LRUIterator::increasePriority(const CacheGuard::Lock &)
{
assertValid();
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
return ++iterator->hits;
return ++((*iterator)->hits);
}

void LRUFileCachePriority::LRUIterator::assertValid() const
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/Cache/LRUFileCachePriority.h
Expand Up @@ -15,7 +15,7 @@ class LRUFileCachePriority final : public IFileCachePriority
{
private:
class LRUIterator;
using LRUQueue = std::list<Entry>;
using LRUQueue = std::list<EntryPtr>;
friend class SLRUFileCachePriority;

public:
Expand Down Expand Up @@ -76,7 +76,7 @@ class LRUFileCachePriority final : public IFileCachePriority
void iterate(IterateFunc && func, const CacheGuard::Lock &);

LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &);
LRUIterator add(Entry && entry, const CacheGuard::Lock &);
LRUIterator add(EntryPtr entry, const CacheGuard::Lock &);
};

class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
Expand All @@ -91,7 +91,7 @@ class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
LRUIterator & operator =(const LRUIterator & other);
bool operator ==(const LRUIterator & other) const;

const Entry & getEntry() const override { return *iterator; }
EntryPtr getEntry() const override { return *iterator; }

size_t increasePriority(const CacheGuard::Lock &) override;

Expand Down
21 changes: 11 additions & 10 deletions src/Interpreters/Cache/SLRUFileCachePriority.cpp
Expand Up @@ -61,18 +61,18 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT
/// because we do not know the distribution between queues after server restart.
if (probationary_queue.canFit(size, lock))
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
else
{
auto lru_iterator = protected_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = protected_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), true);
}
}
else
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach

/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
const size_t size = iterator.getEntry()->size;
if (size > protected_queue.getSizeLimit(lock))
{
/// Entry size is bigger than the whole protected queue limit.
Expand Down Expand Up @@ -205,7 +205,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach

/// All checks passed, now we can move downgrade candidates to
/// probationary queue and our entry to protected queue.
Entry entry_copy = iterator.getEntry();
EntryPtr entry = iterator.getEntry();
iterator.lru_iterator.remove(lock);

for (const auto & [key, key_candidates] : downgrade_candidates)
Expand All @@ -218,7 +218,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
}
}

iterator.lru_iterator = protected_queue.add(std::move(entry_copy), lock);
iterator.lru_iterator = protected_queue.add(entry, lock);
iterator.is_protected = true;
}

Expand Down Expand Up @@ -257,21 +257,22 @@ SLRUFileCachePriority::SLRUIterator::SLRUIterator(
bool is_protected_)
: cache_priority(cache_priority_)
, lru_iterator(lru_iterator_)
, entry(lru_iterator.getEntry())
, is_protected(is_protected_)
{
}

const SLRUFileCachePriority::Entry & SLRUFileCachePriority::SLRUIterator::getEntry() const
SLRUFileCachePriority::EntryPtr SLRUFileCachePriority::SLRUIterator::getEntry() const
{
assertValid();
return lru_iterator.getEntry();
chassert(entry == lru_iterator.getEntry());
return entry;
}

size_t SLRUFileCachePriority::SLRUIterator::increasePriority(const CacheGuard::Lock & lock)
{
assertValid();
cache_priority->increasePriority(*this, lock);
return getEntry().hits;
return getEntry()->hits;
}

void SLRUFileCachePriority::SLRUIterator::updateSize(int64_t size)
Expand Down
11 changes: 4 additions & 7 deletions src/Interpreters/Cache/SLRUFileCachePriority.h
Expand Up @@ -11,10 +11,6 @@ namespace DB
/// the head of the queue, and the record with the highest priority is stored at the tail.
class SLRUFileCachePriority : public IFileCachePriority
{
private:
using LRUIterator = LRUFileCachePriority::LRUIterator;
using LRUQueue = std::list<Entry>;

public:
class SLRUIterator;

Expand Down Expand Up @@ -62,10 +58,10 @@ class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
public:
SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUIterator && lru_iterator_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
bool is_protected_);

const Entry & getEntry() const override;
EntryPtr getEntry() const override;

size_t increasePriority(const CacheGuard::Lock &) override;

Expand All @@ -81,7 +77,8 @@ class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
void assertValid() const;

SLRUFileCachePriority * cache_priority;
mutable LRUIterator lru_iterator;
LRUFileCachePriority::LRUIterator lru_iterator;
const EntryPtr entry;
bool is_protected;
};

Expand Down

0 comments on commit e2c5e5f

Please sign in to comment.