Skip to content

Commit

Permalink
Merge "Fixes for incremental backup" from Glauber
Browse files Browse the repository at this point in the history
"The control over backups is now moved to the CF itself, from the storage
service. That allows us to simplify the code (while making it correct) for cases
in which the storage service is not available.

With this change, we no longer need the database config passed down to the
storage_service object. So that patch is reverted."
  • Loading branch information
avikivity committed Oct 5, 2015
2 parents b74a9d9 + 651937b commit e342914
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 28 deletions.
29 changes: 25 additions & 4 deletions api/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,36 @@ void set_storage_service(http_context& ctx, routes& r) {
});

ss::is_incremental_backups_enabled.set(r, [](std::unique_ptr<request> req) {
return make_ready_future<json::json_return_type>(service::get_local_storage_service().incremental_backups_enabled());
// If this is issued in parallel with an ongoing change, we may see values not agreeing.
// Reissuing is asking for trouble, so we will just return true upon seeing any true value.
return service::get_local_storage_service().db().map_reduce(adder<bool>(), [] (database& db) {
for (auto& pair: db.get_keyspaces()) {
auto& ks = pair.second;
if (ks.incremental_backups_enabled()) {
return true;
}
}
return false;
}).then([] (bool val) {
return make_ready_future<json::json_return_type>(val);
});
});

