Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the dataset class #88

Merged
merged 5 commits into from
May 9, 2023
Merged

Refactor the dataset class #88

merged 5 commits into from
May 9, 2023

Conversation

GeorgesLorre
Copy link
Collaborator

@GeorgesLorre GeorgesLorre commented May 8, 2023

Closes #89

  • renamed dataset.py to data_io.py
  • split DataSet class into DaskDataLoader and DaskDataWriter
  • Add full test coverage

@GeorgesLorre GeorgesLorre marked this pull request as ready for review May 8, 2023 14:53
Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :) Thanks for implementing this.
Left a few comments

"""Test writing out the index."""
with tmp_path_factory.mktemp("temp") as fn:
# override the base path of the manifest with the temp dir
manifest.update_metadata("base_path", str(fn))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :) Thanks for fixing this

# write out index to temp dir
dw.write_index(df=dataframe)
# read written data and assert
odf = dd.read_parquet(fn / "index")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

odf?

@@ -119,6 +116,7 @@ def _create_write_dataframe_task(
write_task = dd.to_parquet(
df, remote_path, schema=schema, overwrite=False, compute=False
)
logging.info(f"Creating write task for: {remote_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I think we add to add more logging to make debugging easier. Maybe it's better to have this logging at the write_index and write_subset level? This way you could log both the name of the subset and the path

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Will create a ticket to take a look @ logging.

@@ -182,9 +184,9 @@ def _load_or_create_manifest(self) -> Manifest:
def transform(self, dataframe: dd.DataFrame, **kwargs) -> dd.DataFrame:
"""Abstract method for applying data transformations to the input dataframe."""

def _process_dataset(self, dataset: FondantDataset) -> dd.DataFrame:
def _process_dataset(self, dataset: DaskDataLoader) -> dd.DataFrame: # type: ignore[override]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like dataset is defined as an optional parameters in the parent class but we're enforcing is as a mandatory in the child class. Maybe it's better to change

 def _process_dataset(self, dataset: FondantDataset) 

to

def _process_dataset(self, **kwargs) 

In the parent class.

Copy link
Member

@RobbeSneyders RobbeSneyders May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get around this issue if we implement the run() method as follows:

class Component:

    def _write_dataset(dataframe, *, manifest):
        output_dataset = DaskDataWriter(output_manifest)

        # write index and output subsets to remote storage
        output_dataset.write_index(dataframe)
        output_dataset.write_subsets(dataframe, self.spec)

    def run(self):
        """Runs the component."""
        input_manifest = self._load_or_create_manifest()
  
        output_df = self._process_data(input_manifest)
  
        output_manifest = input_manifest.evolve(component_spec=self.spec)
  
        self._write_dataset(output_df, manifest=output_manifest)
  
        self.upload_manifest(output_manifest, save_path=self.args.output_manifest_path)

class LoadComponent:

    def _process_data(manifest):
        return self.load(...)

class TransformComponent:

    def _process_data(manifest):
         dataset = DaskDataLoader(manifest)
         df = dataset.load_dataframe(self.spec)
         df = self.transform(df, **self.user_arguments)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed this seems more appropriate :) thanks for the suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @RobbeSneyders 's suggestion, would just use appropriate names like dataloader = DaskDataLoader(manifest) instead of dataset = DaskDataLoader(manifest)

Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Georges!

Would be great if we can improve the small interface conflict @PhilippeMoussalli highlighted., but already is a big improvement.

@@ -182,9 +184,9 @@ def _load_or_create_manifest(self) -> Manifest:
def transform(self, dataframe: dd.DataFrame, **kwargs) -> dd.DataFrame:
"""Abstract method for applying data transformations to the input dataframe."""

def _process_dataset(self, dataset: FondantDataset) -> dd.DataFrame:
def _process_dataset(self, dataset: DaskDataLoader) -> dd.DataFrame: # type: ignore[override]
Copy link
Member

@RobbeSneyders RobbeSneyders May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get around this issue if we implement the run() method as follows:

class Component:

    def _write_dataset(dataframe, *, manifest):
        output_dataset = DaskDataWriter(output_manifest)

        # write index and output subsets to remote storage
        output_dataset.write_index(dataframe)
        output_dataset.write_subsets(dataframe, self.spec)

    def run(self):
        """Runs the component."""
        input_manifest = self._load_or_create_manifest()
  
        output_df = self._process_data(input_manifest)
  
        output_manifest = input_manifest.evolve(component_spec=self.spec)
  
        self._write_dataset(output_df, manifest=output_manifest)
  
        self.upload_manifest(output_manifest, save_path=self.args.output_manifest_path)

class LoadComponent:

    def _process_data(manifest):
        return self.load(...)

class TransformComponent:

    def _process_data(manifest):
         dataset = DaskDataLoader(manifest)
         df = dataset.load_dataframe(self.spec)
         df = self.transform(df, **self.user_arguments)

returns another dataframe.
"""

def run(self):
"""Runs the component."""
input_manifest = self._load_or_create_manifest()
input_dataset = FondantDataset(input_manifest)
input_dataset = DaskDataLoader(input_manifest)
Copy link
Contributor

@NielsRogge NielsRogge May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input_dataset = DaskDataLoader(input_manifest)
data_loader = DaskDataLoader(input_manifest)


df = self._process_dataset(input_dataset)

output_manifest = input_manifest.evolve(component_spec=self.spec)
output_dataset = FondantDataset(output_manifest)
output_dataset = DaskDataWriter(output_manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_dataset = DaskDataWriter(output_manifest)
data_writer = DaskDataWriter(output_manifest)

with tmp_path_factory.mktemp("temp") as fn:
# override the base path of the manifest with the temp dir
manifest.update_metadata("base_path", str(fn))
dw = DaskDataWriter(manifest=manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dw = DaskDataWriter(manifest=manifest)
data_writer = DaskDataWriter(manifest=manifest)

Copy link
Contributor

@NielsRogge NielsRogge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for improving!

NielsRogge

This comment was marked as duplicate.

NielsRogge

This comment was marked as duplicate.

NielsRogge

This comment was marked as duplicate.

NielsRogge

This comment was marked as duplicate.

NielsRogge

This comment was marked as duplicate.

NielsRogge

This comment was marked as duplicate.

Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Georges!

@GeorgesLorre GeorgesLorre merged commit b42e13a into main May 9, 2023
@RobbeSneyders RobbeSneyders deleted the feature/data_io branch May 15, 2023 16:28
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
Closes #89 

- renamed `dataset.py` to `data_io.py`
- split `DataSet` class into `DaskDataLoader` and `DaskDataWriter`
- Add full test coverage
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Split Dataset class into Loader and Writer and add tests
4 participants