Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync before and after deleting #2268

Merged
merged 8 commits into from Nov 30, 2023

Conversation

pplantinga
Copy link
Collaborator

To prevent the error in #2250 this PR adds a barrier before and after deletion so that no processes can write at the same time as the deletion.

@pplantinga pplantinga self-assigned this Nov 26, 2023
@pplantinga pplantinga mentioned this pull request Nov 26, 2023
13 tasks
@TParcollet
Copy link
Collaborator

TParcollet commented Nov 26, 2023

@pplantinga I checked and it's not solving my issue (the PR made by Adel is). As you can see, it hangs at the end of an epoch, once validation is done and wants to go into the next
image

@pplantinga
Copy link
Collaborator Author

Looks like the issue here is an if_main_process() in the recipe which doesn't play nice with torch.barrier(). Although this technically could be said to be an issue with the recipe, not the core code, we should improve the situation so that these two functions work together better. I briefly looked into converting if_main_process() into a context manager with setting an environment variable or something to check when running torch.barrier() but unfortunately this doesn't allow skipping the execution of the body of the with statement without some very hacky workarounds. If anybody has a bright idea about a solution to this, I'm all figurative ears (even if I'm literally deaf).

@pplantinga
Copy link
Collaborator Author

Okay, this is ready for review again @TParcollet @Adel-Moumen , basically I propose replacing instances of if_main_process with run_on_main which will signal that it is executing in single-threaded mode and skip the barriers. You can see an example in the TIMIT recipe that is updated.

# Sync before deleting to avoid another process saving at the same time.
# This has led to errors as documented here:
# https://github.com/speechbrain/speechbrain/issues/2250
ddp_barrier()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pplantinga what will happen if torch_recovery is called outside of a run on main? These barrier would be hit and MAIN_PROC_ENV wouldn't be 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside of run_on_main the program should be operating with multiple processes, so all should hit the barrier together. The only scenario where it would still freeze is if you are inside if if_main_process(): block, which we should discourage use of.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Collaborator

@Gastron Gastron Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a solution could be developed to catch those bugs where you branch based on the main_process, but inside that branch you call some code which should hit a DDP barrier. So this will not automatically solve problems, but should help catch bugs. This would replace the if_main_process() (almost drop-in, just adds indentation).

BARRIER_PROTECTOR = "SPEECHBRAIN_DDP_BARRIER_PROTECTOR"
os.environ[BARRIER_PROTECTOR] = 0

class DDPProtector(object):
    """Protects from running into DDP Barrier in a code block that has already branched"""
    def __enter__(self):
        # Increment so that we can support nested protectors
        os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])+1)

    def on_main_process(self):
        # ...There would be a check here...
        return  ## True if on main process, else False

    def __exit__(self, exception_type, exception_value, traceback):
        <something to possibly handle exceptions>
        os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])-1)
        return

def ddp_barrier():
    """In DDP mode, this function will synchronize all processes.
    torch.distributed.barrier() will block processes until the whole
    group enters this function.
    """
    if int(os.environ[BARRIER_PROTECTOR]) > 0:
        raise RuntimeError("DDP Barrier inside a main process only branch, this will create a deadlock or a subtle bug.")
    # Check if we're in a single-threaded section, skip barrier
    elif os.environ.get(MAIN_PROC_ENV, "0") == "1":
        return
    elif torch.distributed.is_initialized():
        torch.distributed.barrier()

This would be simply used to mark that you intend not to run into DDP Barriers in this part of the code:

with DDPProtector() as protector:
    if protector.on_main_process():
        ...

So when if_main_process() is replaced by this, we should catch some bugs more easily.

@TParcollet
Copy link
Collaborator

It fixes my bug now. We still need Ryan to verify if it solves issue 2250 as well.

@mravanelli mravanelli marked this pull request as ready for review November 29, 2023 15:58
Copy link
Collaborator