ss::set_incremental_backups_enabled.set(r, [](std::unique_ptr<request> req) {
auto val_str = req->get_query_param("value");
bool value = (val_str == "True") || (val_str == "true") || (val_str == "1");
return service::get_storage_service().invoke_on_all([value] (auto& local_service) {
local_service.incremental_backups_set_value(value);
}).then([] {;
return service::get_local_storage_service().db().invoke_on_all([value] (database& db) {
// Change both KS and CF, so they are in sync
for (auto& pair: db.get_keyspaces()) {
auto& ks = pair.second;
ks.set_incremental_backups(value);
}

for (auto& pair: db.get_column_families()) {
auto cf_ptr = pair.second;
cf_ptr->set_incremental_backups(value);
}
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
Expand Down
6 changes: 4 additions & 2 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
newtab->set_unshared();
dblog.debug("Flushing to {}", newtab->get_filename());
return newtab->write_components(*old).then([this, newtab, old] {
return newtab->open_data().then([this, newtab] {;
return newtab->open_data().then([this, newtab] {
// Note that due to our sharded architecture, it is possible that
// in the face of a value change some shards will backup sstables
// while others won't.
Expand All @@ -510,7 +510,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
//
// The code as is guarantees that we'll never partially backup a
// single sstable, so that is enough of a guarantee.
if (!service::get_local_storage_service().incremental_backups_enabled()) {
if (!incremental_backups_enabled()) {
return make_ready_future<>();
}
auto dir = newtab->get_dir() + "/backups/";
Expand Down Expand Up @@ -1139,6 +1139,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.enable_cache = _config.enable_cache;
cfg.max_memtable_size = _config.max_memtable_size;
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
cfg.enable_incremental_backups = _config.enable_incremental_backups;

return cfg;
}
Expand Down Expand Up @@ -1480,6 +1481,7 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.max_memtable_size = std::numeric_limits<size_t>::max();
}
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
cfg.enable_incremental_backups = _cfg->incremental_backups();
return cfg;
}

Expand Down
28 changes: 28 additions & 0 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public:
bool enable_disk_reads = true;
bool enable_cache = true;
bool enable_commitlog = true;
bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
};
Expand Down Expand Up @@ -217,6 +218,14 @@ public:

future<> snapshot(sstring name);

const bool incremental_backups_enabled() const {
return _config.enable_incremental_backups;
}

void set_incremental_backups(bool val) {
_config.enable_incremental_backups = val;
}

lw_shared_ptr<sstable_list> get_sstables();
size_t sstables_count();
int64_t get_unleveled_sstables() const;
Expand Down Expand Up @@ -370,6 +379,7 @@ public:
bool enable_disk_reads = true;
bool enable_disk_writes = true;
bool enable_cache = true;
bool enable_incremental_backups = false;
size_t max_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
};
Expand Down Expand Up @@ -398,6 +408,14 @@ public:
// FIXME to allow simple registration at boostrap
void set_replication_strategy(std::unique_ptr<locator::abstract_replication_strategy> replication_strategy);

const bool incremental_backups_enabled() const {
return _config.enable_incremental_backups;
}

void set_incremental_backups(bool val) {
_config.enable_incremental_backups = val;
}

const sstring& datadir() const {
return _config.datadir;
}
Expand Down Expand Up @@ -522,9 +540,19 @@ public:
const std::unordered_map<sstring, keyspace>& get_keyspaces() const {
return _keyspaces;
}

std::unordered_map<sstring, keyspace>& get_keyspaces() {
return _keyspaces;
}

const std::unordered_map<utils::UUID, lw_shared_ptr<column_family>>& get_column_families() const {
return _column_families;
}

std::unordered_map<utils::UUID, lw_shared_ptr<column_family>>& get_column_families() {
return _column_families;
}

const std::unordered_map<std::pair<sstring, sstring>, utils::UUID, utils::tuple_hash>&
get_column_families_mapping() const {
return _ks_cf_to_uuid;
Expand Down
4 changes: 2 additions & 2 deletions init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
// duplicated in cql_test_env.cc
// until proper shutdown is done.

future<> init_storage_service(distributed<database>& db, const db::config& cfg) {
return service::init_storage_service(db, cfg).then([] {
future<> init_storage_service(distributed<database>& db) {
return service::init_storage_service(db).then([] {
// #293 - do not stop anything
//engine().at_exit([] { return service::deinit_storage_service(); });
});
Expand Down
2 changes: 1 addition & 1 deletion init.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@
#include "db/config.hh"
#include "database.hh"

future<> init_storage_service(distributed<database>& db, const db::config& cfg);
future<> init_storage_service(distributed<database>& db);
future<> init_ms_fd_gossiper(sstring listen_address, db::seed_provider_type seed_provider, sstring cluster_name = "Test Cluster");
6 changes: 3 additions & 3 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ int main(int ac, char** av) {
return i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).then([] {
// #293 - do not stop anything
// engine().at_exit([] { return i_endpoint_snitch::stop_snitch(); });
}).then([&db] {
return init_storage_service(db);
}).then([&db, cfg] {
return init_storage_service(db, std::ref(*cfg));
}).then([&db, cfg] {
return db.start(std::ref(*cfg)).then([&db] {
return db.start(std::move(*cfg)).then([&db] {
engine().at_exit([&db] {

// #293 - do not stop anything - not even db (for real)
Expand Down
21 changes: 7 additions & 14 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,12 @@ class storage_service : public gms::i_endpoint_state_change_subscriber, public s
private final AtomicLong notificationSerialNumber = new AtomicLong();
#endif
distributed<database>& _db;
std::unique_ptr<db::config> _cfg;
public:
storage_service(distributed<database>& db, const db::config& cfg)
: _db(db)
, _cfg(std::make_unique<db::config>(cfg)) {
storage_service(distributed<database>& db)
: _db(db) {
}
static int RING_DELAY; // delay after which we assume ring has stablized

bool incremental_backups_enabled() {
return _cfg->incremental_backups();
}

void incremental_backups_set_value(bool val) {
_cfg->incremental_backups() = val;
}

// Needed by distributed<>
future<> stop();

Expand All @@ -106,6 +96,9 @@ public:

void gossip_snitch_info();

distributed<database>& db() {
return _db;
}
private:
bool is_auto_bootstrap();
inet_address get_broadcast_address() {
Expand Down Expand Up @@ -2963,8 +2956,8 @@ inline future<std::map<dht::token, gms::inet_address>> get_token_to_endpoint() {
});
}

inline future<> init_storage_service(distributed<database>& db, const db::config& cfg) {
return service::get_storage_service().start(std::ref(db), cfg).then([] {
inline future<> init_storage_service(distributed<database>& db) {
return service::get_storage_service().start(std::ref(db)).then([] {
print("Start Storage service ...\n");
});
}
Expand Down
2 changes: 1 addition & 1 deletion tests/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
// Simpler to copy the code from init.cc than trying to do clever parameterization
// and whatnot.
static future<> tst_init_storage_service(distributed<database>& db) {
return service::init_storage_service(db, db::config()).then([] {
return service::init_storage_service(db).then([] {
engine().at_exit([] { return service::deinit_storage_service(); });
});
}
Expand Down
2 changes: 1 addition & 1 deletion tests/gossip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int main(int ac, char ** av) {
auto config = app.configuration();
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
service::init_storage_service(db, db::config()).then([listen, config] {
service::init_storage_service(db).then([listen, config] {
return net::get_messaging_service().start(listen);
}).then([config] {
auto& server = net::get_local_messaging_service();
Expand Down
3 changes: 3 additions & 0 deletions tests/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
return with_column_family(s, cfg, [s] (column_family& cf) {
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
Expand Down Expand Up @@ -319,6 +320,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
cfg.enable_disk_reads = true;
cfg.enable_disk_writes = true;
cfg.enable_cache = true;
cfg.enable_incremental_backups = false;

return with_column_family(s, cfg, [s](column_family& cf) {
return seastar::async([s, &cf] {
Expand Down Expand Up @@ -391,6 +393,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
column_family::config cfg;
cfg.enable_disk_reads = false;
cfg.enable_disk_writes = false;
cfg.enable_incremental_backups = false;
auto cm = make_lw_shared<compaction_manager>();
return do_with(make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm), [s, cm] (auto& cf_ptr) mutable {
column_family& cf = *cf_ptr;
Expand Down
1 change: 1 addition & 0 deletions tests/sstable_datafile_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ SEASTAR_TEST_CASE(compaction_manager_test) {
column_family::config cfg;
cfg.datadir = tmp->path;
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm);
cf->start();
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
Expand Down

0 comments on commit e342914

Please sign in to comment.