Skip to content

Commit

Permalink
apacheGH-41862: [C++][Dataset] Try out possible fix number 1
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed May 29, 2024
1 parent 4a2df66 commit 6a0c1e7
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,21 +330,21 @@ class DatasetWriterDirectoryQueue {

Status FinishCurrentFile() {
if (latest_open_file_) {
ARROW_RETURN_NOT_OK(latest_open_file_->Finish());
Status st = latest_open_file_->Finish();
latest_open_file_tasks_.reset();
latest_open_file_ = nullptr;
latest_open_file_.reset();
ARROW_RETURN_NOT_OK(st);
}
rows_written_ = 0;
return GetNextFilename().Value(&current_filename_);
}

Status OpenFileQueue(const std::string& filename) {
auto file_queue =
std::make_unique<DatasetWriterFileQueue>(schema_, write_options_, writer_state_);
latest_open_file_ = file_queue.get();
latest_open_file_ =
std::make_shared<DatasetWriterFileQueue>(schema_, write_options_, writer_state_);
// Create a dedicated throttle for write jobs to this file and keep it alive until we
// are finished and have closed the file.
auto file_finish_task = [this, file_queue = std::move(file_queue)] {
auto file_finish_task = [this, file_queue = latest_open_file_] {
writer_state_->open_files_throttle.Release(1);
return Status::OK();
};
Expand Down Expand Up @@ -413,9 +413,10 @@ class DatasetWriterDirectoryQueue {

Status Finish() {
if (latest_open_file_) {
ARROW_RETURN_NOT_OK(latest_open_file_->Finish());
Status st = latest_open_file_->Finish();
latest_open_file_tasks_.reset();
latest_open_file_ = nullptr;
latest_open_file_.reset();
ARROW_RETURN_NOT_OK(st);
}
used_filenames_.clear();
return Status::OK();
Expand All @@ -431,7 +432,7 @@ class DatasetWriterDirectoryQueue {
Future<> init_future_;
std::string current_filename_;
std::unordered_set<std::string> used_filenames_;
DatasetWriterFileQueue* latest_open_file_ = nullptr;
std::shared_ptr<DatasetWriterFileQueue> latest_open_file_;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> latest_open_file_tasks_;
uint64_t rows_written_ = 0;
uint32_t file_counter_ = 0;
Expand Down

0 comments on commit 6a0c1e7

Please sign in to comment.