Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [2.4]Throw an exception after all the threads in thread pool finished (#32810) #33314

Merged
merged 1 commit into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,22 @@
}

std::vector<FieldDataPtr> datas;
for (int i = 0; i < futures.size(); ++i) {
auto res = futures[i].get();
datas.emplace_back(res->GetFieldData());
std::exception_ptr first_exception = nullptr;
for (auto& future : futures) {
try {
auto res = future.get();
datas.emplace_back(res->GetFieldData());
} catch (...) {
if (!first_exception) {
first_exception = std::current_exception();

Check warning on line 585 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L583-L585

Added lines #L583 - L585 were not covered by tests
}
}
}
ReleaseArrowUnused();
if (first_exception) {
std::rethrow_exception(first_exception);

Check warning on line 591 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L591

Added line #L591 was not covered by tests
}

return datas;
}

Expand Down Expand Up @@ -612,12 +623,22 @@
}

std::map<std::string, int64_t> remote_paths_to_size;
std::exception_ptr first_exception = nullptr;
for (auto& future : futures) {
auto res = future.get();
remote_paths_to_size[res.first] = res.second;
try {
auto res = future.get();
remote_paths_to_size[res.first] = res.second;
} catch (...) {
if (!first_exception) {
first_exception = std::current_exception();

Check warning on line 633 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L631-L633

Added lines #L631 - L633 were not covered by tests
}
}
}

ReleaseArrowUnused();
if (first_exception) {
std::rethrow_exception(first_exception);

Check warning on line 639 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L639

Added line #L639 was not covered by tests
}

return remote_paths_to_size;
}

Expand Down Expand Up @@ -650,12 +671,22 @@
}

std::map<std::string, int64_t> remote_paths_to_size;
std::exception_ptr first_exception = nullptr;
for (auto& future : futures) {
auto res = future.get();
remote_paths_to_size[res.first] = res.second;
try {
auto res = future.get();
remote_paths_to_size[res.first] = res.second;
} catch (...) {
if (!first_exception) {
first_exception = std::current_exception();

Check warning on line 681 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L679-L681

Added lines #L679 - L681 were not covered by tests
}
}
}

ReleaseArrowUnused();
if (first_exception) {
std::rethrow_exception(first_exception);

Check warning on line 687 in internal/core/src/storage/Util.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/Util.cpp#L687

Added line #L687 was not covered by tests
}

return remote_paths_to_size;
}

Expand Down
Loading