Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the merge audio dirs job for fairseq training #454

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions fairseq/manifest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections import defaultdict
import errno
import glob
import numpy as np
import os
Expand Down Expand Up @@ -29,21 +30,39 @@ def __init__(
self.audio_dir_paths = audio_dir_paths
self.file_extension = file_extension
self.path_must_contain = path_must_contain
self.concurrent = len(audio_dir_paths)

self.out_audio_dir_path = self.output_path("audio", directory=True)

self.rqmt = {"time": 8, "cpu": 4, "mem": 4}
michelwi marked this conversation as resolved.
Show resolved Hide resolved

def tasks(self):
yield Task("run", mini_task=True)
yield Task("run", rqmt=self.rqmt, args=range(1, self.concurrent + 1))

def run(self):
for dir_path in self.audio_dir_paths:
search_path = os.path.join(dir_path, "*." + self.file_extension)
for file in glob.iglob(search_path, recursive=True):
if self.path_must_contain and self.path_must_contain not in file:
continue
base_name = os.path.basename(file)
dst = os.path.join(self.out_audio_dir_path, base_name)
os.symlink(file, dst)
def run(self, task_id):
dir_path = self.audio_dir_paths[task_id - 1]
search_path = os.path.join(dir_path, "*." + self.file_extension)
for file in glob.iglob(search_path, recursive=True):
if self.path_must_contain and self.path_must_contain not in file:
continue
base_name = os.path.basename(file)
creation_complete = False
dst = os.path.join(self.out_audio_dir_path, base_name)
i = 2
while not creation_complete:
while os.path.exists(dst):
if os.path.realpath(dst) == file:
creation_complete = True
break
dst = f"{os.path.splitext(dst)[0]}_{i}.{self.file_extension}"
i += 1
else:
try:
os.symlink(file, dst)
creation_complete = True
except OSError as err:
if err.errno != errno.EEXIST:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would there be an OSError with errno.EEXIST (i.e. FileExistsError)? You have just completed the while loop and this implies that it did not exist.

Oh, you are running several tasks in parallel and there could be race conditions. if folder1/a.wav is in self.audio_dir_paths[0] and folder2/a.wav is in self.audio_dir_paths[1]. But then the correct solution is not to ignore the error but make a thread save implementation.... which I now realize is the purpose of the double while loop..

Ok. nevermind my previous comment then. (But do we really need parallelism for symlinking a bunch of files?)

Except for the issue with the ever expanding underscores. That should probably be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the issue for the ever expanding underscores, sorry I couldn't find the relevant comments, could you please refer me to that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while os.path.exists(dst):
    dst = f"{os.path.splitext(dst)[0]}_{i}.{self.file_extension}"

this will keep appending underscores to the filename:

a.wav
a_2.wav
a_2_3.wav
...

raise err
michelwi marked this conversation as resolved.
Show resolved Hide resolved


class CreateManifestJob(Job):
Expand Down
Loading