Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cached plan not getting invalidated #1348

Merged
merged 24 commits into from Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
80a01ba
use newdelete instead of monotonic
DavIvek Oct 11, 2023
b99334e
stop avoiding monotonic
DavIvek Oct 24, 2023
7737915
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Oct 24, 2023
873b84f
force clear plan_cache when creating index
DavIvek Oct 24, 2023
96b2b92
implement lru cache with spinlock for plan caching
DavIvek Oct 26, 2023
39388b1
add clear() method to lru
DavIvek Oct 26, 2023
a91f3c4
change name of flag and do clean up
DavIvek Oct 30, 2023
39ccd1a
fix tests
DavIvek Oct 30, 2023
660b9e4
fix config test
DavIvek Oct 30, 2023
7f3be25
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Oct 30, 2023
63786c4
fix lru cache
DavIvek Nov 2, 2023
31cedda
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 2, 2023
376df2e
use LRUPlanCache::exists() before LRUPlanCache::get() and add test
DavIvek Nov 2, 2023
0796045
rename CachedPlan into PlanWrapper
DavIvek Nov 2, 2023
87d1e21
use one lock instead of two when checking on existing plan
DavIvek Nov 3, 2023
7b36ab2
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 3, 2023
16a4846
fix get method of lru cache
DavIvek Nov 3, 2023
e8bba12
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 3, 2023
3a6bf41
use std::optional in lru cache implementation
DavIvek Nov 3, 2023
baaf9cf
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 6, 2023
422546a
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 6, 2023
94a9e78
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 6, 2023
ae2bf04
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 7, 2023
7d455be
Merge branch 'master' into fix-memory-spike-merge-with-variable
DavIvek Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dbms/database.cpp
Expand Up @@ -21,7 +21,8 @@ namespace memgraph::dbms {

Database::Database(const storage::Config &config)
: trigger_store_(config.durability.storage_directory / "triggers"),
streams_{config.durability.storage_directory / "streams"} {
streams_{config.durability.storage_directory / "streams"},
plan_cache_{FLAGS_query_plan_cache_max_size} {
if (config.storage_mode == memgraph::storage::StorageMode::ON_DISK_TRANSACTIONAL || config.force_on_disk ||
utils::DirExists(config.disk.main_storage_directory)) {
storage_ = std::make_unique<storage::DiskStorage>(config);
Expand Down
8 changes: 5 additions & 3 deletions src/dbms/database.hpp
Expand Up @@ -24,6 +24,8 @@
#include "query/trigger.hpp"
#include "storage/v2/storage.hpp"
#include "utils/gatekeeper.hpp"
#include "utils/lru_cache.hpp"
#include "utils/synchronized.hpp"

namespace memgraph::dbms {

Expand Down Expand Up @@ -145,9 +147,9 @@ class Database {
/**
* @brief Returns the PlanCache vector raw pointer
*
* @return utils::SkipList<query::PlanCacheEntry>*
* @return utils::Synchronized<utils::LRUCache<uint64_t, std::shared_ptr<PlanWrapper>>, utils::RWSpinLock>
*/
utils::SkipList<query::PlanCacheEntry> *plan_cache() { return &plan_cache_; }
query::PlanCacheLRU *plan_cache() { return &plan_cache_; }

private:
std::unique_ptr<storage::Storage> storage_; //!< Underlying storage
Expand All @@ -156,7 +158,7 @@ class Database {
query::stream::Streams streams_; //!< Streams associated with the storage

// TODO: Move to a better place
utils::SkipList<query::PlanCacheEntry> plan_cache_; //!< Plan cache associated with the storage
query::PlanCacheLRU plan_cache_; //!< Plan cache associated with the storage
};

} // namespace memgraph::dbms
Expand Down
33 changes: 15 additions & 18 deletions src/query/cypher_query_interpreter.cpp
Expand Up @@ -12,15 +12,16 @@
#include "query/cypher_query_interpreter.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
#include "utils/synchronized.hpp"

// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(query_cost_planner, true, "Use the cost-estimating query planner.");
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.",
DEFINE_VALIDATED_int32(query_plan_cache_max_size, 1000, "Maximum number of query plans to cache.",
FLAG_IN_RANGE(0, std::numeric_limits<int32_t>::max()));

namespace memgraph::query {
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
PlanWrapper::PlanWrapper(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}

ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> &params,
utils::SkipList<QueryCacheEntry> *cache, const InterpreterConfig::Query &query_config) {
Expand Down Expand Up @@ -127,28 +128,24 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
std::move(symbol_table));
}

std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
DbAccessor *db_accessor,
const std::vector<Identifier *> &predefined_identifiers) {
std::optional<utils::SkipList<PlanCacheEntry>::Accessor> plan_cache_access;
std::shared_ptr<PlanWrapper> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, PlanCacheLRU *plan_cache,
DbAccessor *db_accessor,
const std::vector<Identifier *> &predefined_identifiers) {
if (plan_cache) {
plan_cache_access.emplace(plan_cache->access());
auto it = plan_cache_access->find(hash);
if (it != plan_cache_access->end()) {
if (it->second->IsExpired()) {
plan_cache_access->remove(hash);
} else {
return it->second;
}
auto existing_plan = plan_cache->WithLock([&](auto &cache) { return cache.get(hash); });
if (existing_plan.has_value()) {
return existing_plan.value();
}
}

auto plan = std::make_shared<CachedPlan>(
auto plan = std::make_shared<PlanWrapper>(
MakeLogicalPlan(std::move(ast_storage), query, parameters, db_accessor, predefined_identifiers));
if (plan_cache_access) {
plan_cache_access->insert({hash, plan});

if (plan_cache) {
plan_cache->WithLock([&](auto &cache) { cache.put(hash, plan); });
}

return plan;
}
} // namespace memgraph::query
37 changes: 12 additions & 25 deletions src/query/cypher_query_interpreter.hpp
Expand Up @@ -17,12 +17,14 @@
#include "query/frontend/stripped.hpp"
#include "query/plan/planner.hpp"
#include "utils/flag_validation.hpp"
#include "utils/lru_cache.hpp"
#include "utils/synchronized.hpp"
#include "utils/timer.hpp"

// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(query_cost_planner);
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_int32(query_plan_cache_ttl);
DECLARE_int32(query_plan_cache_max_size);

namespace memgraph::query {

Expand All @@ -45,23 +47,17 @@ class LogicalPlan {
virtual const AstStorage &GetAstStorage() const = 0;
};

class CachedPlan {
class PlanWrapper {
public:
explicit CachedPlan(std::unique_ptr<LogicalPlan> plan);
explicit PlanWrapper(std::unique_ptr<LogicalPlan> plan);

const auto &plan() const { return plan_->GetRoot(); }
double cost() const { return plan_->GetCost(); }
const auto &symbol_table() const { return plan_->GetSymbolTable(); }
const auto &ast_storage() const { return plan_->GetAstStorage(); }

bool IsExpired() const {
// NOLINTNEXTLINE (modernize-use-nullptr)
return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl);
};

private:
DavIvek marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<LogicalPlan> plan_;
utils::Timer cache_timer_;
};

struct CachedQuery {
Expand All @@ -82,18 +78,6 @@ struct QueryCacheEntry {
CachedQuery second;
};

struct PlanCacheEntry {
bool operator==(const PlanCacheEntry &other) const { return first == other.first; }
bool operator<(const PlanCacheEntry &other) const { return first < other.first; }
bool operator==(const uint64_t &other) const { return first == other; }
bool operator<(const uint64_t &other) const { return first < other; }

uint64_t first;
// TODO: Maybe store the query string here and use it as a key with the hash
// so that we eliminate the risk of hash collisions.
std::shared_ptr<CachedPlan> second;
};

/**
* A container for data related to the parsing of a query.
*/
Expand Down Expand Up @@ -129,6 +113,9 @@ class SingleNodeLogicalPlan final : public LogicalPlan {
SymbolTable symbol_table_;
};

using PlanCacheLRU =
utils::Synchronized<utils::LRUCache<uint64_t, std::shared_ptr<query::PlanWrapper>>, utils::RWSpinLock>;

std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters,
DbAccessor *db_accessor,
const std::vector<Identifier *> &predefined_identifiers);
Expand All @@ -141,9 +128,9 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
* If an identifier is contained there, we inject it at that place and remove it,
* because a predefined identifier can be used only in one scope.
*/
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
DbAccessor *db_accessor,
const std::vector<Identifier *> &predefined_identifiers = {});
std::shared_ptr<PlanWrapper> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, PlanCacheLRU *plan_cache,
DbAccessor *db_accessor,
const std::vector<Identifier *> &predefined_identifiers = {});

} // namespace memgraph::query
16 changes: 5 additions & 11 deletions src/query/interpreter.cpp
Expand Up @@ -1203,7 +1203,7 @@ struct TxTimeout {
};

struct PullPlan {
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
explicit PullPlan(std::shared_ptr<PlanWrapper> plan, const Parameters &parameters, bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
std::shared_ptr<utils::AsyncTimer> tx_timer,
Expand All @@ -1216,7 +1216,7 @@ struct PullPlan {
std::map<std::string, TypedValue> *summary);

private:
std::shared_ptr<CachedPlan> plan_ = nullptr;
std::shared_ptr<PlanWrapper> plan_ = nullptr;
plan::UniqueCursorPtr cursor_ = nullptr;
Frame frame_;
ExecutionContext ctx_;
Expand All @@ -1243,7 +1243,7 @@ struct PullPlan {
bool use_monotonic_memory_;
};

PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
std::optional<std::string> username, std::atomic<TransactionStatus> *transaction_status,
std::shared_ptr<utils::AsyncTimer> tx_timer, TriggerContextCollector *trigger_context_collector,
Expand Down Expand Up @@ -2122,10 +2122,7 @@ PreparedQuery PrepareAnalyzeGraphQuery(ParsedQuery parsed_query, bool in_explici

// Creating an index influences computed plan costs.
auto invalidate_plan_cache = [plan_cache = current_db.db_acc_->get()->plan_cache()] {
auto access = plan_cache->access();
for (auto &kv : access) {
access.remove(kv.first);
}
plan_cache->WithLock([&](auto &cache) { cache.reset(); });
};
utils::OnScopeExit cache_invalidator(invalidate_plan_cache);

Expand Down Expand Up @@ -2170,10 +2167,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans

// Creating an index influences computed plan costs.
auto invalidate_plan_cache = [plan_cache = db_acc->plan_cache()] {
auto access = plan_cache->access();
for (auto &kv : access) {
access.remove(kv.first);
}
plan_cache->WithLock([&](auto &cache) { cache.reset(); });
};

auto *storage = db_acc->storage();
Expand Down
2 changes: 1 addition & 1 deletion src/query/trigger.cpp
Expand Up @@ -169,7 +169,7 @@ Trigger::TriggerPlan::TriggerPlan(std::unique_ptr<LogicalPlan> logical_plan, std
std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor,
const query::AuthChecker *auth_checker) const {
std::lock_guard plan_guard{plan_lock_};
if (!parsed_statements_.is_cacheable || !trigger_plan_ || trigger_plan_->cached_plan.IsExpired()) {
if (!parsed_statements_.is_cacheable || !trigger_plan_) {
auto identifiers = GetPredefinedIdentifiers(event_type_);

AstStorage ast_storage;
Expand Down
2 changes: 1 addition & 1 deletion src/query/trigger.hpp
Expand Up @@ -62,7 +62,7 @@ struct Trigger {

explicit TriggerPlan(std::unique_ptr<LogicalPlan> logical_plan, std::vector<IdentifierInfo> identifiers);

CachedPlan cached_plan;
PlanWrapper cached_plan;
std::vector<IdentifierInfo> identifiers;
};
std::shared_ptr<TriggerPlan> GetPlan(DbAccessor *db_accessor, const query::AuthChecker *auth_checker) const;
Expand Down
68 changes: 68 additions & 0 deletions src/utils/lru_cache.hpp
@@ -0,0 +1,68 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

#pragma once

#include <list>
#include <optional>
#include <unordered_map>
#include <utility>

namespace memgraph::utils {

/// A simple LRU cache implementation.
/// It is not thread-safe.

template <class TKey, class TVal>
class LRUCache {
DavIvek marked this conversation as resolved.
Show resolved Hide resolved
public:
LRUCache(int cache_size_) : cache_size(cache_size_){};

void put(const TKey &key, const TVal &val) {
auto it = item_map.find(key);
if (it != item_map.end()) {
item_list.erase(it->second);
item_map.erase(it);
}
DavIvek marked this conversation as resolved.
Show resolved Hide resolved
item_list.push_front(std::make_pair(key, val));
item_map.insert(std::make_pair(key, item_list.begin()));
try_clean();
};
std::optional<TVal> get(const TKey &key) {
if (!exists(key)) {
return std::nullopt;
}
auto it = item_map.find(key);
item_list.splice(item_list.begin(), item_list, it->second);
return it->second->second;
}
void reset() {
item_list.clear();
item_map.clear();
};
std::size_t size() { return item_map.size(); };

private:
void try_clean() {
while (item_map.size() > cache_size) {
auto last_it_elem_it = item_list.end();
last_it_elem_it--;
item_map.erase(last_it_elem_it->first);
item_list.pop_back();
}
};
bool exists(const TKey &key) { return (item_map.count(key) > 0); };

std::list<std::pair<TKey, TVal>> item_list;
std::unordered_map<TKey, decltype(item_list.begin())> item_map;
std::size_t cache_size;
};
} // namespace memgraph::utils
2 changes: 1 addition & 1 deletion tests/e2e/configuration/default_config.py
Expand Up @@ -184,7 +184,7 @@
"Set to true to enable telemetry. We collect information about the running system (CPU and memory information) and information about the database runtime (vertex and edge counts and resource usage) to allow for easier improvement of the product.",
),
"query_cost_planner": ("true", "true", "Use the cost-estimating query planner."),
"query_plan_cache_ttl": ("60", "60", "Time to live for cached query plans, in seconds."),
"query_plan_cache_max_size": ("1000", "1000", "Maximum number of query plans to cache."),
"query_vertex_count_to_expand_existing": (
"10",
"10",
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/CMakeLists.txt
Expand Up @@ -296,6 +296,9 @@ target_link_libraries(${test_prefix}utils_java_string_formatter mg-utils)
add_unit_test(utils_resource_lock.cpp)
target_link_libraries(${test_prefix}utils_resource_lock mg-utils)

add_unit_test(lru_cache.cpp)
target_link_libraries(${test_prefix}lru_cache mg-utils)

# Test mg-storage-v2
add_unit_test(commit_log_v2.cpp)
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)
Expand Down