Skip to content

Commit

Permalink
Merge pull request #28334 from vespa-engine/geirst/simplify-threading…
Browse files Browse the repository at this point in the history
…-service

Simplify IThreadingService as there only is one field writer executor.
  • Loading branch information
baldersheim authored Aug 31, 2023
2 parents 0cdcbd8 + 1d5c830 commit 46716a0
Show file tree
Hide file tree
Showing 15 changed files with 38 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Fixture::initViewSet(ViewSet &views)
TuneFileIndexManager(), TuneFileAttributes(), views._fileHeaderContext);
auto attrMgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(),
views._fileHeaderContext, std::make_shared<search::attribute::Interlock>(),
views._service.write().attributeFieldWriter(), views._service.write().shared(), views._hwInfo);
views._service.write().field_writer(), views._service.write().shared(), views._hwInfo);
auto summaryMgr = make_shared<SummaryManager>
(_summaryExecutor, search::LogDocumentStore::Config(), search::GrowStrategy(), BASE_DIR,
TuneFileSummary(), views._fileHeaderContext,views._noTlSyncer, search::IBucketizer::SP());
Expand Down Expand Up @@ -318,7 +318,7 @@ struct MyFastAccessFeedView
StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY);
auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(),
_fileHeaderContext, std::make_shared<search::attribute::Interlock>(),
_writeService.attributeFieldWriter(), _writeService.shared(), _hwInfo);
_writeService.field_writer(), _writeService.shared(), _hwInfo);
auto writer = std::make_shared<AttributeWriter>(mgr);
FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit);
_feedView.set(std::make_shared<FastAccessFeedView>(std::move(storeOnlyCtx), params, fastUpdateCtx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ class ExecutorThreadingServiceTest : public ::testing::Test {
ThreadingServiceConfig::make()))
{
}
SequencedTaskExecutor* index_inverter() {
return to_concrete_type(service->indexFieldInverter());
}
SequencedTaskExecutor* index_writer() {
return to_concrete_type(service->indexFieldWriter());
}
SequencedTaskExecutor* attribute_writer() {
return to_concrete_type(service->attributeFieldWriter());
}
SequencedTaskExecutor* field_writer() {
return to_concrete_type(*field_writer_executor);
}
Expand All @@ -57,9 +48,7 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex

TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside)
{
EXPECT_EQ(field_writer(), index_inverter());
EXPECT_EQ(field_writer(), index_writer());
EXPECT_EQ(field_writer(), attribute_writer());
EXPECT_EQ(field_writer(), &service->field_writer());
assert_executor(field_writer(), 3, 200);
}

Expand All @@ -69,9 +58,7 @@ TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated)
EXPECT_EQ(5, service->master_task_limit());
EXPECT_EQ(7, service->index().getTaskLimit());
EXPECT_EQ(11, service->summary().getTaskLimit());
EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit());
EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit());
EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit());
EXPECT_EQ(7, field_writer()->first_executor()->getTaskLimit());
}

