Skip to content

Commit

Permalink
Enable speedb features: Constrain the i/f of SharedOptions (make immu…
Browse files Browse the repository at this point in the history
…table)
  • Loading branch information
udi-speedb committed Nov 3, 2023
1 parent 4c5ce06 commit 5292d91
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 148 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Added a kUseBaseAddress flag and GetBaseOffset flag to OptionTypeInfo. If this flag is set and a function is used for processing options, the function is passed the base address of the struct rather than the specific field (#397)
* Static Pinning: Set the default for mid-percent capacity threshold in scoped pinning policy to 70 (#689).
* db_bench: Add support for individual scoped pinning policy parameters (#687).
* Enable speedb features: Constrain the interface of SharedOptions (make immutable) (#740).

### Bug Fixes
* db_bench: Fix SeekRandomWriteRandom valid check. Use key and value only after checking iterator is valid.
Expand Down
9 changes: 9 additions & 0 deletions examples/enable_speedb_features_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"

using namespace ROCKSDB_NAMESPACE;

Expand Down Expand Up @@ -55,11 +56,17 @@ int main() {
// as listed in the definition of SpeedbSharedOptiopns in options.h
op1.create_if_missing = true;
op1.compression = rocksdb::kNoCompression;
// NOT Having a prefix-extractor (the deafult) will result in the
// memtable_factory==HashSpdbRepFactory
//...
op1.EnableSpeedbFeatures(so1);

op2.create_if_missing = true;
op2.compression = rocksdb::kZlibCompression;
// Having a prefix-extractor will result in the
// memtable_factory==SkipListRepFactory
op2.prefix_extractor.reset(NewFixedPrefixTransform(4));

//...
op2.EnableSpeedbFeatures(so1);

Expand Down Expand Up @@ -124,6 +131,8 @@ int main() {
}
std::cout << "new_cf was created in db3" << std::endl;

// Cleanup

s = db3->DropColumnFamily(cf);
if (!s.ok()) {
std::cerr << s.ToString() << std::endl;
Expand Down
71 changes: 52 additions & 19 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2241,39 +2241,72 @@ struct LiveFilesStorageInfoOptions {
// more info and use example can be found in enable_speedb_features_example.cc
class SharedOptions {
public:
SharedOptions();
static constexpr size_t kDefaultDelayedWriteRate = 256 * 1024 * 1024ul;
static constexpr size_t kDefaultBucketSize = 1000000;
static constexpr bool kDeafultUseMerge = true;

static constexpr size_t kWbmPerCfSizeIncrease = 512 * 1024 * 1024ul;

public:
SharedOptions(size_t total_ram_size_bytes, size_t total_threads,
size_t delayed_write_rate = 256 * 1024 * 1024ul,
size_t bucket_size = 1000000, bool use_merge = true);
size_t GetTotalThreads() { return total_threads_; }
size_t GetTotalRamSizeBytes() { return total_ram_size_bytes_; }
size_t GetDelayedWriteRate() { return delayed_write_rate_; }
size_t GetBucketSize() { return bucket_size_; }
size_t IsMergeMemtableSupported() { return use_merge_; }
size_t delayed_write_rate = kDefaultDelayedWriteRate,
size_t bucket_size = kDefaultBucketSize,
bool use_merge = kDeafultUseMerge);

public:
size_t GetMaxWriteBufferManagerSize() const;

size_t GetTotalThreads() const { return total_threads_; }
size_t GetTotalRamSizeBytes() const { return total_ram_size_bytes_; }
size_t GetDelayedWriteRate() const { return delayed_write_rate_; }
size_t GetBucketSize() const { return bucket_size_; }
size_t IsMergeMemtableSupported() const { return use_merge_; }

const Cache* GetCache() const { return cache_.get(); }
const WriteController* GetWriteController() const {
return write_controller_.get();
};
const WriteBufferManager* GetWriteBufferManager() const {
return write_buffer_manager_.get();
}
const TablePinningPolicy* GetPinningPolicy() const {
return pinning_policy_.get();
}

private:
void CreateWriteBufferManager();
void CreatePinningPolicy();

// this function will increase write buffer manager by increased_by amount
// as long as the result is not bigger than the maximum size of
// total_ram_size_ /4
void IncreaseWriteBufferSize(size_t increase_by);
void CreatePinningPolicy();
size_t GetMaxWriteBufferManagerSize() const;

std::shared_ptr<Cache> cache = nullptr;
std::shared_ptr<WriteController> write_controller = nullptr;
std::shared_ptr<WriteBufferManager> write_buffer_manager = nullptr;
private:
std::shared_ptr<Cache> cache_ = nullptr;
std::shared_ptr<WriteController> write_controller_ = nullptr;
std::shared_ptr<WriteBufferManager> write_buffer_manager_ = nullptr;
std::shared_ptr<TablePinningPolicy> pinning_policy_ = nullptr;

private:
size_t total_ram_size_bytes_ = 0;
size_t total_threads_ = 0;
size_t delayed_write_rate_ = kDefaultBucketSize;
size_t bucket_size_ = kDefaultBucketSize;
bool use_merge_ = kDeafultUseMerge;

private:
// For Future Use
Env* env = Env::Default();
std::shared_ptr<RateLimiter> rate_limiter = nullptr;
std::shared_ptr<SstFileManager> sst_file_manager = nullptr;
std::shared_ptr<Logger> info_log = nullptr;
std::vector<std::shared_ptr<EventListener>> listeners;
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;
std::shared_ptr<TablePinningPolicy> pinning_policy = nullptr;

private:
size_t total_threads_ = 0;
size_t total_ram_size_bytes_ = 0;
size_t delayed_write_rate_ = 0;
size_t bucket_size_ = 1000000;
bool use_merge_ = true;
friend struct DBOptions;
friend struct ColumnFamilyOptions;
};

} // namespace ROCKSDB_NAMESPACE
62 changes: 35 additions & 27 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,33 +551,32 @@ Options* Options::OldDefaults(int rocksdb_major_version,
Options* Options::EnableSpeedbFeatures(SharedOptions& shared_options) {
EnableSpeedbFeaturesDB(shared_options);
EnableSpeedbFeaturesCF(shared_options);
if (memtable_factory->IsInsertConcurrentlySupported() == false) {
allow_concurrent_memtable_write = false;
}
return this;
}

SharedOptions::SharedOptions(size_t total_ram_size_bytes, size_t total_threads,
size_t delayed_write_rate, size_t bucket_size,
bool use_merge) {
total_threads_ = total_threads;
total_ram_size_bytes_ = total_ram_size_bytes;
delayed_write_rate_ = delayed_write_rate;
bucket_size_ = bucket_size;
use_merge_ = use_merge;
// initial_write_buffer_size_ is initialized to 1 to avoid from empty memory
// which might cause some problems
int initial_write_buffer_size_ = 1;
cache = NewLRUCache(total_ram_size_bytes_);
write_controller.reset(
bool use_merge)
: total_ram_size_bytes_(total_ram_size_bytes),
total_threads_(total_threads),
delayed_write_rate_(delayed_write_rate),
bucket_size_(bucket_size),
use_merge_(use_merge) {
cache_ = NewLRUCache(total_ram_size_bytes_);
write_controller_.reset(
new WriteController(true /*dynamic_delay*/, delayed_write_rate_));
write_buffer_manager.reset(new WriteBufferManager(
initial_write_buffer_size_, cache, true /*allow_stall*/));

CreateWriteBufferManager();
CreatePinningPolicy();
}

void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) {
// Max write_buffer_manager->buffer_size()
size_t wbm_max_buf_size = GetMaxWriteBufferManagerSize();
size_t current_buffer_size = write_buffer_manager->buffer_size();
size_t current_buffer_size = write_buffer_manager_->buffer_size();
size_t set_buf_res = 0;

if (current_buffer_size == 1 && increase_by > 1) {
Expand All @@ -591,14 +590,25 @@ void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) {
set_buf_res = wbm_max_buf_size;
}
if (set_buf_res != 0) {
write_buffer_manager->SetBufferSize(set_buf_res);
write_buffer_manager_->SetBufferSize(set_buf_res);
}
}

void SharedOptions::CreateWriteBufferManager() {
// initial_write_buffer_size_ is initialized to 1 to avoid from empty memory
// which might cause some problems
size_t initial_write_buffer_size_ = 1U;

write_buffer_manager_.reset(new WriteBufferManager(
initial_write_buffer_size_, cache_, true /*allow_stall*/,
true /* initiate_fluses */, WriteBufferManager::FlushInitiationOptions(),
WriteBufferManager::kDfltStartDelayPercentThreshold));
}

void SharedOptions::CreatePinningPolicy() {
// Calculate the size of the clean memory
auto clean_memory_capacity = cache->GetCapacity();
if (write_buffer_manager->cost_to_cache()) {
auto clean_memory_capacity = cache_->GetCapacity();
if (write_buffer_manager_->cost_to_cache()) {
// The WBM's size is increased on every call to EnableSpeedbFeaturesCF()
// up to a max size. For simplicity, calculate the space for pinning
// as if wbm is at its max size. Otherwise we would have to update the
Expand All @@ -622,7 +632,7 @@ void SharedOptions::CreatePinningPolicy() {
std::ostringstream oss;
oss << "id=speedb_scoped_pinning_policy; capacity=" << pinning_capacity;
auto s = TablePinningPolicy::CreateFromString(config_options, oss.str(),
&pinning_policy);
&pinning_policy_);
assert(s.ok());
}

Expand All @@ -631,13 +641,12 @@ size_t SharedOptions::GetMaxWriteBufferManagerSize() const {
}

DBOptions* DBOptions::EnableSpeedbFeaturesDB(SharedOptions& shared_options) {
env = shared_options.env;
IncreaseParallelism((int)shared_options.GetTotalThreads());
delayed_write_rate = shared_options.GetDelayedWriteRate();
bytes_per_sync = 1ul << 20;
use_dynamic_delay = true;
write_buffer_manager = shared_options.write_buffer_manager;
write_controller = shared_options.write_controller;
write_buffer_manager = shared_options.write_buffer_manager_;
write_controller = shared_options.write_controller_;
return this;
}

Expand Down Expand Up @@ -665,8 +674,8 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF(
// to disable flush due to write buffer full
// each new column family will ask the write buffer manager to increase the
// write buffer size by 512 * 1024 * 1024ul
shared_options.IncreaseWriteBufferSize(512 * 1024 * 1024ul);
auto db_wbf_size = shared_options.write_buffer_manager->buffer_size();
shared_options.IncreaseWriteBufferSize(SharedOptions::kWbmPerCfSizeIncrease);
auto db_wbf_size = shared_options.write_buffer_manager_->buffer_size();
// cf write_buffer_size
write_buffer_size = std::min<size_t>(db_wbf_size / 4, 64ul << 20);
max_write_buffer_number = 4;
Expand All @@ -682,15 +691,14 @@ ColumnFamilyOptions* ColumnFamilyOptions::EnableSpeedbFeaturesCF(
&block_based_table_options.filter_policy);
assert(s.ok());
block_based_table_options.cache_index_and_filter_blocks = true;
block_based_table_options.block_cache = shared_options.cache;
block_based_table_options.block_cache = shared_options.cache_;
block_based_table_options.cache_index_and_filter_blocks_with_high_priority =
true;
block_based_table_options.pinning_policy = shared_options.pinning_policy;
block_based_table_options.pinning_policy = shared_options.pinning_policy_;
table_factory.reset(NewBlockBasedTableFactory(block_based_table_options));
}
if (prefix_extractor) {
memtable_factory.reset(
NewHashSkipListRepFactory(shared_options.GetBucketSize()));
memtable_factory.reset(new SkipListFactory());
} else {
memtable_factory.reset(
NewHashSpdbRepFactory(shared_options.GetBucketSize(),
Expand Down
Loading

0 comments on commit 5292d91

Please sign in to comment.