Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objects.transfer: minor refactoring, move lazier taskset inside custom executor #6591

Merged
merged 1 commit into from Sep 13, 2021

Conversation

skshetry
Copy link
Member

No description provided.

@skshetry skshetry added refactoring Factoring and re-factoring A: object-storage Related to the object/content-addressable storage labels Sep 10, 2021
@skshetry skshetry self-assigned this Sep 10, 2021
@skshetry skshetry requested a review from a team as a code owner September 10, 2021 11:47
@skshetry skshetry added this to In progress in DVC 07 Sep - 21 Sep 2021 via automation Sep 10, 2021
Comment on lines 8 to 34
class ThreadPoolExecutor(futures.ThreadPoolExecutor):
@property
def max_workers(self) -> int:
return self._max_workers

def imap_unordered(
self, fn: Callable[..., _T], *iterables: Iterable[Any]
) -> Iterator[_T]:
"""Lazier version of map that does not preserve ordering of results.

It does not create all the futures at once to reduce memory usage.
"""

def create_taskset(n: int) -> Set[futures.Future]:
return {self.submit(fn, *args) for args in islice(it, n)}

it = zip(*iterables)
tasks = create_taskset(self.max_workers * 5)
while tasks:
done, tasks = futures.wait(
tasks, return_when=futures.FIRST_COMPLETED
)
for fut in done:
yield fut.result()
tasks.update(create_taskset(len(done)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, there was a plan to try using async here e.g. with fsspec's put/get methods. Maybe it would be reasonable to do that instead now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate? I am not sure I understand.

Copy link
Member

@efiop efiop Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we use thread pool executor right now to parallelize uploading/downloading of objects, but fsspec provides put/get methods that accept batches and async filesystems do that with async, effectively managing the coroutines themselves, instead of us manually using threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what you are suggesting is to have something like TransferManager that handles multithreaded and async transfer as well right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skshetry Ah, no, I meant that we possibly won't need transfer manager at all, and could just fs.put(obj_list) and let it do its thing. So we would be effectively delegating transfer management to particular filesystem's put/get methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skshetry Btw, happy to jump on a call to talk about it a bit more.

Copy link
Member Author

@skshetry skshetry Sep 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should wait for fsspec changes to be done, this will likely require us to have colored APIs too (odb.add_async, fs.utils.transfer_async etc.). And, there's a problem of running fsspec's sync FS in async mode too.

As this change is a very minor restructuring, I'd prefer to merge this as-is.

break
except (FileNotFoundError, ObjectFormatError):
pass
dir_obj = find_tree_by_obj_id([cache_odb, src], dir_hash)
Copy link
Member Author

@skshetry skshetry Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we call it dir_obj instead of tree?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a terminology mess (+ .dir objects as we call them, and not .tree). Also driving me insane for 3.0 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.dir is a physical repr thing similar to how it's json and kept in some structure, it's fine till 3.0. Though I'd say that we s/dir_obj/tree in all of these abstractions right away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a lot of leftovers from when remote was migrated to odb, and yeah it would be good to just drop the old terminology entirely whenever possible

@skshetry skshetry merged commit d2d6c73 into iterative:master Sep 13, 2021
DVC 07 Sep - 21 Sep 2021 automation moved this from In progress to Done Sep 13, 2021
@skshetry skshetry deleted the transfer-minor-refactoring branch September 13, 2021 05:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A: object-storage Related to the object/content-addressable storage refactoring Factoring and re-factoring
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

None yet

3 participants