GTEST_MAIN_RUN_ALL_TESTS()
Expand Down
4 changes: 2 additions & 2 deletions searchcore/src/tests/proton/index/fusionrunner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ void Test::createIndex(const string &dir, uint32_t id, bool fusion) {
DocBuilder doc_builder(add_fields);
auto schema = SchemaBuilder(doc_builder).add_all_indexes().build();
MemoryIndex memory_index(schema, MockFieldLengthInspector(),
_service.write().indexFieldInverter(),
_service.write().indexFieldWriter());
_service.write().field_writer(),
_service.write().field_writer());
addDocument(doc_builder, memory_index, *_selector, id, id + 0, term);
addDocument(doc_builder, memory_index, *_selector, id, id + 1, "bar");
addDocument(doc_builder, memory_index, *_selector, id, id + 2, "baz");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ MemoryIndexWrapper::MemoryIndexWrapper(const search::index::Schema& schema,
const TuneFileIndexing& tuneFileIndexing,
searchcorespi::index::IThreadingService& threadingService,
search::SerialNum serialNum)
: _index(schema, inspector, threadingService.indexFieldInverter(),
threadingService.indexFieldWriter()),
: _index(schema, inspector, threadingService.field_writer(),
threadingService.field_writer()),
_serialNum(serialNum),
_fileHeaderContext(fileHeaderContext),
_tuneFileIndexing(tuneFileIndexing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ ExecutorThreadingServiceMetrics::update(const ExecutorThreadingServiceStats &sta
master.update(stats.getMasterExecutorStats());
index.update(stats.getIndexExecutorStats());
summary.update(stats.getSummaryExecutorStats());
indexFieldInverter.update(stats.getIndexFieldInverterExecutorStats());
indexFieldWriter.update(stats.getIndexFieldWriterExecutorStats());
attributeFieldWriter.update(stats.getAttributeFieldWriterExecutorStats());
vespalib::ExecutorStats empty_stats;
indexFieldInverter.update(empty_stats);
indexFieldWriter.update(empty_stats);
attributeFieldWriter.update(empty_stats);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@ namespace proton {

ExecutorThreadingServiceStats::ExecutorThreadingServiceStats(Stats masterExecutorStats,
Stats indexExecutorStats,
Stats summaryExecutorStats,
Stats indexFieldInverterExecutorStats,
Stats indexFieldWriterExecutorStats,
Stats attributeFieldWriterExecutorStats)
Stats summaryExecutorStats)
: _masterExecutorStats(masterExecutorStats),
_indexExecutorStats(indexExecutorStats),
_summaryExecutorStats(summaryExecutorStats),
_indexFieldInverterExecutorStats(indexFieldInverterExecutorStats),
_indexFieldWriterExecutorStats(indexFieldWriterExecutorStats),
_attributeFieldWriterExecutorStats(attributeFieldWriterExecutorStats)
_summaryExecutorStats(summaryExecutorStats)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,15 @@ class ExecutorThreadingServiceStats {
Stats _masterExecutorStats;
Stats _indexExecutorStats;
Stats _summaryExecutorStats;
Stats _indexFieldInverterExecutorStats;
Stats _indexFieldWriterExecutorStats;
Stats _attributeFieldWriterExecutorStats;
public:
ExecutorThreadingServiceStats(Stats masterExecutorStats,
Stats indexExecutorStats,
Stats summaryExecutorStats,
Stats indexFieldInverterExecutorStats,
Stats indexFieldWriterExecutorStats,
Stats attributeFieldWriterExecutorStats);
Stats summaryExecutorStats);
~ExecutorThreadingServiceStats();

const Stats &getMasterExecutorStats() const { return _masterExecutorStats; }
const Stats &getIndexExecutorStats() const { return _indexExecutorStats; }
const Stats &getSummaryExecutorStats() const { return _summaryExecutorStats; }
const Stats &getIndexFieldInverterExecutorStats() const { return _indexFieldInverterExecutorStats; }
const Stats &getIndexFieldWriterExecutorStats() const { return _indexFieldWriterExecutorStats; }
const Stats &getAttributeFieldWriterExecutorStats() const { return _attributeFieldWriterExecutorStats; }
};

}
4 changes: 2 additions & 2 deletions searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ DocumentDB::applySubDBConfig(const DocumentDBConfig &newConfigSnapshot,
auto newDocType = newRepo->getDocumentType(_docTypeName.getName());
assert(newDocType != nullptr);
DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(), *oldDocType,
_refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig());
_refCount, _writeService.field_writer(), _state.getAllowReconfig());
_subDBs.applyConfig(newConfigSnapshot, *_activeConfigSnapshot, serialNum, params, resolver, prepared_reconfig);
}

