Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[datasets] Add remote filesystem support to datasets module #1244

Merged
merged 75 commits into from
Aug 18, 2021

Conversation

ANarayan
Copy link
Collaborator

This PR adds support for reading and writing to remote filesystems (e.g. S3 buckets) in the Datasets module.

cc: @w4nderlust , @tgaddair

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Few comments about moving some of this into fs_utils.

else:
shutil.copytree(tmpdir, self.raw_dataset_path)

#os.rename(self.raw_temp_path, self.raw_dataset_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this line.

os.rename(self.raw_temp_path, self.raw_dataset_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)

with tempfile.TemporaryDirectory() as tmpdir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify to something like this?

with fs_utils.upload_output_file(self.raw_dataset_path) as tmpdir:
    ...

If that doesn't work, we should still make something like this in fs_utils as we do the same thing below.

with ZipFile(BytesIO(zipresp.read())) as zfile:
zfile.extractall(self.raw_temp_path)
os.rename(self.raw_temp_path, self.raw_dataset_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this.

shutil.copyfile(f, os.path.join(self.raw_temp_path, f.name))

os.rename(self.raw_temp_path, self.raw_dataset_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this.

os.rename(self.raw_temp_path, self.raw_dataset_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)

with tempfile.TemporaryDirectory() as tmpdir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above regarding using fs_utils.

filename))

os.rename(self.raw_temp_path, self.raw_dataset_path)
with tempfile.TemporaryDirectory() as tmpdir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@@ -26,7 +30,13 @@ class IdentityProcessMixin:
processed_dataset_path: str

def process_downloaded_dataset(self):
os.rename(self.raw_dataset_path, self.processed_dataset_path)
protocol, _ = split_protocol(self.processed_dataset_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this into fs_utils as a utility function, like fs_utils.rename.

protocol, _ = split_protocol(self.processed_dataset_path)
if protocol is not None:
fs = fsspec.filesystem(protocol)
fs.copy(self.raw_dataset_path,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use fsspec.mv or fsspec.rename here. Otherwise the behavior is different between local and remote versions.

os.makedirs(self.processed_dataset_path, exist_ok=True)
#os.makedirs(self.processed_dataset_path, exist_ok=True)
#makedirs(self.process_downloaded_dataset, exist_ok=True)
with fsspec.open(self.processed_dataset_path, mode="wb") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to write this file? Does it work without this?

@@ -65,7 +66,8 @@ def process_downloaded_dataset(self):
final_train = pd.merge(
train_dfs['train_transaction'], train_dfs['train_identity'], on='TransactionID', how='left')

os.makedirs(self.processed_dataset_path, exist_ok=True)
with fsspec.open(self.processed_dataset_path, mode="wb") as f:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create this file?

@w4nderlust
Copy link
Collaborator

The new changes look good to me, are all places where fsspec was used covered? Asking just to be sure.
Other than that, le'ts address the failures, bt the structure of the code looks fine tome. Thanks!

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Just a couple things we can refactor.

df.to_csv(os.path.join(self.processed_temp_path, self.csv_filename),
index=False)
os.rename(self.processed_temp_path, self.processed_dataset_path)

protocol, _ = split_protocol(self.processed_dataset_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this fs_utils.mv(self.processed_temp_path, self.processed_dataset_path)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also remove self.processed_temp_path at the end of the remote case, right?

download_func(self.competition_name, path=self.raw_temp_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)

with tempfile.TemporaryDirectory() as tmpdir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also use fs_utils.upload_output_directory?

def makedirs(url, exist_ok=False):
fs, path = get_fs_and_path(url)
return fs.makedirs(path, exist_ok=exist_ok)
if fs == "s3":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is needed for s3, since directories don't exist. Is this so checks like exists(dirname) will pass?

This may also be true for other object stores. Maybe something more generic we could do here is:

fs.makedirs(path, exist_ok=exist_ok)
if not exists(paths):
    # Directory was not created -> no directory concept in filesystem
    with fsspec.open(url, mode="wb") as f:
            pass

What do you think?

download_func = api.dataset_download_files
# Download all files for a competition/dataset
download_func(self.competition_name, path=self.raw_temp_path)
#os.makedirs(self.raw_temp_path, exist_ok=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this line.

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@tgaddair tgaddair merged commit 30d1fdf into ludwig-ai:master Aug 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants