Skip to content

Commit

Permalink
distributed_loader: drop execute_futures function
Browse files Browse the repository at this point in the history
execute_futures() is just a local reimplementation of
when_all_succeed(). Use the former directly.

Message-Id: <20210208114816.GA1658725@scylladb.com>
  • Loading branch information
Gleb Natapov authored and tgrabiec committed Feb 8, 2021
1 parent a05adb8 commit b9a5aff
Showing 1 changed file with 3 additions and 28 deletions.
31 changes: 3 additions & 28 deletions distributed_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@

extern logging::logger dblog;

static future<> execute_futures(std::vector<future<>>& futures);

static const std::unordered_set<std::string_view> system_keyspaces = {
db::system_keyspace::NAME, db::schema_tables::NAME
};
Expand Down Expand Up @@ -484,29 +482,6 @@ distributed_loader::get_sstables_from_upload_dir(distributed<database>& db, sstr
});
}

static future<> execute_futures(std::vector<future<>>& futures) {
return seastar::when_all(futures.begin(), futures.end()).then([] (std::vector<future<>> ret) {
std::exception_ptr eptr;

for (auto& f : ret) {
try {
if (eptr) {
f.ignore_ready_future();
} else {
f.get();
}
} catch(...) {
eptr = std::current_exception();
}
}

if (eptr) {
return make_exception_future<>(eptr);
}
return make_ready_future<>();
});
}

future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir) {
return do_with(std::vector<future<>>(), [sstdir = std::move(sstdir)] (std::vector<future<>>& futures) {
return lister::scan_dir(sstdir, { directory_entry_type::directory }, [&futures] (fs::path sstdir, directory_entry de) {
Expand All @@ -520,7 +495,7 @@ future<> distributed_loader::cleanup_column_family_temp_sst_dirs(sstring sstdir)
}
return make_ready_future<>();
}).then([&futures] {
return execute_futures(futures);
return when_all_succeed(futures.begin(), futures.end()).discard_result();
});
});
}
Expand All @@ -547,7 +522,7 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
}
return make_ready_future<>();
}).then([&futures] {
return execute_futures(futures);
return when_all_succeed(futures.begin(), futures.end()).discard_result();
});
});
}
Expand Down Expand Up @@ -774,7 +749,7 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<database>& db
}));
}

execute_futures(futures).get();
when_all_succeed(futures.begin(), futures.end()).discard_result().get();

db.invoke_on_all([] (database& db) {
return parallel_for_each(db.get_non_system_column_families(), [] (lw_shared_ptr<table> table) {
Expand Down

0 comments on commit b9a5aff

Please sign in to comment.