Expand Down Expand Up @@ -535,7 +535,7 @@ DocumentDB::tearDownReferences()
auto docType = repo->getDocumentType(_docTypeName.getName());
assert(docType != nullptr);
DocumentDBReferenceResolver resolver(*registry, *docType, activeConfig->getImportedFieldsConfig(), *docType,
_refCount, _writeService.attributeFieldWriter(), false);
_refCount, _writeService.field_writer(), false);
_subDBs.tearDownReferences(resolver);
registry->remove(_docTypeName.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ ExecutorThreadingServiceExplorer::get_state(const vespalib::slime::Inserter& ins
convert_executor_to_slime(&_service.master(), object.setObject("master"));
convert_executor_to_slime(&_service.index(), object.setObject("index"));
convert_executor_to_slime(&_service.summary(), object.setObject("summary"));
convert_executor_to_slime(&_service.indexFieldInverter(), object.setObject("index_field_inverter"));
convert_executor_to_slime(&_service.indexFieldWriter(), object.setObject("index_field_writer"));
convert_executor_to_slime(&_service.attributeFieldWriter(), object.setObject("attribute_field_writer"));
convert_executor_to_slime(&_service.field_writer(), object.setObject("field_writer"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx
_summaryExecutor(createExecutorWithOneThread(cfg, CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_index_field_inverter(field_writer),
_index_field_writer(field_writer),
_attribute_field_writer(field_writer),
_field_writer(field_writer),
_invokeRegistrations()
{
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
Expand All @@ -92,11 +90,11 @@ void
ExecutorThreadingService::shutdown()
{
_masterExecutor.shutdown().sync();
_attribute_field_writer.sync_all();
_field_writer.sync_all();
_summaryExecutor->shutdown().sync();
_indexExecutor->shutdown().sync();
_index_field_inverter.sync_all();
_index_field_writer.sync_all();
_field_writer.sync_all();
_field_writer.sync_all();
}

void
Expand All @@ -107,10 +105,7 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit,
_master_task_limit.store(master_task_limit, std::memory_order_release);
_indexExecutor->setTaskLimit(field_task_limit);
_summaryExecutor->setTaskLimit(summary_task_limit);
// TODO: Move this to a common place when the field writer is always shared.
_index_field_inverter.setTaskLimit(field_task_limit);
_index_field_writer.setTaskLimit(field_task_limit);
_attribute_field_writer.setTaskLimit(field_task_limit);
_field_writer.setTaskLimit(field_task_limit);
}

ExecutorThreadingServiceStats
Expand All @@ -119,26 +114,13 @@ ExecutorThreadingService::getStats()
auto master_stats = _masterExecutor.getStats();
auto index_stats = _indexExecutor->getStats();
auto summary_stats = _summaryExecutor->getStats();
vespalib::ExecutorStats empty_stats;
// In this case the field writer stats are reported at a higher level.
return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats,
empty_stats, empty_stats, empty_stats);
return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats);
}

vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldInverter() {
return _index_field_inverter;
ExecutorThreadingService::field_writer() {
return _field_writer;
}

vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::indexFieldWriter() {
return _index_field_writer;
}

vespalib::ISequencedTaskExecutor &
ExecutorThreadingService::attributeFieldWriter() {
return _attribute_field_writer;
}

} // namespace proton

Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor;
SyncableExecutorThreadService _masterService;
ExecutorThreadService _indexService;
vespalib::ISequencedTaskExecutor& _index_field_inverter;
vespalib::ISequencedTaskExecutor& _index_field_writer;
vespalib::ISequencedTaskExecutor& _attribute_field_writer;
vespalib::ISequencedTaskExecutor& _field_writer;
std::vector<Registration> _invokeRegistrations;

public:
Expand Down Expand Up @@ -77,9 +75,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService
return _sharedExecutor;
}

vespalib::ISequencedTaskExecutor &indexFieldInverter() override;
vespalib::ISequencedTaskExecutor &indexFieldWriter() override;
vespalib::ISequencedTaskExecutor &attributeFieldWriter() override;
vespalib::ISequencedTaskExecutor &field_writer() override;
FNET_Transport &transport() override { return _transport; }
const vespalib::Clock &clock() const override { return _clock; }
ExecutorThreadingServiceStats getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ FastAccessDocSubDB::createAttributeManagerInitializer(const DocumentDBConfig &co
configSnapshot.getTuneFileDocumentDBSP()->_attr,
_fileHeaderContext,
_attribute_interlock,
_writeService.attributeFieldWriter(),
_writeService.field_writer(),
_writeService.shared(),
attrFactory,
_hwInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ ThreadingServiceObserver::ThreadingServiceObserver(searchcorespi::index::IThread
_index(service.index()),
_summary(service.summary()),
_shared(service.shared()),
_indexFieldInverter(_service.indexFieldInverter()),
_indexFieldWriter(_service.indexFieldWriter()),
_attributeFieldWriter(_service.attributeFieldWriter())
_field_writer(_service.field_writer())
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ class ThreadingServiceObserver : public searchcorespi::index::IThreadingService
ThreadServiceObserver _index;
ThreadExecutorObserver _summary;
vespalib::Executor & _shared;
vespalib::SequencedTaskExecutorObserver _indexFieldInverter;
vespalib::SequencedTaskExecutorObserver _indexFieldWriter;
vespalib::SequencedTaskExecutorObserver _attributeFieldWriter;
vespalib::SequencedTaskExecutorObserver _field_writer;

public:
ThreadingServiceObserver(searchcorespi::index::IThreadingService &service);
Expand Down Expand Up @@ -51,17 +49,9 @@ class ThreadingServiceObserver : public searchcorespi::index::IThreadingService
}
FNET_Transport & transport() override { return _service.transport(); }
const vespalib::Clock & clock() const override { return _service.clock(); }
vespalib::ISequencedTaskExecutor &indexFieldInverter() override {
return _indexFieldInverter;
vespalib::ISequencedTaskExecutor &field_writer() override {
return _field_writer;
}
vespalib::ISequencedTaskExecutor &indexFieldWriter() override {
return _indexFieldWriter;
}

vespalib::ISequencedTaskExecutor &attributeFieldWriter() override {
return _attributeFieldWriter;
}

};

}
25 changes: 5 additions & 20 deletions searchcore/src/vespa/searchcorespi/index/ithreadingservice.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ namespace searchcorespi::index {
*
* 2. The "index" write thread used for doing changes to the memory
* index, either directly (for data not bound to a field) or via
* index field inverter executor or index field writer executor.
* field writer executor (index field inverter / index field writer).
*
* 3. The "summary" thread is used for doing changes to the document store.
*
* 4. The "index field inverter" executor is used to populate field
* 4. The field writer executor ("index field inverter") is used to populate field
* inverters with data from document fields. Scheduled tasks for
* the same field are executed in sequence.
*
* 5. The "index field writer" executor is used to sort data in field
* 5. The field writer executor ("index field writer") is used to sort data in field
* inverters before pushing the data to the memory field indexes.
* Scheduled tasks for the same field are executed in sequence.
*
* 6. The "attribute field writer" executor is used to write data to attribute vectors.
* 6. The field writer executor ("attribute field writer") is used to write data to attribute vectors.
* Each attribute is always handled by the same thread,
* and scheduled tasks for the same attribute are executed in sequence.
*
Expand All @@ -47,19 +47,6 @@ namespace searchcorespi::index {
* task to the index field inverter executor and the index field
* writer executor.
*
* The index field inverter executor and index field writer executor
* are separate to allow for double buffering, i.e. populate one set
* of field inverters using the index field inverter executor while
* another set of field inverters are handled by the index field
* writer executor.
*
* We might decide to allow index field inverter tasks to schedule
* tasks to the index field writer executor, so draining logic needs
* to sync index field inverter executor before syncing index field
* writer executor.
*
* TODO: * indexFieldInverter and indexFieldWriter can be collapsed to one. Both need sequencing,
* but they sequence on different things so efficiency will be the same and just depends on #threads
*/
struct IThreadingService
{
Expand All @@ -80,9 +67,7 @@ struct IThreadingService
virtual vespalib::Executor &shared() = 0;
virtual FNET_Transport &transport() = 0;
virtual const vespalib::Clock &clock() const = 0;
virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0;
virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0;
virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0;
virtual vespalib::ISequencedTaskExecutor &field_writer() = 0;
};

}

0 comments on commit 46716a0

Please sign in to comment.