diff --git a/benchmarks/sqlite_comparison_bench.cpp b/benchmarks/sqlite_comparison_bench.cpp index de8b33a0..eea54155 100644 --- a/benchmarks/sqlite_comparison_bench.cpp +++ b/benchmarks/sqlite_comparison_bench.cpp @@ -108,15 +108,31 @@ struct SQLiteContext { static void BM_CloudSQL_Insert(benchmark::State& state) { CloudSQLContext ctx("./bench_cloudsql_insert_" + std::to_string(state.thread_index())); + // Prepare the statement once outside the hot loop + auto prepared = ctx.executor->prepare("INSERT INTO bench_table VALUES (?, ?, ?);"); + if (!prepared) { + state.SkipWithError("Failed to prepare statement"); + return; + } + + // Pre-allocate params to avoid heap allocations in the loop + std::vector params; + params.reserve(3); + params.push_back(common::Value::make_int64(0)); + params.push_back(common::Value::make_float64(3.14)); + params.push_back(common::Value::make_text("some_payload_data")); + + // Use a single transaction for the whole benchmark to reveal raw engine speed + ctx.executor->execute("BEGIN"); + + int64_t i = 0; for (auto _ : state) { - state.PauseTiming(); - std::string sql = "INSERT INTO bench_table VALUES (" + std::to_string(state.iterations()) + - ", 3.14, 'some_payload_data');"; - auto stmt = ParseSQL(sql); - state.ResumeTiming(); - - ctx.executor->execute(*stmt); + // Update only the changing value + params[0] = common::Value::make_int64(i++); + ctx.executor->execute(*prepared, params); } + + ctx.executor->execute("COMMIT"); state.SetItemsProcessed(state.iterations()); } BENCHMARK(BM_CloudSQL_Insert); diff --git a/include/common/arena_allocator.hpp b/include/common/arena_allocator.hpp new file mode 100644 index 00000000..9884e536 --- /dev/null +++ b/include/common/arena_allocator.hpp @@ -0,0 +1,119 @@ +/** + * @file arena_allocator.hpp + * @brief High-performance bump allocator for execution-scoped data + */ + +#ifndef CLOUDSQL_COMMON_ARENA_ALLOCATOR_HPP +#define CLOUDSQL_COMMON_ARENA_ALLOCATOR_HPP + +#include +#include +#include +#include +#include +#include + +namespace cloudsql::common { + +/** + * @class ArenaAllocator + * @brief Manages memory chunks and provides fast, contiguous allocations. + * + * Implements std::pmr::memory_resource for compatibility with standard + * containers like std::pmr::vector. + */ +class ArenaAllocator : public std::pmr::memory_resource { + public: + static constexpr size_t DEFAULT_CHUNK_SIZE = 65536; // 64KB + + explicit ArenaAllocator(size_t chunk_size = DEFAULT_CHUNK_SIZE) + : chunk_size_(chunk_size), current_chunk_idx_(0), current_offset_(0) {} + + ~ArenaAllocator() override { + for (auto* chunk : chunks_) { + delete[] chunk; + } + } + + // Disable copy + ArenaAllocator(const ArenaAllocator&) = delete; + ArenaAllocator& operator=(const ArenaAllocator&) = delete; + + /** + * @brief Reset the arena, reclaiming all memory for reuse. + * + * Keeps all allocated chunks but resets pointers so they can be overwritten. + * This is an O(1) or O(N_chunks) operation with zero heap overhead. + */ + void reset() { + current_chunk_idx_ = 0; + current_offset_ = 0; + } + + protected: + /** + * @brief Internal allocation logic for PMR + */ + void* do_allocate(size_t bytes, size_t alignment) override { + if (bytes == 0) return nullptr; + + // Align the offset + size_t mask = alignment - 1; + + // Try current chunk + if (current_chunk_idx_ < chunks_.size()) { + size_t aligned_offset = (current_offset_ + mask) & ~mask; + if (aligned_offset + bytes <= chunk_size_) { + void* result = chunks_[current_chunk_idx_] + aligned_offset; + current_offset_ = aligned_offset + bytes; + return result; + } + + // Move to next existing chunk if possible + current_chunk_idx_++; + current_offset_ = 0; + return do_allocate(bytes, alignment); + } + + // Need a new chunk + if (bytes > chunk_size_) { + auto* large_chunk = new uint8_t[bytes]; + chunks_.push_back(large_chunk); + // We don't make this the "current" chunk for small allocations + // to avoid wasting space. We just return it. + return large_chunk; + } + + allocate_new_chunk(); + return do_allocate(bytes, alignment); + } + + /** + * @brief PMR deallocate is a no-op for bump allocators (we reset the whole arena) + */ + void do_deallocate(void* p, size_t bytes, size_t alignment) override { + // No-op + (void)p; + (void)bytes; + (void)alignment; + } + + bool do_is_equal(const std::pmr::memory_resource& other) const noexcept override { + return this == &other; + } + + private: + void allocate_new_chunk() { + chunks_.push_back(new uint8_t[chunk_size_]); + // Don't change current_chunk_idx_ here, let the recursive call handle it + } + + size_t chunk_size_; + std::vector chunks_; + size_t current_chunk_idx_; + size_t current_offset_; +}; + +} // namespace cloudsql::common + +#endif // CLOUDSQL_COMMON_ARENA_ALLOCATOR_HPP diff --git a/include/executor/operator.hpp b/include/executor/operator.hpp index d4607b71..253d5006 100644 --- a/include/executor/operator.hpp +++ b/include/executor/operator.hpp @@ -7,6 +7,7 @@ #define CLOUDSQL_EXECUTOR_OPERATOR_HPP #include +#include #include #include #include @@ -52,6 +53,8 @@ class Operator { std::string error_message_; Transaction* txn_; LockManager* lock_manager_; + std::pmr::memory_resource* mr_ = nullptr; + const std::vector* params_ = nullptr; public: explicit Operator(OperatorType type, Transaction* txn = nullptr, @@ -71,6 +74,14 @@ class Operator { [[nodiscard]] Transaction* get_txn() const { return txn_; } [[nodiscard]] LockManager* get_lock_manager() const { return lock_manager_; } + virtual void set_memory_resource(std::pmr::memory_resource* mr) { mr_ = mr; } + [[nodiscard]] std::pmr::memory_resource* get_memory_resource() const { + return mr_ ? mr_ : std::pmr::get_default_resource(); + } + + virtual void set_params(const std::vector* params) { params_ = params; } + [[nodiscard]] const std::vector* get_params() const { return params_; } + virtual bool init() { return true; } virtual bool open() { return true; } virtual bool next(Tuple& out_tuple) { @@ -191,6 +202,9 @@ class FilterOperator : public Operator { void close() override; [[nodiscard]] Schema& output_schema() override; void add_child(std::unique_ptr child) override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; /** @@ -212,6 +226,9 @@ class ProjectOperator : public Operator { void close() override; [[nodiscard]] Schema& output_schema() override; void add_child(std::unique_ptr child) override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; /** @@ -236,6 +253,9 @@ class SortOperator : public Operator { bool next(Tuple& out_tuple) override; void close() override; [[nodiscard]] Schema& output_schema() override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; /** @@ -270,6 +290,9 @@ class AggregateOperator : public Operator { bool next(Tuple& out_tuple) override; void close() override; [[nodiscard]] Schema& output_schema() override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; /** @@ -319,6 +342,9 @@ class HashJoinOperator : public Operator { void close() override; [[nodiscard]] Schema& output_schema() override; void add_child(std::unique_ptr child) override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; /** @@ -341,6 +367,9 @@ class LimitOperator : public Operator { void close() override; [[nodiscard]] Schema& output_schema() override; void add_child(std::unique_ptr child) override; + + void set_memory_resource(std::pmr::memory_resource* mr) override; + void set_params(const std::vector* params) override; }; } // namespace cloudsql::executor diff --git a/include/executor/query_executor.hpp b/include/executor/query_executor.hpp index 9f55856c..6d560714 100644 --- a/include/executor/query_executor.hpp +++ b/include/executor/query_executor.hpp @@ -6,7 +6,11 @@ #ifndef CLOUDSQL_EXECUTOR_QUERY_EXECUTOR_HPP #define CLOUDSQL_EXECUTOR_QUERY_EXECUTOR_HPP +#include +#include + #include "catalog/catalog.hpp" +#include "common/arena_allocator.hpp" #include "common/cluster_manager.hpp" #include "distributed/raft_types.hpp" #include "executor/operator.hpp" @@ -18,6 +22,20 @@ namespace cloudsql::executor { +/** + * @brief Represents a pre-parsed and pre-planned SQL statement + */ +struct PreparedStatement { + std::shared_ptr stmt; + std::string sql; + + // Cached execution state for hot-path optimization + const TableInfo* table_meta = nullptr; + std::unique_ptr schema; + std::unique_ptr table; + std::vector> indexes; +}; + /** * @brief State machine for a specific data shard */ @@ -62,11 +80,32 @@ class QueryExecutor { */ void set_local_only(bool local) { is_local_only_ = local; } + /** + * @brief Prepare a SQL string into a reusable PreparedStatement + */ + std::shared_ptr prepare(const std::string& sql); + /** * @brief Execute a SQL statement and return results */ QueryResult execute(const parser::Statement& stmt); + /** + * @brief Execute a SQL string (includes parsing and cache lookup) + */ + QueryResult execute(const std::string& sql); + + /** + * @brief Execute a PreparedStatement with bound parameters + */ + QueryResult execute(const PreparedStatement& prepared, + const std::vector& params); + + /** + * @brief Get access to the query-scoped arena + */ + common::ArenaAllocator& arena() { return arena_; } + private: Catalog& catalog_; storage::BufferPoolManager& bpm_; @@ -78,6 +117,16 @@ class QueryExecutor { transaction::Transaction* current_txn_ = nullptr; bool is_local_only_ = false; + // Bound parameters for the current execution + const std::vector* current_params_ = nullptr; + + // Performance structures + common::ArenaAllocator arena_; + + // Global statement cache (thread-safe) + static std::unordered_map> statement_cache_; + static std::mutex cache_mutex_; + QueryResult execute_select(const parser::SelectStatement& stmt, transaction::Transaction* txn); QueryResult execute_create_table(const parser::CreateTableStatement& stmt); QueryResult execute_create_index(const parser::CreateIndexStatement& stmt); diff --git a/include/executor/types.hpp b/include/executor/types.hpp index 3c5e3848..416e1d03 100644 --- a/include/executor/types.hpp +++ b/include/executor/types.hpp @@ -11,7 +11,9 @@ #define CLOUDSQL_EXECUTOR_TYPES_HPP #include +#include #include +#include #include #include #include @@ -120,14 +122,38 @@ class Schema { /** * @brief A single data row used in the row-oriented (Volcano) execution model. + * + * Uses std::pmr::vector to support custom allocators (e.g. ArenaAllocator). */ class Tuple { private: - std::vector values_; + std::pmr::vector values_; public: Tuple() = default; - explicit Tuple(std::vector values) : values_(std::move(values)) {} + + // Explicit PMR vector constructor + explicit Tuple(std::pmr::vector values) : values_(std::move(values)) {} + + // Initializer list constructor + Tuple(std::initializer_list list) : values_(list) {} + + // Support allocation from a custom memory resource + explicit Tuple(std::pmr::memory_resource* mr) + : values_(mr ? mr : std::pmr::get_default_resource()) {} + + // Support construction from standard vector or PMR vector with specific resource + template , Tuple>>, + typename std::enable_if_t< + !std::is_same_v, std::pmr::memory_resource*>>* = nullptr> + Tuple(const VectorType& values, std::pmr::memory_resource* mr = nullptr) + : values_(values.begin(), values.end(), mr ? mr : std::pmr::get_default_resource()) {} + + template , Tuple>>> + explicit Tuple(VectorType&& values) + : values_(std::make_move_iterator(values.begin()), std::make_move_iterator(values.end())) {} Tuple(const Tuple& other) = default; Tuple(Tuple&& other) noexcept = default; @@ -159,8 +185,8 @@ class Tuple { [[nodiscard]] size_t size() const { return values_.size(); } [[nodiscard]] bool empty() const { return values_.empty(); } - [[nodiscard]] const std::vector& values() const { return values_; } - [[nodiscard]] std::vector& values() { return values_; } + [[nodiscard]] const std::pmr::vector& values() const { return values_; } + [[nodiscard]] std::pmr::vector& values() { return values_; } [[nodiscard]] std::string to_string() const; }; diff --git a/include/parser/expression.hpp b/include/parser/expression.hpp index 5776ba29..acf662ad 100644 --- a/include/parser/expression.hpp +++ b/include/parser/expression.hpp @@ -35,7 +35,8 @@ enum class ExprType : uint8_t { In, Like, Between, - IsNull + IsNull, + Parameter }; /** @@ -57,10 +58,11 @@ class Expression { [[nodiscard]] virtual ExprType type() const = 0; /** - * @brief Evaluate expression against an optional tuple context + * @brief Evaluate expression against an optional tuple context and bound parameters */ [[nodiscard]] virtual common::Value evaluate( - const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr) const = 0; + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const = 0; /** * @brief Evaluate expression against a batch of data (Vectorized) @@ -87,8 +89,9 @@ class BinaryExpr : public Expression { : left_(std::move(left)), op_(op), right_(std::move(right)) {} [[nodiscard]] ExprType type() const override { return ExprType::Binary; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -111,8 +114,9 @@ class UnaryExpr : public Expression { UnaryExpr(TokenType op, std::unique_ptr expr) : op_(op), expr_(std::move(expr)) {} [[nodiscard]] ExprType type() const override { return ExprType::Unary; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -133,8 +137,9 @@ class ColumnExpr : public Expression { : table_name_(std::move(table)), name_(std::move(name)) {} [[nodiscard]] ExprType type() const override { return ExprType::Column; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -156,8 +161,9 @@ class ConstantExpr : public Expression { explicit ConstantExpr(common::Value val) : value_(std::move(val)) {} [[nodiscard]] ExprType type() const override { return ExprType::Constant; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -166,6 +172,28 @@ class ConstantExpr : public Expression { [[nodiscard]] const common::Value& value() const { return value_; } }; +/** + * @brief Bound parameter expression (e.g. ?) + */ +class ParameterExpr : public Expression { + private: + uint32_t index_; + + public: + explicit ParameterExpr(uint32_t index) : index_(index) {} + + [[nodiscard]] ExprType type() const override { return ExprType::Parameter; } + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; + void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, + executor::ColumnVector& result) const override; + [[nodiscard]] std::string to_string() const override; + [[nodiscard]] std::unique_ptr clone() const override; + + [[nodiscard]] uint32_t index() const { return index_; } +}; + /** * @brief Scalar function or aggregate expression */ @@ -179,8 +207,9 @@ class FunctionExpr : public Expression { explicit FunctionExpr(std::string name) : func_name_(std::move(name)) {} [[nodiscard]] ExprType type() const override { return ExprType::Function; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -208,8 +237,9 @@ class InExpr : public Expression { : column_(std::move(col)), values_(std::move(vals)), not_flag_(is_not) {} [[nodiscard]] ExprType type() const override { return ExprType::In; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; @@ -229,8 +259,9 @@ class IsNullExpr : public Expression { : expr_(std::move(expr)), not_flag_(not_flag) {} [[nodiscard]] ExprType type() const override { return ExprType::IsNull; } - [[nodiscard]] common::Value evaluate(const executor::Tuple* tuple = nullptr, - const executor::Schema* schema = nullptr) const override; + [[nodiscard]] common::Value evaluate( + const executor::Tuple* tuple = nullptr, const executor::Schema* schema = nullptr, + const std::vector* params = nullptr) const override; void evaluate_vectorized(const executor::VectorBatch& batch, const executor::Schema& schema, executor::ColumnVector& result) const override; [[nodiscard]] std::string to_string() const override; diff --git a/include/parser/parser.hpp b/include/parser/parser.hpp index e8ead47e..53cacfad 100644 --- a/include/parser/parser.hpp +++ b/include/parser/parser.hpp @@ -18,6 +18,7 @@ class Parser { std::unique_ptr lexer_; Token current_token_; bool has_current_ = false; + uint32_t param_count_ = 0; Token next_token(); Token peek_token(); diff --git a/include/storage/buffer_pool_manager.hpp b/include/storage/buffer_pool_manager.hpp index 8481cc67..2386149b 100644 --- a/include/storage/buffer_pool_manager.hpp +++ b/include/storage/buffer_pool_manager.hpp @@ -77,7 +77,7 @@ class BufferPoolManager { * @param[out] page_id Output param for the id of the created page * @return Pointer to the new Page, or nullptr if cannot be created */ - Page* new_page(const std::string& file_name, const uint32_t* page_id); + Page* new_page(const std::string& file_name, uint32_t* page_id); /** * @brief Delete a page @@ -102,12 +102,22 @@ class BufferPoolManager { [[nodiscard]] recovery::LogManager* get_log_manager() const { return log_manager_; } private: - /** - * @brief Generates a unique string key for file and page mapping - */ - static std::string make_page_key(const std::string& file_name, uint32_t page_id) { - return file_name + "_" + std::to_string(page_id); - } + struct PageKey { + uint32_t file_id; + uint32_t page_id; + + bool operator==(const PageKey& other) const { + return file_id == other.file_id && page_id == other.page_id; + } + + struct Hash { + std::size_t operator()(const PageKey& key) const { + return (static_cast(key.file_id) << 32) | static_cast(key.page_id); + } + }; + }; + + uint32_t get_file_id(const std::string& file_name); size_t pool_size_; StorageManager& storage_manager_; @@ -126,7 +136,11 @@ class BufferPoolManager { std::list free_list_; // Maps page keys (file+pageId) to frame IDs - std::unordered_map page_table_; + std::unordered_map page_table_; + + // Mapping from file name to internal file_id to avoid string keys in page_table_ + std::unordered_map file_id_map_; + uint32_t next_file_id_ = 1; }; } // namespace cloudsql::storage diff --git a/include/storage/heap_table.hpp b/include/storage/heap_table.hpp index 5890885f..2922ae05 100644 --- a/include/storage/heap_table.hpp +++ b/include/storage/heap_table.hpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -50,6 +51,13 @@ class HeapTable { bool operator==(const TupleId& other) const { return page_num == other.page_num && slot_num == other.slot_num; } + + struct Hash { + std::size_t operator()(const TupleId& tid) const { + return (static_cast(tid.page_num) << 16) ^ + static_cast(tid.slot_num); + } + }; }; /** @@ -89,13 +97,19 @@ class HeapTable { class Iterator { private: HeapTable& table_; - TupleId next_id_; /**< ID of the next record to be checked */ - TupleId last_id_; /**< ID of the record returned by the last next() call */ - bool eof_ = false; /**< End-of-file indicator */ + TupleId next_id_; /**< ID of the next record to be checked */ + TupleId last_id_; /**< ID of the record returned by the last next() call */ + bool eof_ = false; /**< End-of-file indicator */ + std::pmr::memory_resource* mr_; /**< Memory resource for tuple allocations */ public: - explicit Iterator(HeapTable& table); + explicit Iterator(HeapTable& table, std::pmr::memory_resource* mr = nullptr); + ~Iterator() = default; + Iterator(const Iterator&) = default; + Iterator& operator=(const Iterator&) = default; + Iterator(Iterator&&) noexcept = default; + Iterator& operator=(Iterator&&) noexcept = default; /** * @brief Fetches the next non-deleted record from the heap * @param[out] out_tuple Container for the retrieved record @@ -124,6 +138,10 @@ class HeapTable { executor::Schema schema_; uint32_t last_page_id_ = 0; + // Last page cache for fast insertions + Page* cached_page_ = nullptr; + uint32_t cached_page_id_ = 0xFFFFFFFF; + public: /** * @brief Constructor @@ -133,7 +151,7 @@ class HeapTable { */ HeapTable(std::string table_name, BufferPoolManager& bpm, executor::Schema schema); - ~HeapTable() = default; + ~HeapTable(); /* Disable copy semantics */ HeapTable(const HeapTable&) = delete; @@ -202,7 +220,9 @@ class HeapTable { [[nodiscard]] uint64_t tuple_count() const; /** @return An iterator starting at the first page */ - [[nodiscard]] Iterator scan() { return Iterator(*this); } + [[nodiscard]] Iterator scan(std::pmr::memory_resource* mr = nullptr) { + return Iterator(*this, mr); + } /** @brief Initializes the physical heap file */ bool create(); diff --git a/include/transaction/lock_manager.hpp b/include/transaction/lock_manager.hpp index 778ed3e4..698b8d60 100644 --- a/include/transaction/lock_manager.hpp +++ b/include/transaction/lock_manager.hpp @@ -33,7 +33,8 @@ class LockManager { }; std::mutex latch_; - std::unordered_map lock_table_; // RID -> LockQueue + std::unordered_map + lock_table_; // RID -> LockQueue public: LockManager() = default; @@ -48,17 +49,17 @@ class LockManager { /** * @brief Acquire a shared (read) lock on a tuple */ - bool acquire_shared(Transaction* txn, const std::string& rid); + bool acquire_shared(Transaction* txn, const storage::HeapTable::TupleId& rid); /** * @brief Acquire an exclusive (write) lock on a tuple */ - bool acquire_exclusive(Transaction* txn, const std::string& rid); + bool acquire_exclusive(Transaction* txn, const storage::HeapTable::TupleId& rid); /** - * @brief Unlock a tuple + * @brief Release a lock held by a transaction */ - bool unlock(Transaction* txn, const std::string& rid); + bool unlock(Transaction* txn, const storage::HeapTable::TupleId& rid); }; } // namespace cloudsql::transaction diff --git a/include/transaction/transaction.hpp b/include/transaction/transaction.hpp index 68bca41f..34f11780 100644 --- a/include/transaction/transaction.hpp +++ b/include/transaction/transaction.hpp @@ -73,8 +73,10 @@ class Transaction { // Locks held by this transaction (for auto-release on commit/abort) std::mutex lock_set_mutex_; - std::unordered_set shared_locks_; // RID string - std::unordered_set exclusive_locks_; + std::unordered_set + shared_locks_; + std::unordered_set + exclusive_locks_; // Changes to undo on rollback std::vector undo_logs_; @@ -102,21 +104,23 @@ class Transaction { [[nodiscard]] int32_t get_prev_lsn() const { return prev_lsn_; } void set_prev_lsn(int32_t lsn) { prev_lsn_ = lsn; } - void add_shared_lock(const std::string& rid) { + void add_shared_lock(const storage::HeapTable::TupleId& rid) { const std::scoped_lock lock(lock_set_mutex_); shared_locks_.insert(rid); } - void add_exclusive_lock(const std::string& rid) { + void add_exclusive_lock(const storage::HeapTable::TupleId& rid) { const std::scoped_lock lock(lock_set_mutex_); exclusive_locks_.insert(rid); } - [[nodiscard]] std::unordered_set get_shared_lock_set() { + [[nodiscard]] std::unordered_set + get_shared_lock_set() { const std::scoped_lock lock(lock_set_mutex_); return shared_locks_; } - [[nodiscard]] std::unordered_set get_exclusive_lock_set() { + [[nodiscard]] std::unordered_set + get_exclusive_lock_set() { const std::scoped_lock lock(lock_set_mutex_); return exclusive_locks_; } diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index 508be1a9..85219077 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -36,7 +36,6 @@ bool Catalog::load(const std::string& filename) { (void)database_; // Use instance member to satisfy linter std::ifstream file(filename); if (!file.is_open()) { - std::cerr << "Cannot open catalog file: " << filename << "\n"; return false; } // Simplified - just read database name @@ -58,7 +57,6 @@ bool Catalog::save(const std::string& filename) const { (void)database_; // Use instance member to satisfy linter std::ofstream file(filename); if (!file.is_open()) { - std::cerr << "Cannot open catalog file for writing: " << filename << "\n"; return false; } file << "# System Catalog\n"; @@ -71,8 +69,6 @@ bool Catalog::save(const std::string& filename) const { * @brief Create a new table */ oid_t Catalog::create_table(const std::string& table_name, std::vector columns) { - std::cerr << "--- [Catalog] create_table CALLED for " << table_name << " ---" << std::endl; - // Compute shards from ClusterManager for serialization std::vector shards; if (cluster_manager_ != nullptr) { @@ -179,9 +175,6 @@ oid_t Catalog::create_table_local(const std::string& table_name, std::vectorshards.push_back(shard); } - std::cerr << "--- [Catalog] Table " << table_name << " initialized with " - << table->shards.size() << " shards ---" << std::endl; - const oid_t id = table->table_id; tables_[id] = std::move(table); version_++; @@ -217,8 +210,6 @@ bool Catalog::drop_table_local(oid_t table_id) { void Catalog::apply(const raft::LogEntry& entry) { if (entry.data.empty()) return; - std::cerr << "--- [Catalog] apply CALLED for entry type " << (int)entry.data[0] << " ---" - << std::endl; uint8_t type = entry.data[0]; if (type == 1) { // CreateTable @@ -300,11 +291,8 @@ std::optional Catalog::get_table_by_name(const std::string& table_na } } - std::cerr << "--- [Catalog] Table NOT FOUND: " << table_name << ". Catalog contains: "; for (auto& pair : tables_) { - std::cerr << pair.second->name << ", "; } - std::cerr << " ---" << std::endl; return std::nullopt; } diff --git a/src/executor/operator.cpp b/src/executor/operator.cpp index 1708e84a..2f67b092 100644 --- a/src/executor/operator.cpp +++ b/src/executor/operator.cpp @@ -48,7 +48,7 @@ bool SeqScanOperator::init() { bool SeqScanOperator::open() { set_state(ExecState::Open); - iterator_ = std::make_unique(table_->scan()); + iterator_ = std::make_unique(table_->scan(get_memory_resource())); return true; } @@ -237,7 +237,7 @@ bool FilterOperator::next(Tuple& out_tuple) { Tuple tuple; while (child_->next(tuple)) { /* Evaluate condition against the current tuple context */ - const common::Value result = condition_->evaluate(&tuple, &schema_); + const common::Value result = condition_->evaluate(&tuple, &schema_, get_params()); if (result.as_bool()) { out_tuple = std::move(tuple); return true; @@ -260,6 +260,16 @@ void FilterOperator::add_child(std::unique_ptr child) { child_ = std::move(child); } +void FilterOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (child_) child_->set_memory_resource(mr); +} + +void FilterOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (child_) child_->set_params(params); +} + /* --- ProjectOperator --- */ ProjectOperator::ProjectOperator(std::unique_ptr child, @@ -306,12 +316,12 @@ bool ProjectOperator::next(Tuple& out_tuple) { return false; } - std::vector output_values; + std::pmr::vector output_values(get_memory_resource()); output_values.reserve(columns_.size()); auto input_schema = child_->output_schema(); for (const auto& col : columns_) { /* Evaluate projection expression with input tuple context */ - common::Value v = col->evaluate(&input, &input_schema); + common::Value v = col->evaluate(&input, &input_schema, get_params()); output_values.push_back(std::move(v)); } out_tuple = Tuple(std::move(output_values)); @@ -331,6 +341,17 @@ void ProjectOperator::add_child(std::unique_ptr child) { child_ = std::move(child); } +/* Override propagation for ProjectOperator */ +void ProjectOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (child_) child_->set_memory_resource(mr); +} + +void ProjectOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (child_) child_->set_params(params); +} + /* --- SortOperator --- */ SortOperator::SortOperator(std::unique_ptr child, @@ -361,21 +382,21 @@ bool SortOperator::open() { } /* Perform sort using child schema for evaluation */ - std::stable_sort(sorted_tuples_.begin(), sorted_tuples_.end(), - [this](const Tuple& a, const Tuple& b) { - for (size_t i = 0; i < sort_keys_.size(); ++i) { - const common::Value val_a = sort_keys_[i]->evaluate(&a, &schema_); - const common::Value val_b = sort_keys_[i]->evaluate(&b, &schema_); - const bool asc = ascending_[i]; - if (val_a < val_b) { - return asc; - } - if (val_b < val_a) { - return !asc; - } - } - return false; - }); + std::stable_sort( + sorted_tuples_.begin(), sorted_tuples_.end(), [this](const Tuple& a, const Tuple& b) { + for (size_t i = 0; i < sort_keys_.size(); ++i) { + const common::Value val_a = sort_keys_[i]->evaluate(&a, &schema_, get_params()); + const common::Value val_b = sort_keys_[i]->evaluate(&b, &schema_, get_params()); + const bool asc = ascending_[i]; + if (val_a < val_b) { + return asc; + } + if (val_b < val_a) { + return !asc; + } + } + return false; + }); current_index_ = 0; set_state(ExecState::Open); @@ -401,6 +422,17 @@ Schema& SortOperator::output_schema() { return schema_; } +/* Override propagation for SortOperator */ +void SortOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (child_) child_->set_memory_resource(mr); +} + +void SortOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (child_) child_->set_params(params); +} + /* --- AggregateOperator --- */ AggregateOperator::AggregateOperator(std::unique_ptr child, @@ -470,7 +502,8 @@ bool AggregateOperator::open() { if (!is_global) { key = ""; for (const auto& gb : group_by_) { - auto val = gb ? gb->evaluate(&tuple, &child_schema) : common::Value::make_null(); + auto val = gb ? gb->evaluate(&tuple, &child_schema, get_params()) + : common::Value::make_null(); key += val.to_string() + "|"; gb_vals.push_back(std::move(val)); } @@ -486,7 +519,7 @@ bool AggregateOperator::open() { for (size_t i = 0; i < aggregates_.size(); ++i) { common::Value val; if (aggregates_[i].expr) { - val = aggregates_[i].expr->evaluate(&tuple, &child_schema); + val = aggregates_[i].expr->evaluate(&tuple, &child_schema, get_params()); } else { val = common::Value::make_int64(static_cast(1)); } @@ -578,6 +611,17 @@ Schema& AggregateOperator::output_schema() { return schema_; } +/* Override propagation for AggregateOperator */ +void AggregateOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (child_) child_->set_memory_resource(mr); +} + +void AggregateOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (child_) child_->set_params(params); +} + /* --- HashJoinOperator --- */ HashJoinOperator::HashJoinOperator(std::unique_ptr left, std::unique_ptr right, @@ -623,7 +667,7 @@ bool HashJoinOperator::open() { Tuple right_tuple; auto right_schema = right_->output_schema(); while (right_->next(right_tuple)) { - const common::Value key = right_key_->evaluate(&right_tuple, &right_schema); + const common::Value key = right_key_->evaluate(&right_tuple, &right_schema, get_params()); hash_table_.emplace(key.to_string(), BuildTuple{std::move(right_tuple), false}); } @@ -645,7 +689,9 @@ bool HashJoinOperator::next(Tuple& out_tuple) { if (iter_state.current != iter_state.end) { auto& build_tuple = iter_state.current->second; const auto& right_tuple = build_tuple.tuple; - std::vector joined_values = left_tuple_->values(); + std::pmr::vector joined_values(left_tuple_->values().begin(), + left_tuple_->values().end(), + get_memory_resource()); joined_values.insert(joined_values.end(), right_tuple.values().begin(), right_tuple.values().end()); @@ -661,7 +707,9 @@ bool HashJoinOperator::next(Tuple& out_tuple) { match_iter_ = std::nullopt; if ((join_type_ == JoinType::Left || join_type_ == JoinType::Full) && !left_had_match_) { - std::vector joined_values = left_tuple_->values(); + std::pmr::vector joined_values(left_tuple_->values().begin(), + left_tuple_->values().end(), + get_memory_resource()); for (size_t i = 0; i < right_schema.column_count(); ++i) { joined_values.push_back(common::Value::make_null()); } @@ -677,7 +725,8 @@ bool HashJoinOperator::next(Tuple& out_tuple) { if (left_->next(next_left)) { left_tuple_ = std::move(next_left); left_had_match_ = false; - const common::Value key = left_key_->evaluate(&(left_tuple_.value()), &left_schema); + const common::Value key = + left_key_->evaluate(&(left_tuple_.value()), &left_schema, get_params()); /* Look up in hash table */ auto range = hash_table_.equal_range(key.to_string()); @@ -685,7 +734,9 @@ bool HashJoinOperator::next(Tuple& out_tuple) { match_iter_ = {range.first, range.second}; } else if (join_type_ == JoinType::Left || join_type_ == JoinType::Full) { /* No match found immediately, emit NULLs if Left/Full join */ - std::vector joined_values = left_tuple_->values(); + std::pmr::vector joined_values(left_tuple_->values().begin(), + left_tuple_->values().end(), + get_memory_resource()); for (size_t i = 0; i < right_schema.column_count(); ++i) { joined_values.push_back(common::Value::make_null()); } @@ -708,7 +759,7 @@ bool HashJoinOperator::next(Tuple& out_tuple) { auto& it = right_idx_iter_.value(); while (it != hash_table_.end()) { if (!it->second.matched) { - std::vector joined_values; + std::pmr::vector joined_values(get_memory_resource()); for (size_t i = 0; i < left_schema.column_count(); ++i) { joined_values.push_back(common::Value::make_null()); } @@ -749,6 +800,19 @@ void HashJoinOperator::add_child(std::unique_ptr child) { } } +/* Override propagation for HashJoinOperator */ +void HashJoinOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (left_) left_->set_memory_resource(mr); + if (right_) right_->set_memory_resource(mr); +} + +void HashJoinOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (left_) left_->set_params(params); + if (right_) right_->set_params(params); +} + /* --- LimitOperator --- */ LimitOperator::LimitOperator(std::unique_ptr child, int64_t limit, int64_t offset) @@ -808,4 +872,15 @@ void LimitOperator::add_child(std::unique_ptr child) { child_ = std::move(child); } +/* Override propagation for LimitOperator */ +void LimitOperator::set_memory_resource(std::pmr::memory_resource* mr) { + Operator::set_memory_resource(mr); + if (child_) child_->set_memory_resource(mr); +} + +void LimitOperator::set_params(const std::vector* params) { + Operator::set_params(params); + if (child_) child_->set_params(params); +} + } // namespace cloudsql::executor diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 68657296..af17fb35 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -28,6 +28,8 @@ #include "executor/types.hpp" #include "network/rpc_message.hpp" #include "parser/expression.hpp" +#include "parser/lexer.hpp" +#include "parser/parser.hpp" #include "parser/statement.hpp" #include "parser/token.hpp" #include "recovery/log_manager.hpp" @@ -41,6 +43,10 @@ namespace cloudsql::executor { +// Define static members for statement cache +std::unordered_map> QueryExecutor::statement_cache_; +std::mutex QueryExecutor::cache_mutex_; + namespace { enum class IndexOp { Insert, Remove }; @@ -124,6 +130,150 @@ QueryExecutor::~QueryExecutor() { } } +std::shared_ptr QueryExecutor::prepare(const std::string& sql) { + auto lexer = std::make_unique(sql); + parser::Parser parser(std::move(lexer)); + auto stmt = parser.parse_statement(); + if (!stmt) return nullptr; + + auto prepared = std::make_shared(); + prepared->stmt = std::shared_ptr(stmt.release()); + prepared->sql = sql; + + // Cache metadata for INSERT fast-path + if (prepared->stmt->type() == parser::StmtType::Insert) { + const auto& insert_stmt = dynamic_cast(*prepared->stmt); + if (insert_stmt.table()) { + const std::string table_name = insert_stmt.table()->to_string(); + auto table_meta_opt = catalog_.get_table_by_name(table_name); + if (table_meta_opt.has_value()) { + prepared->table_meta = table_meta_opt.value(); + prepared->schema = std::make_unique(); + for (const auto& col : prepared->table_meta->columns) { + prepared->schema->add_column(col.name, col.type); + } + prepared->table = + std::make_unique(table_name, bpm_, *prepared->schema); + + // Cache B-tree index objects + for (const auto& idx_info : prepared->table_meta->indexes) { + if (!idx_info.column_positions.empty()) { + uint16_t pos = idx_info.column_positions[0]; + common::ValueType ktype = prepared->table_meta->columns[pos].type; + prepared->indexes.push_back( + std::make_unique(idx_info.name, bpm_, ktype)); + } + } + } + } + } + + return prepared; +} + +QueryResult QueryExecutor::execute(const PreparedStatement& prepared, + const std::vector& params) { + // Fast-path for INSERT + if (prepared.stmt->type() == parser::StmtType::Insert && prepared.table) { + const auto start = std::chrono::high_resolution_clock::now(); + QueryResult result; + current_params_ = ¶ms; + + const bool is_auto_commit = (current_txn_ == nullptr); + transaction::Transaction* txn = current_txn_; + if (is_auto_commit) txn = transaction_manager_.begin(); + + try { + const auto& insert_stmt = dynamic_cast(*prepared.stmt); + uint64_t rows_inserted = 0; + const uint64_t xmin = (txn != nullptr) ? txn->get_id() : 0; + + for (const auto& row_exprs : insert_stmt.values()) { + std::pmr::vector values(&arena_); + values.reserve(row_exprs.size()); + for (const auto& expr : row_exprs) { + values.push_back(expr->evaluate(nullptr, nullptr, current_params_)); + } + + const Tuple tuple(std::move(values)); + const auto tid = prepared.table->insert(tuple, xmin); + + // Index updates using cached index objects + std::string err; + size_t cached_idx_ptr = 0; + for (const auto& idx_info : prepared.table_meta->indexes) { + if (!idx_info.column_positions.empty()) { + uint16_t pos = idx_info.column_positions[0]; + if (!apply_index_write(*prepared.indexes[cached_idx_ptr++], tuple.get(pos), + tid, IndexOp::Insert, err)) { + throw std::runtime_error(err); + } + } + } + + if (txn != nullptr) { + txn->add_undo_log(transaction::UndoLog::Type::INSERT, prepared.table_meta->name, + tid); + if (!lock_manager_.acquire_exclusive(txn, tid)) { + throw std::runtime_error("Failed to acquire exclusive lock"); + } + } + rows_inserted++; + } + + if (is_auto_commit && txn != nullptr) transaction_manager_.commit(txn); + result.set_rows_affected(rows_inserted); + } catch (const std::exception& e) { + if (is_auto_commit && txn != nullptr) transaction_manager_.abort(txn); + result.set_error(std::string("Execution error: ") + e.what()); + } + + current_params_ = nullptr; + const auto end = std::chrono::high_resolution_clock::now(); + result.set_execution_time( + std::chrono::duration_cast(end - start).count()); + arena_.reset(); + return result; + } + + // Fallback for other statement types + current_params_ = ¶ms; + QueryResult res = execute(*(prepared.stmt)); + current_params_ = nullptr; + return res; +} + +QueryResult QueryExecutor::execute(const std::string& sql) { + std::shared_ptr stmt = nullptr; + + { + std::lock_guard lock(cache_mutex_); + auto it = statement_cache_.find(sql); + if (it != statement_cache_.end()) { + stmt = it->second; + } + } + + if (!stmt) { + auto lexer = std::make_unique(sql); + parser::Parser parser(std::move(lexer)); + auto parsed_stmt = parser.parse_statement(); + if (parsed_stmt) { + stmt = std::shared_ptr(parsed_stmt.release()); + std::lock_guard lock(cache_mutex_); + statement_cache_[sql] = stmt; + } + } + + if (!stmt) { + QueryResult res; + res.set_error("Failed to parse SQL statement"); + return res; + } + + return execute(*stmt); +} + QueryResult QueryExecutor::execute(const parser::Statement& stmt) { const auto start = std::chrono::high_resolution_clock::now(); QueryResult result; @@ -190,6 +340,9 @@ QueryResult QueryExecutor::execute(const parser::Statement& stmt) { const auto duration = std::chrono::duration_cast(end - start); result.set_execution_time(static_cast(duration.count())); + // Reset arena for the next query to reclaim zero-allocation memory + arena_.reset(); + return result; } @@ -237,6 +390,8 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, } /* Initialize and open operators */ + root->set_memory_resource(&arena_); + root->set_params(current_params_); if (!root->init() || !root->open()) { result.set_error(root->error().empty() ? "Failed to open execution plan" : root->error()); return result; @@ -248,7 +403,8 @@ QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, /* Pull tuples (Volcano model) */ Tuple tuple; while (root->next(tuple)) { - result.add_row(std::move(tuple)); + // MUST deep-copy tuple to default allocator (heap) so it outlives the arena reset + result.add_row(Tuple(tuple.values(), nullptr)); } root->close(); @@ -412,10 +568,12 @@ QueryResult QueryExecutor::execute_insert(const parser::InsertStatement& stmt, const uint64_t xmin = (txn != nullptr) ? txn->get_id() : 0; for (const auto& row_exprs : stmt.values()) { - std::vector values; + // Zero-allocation vector construction via Arena + std::pmr::vector values(&arena_); values.reserve(row_exprs.size()); for (const auto& expr : row_exprs) { - values.push_back(expr->evaluate()); + // Include bound parameters in expression evaluation + values.push_back(expr->evaluate(nullptr, nullptr, current_params_)); } const Tuple tuple(std::move(values)); @@ -431,8 +589,6 @@ QueryResult QueryExecutor::execute_insert(const parser::InsertStatement& stmt, if (shard_info_opt.has_value()) { const auto& shard_info = shard_info_opt.value(); - std::cerr << "--- [QueryExecutor] Routing tuple to data node " - << shard_info.node_address << " ---" << std::endl; network::RpcClient client(shard_info.node_address, shard_info.port); if (client.connect()) { network::ExecuteFragmentArgs args; @@ -483,8 +639,7 @@ QueryResult QueryExecutor::execute_insert(const parser::InsertStatement& stmt, /* Record undo log and Acquire Exclusive Lock if in transaction */ if (txn != nullptr) { txn->add_undo_log(transaction::UndoLog::Type::INSERT, table_name, tid); - if (!lock_manager_.acquire_exclusive( - txn, std::to_string(tid.page_num) + ":" + std::to_string(tid.slot_num))) { + if (!lock_manager_.acquire_exclusive(txn, tid)) { throw std::runtime_error("Failed to acquire exclusive lock"); } } @@ -523,7 +678,8 @@ QueryResult QueryExecutor::execute_delete(const parser::DeleteStatement& stmt, while (iter.next_meta(meta)) { bool match = true; if (stmt.where()) { - match = stmt.where()->evaluate(&meta.tuple, &schema).as_bool(); + // Support parameters in DELETE WHERE + match = stmt.where()->evaluate(&meta.tuple, &schema, current_params_).as_bool(); } if (match && meta.xmax == 0) { @@ -632,7 +788,7 @@ QueryResult QueryExecutor::execute_update(const parser::UpdateStatement& stmt, while (iter.next_meta(meta)) { bool match = true; if (stmt.where()) { - match = stmt.where()->evaluate(&meta.tuple, &schema).as_bool(); + match = stmt.where()->evaluate(&meta.tuple, &schema, current_params_).as_bool(); } if (match && meta.xmax == 0) { @@ -642,7 +798,7 @@ QueryResult QueryExecutor::execute_update(const parser::UpdateStatement& stmt, const std::string col_name = col_expr->to_string(); const size_t idx = schema.find_column(col_name); if (idx != static_cast(-1)) { - new_tuple.set(idx, val_expr->evaluate(&meta.tuple, &schema)); + new_tuple.set(idx, val_expr->evaluate(&meta.tuple, &schema, current_params_)); } } updates.push_back({iter.current_id(), meta.tuple, std::move(new_tuple)}); @@ -734,9 +890,6 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen } } - std::cerr << "--- [BuildPlan] Table " << base_table_name - << " found in SHUFFLE buffer. Schema size=" << buffer_schema.column_count() - << " ---" << std::endl; current_root = std::make_unique( context_id_, base_table_name, std::move(data), std::move(buffer_schema)); } else { @@ -767,12 +920,12 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen if (bin_expr->left().type() == parser::ExprType::Column && bin_expr->right().type() == parser::ExprType::Constant) { col_name = bin_expr->left().to_string(); - const_val = bin_expr->right().evaluate(); + const_val = bin_expr->right().evaluate(nullptr, nullptr, current_params_); eligible = true; } else if (bin_expr->right().type() == parser::ExprType::Column && bin_expr->left().type() == parser::ExprType::Constant) { col_name = bin_expr->right().to_string(); - const_val = bin_expr->left().evaluate(); + const_val = bin_expr->left().evaluate(nullptr, nullptr, current_params_); eligible = true; } @@ -803,15 +956,16 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen if (!index_used) { current_root = std::make_unique( - std::make_unique(base_table_name, bpm_, base_schema), txn, + std::make_shared(base_table_name, bpm_, base_schema), txn, &lock_manager_); } } if (!current_root) return nullptr; - std::cerr << "--- [BuildPlan] Base root schema size=" - << current_root->output_schema().column_count() << " ---" << std::endl; + // Propagate memory resource and bound parameters to the operator tree + current_root->set_memory_resource(&arena_); + current_root->set_params(current_params_); /* 2. Add JOINs */ for (const auto& join : stmt.joins()) { @@ -831,9 +985,6 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen } } - std::cerr << "--- [BuildPlan] JOIN Table " << join_table_name - << " found in SHUFFLE buffer. Schema size=" << buffer_schema.column_count() - << " ---" << std::endl; join_scan = std::make_unique( context_id_, join_table_name, std::move(data), std::move(buffer_schema)); } else { @@ -849,13 +1000,13 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen } join_scan = std::make_unique( - std::make_unique(join_table_name, bpm_, join_schema), txn, + std::make_shared(join_table_name, bpm_, join_schema), txn, &lock_manager_); - std::cerr << "--- [BuildPlan] JOIN Table " << join_table_name - << " from LOCAL. Schema size=" << join_scan->output_schema().column_count() - << " ---" << std::endl; } + join_scan->set_memory_resource(&arena_); + join_scan->set_params(current_params_); + bool use_hash_join = false; std::unique_ptr left_key = nullptr; std::unique_ptr right_key = nullptr; @@ -904,11 +1055,12 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen exec_join_type = executor::JoinType::Full; } - current_root = std::make_unique( + auto join_op = std::make_unique( std::move(current_root), std::move(join_scan), std::move(left_key), std::move(right_key), exec_join_type); - std::cerr << "--- [BuildPlan] Added HashJoin. Combined schema size=" - << current_root->output_schema().column_count() << " ---" << std::endl; + join_op->set_memory_resource(&arena_); + join_op->set_params(current_params_); + current_root = std::move(join_op); } else { /* TODO: Implement NestedLoopJoin for non-equality or missing conditions */ return nullptr; @@ -917,8 +1069,11 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen /* 3. Filter (WHERE) - Only if not already handled by IndexScan */ if (stmt.where()) { - current_root = + auto filter_op = std::make_unique(std::move(current_root), stmt.where()->clone()); + filter_op->set_memory_resource(&arena_); + filter_op->set_params(current_params_); + current_root = std::move(filter_op); } /* 3. Aggregate (GROUP BY or implicit aggregates) */ @@ -971,13 +1126,19 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen for (const auto& gb : stmt.group_by()) { group_by.push_back(gb->clone()); } - current_root = std::make_unique(std::move(current_root), - std::move(group_by), std::move(aggs)); + auto agg_op = std::make_unique(std::move(current_root), + std::move(group_by), std::move(aggs)); + agg_op->set_memory_resource(&arena_); + agg_op->set_params(current_params_); + current_root = std::move(agg_op); /* 3.5. Having */ if (stmt.having()) { - current_root = + auto having_filter = std::make_unique(std::move(current_root), stmt.having()->clone()); + having_filter->set_memory_resource(&arena_); + having_filter->set_params(current_params_); + current_root = std::move(having_filter); } } @@ -989,8 +1150,11 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen sort_keys.push_back(ob->clone()); ascending.push_back(true); /* Default to ASC */ } - current_root = std::make_unique(std::move(current_root), std::move(sort_keys), + auto sort_op = std::make_unique(std::move(current_root), std::move(sort_keys), std::move(ascending)); + sort_op->set_memory_resource(&arena_); + sort_op->set_params(current_params_); + current_root = std::move(sort_op); } /* 5. Project (SELECT columns) */ @@ -999,16 +1163,20 @@ std::unique_ptr QueryExecutor::build_plan(const parser::SelectStatemen for (const auto& col : stmt.columns()) { projection.push_back(col->clone()); } - current_root = + auto project_op = std::make_unique(std::move(current_root), std::move(projection)); - std::cerr << "--- [BuildPlan] Added Projection. Result schema size=" - << current_root->output_schema().column_count() << " ---" << std::endl; + project_op->set_memory_resource(&arena_); + project_op->set_params(current_params_); + current_root = std::move(project_op); } /* 6. Limit */ if (stmt.has_limit() || stmt.has_offset()) { - current_root = + auto limit_op = std::make_unique(std::move(current_root), stmt.limit(), stmt.offset()); + limit_op->set_memory_resource(&arena_); + limit_op->set_params(current_params_); + current_root = std::move(limit_op); } return current_root; diff --git a/src/parser/expression.cpp b/src/parser/expression.cpp index 746625dc..f878d58c 100644 --- a/src/parser/expression.cpp +++ b/src/parser/expression.cpp @@ -21,10 +21,10 @@ namespace cloudsql::parser { /** * @brief Evaluate binary expression */ -common::Value BinaryExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { - const common::Value left_val = left_->evaluate(tuple, schema); - const common::Value right_val = right_->evaluate(tuple, schema); +common::Value BinaryExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + const common::Value left_val = left_->evaluate(tuple, schema, params); + const common::Value right_val = right_->evaluate(tuple, schema, params); switch (op_) { case TokenType::Plus: @@ -186,17 +186,20 @@ std::unique_ptr BinaryExpr::clone() const { /** * @brief Evaluate unary expression */ -common::Value UnaryExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { - const common::Value val = expr_->evaluate(tuple, schema); +common::Value UnaryExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + const common::Value val = expr_->evaluate(tuple, schema, params); switch (op_) { case TokenType::Minus: if (val.is_numeric()) { - return common::Value(-val.to_float64()); + if (val.type() == common::ValueType::TYPE_FLOAT64) { + return common::Value::make_float64(-val.to_float64()); + } + return common::Value::make_int64(-val.to_int64()); } break; case TokenType::Not: - return common::Value(!val.as_bool()); + return common::Value::make_bool(!val.as_bool()); default: break; } @@ -229,8 +232,9 @@ std::unique_ptr UnaryExpr::clone() const { /** * @brief Evaluate column expression using tuple and schema */ -common::Value ColumnExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { +common::Value ColumnExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + (void)params; if (tuple == nullptr || schema == nullptr) { return common::Value::make_null(); } @@ -288,10 +292,11 @@ std::unique_ptr ColumnExpr::clone() const { : std::make_unique(name_); } -common::Value ConstantExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { +common::Value ConstantExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { (void)tuple; (void)schema; + (void)params; return value_; } @@ -316,11 +321,41 @@ std::unique_ptr ConstantExpr::clone() const { return std::make_unique(value_); } +/** + * @brief Evaluate parameter expression + */ +common::Value ParameterExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + (void)tuple; + (void)schema; + if (params == nullptr || index_ >= params->size()) { + return common::Value::make_null(); + } + return (*params)[index_]; +} + +void ParameterExpr::evaluate_vectorized(const executor::VectorBatch& batch, + const executor::Schema& schema, + executor::ColumnVector& result) const { + (void)batch; + (void)schema; + (void)result; +} + +std::string ParameterExpr::to_string() const { + return "?"; +} + +std::unique_ptr ParameterExpr::clone() const { + return std::make_unique(index_); +} + /** * @brief Evaluate function expression */ -common::Value FunctionExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { +common::Value FunctionExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + (void)params; if (tuple == nullptr || schema == nullptr) { return common::Value::make_null(); } @@ -381,10 +416,11 @@ std::unique_ptr FunctionExpr::clone() const { /** * @brief Evaluate IN expression */ -common::Value InExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema) const { - const common::Value col_val = column_->evaluate(tuple, schema); +common::Value InExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + const common::Value col_val = column_->evaluate(tuple, schema, params); for (const auto& val : values_) { - if (col_val == val->evaluate(tuple, schema)) { + if (col_val == val->evaluate(tuple, schema, params)) { return common::Value(!not_flag_); } } @@ -431,9 +467,9 @@ std::unique_ptr InExpr::clone() const { /** * @brief Evaluate IS NULL expression */ -common::Value IsNullExpr::evaluate(const executor::Tuple* tuple, - const executor::Schema* schema) const { - const common::Value val = expr_->evaluate(tuple, schema); +common::Value IsNullExpr::evaluate(const executor::Tuple* tuple, const executor::Schema* schema, + const std::vector* params) const { + const common::Value val = expr_->evaluate(tuple, schema, params); const bool result = val.is_null(); return common::Value(not_flag_ ? !result : result); } diff --git a/src/parser/lexer.cpp b/src/parser/lexer.cpp index 45a4e29a..361f2c38 100644 --- a/src/parser/lexer.cpp +++ b/src/parser/lexer.cpp @@ -288,6 +288,8 @@ Token Lexer::read_operator() { return {TokenType::Ne, "!="}; } return {TokenType::Error, "!"}; + case '?': + return {TokenType::Param, "?"}; default: return {TokenType::Error, std::string(1, c)}; } diff --git a/src/parser/parser.cpp b/src/parser/parser.cpp index 5fa4b54a..ea9845b7 100644 --- a/src/parser/parser.cpp +++ b/src/parser/parser.cpp @@ -769,6 +769,11 @@ std::unique_ptr Parser::parse_primary() { // NOLINT(misc-no-recursi return std::make_unique("*"); } + if (tok.type() == TokenType::Param) { + static_cast(next_token()); + return std::make_unique(param_count_++); + } + if (tok.type() == TokenType::Identifier || tok.is_keyword()) { const Token id = next_token(); diff --git a/src/storage/buffer_pool_manager.cpp b/src/storage/buffer_pool_manager.cpp index b3ecb017..91e55f0e 100644 --- a/src/storage/buffer_pool_manager.cpp +++ b/src/storage/buffer_pool_manager.cpp @@ -43,10 +43,22 @@ BufferPoolManager::~BufferPoolManager() { } } +uint32_t BufferPoolManager::get_file_id(const std::string& file_name) { + auto it = file_id_map_.find(file_name); + if (it != file_id_map_.end()) { + return it->second; + } + uint32_t id = next_file_id_++; + file_id_map_[file_name] = id; + return id; +} + Page* BufferPoolManager::fetch_page(const std::string& file_name, uint32_t page_id) { const std::scoped_lock lock(latch_); - const std::string key = make_page_key(file_name, page_id); + const uint32_t file_id = get_file_id(file_name); + const PageKey key{file_id, page_id}; + if (page_table_.find(key) != page_table_.end()) { const uint32_t frame_id = page_table_[key]; Page* const page = &pages_[frame_id]; @@ -68,7 +80,10 @@ Page* BufferPoolManager::fetch_page(const std::string& file_name, uint32_t page_ storage_manager_.write_page(page->file_name_, page->page_id_, page->get_data()); } - page_table_.erase(make_page_key(page->file_name_, page->page_id_)); + if (!page->file_name_.empty()) { + const uint32_t old_file_id = get_file_id(page->file_name_); + page_table_.erase({old_file_id, page->page_id_}); + } page_table_[key] = frame_id; page->page_id_ = page_id; @@ -88,7 +103,9 @@ Page* BufferPoolManager::fetch_page(const std::string& file_name, uint32_t page_ bool BufferPoolManager::unpin_page(const std::string& file_name, uint32_t page_id, bool is_dirty) { const std::scoped_lock lock(latch_); - const std::string key = make_page_key(file_name, page_id); + const uint32_t file_id = get_file_id(file_name); + const PageKey key{file_id, page_id}; + if (page_table_.find(key) == page_table_.end()) { return false; } @@ -115,7 +132,9 @@ bool BufferPoolManager::unpin_page(const std::string& file_name, uint32_t page_i bool BufferPoolManager::flush_page(const std::string& file_name, uint32_t page_id) { const std::scoped_lock lock(latch_); - const std::string key = make_page_key(file_name, page_id); + const uint32_t file_id = get_file_id(file_name); + const PageKey key{file_id, page_id}; + if (page_table_.find(key) == page_table_.end()) { return false; } @@ -128,15 +147,16 @@ bool BufferPoolManager::flush_page(const std::string& file_name, uint32_t page_i return true; } -Page* BufferPoolManager::new_page(const std::string& file_name, const uint32_t* page_id) { +Page* BufferPoolManager::new_page(const std::string& file_name, uint32_t* page_id) { const std::scoped_lock lock(latch_); const uint32_t target_page_id = storage_manager_.allocate_page(file_name); if (page_id != nullptr) { - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) - const_cast(*page_id) = target_page_id; + *page_id = target_page_id; } - const std::string key = make_page_key(file_name, target_page_id); + + const uint32_t file_id = get_file_id(file_name); + const PageKey key{file_id, target_page_id}; uint32_t frame_id = 0; if (!free_list_.empty()) { @@ -151,7 +171,10 @@ Page* BufferPoolManager::new_page(const std::string& file_name, const uint32_t* storage_manager_.write_page(page->file_name_, page->page_id_, page->get_data()); } - page_table_.erase(make_page_key(page->file_name_, page->page_id_)); + if (!page->file_name_.empty()) { + const uint32_t old_file_id = get_file_id(page->file_name_); + page_table_.erase({old_file_id, page->page_id_}); + } page_table_[key] = frame_id; page->page_id_ = target_page_id; @@ -167,7 +190,9 @@ Page* BufferPoolManager::new_page(const std::string& file_name, const uint32_t* bool BufferPoolManager::delete_page(const std::string& file_name, uint32_t page_id) { const std::scoped_lock lock(latch_); - const std::string key = make_page_key(file_name, page_id); + const uint32_t file_id = get_file_id(file_name); + const PageKey key{file_id, page_id}; + if (page_table_.find(key) != page_table_.end()) { const uint32_t frame_id = page_table_[key]; Page* const page = &pages_[frame_id]; diff --git a/src/storage/heap_table.cpp b/src/storage/heap_table.cpp index b350032a..256a1181 100644 --- a/src/storage/heap_table.cpp +++ b/src/storage/heap_table.cpp @@ -38,9 +38,27 @@ HeapTable::HeapTable(std::string table_name, BufferPoolManager& bpm, executor::S schema_(std::move(schema)), last_page_id_(0) {} +HeapTable::~HeapTable() { + // Note: In some tests, the BufferPoolManager might be destroyed before the HeapTable + // causing this to potentially access a dangling reference if we are not careful. + if (cached_page_ != nullptr) { + try { + bpm_.unpin_page(filename_, cached_page_id_, true); + } catch (...) { + // Ignore errors during destruction if BPM is already gone + } + cached_page_ = nullptr; + } +} + /* --- Iterator Implementation --- */ -HeapTable::Iterator::Iterator(HeapTable& table) : table_(table), next_id_(0, 0), last_id_(0, 0) {} +HeapTable::Iterator::Iterator(HeapTable& table, std::pmr::memory_resource* mr) + : table_(table), + next_id_(0, 0), + last_id_(0, 0), + eof_(false), + mr_(mr ? mr : std::pmr::new_delete_resource()) {} bool HeapTable::Iterator::next(executor::Tuple& out_tuple) { TupleMeta meta; @@ -88,23 +106,27 @@ bool HeapTable::Iterator::next_meta(TupleMeta& out_meta) { if (offset != 0) { /* Found a record: Deserialize it in-place from the pinned buffer */ const uint8_t* const data = reinterpret_cast(buffer + offset); - const size_t data_len = Page::PAGE_SIZE - offset; - if (data_len < 16) { + // Read Tuple Length (first 2 bytes) + uint16_t tuple_data_len; + std::memcpy(&tuple_data_len, data, 2); + + const size_t record_len = static_cast(tuple_data_len); + if (record_len < 18) { // 2 len + 8 xmin + 8 xmax table_.bpm_.unpin_page(table_.filename_, next_id_.page_num, false); return false; } // Read MVCC Header - std::memcpy(&out_meta.xmin, data, 8); - std::memcpy(&out_meta.xmax, data + 8, 8); + std::memcpy(&out_meta.xmin, data + 2, 8); + std::memcpy(&out_meta.xmax, data + 10, 8); - size_t cursor = 16; - std::vector values; + size_t cursor = 18; + std::pmr::vector values(mr_); values.reserve(table_.schema_.column_count()); for (size_t i = 0; i < table_.schema_.column_count(); ++i) { - if (cursor >= data_len) break; + if (cursor >= record_len) break; auto type = static_cast(data[cursor++]); if (type == common::ValueType::TYPE_NULL) { values.push_back(common::Value::make_null()); @@ -118,7 +140,7 @@ bool HeapTable::Iterator::next_meta(TupleMeta& out_meta) { type == common::ValueType::TYPE_INT64 || type == common::ValueType::TYPE_FLOAT32 || type == common::ValueType::TYPE_FLOAT64) { - if (cursor + 8 > data_len) break; + if (cursor + 8 > record_len) break; if (type == common::ValueType::TYPE_FLOAT32 || type == common::ValueType::TYPE_FLOAT64) { @@ -135,11 +157,11 @@ bool HeapTable::Iterator::next_meta(TupleMeta& out_meta) { } cursor += 8; } else { - if (cursor + 4 > data_len) break; + if (cursor + 4 > record_len) break; uint32_t len; std::memcpy(&len, data + cursor, 4); cursor += 4; - if (cursor + len > data_len) break; + if (cursor + len > record_len) break; std::string s(reinterpret_cast(data + cursor), len); cursor += len; values.push_back(common::Value::make_text(s)); @@ -166,52 +188,83 @@ bool HeapTable::Iterator::next_meta(TupleMeta& out_meta) { /* --- HeapTable Methods --- */ HeapTable::TupleId HeapTable::insert(const executor::Tuple& tuple, uint64_t xmin) { - uint32_t page_num = last_page_id_; - - /* Pre-serialize tuple to binary to determine size and avoid repeat work */ - std::vector payload; - payload.reserve(16 + (tuple.size() * 9)); + /* Optimization: Use stack buffer for serialization to avoid heap allocations */ + std::array stack_buf{}; + std::vector heap_payload; + uint8_t* payload_ptr = stack_buf.data(); + size_t payload_capacity = stack_buf.size(); + size_t payload_size = 0; + + auto ensure_capacity = [&](size_t needed) { + if (payload_size + needed > payload_capacity) { + if (heap_payload.empty()) { + heap_payload.assign(stack_buf.begin(), stack_buf.begin() + payload_size); + } + heap_payload.resize(payload_size + needed + 256); + payload_ptr = heap_payload.data(); + payload_capacity = heap_payload.size(); + } + }; uint64_t xmax = 0; - payload.resize(16); - std::memcpy(payload.data(), &xmin, 8); - std::memcpy(payload.data() + 8, &xmax, 8); + payload_size = 18; // 2 len + 8 xmin + 8 xmax + // placeholder for length + std::memset(payload_ptr, 0, 2); + std::memcpy(payload_ptr + 2, &xmin, 8); + std::memcpy(payload_ptr + 10, &xmax, 8); for (const auto& val : tuple.values()) { + ensure_capacity(1); auto type = static_cast(val.type()); - payload.push_back(type); + payload_ptr[payload_size++] = type; if (val.is_null()) continue; if (val.is_numeric()) { - size_t off = payload.size(); - payload.resize(off + 8); + ensure_capacity(8); if (val.is_integer()) { int64_t v = val.to_int64(); - std::memcpy(payload.data() + off, &v, 8); + std::memcpy(payload_ptr + payload_size, &v, 8); } else { double v = val.to_float64(); - std::memcpy(payload.data() + off, &v, 8); + std::memcpy(payload_ptr + payload_size, &v, 8); } + payload_size += 8; + } else if (val.type() == common::ValueType::TYPE_BOOL) { + ensure_capacity(8); + int64_t v = val.as_bool() ? 1 : 0; + std::memcpy(payload_ptr + payload_size, &v, 8); + payload_size += 8; } else { - const std::string& s = val.to_string(); + const std::string& s = val.as_text(); uint32_t len = static_cast(s.size()); - size_t off = payload.size(); - payload.resize(off + 4 + len); - std::memcpy(payload.data() + off, &len, 4); - std::memcpy(payload.data() + off + 4, s.data(), len); + ensure_capacity(4 + len); + std::memcpy(payload_ptr + payload_size, &len, 4); + std::memcpy(payload_ptr + payload_size + 4, s.data(), len); + payload_size += 4 + len; } } - const auto required = static_cast(payload.size()); + const auto required = static_cast(payload_size); + std::memcpy(payload_ptr, &required, 2); // set final length while (true) { - Page* page = bpm_.fetch_page(filename_, page_num); - if (!page) { - page = bpm_.new_page(filename_, &page_num); - if (!page) return {0, 0}; + // Use cached page if available + if (cached_page_ == nullptr || cached_page_id_ != last_page_id_) { + if (cached_page_ != nullptr) { + bpm_.unpin_page(filename_, cached_page_id_, true); + } + cached_page_id_ = last_page_id_; + cached_page_ = bpm_.fetch_page(filename_, cached_page_id_); + if (!cached_page_) { + cached_page_ = bpm_.new_page(filename_, &cached_page_id_); + if (!cached_page_) { + return {0, 0}; // Buffer pool full or allocation failed + } + last_page_id_ = cached_page_id_; + } } - auto* buffer = page->get_data(); + auto* buffer = cached_page_->get_data(); PageHeader header{}; std::memcpy(&header, buffer, sizeof(PageHeader)); @@ -228,25 +281,25 @@ HeapTable::TupleId HeapTable::insert(const executor::Tuple& tuple, uint64_t xmin const uint16_t offset = header.free_space_offset; // Copy binary payload directly to page buffer - std::memcpy(buffer + offset, payload.data(), payload.size()); + std::memcpy(buffer + offset, payload_ptr, payload_size); /* Update slot directory */ std::memcpy(buffer + sizeof(PageHeader) + (header.num_slots * sizeof(uint16_t)), &offset, sizeof(uint16_t)); - TupleId tid(page_num, header.num_slots); + TupleId tid(cached_page_id_, header.num_slots); header.num_slots++; header.free_space_offset += required; std::memcpy(buffer, &header, sizeof(PageHeader)); - bpm_.unpin_page(filename_, page_num, true); - last_page_id_ = page_num; + // Keep page pinned for next insertion return tid; } - /* Page is full; attempt insertion in the next page */ - bpm_.unpin_page(filename_, page_num, false); - page_num++; + /* Page is full; unpin and move to next */ + bpm_.unpin_page(filename_, cached_page_id_, true); + cached_page_ = nullptr; + last_page_id_++; } } @@ -254,6 +307,22 @@ HeapTable::TupleId HeapTable::insert(const executor::Tuple& tuple, uint64_t xmin * @brief Logical deletion: update xmax field in the record blob */ bool HeapTable::remove(const TupleId& tuple_id, uint64_t xmax) { + // If target page is currently cached, we must use it or flush it + if (cached_page_ != nullptr && cached_page_id_ == tuple_id.page_num) { + auto* buffer = cached_page_->get_data(); + PageHeader header{}; + std::memcpy(&header, buffer, sizeof(PageHeader)); + + uint16_t offset = 0; + std::memcpy(&offset, buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), + sizeof(uint16_t)); + if (offset != 0) { + std::memcpy(buffer + offset + 10, &xmax, 8); + return true; + } + return false; + } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); if (!page) return false; @@ -273,8 +342,8 @@ bool HeapTable::remove(const TupleId& tuple_id, uint64_t xmax) { return false; } - /* In binary format, xmax is at offset + 8 */ - std::memcpy(buffer + offset + 8, &xmax, 8); + /* In binary format, xmax is at offset + 10 (2 len + 8 xmin) */ + std::memcpy(buffer + offset + 10, &xmax, 8); bpm_.unpin_page(filename_, tuple_id.page_num, true); return true; @@ -284,6 +353,14 @@ bool HeapTable::remove(const TupleId& tuple_id, uint64_t xmax) { * @brief Physical deletion: zero out slot offset (rollback only) */ bool HeapTable::physical_remove(const TupleId& tuple_id) { + if (cached_page_ != nullptr && cached_page_id_ == tuple_id.page_num) { + auto* buffer = cached_page_->get_data(); + const uint16_t zero = 0; + std::memcpy(buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), &zero, + sizeof(uint16_t)); + return true; + } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); if (!page) return false; @@ -319,6 +396,69 @@ bool HeapTable::update(const TupleId& tuple_id, const executor::Tuple& tuple, ui } bool HeapTable::get_meta(const TupleId& tuple_id, TupleMeta& out_meta) const { + if (cached_page_ != nullptr && cached_page_id_ == tuple_id.page_num) { + auto* buffer = cached_page_->get_data(); + uint16_t offset = 0; + std::memcpy(&offset, buffer + sizeof(PageHeader) + (tuple_id.slot_num * sizeof(uint16_t)), + sizeof(uint16_t)); + if (offset == 0) return false; + + const uint8_t* const data = reinterpret_cast(buffer + offset); + + uint16_t tuple_data_len; + std::memcpy(&tuple_data_len, data, 2); + const size_t record_len = static_cast(tuple_data_len); + if (record_len < 18) return false; + + std::memcpy(&out_meta.xmin, data + 2, 8); + std::memcpy(&out_meta.xmax, data + 10, 8); + + size_t cursor = 18; + std::vector values; + values.reserve(schema_.column_count()); + + for (size_t i = 0; i < schema_.column_count(); ++i) { + if (cursor >= record_len) break; + auto type = static_cast(data[cursor++]); + if (type == common::ValueType::TYPE_NULL) { + values.push_back(common::Value::make_null()); + continue; + } + + if (type == common::ValueType::TYPE_BOOL || type == common::ValueType::TYPE_INT8 || + type == common::ValueType::TYPE_INT16 || type == common::ValueType::TYPE_INT32 || + type == common::ValueType::TYPE_INT64 || type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + if (cursor + 8 > record_len) break; + if (type == common::ValueType::TYPE_FLOAT32 || + type == common::ValueType::TYPE_FLOAT64) { + double v; + std::memcpy(&v, data + cursor, 8); + values.push_back(common::Value::make_float64(v)); + } else { + int64_t v; + std::memcpy(&v, data + cursor, 8); + if (type == common::ValueType::TYPE_BOOL) + values.push_back(common::Value::make_bool(v != 0)); + else + values.push_back(common::Value::make_int64(v)); + } + cursor += 8; + } else { + if (cursor + 4 > record_len) break; + uint32_t len; + std::memcpy(&len, data + cursor, 4); + cursor += 4; + if (cursor + len > record_len) break; + std::string s(reinterpret_cast(data + cursor), len); + cursor += len; + values.push_back(common::Value::make_text(s)); + } + } + out_meta.tuple = executor::Tuple(std::move(values)); + return true; + } + Page* page = bpm_.fetch_page(filename_, tuple_id.page_num); if (!page) return false; @@ -339,23 +479,25 @@ bool HeapTable::get_meta(const TupleId& tuple_id, TupleMeta& out_meta) const { } const uint8_t* const data = reinterpret_cast(buffer + offset); - const size_t data_len = Page::PAGE_SIZE - offset; - if (data_len < 16) { + uint16_t tuple_data_len; + std::memcpy(&tuple_data_len, data, 2); + const size_t record_len = static_cast(tuple_data_len); + if (record_len < 18) { bpm_.unpin_page(filename_, tuple_id.page_num, false); return false; } // Read MVCC Header - std::memcpy(&out_meta.xmin, data, 8); - std::memcpy(&out_meta.xmax, data + 8, 8); + std::memcpy(&out_meta.xmin, data + 2, 8); + std::memcpy(&out_meta.xmax, data + 10, 8); - size_t cursor = 16; + size_t cursor = 18; std::vector values; values.reserve(schema_.column_count()); for (size_t i = 0; i < schema_.column_count(); ++i) { - if (cursor >= data_len) break; + if (cursor >= record_len) break; auto type = static_cast(data[cursor++]); if (type == common::ValueType::TYPE_NULL) { values.push_back(common::Value::make_null()); @@ -366,7 +508,7 @@ bool HeapTable::get_meta(const TupleId& tuple_id, TupleMeta& out_meta) const { type == common::ValueType::TYPE_INT16 || type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT64 || type == common::ValueType::TYPE_FLOAT32 || type == common::ValueType::TYPE_FLOAT64) { - if (cursor + 8 > data_len) break; + if (cursor + 8 > record_len) break; if (type == common::ValueType::TYPE_FLOAT32 || type == common::ValueType::TYPE_FLOAT64) { @@ -383,11 +525,11 @@ bool HeapTable::get_meta(const TupleId& tuple_id, TupleMeta& out_meta) const { } cursor += 8; } else { - if (cursor + 4 > data_len) break; + if (cursor + 4 > record_len) break; uint32_t len; std::memcpy(&len, data + cursor, 4); cursor += 4; - if (cursor + len > data_len) break; + if (cursor + len > record_len) break; std::string s(reinterpret_cast(data + cursor), len); cursor += len; values.push_back(common::Value::make_text(s)); @@ -429,7 +571,7 @@ uint64_t HeapTable::tuple_count() const { sizeof(uint16_t)); if (offset != 0) { uint64_t xmax = 0; - std::memcpy(&xmax, buffer + offset + 8, 8); + std::memcpy(&xmax, buffer + offset + 10, 8); // 2 len + 8 xmin if (xmax == 0) count++; } } @@ -462,11 +604,19 @@ bool HeapTable::create() { } bool HeapTable::drop() { + if (cached_page_ != nullptr) { + bpm_.unpin_page(filename_, cached_page_id_, false); + cached_page_ = nullptr; + } static_cast(bpm_.close_file(filename_)); return (std::remove(filename_.c_str()) == 0); } bool HeapTable::read_page(uint32_t page_num, char* buffer) const { + if (cached_page_ != nullptr && cached_page_id_ == page_num) { + std::memcpy(buffer, cached_page_->get_data(), Page::PAGE_SIZE); + return true; + } Page* page = bpm_.fetch_page(filename_, page_num); if (!page) return false; std::memcpy(buffer, page->get_data(), Page::PAGE_SIZE); @@ -475,6 +625,10 @@ bool HeapTable::read_page(uint32_t page_num, char* buffer) const { } bool HeapTable::write_page(uint32_t page_num, const char* buffer) { + if (cached_page_ != nullptr && cached_page_id_ == page_num) { + std::memcpy(cached_page_->get_data(), buffer, Page::PAGE_SIZE); + return true; + } Page* page = bpm_.fetch_page(filename_, page_num); if (!page) { page = bpm_.new_page(filename_, &page_num); diff --git a/src/transaction/lock_manager.cpp b/src/transaction/lock_manager.cpp index 837e89f8..3ca01594 100644 --- a/src/transaction/lock_manager.cpp +++ b/src/transaction/lock_manager.cpp @@ -14,7 +14,7 @@ namespace cloudsql::transaction { -bool LockManager::acquire_shared(Transaction* txn, const std::string& rid) { +bool LockManager::acquire_shared(Transaction* txn, const storage::HeapTable::TupleId& rid) { std::unique_lock lock(latch_); auto& queue = lock_table_[rid]; @@ -60,7 +60,7 @@ bool LockManager::acquire_shared(Transaction* txn, const std::string& rid) { return true; } -bool LockManager::acquire_exclusive(Transaction* txn, const std::string& rid) { +bool LockManager::acquire_exclusive(Transaction* txn, const storage::HeapTable::TupleId& rid) { std::unique_lock lock(latch_); auto& queue = lock_table_[rid]; @@ -82,14 +82,6 @@ bool LockManager::acquire_exclusive(Transaction* txn, const std::string& rid) { } } - if (upgrade) { - /* Release S lock temporarily or just modify request? */ - /* Simple upgrade: drop S, queue X. Real implementation needs care for deadlocks/starvation - */ - /* For now, let's just queue a new X request and wait. This is simplistic. */ - /* NOTE: Upgrades are deadlock-prone without proper handling. */ - } - queue.request_queue.push_back({txn->get_id(), LockMode::EXCLUSIVE, false}); const auto my_req = std::prev(queue.request_queue.end()); @@ -102,8 +94,6 @@ bool LockManager::acquire_exclusive(Transaction* txn, const std::string& rid) { /* Exclusive requires NO other locks held by OTHERS */ bool can_grant = true; for (auto iter = queue.request_queue.begin(); iter != my_req; ++iter) { - /* If it's us (upgrade case), we ignore our own previous lock? */ - /* Simplified: Strictly FIFO for X locks relative to others */ if (iter->txn_id != txn->get_id()) { can_grant = false; break; @@ -122,7 +112,7 @@ bool LockManager::acquire_exclusive(Transaction* txn, const std::string& rid) { return true; } -bool LockManager::unlock(Transaction* txn, const std::string& rid) { +bool LockManager::unlock(Transaction* txn, const storage::HeapTable::TupleId& rid) { const std::unique_lock lock(latch_); if (lock_table_.find(rid) == lock_table_.end()) { return false; diff --git a/test_data/idx_id.idx b/test_data/idx_id.idx deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/cloudSQL_tests.cpp b/tests/cloudSQL_tests.cpp index 82a5b9e8..aa8fad93 100644 --- a/tests/cloudSQL_tests.cpp +++ b/tests/cloudSQL_tests.cpp @@ -190,21 +190,27 @@ TEST(CloudSQLTests, StoragePersistence) { Schema schema; schema.add_column("data", ValueType::TYPE_TEXT); { - StorageManager disk_manager("./test_data"); - BufferPoolManager sm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager); - HeapTable table(filename, sm, schema); - static_cast(table.create()); - static_cast(table.insert(Tuple({Value::make_text("Persistent data")}))); - sm.flush_all_pages(); + auto disk_manager = std::make_unique("./test_data"); + auto sm = std::make_unique( + cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, *disk_manager); + { + HeapTable table(filename, *sm, schema); + static_cast(table.create()); + static_cast(table.insert(Tuple({Value::make_text("Persistent data")}))); + } + sm->flush_all_pages(); } { - StorageManager disk_manager("./test_data"); - BufferPoolManager sm(cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, disk_manager); - HeapTable table(filename, sm, schema); - auto iter = table.scan(); - Tuple t; - EXPECT_TRUE(iter.next(t)); - EXPECT_STREQ(t.get(0).as_text().c_str(), "Persistent data"); + auto disk_manager = std::make_unique("./test_data"); + auto sm = std::make_unique( + cloudsql::config::Config::DEFAULT_BUFFER_POOL_SIZE, *disk_manager); + { + HeapTable table(filename, *sm, schema); + auto iter = table.scan(); + Tuple t; + EXPECT_TRUE(iter.next(t)); + EXPECT_STREQ(t.get(0).as_text().c_str(), "Persistent data"); + } } static_cast(std::remove(filepath.c_str())); } diff --git a/tests/lock_manager_tests.cpp b/tests/lock_manager_tests.cpp index bf5db78f..e59ed496 100644 --- a/tests/lock_manager_tests.cpp +++ b/tests/lock_manager_tests.cpp @@ -9,10 +9,12 @@ #include #include +#include "storage/heap_table.hpp" #include "transaction/lock_manager.hpp" #include "transaction/transaction.hpp" using namespace cloudsql::transaction; +using namespace cloudsql::storage; namespace { @@ -22,35 +24,38 @@ TEST(LockManagerTests, Shared) { LockManager lm; Transaction txn1(1); Transaction txn2(2); + HeapTable::TupleId rid1(1, 1); - EXPECT_TRUE(lm.acquire_shared(&txn1, "RID1")); - EXPECT_TRUE(lm.acquire_shared(&txn2, "RID1")); + EXPECT_TRUE(lm.acquire_shared(&txn1, rid1)); + EXPECT_TRUE(lm.acquire_shared(&txn2, rid1)); - static_cast(lm.unlock(&txn1, "RID1")); - static_cast(lm.unlock(&txn2, "RID1")); + static_cast(lm.unlock(&txn1, rid1)); + static_cast(lm.unlock(&txn2, rid1)); } TEST(LockManagerTests, Exclusive) { LockManager lm; Transaction txn1(1); Transaction txn2(2); + HeapTable::TupleId rid1(1, 1); - EXPECT_TRUE(lm.acquire_exclusive(&txn1, "RID1")); - EXPECT_FALSE(lm.acquire_shared(&txn2, "RID1")); + EXPECT_TRUE(lm.acquire_exclusive(&txn1, rid1)); + EXPECT_FALSE(lm.acquire_shared(&txn2, rid1)); - static_cast(lm.unlock(&txn1, "RID1")); - EXPECT_TRUE(lm.acquire_shared(&txn2, "RID1")); - static_cast(lm.unlock(&txn2, "RID1")); + static_cast(lm.unlock(&txn1, rid1)); + EXPECT_TRUE(lm.acquire_shared(&txn2, rid1)); + static_cast(lm.unlock(&txn2, rid1)); } TEST(LockManagerTests, Upgrade) { LockManager lm; Transaction txn1(1); + HeapTable::TupleId rid1(1, 1); - EXPECT_TRUE(lm.acquire_shared(&txn1, "RID1")); - EXPECT_TRUE(lm.acquire_exclusive(&txn1, "RID1")); + EXPECT_TRUE(lm.acquire_shared(&txn1, rid1)); + EXPECT_TRUE(lm.acquire_exclusive(&txn1, rid1)); - static_cast(lm.unlock(&txn1, "RID1")); + static_cast(lm.unlock(&txn1, rid1)); } TEST(LockManagerTests, Wait) { @@ -58,20 +63,21 @@ TEST(LockManagerTests, Wait) { Transaction txn1(1); Transaction txn2(2); Transaction txn3(3); + HeapTable::TupleId rid1(1, 1); std::atomic shared_granted{0}; // 1. Get Exclusive - EXPECT_TRUE(lm.acquire_exclusive(&txn1, "RID1")); + EXPECT_TRUE(lm.acquire_exclusive(&txn1, rid1)); // 2. Try to get Shared from two other txns (should block) std::thread t2([&]() { - if (lm.acquire_shared(&txn2, "RID1")) { + if (lm.acquire_shared(&txn2, rid1)) { shared_granted++; } }); std::thread t3([&]() { - if (lm.acquire_shared(&txn3, "RID1")) { + if (lm.acquire_shared(&txn3, rid1)) { shared_granted++; } }); @@ -81,41 +87,43 @@ TEST(LockManagerTests, Wait) { EXPECT_EQ(shared_granted.load(), 0); // 3. Release Exclusive (should grant both shared) - static_cast(lm.unlock(&txn1, "RID1")); + static_cast(lm.unlock(&txn1, rid1)); t2.join(); t3.join(); EXPECT_EQ(shared_granted.load(), 2); - static_cast(lm.unlock(&txn2, "RID1")); - static_cast(lm.unlock(&txn3, "RID1")); + static_cast(lm.unlock(&txn2, rid1)); + static_cast(lm.unlock(&txn3, rid1)); } TEST(LockManagerTests, Deadlock) { LockManager lm; Transaction txn1(1); Transaction txn2(2); + HeapTable::TupleId ridA(1, 1); + HeapTable::TupleId ridB(1, 2); // txn1 holds A, txn2 holds B - EXPECT_TRUE(lm.acquire_exclusive(&txn1, "A")); - EXPECT_TRUE(lm.acquire_exclusive(&txn2, "B")); + EXPECT_TRUE(lm.acquire_exclusive(&txn1, ridA)); + EXPECT_TRUE(lm.acquire_exclusive(&txn2, ridB)); // txn1 waits for B - std::thread t1([&]() { static_cast(lm.acquire_exclusive(&txn1, "B")); }); + std::thread t1([&]() { static_cast(lm.acquire_exclusive(&txn1, ridB)); }); // Small sleep to ensure t1 is waiting std::this_thread::sleep_for(TEST_SLEEP_MS); // txn2 waits for A -> Deadlock! - static_cast(lm.unlock(&txn1, "A")); - static_cast(lm.acquire_exclusive(&txn2, "A")); + static_cast(lm.unlock(&txn1, ridA)); + static_cast(lm.acquire_exclusive(&txn2, ridA)); - static_cast(lm.unlock(&txn2, "B")); + static_cast(lm.unlock(&txn2, ridB)); t1.join(); - static_cast(lm.unlock(&txn1, "B")); - static_cast(lm.unlock(&txn2, "A")); + static_cast(lm.unlock(&txn1, ridB)); + static_cast(lm.unlock(&txn2, ridA)); } } // namespace diff --git a/tests/transaction_coverage_tests.cpp b/tests/transaction_coverage_tests.cpp index 0cfbad8c..8492d9ca 100644 --- a/tests/transaction_coverage_tests.cpp +++ b/tests/transaction_coverage_tests.cpp @@ -91,19 +91,20 @@ TEST(TransactionCoverageTestsStandalone, LockManagerConcurrency) { std::atomic stop{false}; Transaction writer_txn(100); + storage::HeapTable::TupleId rid(1, 1); // Writers holds exclusive lock initially - ASSERT_TRUE(lm.acquire_exclusive(&writer_txn, "RESOURCE")); + ASSERT_TRUE(lm.acquire_exclusive(&writer_txn, rid)); for (int i = 0; i < num_readers; ++i) { - readers.emplace_back([&, i]() { + readers.emplace_back([&, i, rid]() { Transaction reader_txn(i); - if (lm.acquire_shared(&reader_txn, "RESOURCE")) { + if (lm.acquire_shared(&reader_txn, rid)) { shared_granted++; while (!stop) { std::this_thread::yield(); } - lm.unlock(&reader_txn, "RESOURCE"); + lm.unlock(&reader_txn, rid); } }); } @@ -113,7 +114,7 @@ TEST(TransactionCoverageTestsStandalone, LockManagerConcurrency) { EXPECT_EQ(shared_granted.load(), 0); // Release writer lock, readers should proceed - lm.unlock(&writer_txn, "RESOURCE"); + lm.unlock(&writer_txn, rid); // Wait for all readers to get the lock for (int i = 0; i < 50 && shared_granted.load() < num_readers; ++i) { @@ -139,21 +140,22 @@ TEST_F(TransactionCoverageTests, DeepRollback) { txn = tm.begin(); // 1. Insert some data - auto rid1 = - table.insert(executor::Tuple({common::Value::make_int64(1), common::Value::make_text("A")}), - txn->get_id()); + auto rid1 = table.insert(executor::Tuple(std::pmr::vector( + {common::Value::make_int64(1), common::Value::make_text("A")})), + txn->get_id()); txn->add_undo_log(UndoLog::Type::INSERT, "rollback_stress", rid1); - auto rid2 = - table.insert(executor::Tuple({common::Value::make_int64(2), common::Value::make_text("B")}), - txn->get_id()); + auto rid2 = table.insert(executor::Tuple(std::pmr::vector( + {common::Value::make_int64(2), common::Value::make_text("B")})), + txn->get_id()); txn->add_undo_log(UndoLog::Type::INSERT, "rollback_stress", rid2); // 2. Update data table.remove(rid1, txn->get_id()); // Mark old version deleted - auto rid1_new = table.insert( - executor::Tuple({common::Value::make_int64(1), common::Value::make_text("A_NEW")}), - txn->get_id()); + auto rid1_new = + table.insert(executor::Tuple(std::pmr::vector( + {common::Value::make_int64(1), common::Value::make_text("A_NEW")})), + txn->get_id()); txn->add_undo_log(UndoLog::Type::UPDATE, "rollback_stress", rid1_new, rid1); // 3. Delete data