@Gastron Gastron left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at least the post_func logic should be checked. I also left a suggestion, which could help catch some bugs.

else:
# But main comes here
main_process_only(post_func)(*post_args, **post_kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logic is now inverted, post_func is meant to be run on everything else except main (e.g. load a tokenizer that was just created). With run_post_on_main, post_func is also run on main.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, you are totally right about this... I'll go ahead and fix this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed in latest commit

"""
import datetime
import os
import torch
from functools import wraps

MAIN_PROC_ENV = "MAIN_PROC_ONLY"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should have a SPEECHBRAIN_ prefix just in case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to module-level variable, which makes this unnecessary

# Sync before deleting to avoid another process saving at the same time.
# This has led to errors as documented here:
# https://github.com/speechbrain/speechbrain/issues/2250
ddp_barrier()
Copy link
Collaborator

@Gastron Gastron Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a solution could be developed to catch those bugs where you branch based on the main_process, but inside that branch you call some code which should hit a DDP barrier. So this will not automatically solve problems, but should help catch bugs. This would replace the if_main_process() (almost drop-in, just adds indentation).

BARRIER_PROTECTOR = "SPEECHBRAIN_DDP_BARRIER_PROTECTOR"
os.environ[BARRIER_PROTECTOR] = 0

class DDPProtector(object):
    """Protects from running into DDP Barrier in a code block that has already branched"""
    def __enter__(self):
        # Increment so that we can support nested protectors
        os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])+1)

    def on_main_process(self):
        # ...There would be a check here...
        return  ## True if on main process, else False

    def __exit__(self, exception_type, exception_value, traceback):
        <something to possibly handle exceptions>
        os.environ[BARRIER_PROTECTOR] = str(int(os.environ[BARRIER_PROTECTOR])-1)
        return

def ddp_barrier():
    """In DDP mode, this function will synchronize all processes.
    torch.distributed.barrier() will block processes until the whole
    group enters this function.
    """
    if int(os.environ[BARRIER_PROTECTOR]) > 0:
        raise RuntimeError("DDP Barrier inside a main process only branch, this will create a deadlock or a subtle bug.")
    # Check if we're in a single-threaded section, skip barrier
    elif os.environ.get(MAIN_PROC_ENV, "0") == "1":
        return
    elif torch.distributed.is_initialized():
        torch.distributed.barrier()

This would be simply used to mark that you intend not to run into DDP Barriers in this part of the code:

with DDPProtector() as protector:
    if protector.on_main_process():
        ...

So when if_main_process() is replaced by this, we should catch some bugs more easily.

@TParcollet
Copy link
Collaborator

@pplantinga what do you think of @Gastron comments?

@TParcollet
Copy link
Collaborator

My take is that the point of this PR is to fix bugs. We should merge it with the fixes so that unstable can be merged in develop and then we PR @Gastron idea into dev?

@@ -103,8 +94,13 @@ def main_process_only(function):
@wraps(function)
def main_proc_wrapped_func(*args, **kwargs):
"""This decorated function runs only if this is the main process."""
os.environ[MAIN_PROC_ENV] = "1"
Copy link
Collaborator

@Gastron Gastron Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally I wonder if the environment variables (like MAIN_PROC_ENV here) are the right way to do this sort of process-wide communication. I think something like a variable in a module (Python modules are singletons) should be enough here. So instead of this, I think we could just have:

MAIN_PROC_FLAG=0

def main_proc_wrapped_func(*args, **kwargs):
    global __MAIN_PROC_FLAG
    MAIN_PROC_FLAG = 1
    ...
    MAIN_PROC_FLAG = 0
    
    
def ddp_barrier():
    # Note: as long as this doesn't locally redefine MAIN_PROC_FLAG, 
    # it doesn't need to be marked as global, as it is not mutated.
    if MAIN_PROC_FLAG == 1:
        ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, a module-level flag is better here.

@pplantinga
Copy link
Collaborator Author

You have some nice suggestions here @Gastron. You're right that the current setup would fail if there's a run_on_main inside another run_on_main which perhaps an increment/decrement could handle.

I was trying to avoid having double-indentation for this scenario, and the run_on_main already exists as a construct so I was hoping we could just extend its use a little. I'm not too crazy about the extra context manager, but could be convinced if we aren't able to accomplish the same thing using run_on_main.

@Gastron
Copy link
Collaborator

Gastron commented Nov 30, 2023

You have some nice suggestions here @Gastron. You're right that the current setup would fail if there's a run_on_main inside another run_on_main which perhaps an increment/decrement could handle.

I was trying to avoid having double-indentation for this scenario, and the run_on_main already exists as a construct so I was hoping we could just extend its use a little. I'm not too crazy about the extra context manager, but could be convinced if we aren't able to accomplish the same thing using run_on_main.

Perhaps we could encourage all user code to use run_on_main, then any branching flags or counters or such things can be implemented behind the scenes in library code.

In the short term I think forcing run_on_main everywhere (getting rid of if_main_process) would mean a bigger refactor, since the local code (in the if if_main_process: block) would need to be moved into a new function or other callable.

@TParcollet
Copy link
Collaborator

@Gastron and @pplantinga just to clarify one thing, if we keep using if_main_process (and we should, I agree with Aku), it will still work with this PR right? I don't see any reason why we must replace all the run_on_main? So basically, Mirco wants to release unstable in dev this week, so we need to settle this PR. We either revert with Adel's PR or we move forward with this one. @Gastron what is your opinion on merging this code and opening a new PR to develop your idea? I like the context manager if we can have something simple -- as if_main_process. @pplantinga if we merge, could you confirm that if_main_process will still work or it will break them and we must change all of them?

@pplantinga
Copy link
Collaborator Author

pplantinga commented Nov 30, 2023

In this PR, if_main_process is incompatible with ddp_barrier, meaning you can't call delete_checkpoint inside if_main_process without freezing. As far as I'm aware, none of the recipes are doing this so none of them would currently crash. But it would be a surprising behavior if someone were to try this, so eventually we should either convert to run_on_main or to Aku's suggestion. I'm fine to merge now to fix bugs but we need to address this before any releases.

@TParcollet
Copy link
Collaborator

@pplantinga could we briefly assess what is impacted by this deadly combination? If it's minor, we could easily fix on dev.

@TParcollet
Copy link
Collaborator

I don't understand why this is a problem introduced by this PR though. if_main_process always was incompatible with DDP barrier no? So we should not see it anywhere?

@pplantinga
Copy link
Collaborator Author

pplantinga commented Nov 30, 2023

Yes, if_main_process was always incompatible with ddp_barrier. This PR added ddp_barrier inside delete_checkpoint which was inside if_main_process on some recipes. I deleted the if_main_process from those recipes so we should be good to go in terms of no more bugs.

Copy link
Collaborator

@TParcollet TParcollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge in dev and then move forward with a proper discussion between the people interested to go for a better handling of DDP barrier

@TParcollet
Copy link
Collaborator

@Gastron do you agree? If so, we merge and plan a meeting next week (if you guys are available) to solve this design issue properly.

@Gastron
Copy link
Collaborator

Gastron commented Nov 30, 2023

I think improving this incrementally makes sense, I guess we can indeed merge this and make further improvements soonish

@mravanelli
Copy link
Collaborator

Thank you all for working on this. So, based on what is discussed here, I'm going to merge it. After that, you can discuss better solutions and implement them in another PR.

@mravanelli mravanelli self-requested a review November 30, 2023 20:14
@mravanelli mravanelli merged commit 3fcbbba into speechbrain:develop Nov 30, 2023
5 checks passed
@pplantinga pplantinga deleted the feature/sync-deletion branch December 5, 2023 14:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants