Skip to content

Commit

Permalink
Experimental: Thread pool executor
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jan 17, 2024
1 parent 0fdcedd commit b16c8b9
Showing 1 changed file with 58 additions and 6 deletions.
64 changes: 58 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

__all__ = ("FileDatastore",)

import concurrent.futures
import contextlib
import hashlib
import logging
Expand Down Expand Up @@ -1585,11 +1586,35 @@ def _mexists_check_expected(
# guessed names.
records = {}
id_to_ref = {}
for missing_ref in refs:
expected = self._get_expected_dataset_locations_info(missing_ref)
dataset_id = missing_ref.id
records[dataset_id] = [info for _, info in expected]
id_to_ref[dataset_id] = missing_ref
if True:

def _populate_missing_records(
refs: list[DatasetRef],
) -> tuple[dict[DatasetId, list], dict[DatasetId, DatasetRef]]:
_id_to_ref = {}
_records = {}
for missing_ref in refs:
expected = self._get_expected_dataset_locations_info(missing_ref)
dataset_id = missing_ref.id
_records[dataset_id] = [info for _, info in expected]
_id_to_ref[dataset_id] = missing_ref
return _records, _id_to_ref

executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
future_chunks = [
executor.submit(_populate_missing_records, chunk) for chunk in chunk_iterable(refs, 1_000)
]
for future in concurrent.futures.as_completed(future_chunks):
# Let it fail if any chunk fails.
_r, _id = future.result()
records.update(_r)
id_to_ref.update(_id)
else:
for missing_ref in refs:
expected = self._get_expected_dataset_locations_info(missing_ref)
dataset_id = missing_ref.id
records[dataset_id] = [info for _, info in expected]
id_to_ref[dataset_id] = missing_ref

dataset_existence.update(
self._process_mexists_records(
Expand Down Expand Up @@ -2411,7 +2436,13 @@ def transfer_from(
# This should be chunked in case we end up having to check
# the file store since we need some log output to show
# progress.
for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=10_000):
def _check_missing_files(
source_datastore: Datastore,
id_to_ref: dict[DatasetId, DatasetRef],
artifact_existence: dict[ResourcePath, bool],
missing_ids_chunk: list[DatasetId],
) -> dict[DatasetId, list]:
source_records = defaultdict(list)
records = {}
for missing in missing_ids_chunk:
# Ask the source datastore where the missing artifacts
Expand Down Expand Up @@ -2454,6 +2485,27 @@ def transfer_from(

# Rely on source_records being a defaultdict.
source_records[missing].extend(dataset_records)
return source_records

if True:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
future_chunks = [
executor.submit(
_check_missing_files, source_datastore, id_to_ref, artifact_existence, chunk
)
for chunk in chunk_iterable(missing_ids, chunk_size=5_000)
]
for future in concurrent.futures.as_completed(future_chunks):
# Let it fail if any chunk fails.
new_records = future.result()
for k, v in new_records.items():
source_records[k].extend(v)
else:
for missing_ids_chunk in chunk_iterable(missing_ids, chunk_size=5_000):
new_records = _check_missing_files(source_datastore, missing_ids_chunk)
for k, v in new_records.items():
source_records[k].extend(v)

log.verbose("Completed scan for missing data files")

# See if we already have these records
Expand Down

0 comments on commit b16c8b9

Please sign in to comment.