Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close streams in prototype datasets #6647

Merged
merged 5 commits into from
Oct 6, 2022

Conversation

pmeier
Copy link
Collaborator

@pmeier pmeier commented Sep 26, 2022

Redo of #6128 with the changes suggested there by me:

  • Instead of closing every single EncodedImage.from_file(buffer); buffer.close() individually, I moved the closing into EncodedImage.from_file. So far we we don't have a case where we need to read from the same stream twice. Even if we at some point do, e.g. optical flow (although we are probably better of with caching a single image), we can always introduce a keyword close: bool = True to overwrite the behavior.
  • The patch above accounts for the largest chunk of changes in Closing streams to avoid testing issues #6128. There are a few datasets that handle other files as well and I closed them directly. Plus, I also added a .close() call into read_mat
  • I've added a test that checks that a dataset doesn't leave unclosed streams after a full iteration.

deque(iterator, maxlen=0)


def next_consume(iterator):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful since we often just want the first sample, but need to make sure to still consume to avoid dangling streams. list(iterator) would also do the trick, but keeps everything in memory for no reason.

test/test_prototype_datasets_builtin.py Show resolved Hide resolved
def test_no_simple_tensors(self, dataset_mock, config):
dataset, _ = dataset_mock.load(config)

simple_tensors = {key for key, value in next_consume(iter(dataset)).items() if features.is_simple_tensor(value)}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a drive-by since I was already touching the line: the term "vanilla" tensor is no longer used. In the prototype transforms we use "simple tensor" now and also have features.is_simple_tensor to check for them.

Comment on lines 106 to 110
split_dp, to_be_closed_dp = (
(extra_split_dp, split_dp) if self._split == "train_noval" else (split_dp, extra_split_dp)
)
for _, file in to_be_closed_dp:
file.close()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat problematic in the split == "train_noval" case. We want to close all file handles that are coming from the split_dp. Unfortunately, split_dp comes from a Demultiplexer. Thus, by fully iterating over it here, we are loading everything into the demux buffer.

Is there an idiom to "mark" a datapipe to be closed at runtime even if we don't return the datapipe? The only thing I came up with is changing the classifier function of the Demultiplexer to drop the samples that would go into split_dp if split == "train_noval".

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is incorrect to consume Datapipe in construction time.

        for _, file in to_be_closed_dp:
            file.close()

It will not affect executed graph.

Ideally, we want to have something like dp.close(), which will effectively remove dangling pieces of the graph.

But for now you can either use trick like split_dp = split_dp.concatinate(to_be_closed_dp.filter(lambda x: False)) or do code branching before Demux and avoid creating dangling pieces.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is incorrect to consume Datapipe in construction time.

I don't think that is fully correct for our case. At construction, we load a couple of files and weave them together into one datapipe. These files are always loaded unconditionally, but for some configurations not all of the files are needed. So we should be able to simply close them during the construction of the dataset datapipe, since they will never make it in the graph, correct?

I agree, we shouldn't do this for datapipes that stem for a Demultiplexer if the other parts make it into the final graph.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do code branching before Demux and avoid creating dangling pieces.

If possible, I think that is the better solution, since the other still iterates over all items when the loop should actually be done. I guess that could be irritating as well.

I implemented branching in afb0ec2. PTAL

Copy link

@VitalyFedyunin VitalyFedyunin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking over it!

drop_none=True,
)
else:
archive_dp = resource_dps[0]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen with resource_dps[1] in this case? It is disconnected from the graph or remaining unconsumed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If if self._split != "train_noval", we have only one element in resource_dps. Meaning, it will not be loaded at all and thus also does not need to be closed.

@pmeier pmeier merged commit 7eb5d7f into pytorch:main Oct 6, 2022
facebook-github-bot pushed a commit that referenced this pull request Oct 17, 2022
Summary:
* close streams in prototype datasets

* refactor prototype SBD to avoid closing demux streams at construction time

* mypy

Reviewed By: NicolasHug

Differential Revision: D40427477

fbshipit-source-id: 854554f283ff281f8c9eb0e2786644116a4b4dd8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants