Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign dataset format and interface #567

Closed
Tracked by #643 ...
RobbeSneyders opened this issue Oct 29, 2023 · 36 comments
Closed
Tracked by #643 ...

Redesign dataset format and interface #567

RobbeSneyders opened this issue Oct 29, 2023 · 36 comments

Comments

@RobbeSneyders
Copy link
Member

RobbeSneyders commented Oct 29, 2023

We should revisit the design of our subsets & fields:

  • Some of the main feedback we get is that the subsets and fields design is too complex and hard to understand
  • It's not clear how fields should be grouped in subsets
  • They are accessed differently in Dask & Pandas (underscore vs hierarchical columns)
  • They break on subset names with underscores
  • We still don't know how to pass unconsumed fields without losing the advantages
    Pass unconsumed columns when additionalFields is True #244
  • We can't find a way to map subsets between components that sounds like a good idea
    Enable mapping subsets and fields #352
@RobbeSneyders
Copy link
Member Author

Proposed design

The component spec

The consumes and produces section will be flattened, to only contain fields:

old

new

consumes:
  images:
    fields:
      data:
        type: binary

produces:
  captions:
    fields:
      text:
        type: utf8
consumes:
  image_data:
    type: binary

produces:
  captions:
    type: utf8

Data storage

The produced output by each component is written as a new parquet dataset.

/Component 1

/Component 2

/Component 3

/Component 4

- index
- field 1
- field 2
- field 3
-  
-  
- index
-  
-  
-  
- field 4
- field 5
- index
-  
-  
- field 3
-  
-  
- index
-  
-  
-  
-  
-  

The complete dataset after the third component can be found by looking from the right (format parquet-dataset:field):

- /component4:index
- /component1:field1
- /component1:field2
- /component3:field3
- /component2:field4
- /component2:field5

The manifest

The manifest provides this view at each point in the pipeline. After step 4, it will represent the view above:

{
  "metadata": {
    "base_path": "gs://bucket",
    "pipeline_name": "test_pipeline",
    "run_id": "test_pipeline_12345",
    "component_id": "component4"
  },
  "index": {
    "location": "/component4"
  },
  "fields": {
    "field1": {
      "location": "/component1",
      "type": "..."
    },
    "field2": {
      "location": "/component1",
      "type": "..."
    }
    "field3": {
      "location": "/component3",
      "type": "..."
    }
    "field4": {
      "location": "/component2",
      "type": "..."
    }
    "field5": {
      "location": "/component2",
      "type": "..."
    }
  }
}

Additional fields

We still need a mechanism to remove additional fields from the output dataset of a component if it changes the index (eg. LaionRetrieval components which go from a prompt id to a Laion id).

For example, if component 3 above would define additionalFields: false, its component spec would look like this:

produces:
  field3:
    type: ...
additionalFields: false

The data storage would still look exactly the same as above, but now the manifest only looks back until the output of component3:

{
  "metadata": {
    ...
  },
  "index": {
    "location": "/component4"
  },
  "fields": {
    "field3": {
      "location": "/component3",
      "type": "..."
    }
  }
}

User interface

We still present all the data in a manifest as a single dataframe to the user. Both the Dask and Pandas dataframes will have the same flat columns.

| index | field1 | field2 | field3 | field4 ]
| ----- | ------ | ------ | ------ | ------ |
| id    |  data  |  data  |  data  |  data  |

We can create this dataframe by creating a reverse mapping of the locations and fields in the manifest:

{
    "/component4": ["index"],
    "/component3": ["field3"],
    "/component2": ["field4", "field5"],
    "/component1": ["field1", "field2"],
}

And reading the fields from each location and merging them together.

dataframe = dd.empty()
for location, fields in field_mapping.items():
    partial_df = dd.read_parquet(location, columns=fields)
    dataframe = dd.merge(dataframe, partial_df, how="left")

@RobbeSneyders
Copy link
Member Author

As an example, let's see what the component specs, manifests, and data storage would look like for the following pipeline based on our ControlNet example:

  1. Prompt generation
  2. Laion retrieval
  3. Download images
  4. Filter resolution
  5. Crop images

1. Prompt generation

fondant_component.yaml

produces:
  text:
    type: string

data storage

| /generate_prompts
| - index
| - text

manifest

{
  "index": {
    "location": "/generate_prompts"
  },
  "fields": {
    "text": {
      "location": "/generate_prompts",
      "type": "string"
    }
  }
}

2. Laion Rerieval

fondant_component.yaml

consumes:
  text:
    type: string

produces:
  image_urls:
    type: string

additionalFields: false

data storage

| /generate_prompts | /laion_retrieval |
| - index           | - index          |
| - text            |                  |
|                   | - image_urls     |

manifest

{
  "index": {
    "location": "/laion_retrieval"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    }
  }
}

3. Download images

fondant_component.yaml

consumes:
  image_urls:
    type: string

produces:
  image:
    type: bytes
  width:
    type: int32
  height:
    type: int32

data storage

| /generate_prompts | /laion_retrieval | /download_images
| - index           | - index          | - index
| - text            |                  |
|                   | - image_urls     |
|                   |                  | - image
|                   |                  | - width
|                   |                  | - height

manifest

{
  "index": {
    "location": "/download_images"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/download_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

4. Filter resolution

fondant_component.yaml

consumes:
  width:
    type: int32
  height:
    type: int32

data storage

| /generate_prompts | /laion_retrieval | /download_images | /filter_resolution
| - index           | - index          | - index          | - index
| - text            |                  |                  |
|                   | - image_urls     |                  |
|                   |                  | - image          |
|                   |                  | - width          |
|                   |                  | - height         |

manifest

{
  "index": {
    "location": "/filter_resolution"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/download_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

5. Crop images

fondant_component.yaml

consumes:
  image:
    type: bytes

produces:
  image:
    type: bytes

data storage

| /generate_prompts | /laion_retrieval | /download_images | /filter_resolution | /crop_images
| - index           | - index          | - index          | - index            |
| - text            |                  |                  |                    |
|                   | - image_urls     |                  |                    |
|                   |                  | - image          |                    | - image
|                   |                  | - width          |                    |
|                   |                  | - height         |                    |

manifest

{
  "index": {
    "location": "/filter_resolution"
  },
  "fields": {
    "image_urls": {
      "location": "/laion_retrieval",
      "type": "string"
    },
    "image": {
      "location": "/crop_images",
      "type": "bytes"
    },
    "width": {
      "location": "/download_images",
      "type": "int32"
    },
    "height": {
      "location": "download_images",
      "type": "int32"
    },
  }
}

This is quite a logical and simple flow I believe. The last step shows the only issue I see with this approach: the data is cropped and overwritten, but the width and height still contain the old values. We had the same issue with the previous approach though (although additionalSubsets could alleviate it partially), so I still think this is a step forward. This will be something for the user to validate when chaining components.

@RobbeSneyders
Copy link
Member Author

RobbeSneyders commented Nov 7, 2023

  • Some of the main feedback we get is that the subsets and fields design is too complex and hard to understand
    --> This design is simpler
  • It's not clear how fields should be grouped in subsets
    --> Completely flat hierarchy. Downside is that there is no semantic grouping anymore, although this can be achieved
    through underscores I guess
  • They are accessed differently in Dask & Pandas (underscore vs hierarchical columns)
    --> Both flat interface now
  • They break on subset names with underscores
    --> No longer the case, since we no longer have to split in subsets and fields
  • We still don't know how to pass unconsumed fields without losing the advantages
    Pass unconsumed columns when additionalFields is True #244
    --> This is now inherent to the data storage design
  • We can't find a way to map subsets between components that sounds like a good idea
    Enable mapping subsets and fields #352
    --> Not completely sure yet, but I'm leaning more towards mapping both the input and output now. It will definitely be
    simpler.

@mrchtr
Copy link
Contributor

mrchtr commented Nov 8, 2023

I think this approach tackles a lot of the recent feedback as you have mentioned. A few questions from my side to clarify my understanding.

  • Due to the changes regarding the data storage we will start writing parquet files containing all columns that are produced by the component or do we keep the column wise storage?

  • You have mentioned an issue in the ControlNet pipeline:

The last step shows the only issue I see with this approach: the data is cropped and overwritten, but the width and height still contain the old values.

Isn't it more an issue related to the component design (not updating the width and height after image cropping) instead of an issue of your approach?

  • We can't find a way to map subsets between components that sounds like a good idea.

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema. Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on.

Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

@PhilippeMoussalli
Copy link
Contributor

Thanks Robbe! Looks really promising so far,

Few questions:

  • Do we expect an increase in overhead if the user wants to consume multiple columns because of the multiple merge operations?
  • Jan already mentioned this: regarding dropping fields when additional fields is set to False. I understand the reasoning behind it but I still feel like it's something that might not be very intuitive for end users. I agree with the mechanism but wondering if we could somehow automatically detect it without having to explicitly specifying it in the component spec?

@RobbeSneyders
Copy link
Member Author

  • Due to the changes regarding the data storage we will start writing parquet files containing all columns that are produced by the component or do we keep the column wise storage?

I'm not completely sure what you mean. We indeed write all columns that are produced by a component to a single parquet dataset. Parquet is a columnar storage format, so the data is still stored column-wise.

  • You have mentioned an issue in the ControlNet pipeline:

The last step shows the only issue I see with this approach: the data is cropped and overwritten, but the width and height still contain the old values.

Isn't it more an issue related to the component design (not updating the width and height after image cropping) instead of an issue of your approach?

Partially, yes. However, it might be difficult for a component to know up front all possible fields that might be related. For instance, there might have been a resolution field already in the data instead of height and width fields.

I think the user will have to handle this. We might want to provide a mechanism for the user to invalidate certain columns on the pipeline level.

  • We can't find a way to map subsets between components that sounds like a good idea.

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema. Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on.

Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

So you mean that the user should explicitly map the fields to a component every time? I think that could make sense if we can keep the interface simple enough. I'm open to proposals on this front, I think it's the main open step before we can decide to move forward with this.


  • Do we expect an increase in overhead if the user wants to consume multiple columns because of the multiple merge operations?

Not sure. We already merge the index and different subsets now, and I haven't really noticed an impact from this. We will be merging more in this new proposal, but both sides should always contain indexes, and maybe even be sorted, so the impact might be limited.

It would probably be good to test this before implementation if we choose to go this way.

  • Jan already mentioned this: regarding dropping fields when additional fields is set to False. I understand the reasoning behind it but I still feel like it's something that might not be very intuitive for end users. I agree with the mechanism but wondering if we could somehow automatically detect it without having to explicitly specifying it in the component spec?

I don't think we can detect his automatically, since this is about the semantic meaning of the index. I believe a component that changes the semantic meaning of the index needs to be able to mark this in its component spec, since it will invalidate all older data with 100% certainty. I'm open to other ways of marking this though.

@mrchtr
Copy link
Contributor

mrchtr commented Nov 8, 2023

I'm not completely sure what you mean. We indeed write all columns that are produced by a component to a single parquet dataset. Parquet is a columnar storage format, so the data is still stored column-wise.

Actually, the resulting file structure looks like this:

├── component_1/
│   ├── index/
│   │   ├── part.0.parquet
│   │   ├── part.1.parquet
│   │   └── ...
│   └── text/
│       ├── part.0.parquet
│       ├── part.1.parquet
│       └── ...
└── component_2/
    ├── index/
    │   ├── part.0.parquet
    │   ├── part.1.parquet
    │   └── ...
    └── image_urls/
        ├── part.0.parquet
        ├── part.1.parquet
        └── ...

I thought we are achieving this by using our custom write to parquet approach.

... 
write_tasks = [
    dd.to_parquet(index_df, "/component_1/index", compute=False),
    dd.to_parquet(url_df, "/component_2/text", compute=False)
]

dd.compute(*write_tasks, ...)

I was mainly wondering if your proposed approach has an effect on this. Essentially, should we continue with our approach or combine this into a single write task, which would result in something like this:

├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...

@RobbeSneyders
Copy link
Member Author

├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...

This is indeed what I propose, and what I meant with "a single parquet dataset". I don't see any reason to split them, since we can select which columns to read from a parquet dataset.

@PhilippeMoussalli
Copy link
Contributor

├── component_1/
│   ├── part.0.parquet
│   ├── part.1.parquet
│   └── ...
└── component_2/
    ├── part.0.parquet
    ├── part.1.parquet
    └── ...

This is indeed what I propose, and what I meant with "a single parquet dataset". I don't see any reason to split them, since we can select which columns to read from a parquet dataset.

Oh I initially though that we were planning on storing them separately (individual folder per column). I think this is better especially for the merging since there might be a higher probability that some of the columns to consume might be in the same "single parquet dataset"

@GeorgesLorre
Copy link
Collaborator

Interesting, I like it so far.

Do we always need to write the Index in every component? Or only the ones that modify the number of rows (filter/expand)

@RobbeSneyders
Copy link
Member Author

We need to write it in every component, but we are no longer writing it separately every time. If a component only updates the index, only the index will be written. Otherwise, it will just be written as part of the data, which is necessary anyway.

This means that we write the index less often, while we still keep the advantage of only updating the index for filtering components.

So #70 would be fixed automatically by adopting this approach.

@PhilippeMoussalli
Copy link
Contributor

  • We can't find a way to map subsets between components that sounds like a good idea.

I still think that this is one of the biggest blocker regarding to the reusability of components. From an users perspective we are offering different building blocks which can operate on custom dataframes. I can assume a lot of users creating custom LoadComponents and define a custom data schema.

I am not sure if there are is a better alternative then just adding a mapping directly to the componentOp

So let's say the user starts with this pipeline in place

load_data (generic)
     produces:
	- image_array 
	- text_data

embed_text (custom)
    consumes: 
      - text_data
    produces:
      - image_embedding

and then later on the user decides to add a resuable component to caption text because they loaded other images that don't have captions:

load_data (generic)
produces
  - image_array 

caption_data (reusable)
    consumes: 
      - images_data
    produces:
      - image_captions

embed_text (custom)
    consumes: 
      - text_data
    produces:
      - image_embedding

What we can do is add a mapper

mapper = FieldMapper()
mapper.add_input_mapping(from_input_dataset_field=image_array, to_consumed_component_field = images_data)
mapper.add_output_mapping(from_produced_component_field=image_captions, to_output_dataset_field = text_data)

caption_op = ComponentOp(dir=..., mapper=mapper)

or alternatively

caption_op = ComponentOp(dir=..., mapper=mapper)
    .map_input(from_input_dataset_field=image_array, to_consumed_component_field = images_data)
    .map_output(from_produced_component_field=image_captions, to_output_dataset_field = text_data)

It might not seem like the smoothest experience but I think what we can do is make sure to provide clear instructions in the docs and error catching during static pipeline validation that could then point the user to the correct action to take (depending on input or output mismatch).

We can also always plot the pipeline manifest evolution and highlight in red the source of the mismatch. Open to hearing other suggestions.

image

Now we should offer capabilities to choose a component that applies specific transformation, and define a column to operate on.
Naively, I would like to have an interface to choose columns to operate on and optional column names which will be written to the dataframe. We could use the custom mappings to access the data within the component and write the custom mappings to the manifest too. This should be possible without changing the ComponentSpec. Am I overlooking something?

Do you mean to say that it should me mandatory for every component? This might force us to define it everywhere at every op even when it's not needed (component spec fields allign) and might have some overlap with the information in the component spec.

Or it might mean that we would get rid of the consumes produces field altogether from the component spec but I am not sure if that's a direction we want to take since it will be difficult then to understand what that component operates on unless we can somehow offer that functionality as part of a function with some defaults in it's consume, produce arguments (again not sure if that's possible)

@RobbeSneyders
Copy link
Member Author

RobbeSneyders commented Nov 9, 2023

I can think of some other approaches. These are not fully fleshed out, but just examples to indicate some other directions we could take. They might not be better :)

Chaining ComponentOps consumes directly

from fondant.pipeline import ComponentOp

load_op = ComponentOp(
    component_dir="load_data",
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    consumes={
        "images_data": load_op.image_array
    }
)

embed_op = ComponentOp(
    component_dir="embed_text",
    consumes={
        "text_data": caption_op.image_captions
    }
)

We should be able to calculate the output manifest of each component as we go, so each ComponentOp provides attributes for all the fields that will be available, and the IDE can recognize this.

You should only ever need to access the previous ComponentOp, but I can see how this can be confusing:

crop_op = ComponentOp.from_registry(
    name="crop_images",
    consumes={
        "images_data": embed_op.image_array,  # Correct
        "images_data": load_op.image_array,  # Incorrect
    }
)

Chaining consumes directly in pipeline

This can probably be solved by moving the explicit consumption chaining to the pipeline

from fondant.pipeline import ComponentOp

load_op = ComponentOp(
    component_dir="load_data",
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
)

embed_op = ComponentOp(
    component_dir="embed_text",
)

crop_op = ComponentOp.from_registry(
    name="crop_images",
)

pipeline = pipeline.add_op(load_op)
pipeline = pipeline.add_op(
    caption_op, 
    consumes={
        "images_data": pipeline.image_array
    }
)
pipeline = pipeline.add_op(
    embed_op, 
    consumes={
        "text_data": pipeline.image_captions
    }
)
pipeline = pipeline.add_op(
    crop_op, 
    consumes={
        "images_data": pipeline.image_array,
    }
)

Creating operations and chaining consumes directly in pipeline

Compressing everything into the pipeline add_op method.

pipeline = pipeline.add_op(
    component_dir="load_data"
)
pipeline = pipeline.add_op(
    name="caption_images", 
    consumes={
        "images_data": pipeline.image_array
    }
)
pipeline = pipeline.add_op(
    component_dir="embed_text", 
    consumes={
        "text_data": pipeline.image_captions
    }
)
pipeline = pipeline.add_op(
    name="crop_images", 
    consumes={
        "images_data": pipeline.image_array,
    }
)

These all only map inputs, but if mapping the inputs is mandatory, I don't think there's a lot of reason to map the outputs. Apart maybe for the ability to overwrite data produced in previous steps.

Is this closer to what you meant @mrchtr?

@PhilippeMoussalli
Copy link
Contributor

We should be able to calculate the output manifest of each component as we go, so each ComponentOp provides attributes for all the fields that will be available, and the IDE can recognize this.

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

These all only map inputs, but if mapping the inputs is mandatory, I don't think there's a lot of reason to map the outputs. Apart maybe for the ability to overwrite data produced in previous steps.

I think this it might be needed in order to not break a whole pipeline (having to remap everywhere) if a component is added to a pipeline at a later stage as discussed here. I do agree though that it might be confusing.

The examples that you mentioned above have a nice interface and could be nice if we manage to make the dynamic attributes work. Do you see the consumes section as something that is mandatory then that has to be defined between components (even if they have matching fields by default)?

@RobbeSneyders
Copy link
Member Author

RobbeSneyders commented Nov 9, 2023

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

The code below works, Pycharm recognizes the images_data attribute on component_op.

class ComponentOp:
    def __init__(self, produces: t.List[str]):
        for field in produces:
            setattr(self, field, field)        
component_op = ComponentOp(produces=["images_data"])

Do you see the consumes section as something that is mandatory then that has to be defined between components (even if they have matching fields by default)?

Not sure, both are technically feasible. I haven't really thought this through yet, I just wanted to showcase that there are other ways to tackle this. I would need some time to think about the different options and flesh them out completely.


I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

@PhilippeMoussalli
Copy link
Contributor

How would this work exactly? The estimation of the attributes can only happen at compile time. I am not familiar with a way to provide dynamic typed attributes.

The code below works, Pycharm recognizes the images_data attribute on component_op.

class ComponentOp:
    def __init__(self, produces: t.List[str]):
        for field in produces:
            setattr(self, field, field)
            
component_op = ComponentOp(produces=["images_data"])

For me it doesn't seem to detect it, is it part of a plugin?
image

I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

I agree

@mrchtr
Copy link
Contributor

mrchtr commented Nov 9, 2023

Is this closer to what you meant @mrchtr?

It is super close to my thoughts. I like the last two approaches. But I would still try to include the produce step as well.
Assuming we have a reusable text normalisation component.

...
consumes:
  text:
    type: string
produces:
   text_normalised:
     type: string

and we would use the last approach it would look like this:

pipeline = pipeline.add_op(
    name="crop_images", 
    consumes={
        "text": pipeline.custom_text_column
    },
    produces={
        "text_normalised": pipeline.custom_text_column_normalized
    }
)

We would utilise these components as operators applied to the pipeline dataframe. This approach gives us a kind of global pipeline schema to work with. I believe this would decouple the components and their transformations from a particular pipeline.

I don't think this is blocking though. It's clear that there's ways to solve this, so we can already start breaking down the work needed to remove the subsets.

I think we can indeed move this discussion to a different issue.

@RobbeSneyders
Copy link
Member Author

For me it doesn't seem to detect it, is it part of a plugin?

Woops, I tested it in a Python console, which is of course at runtime :) So probably won't work statically indeed, but it would work in a notebook.

    produces={
        "text_normalised": pipeline.custom_text_column_normalized
    }

pipeline.custom_text_column_normalized will not exist here, right? I'm not sure we can use the same approach for produces.

@mrchtr
Copy link
Contributor

mrchtr commented Nov 9, 2023

pipeline.custom_text_column_normalized will not exist here, right? I'm not sure we can use the same approach for produces.

My idea was that the custom produces would here add the column custom_text_column_normalized to the dataset. Basically map the components produce to a custom produce. The custom_text_column_normalized could be used by the next component as well.

If we collect all the consumes and produces fields (either the default one or the custom ones on the right side) we would know the final schema of the dataset after the pipeline run successfully. The ComponentOp or Pipeline would define the dataframe schema which we could use for the validations.

When we add a consumes either to the CompnentOps or the Pipeline interface, it would be a bit confusing if we wouldn't add a produces option I guess.

@RobbeSneyders
Copy link
Member Author

Yes, you can do that, but you cannot access it as an attribute on the pipeline. So then you should be working with string mappings again.

    produces={
        "text_normalised": "custom_text_column_normalized"
    }

And then we probably don't want to use attributes for consumes either. So the choice would be:

  • Only support consumes remapping with a nicer interface
  • Support remapping for both consumes and produces with a less nice interface

@RobbeSneyders
Copy link
Member Author

If we want to go with the last proposed interface above, it might make more sense to work with datasets as the interface (it matches our design principle 😎):

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.apply(
    component_dir="load_data"
)
dataset = dataset.apply(
    name="caption_images", 
    consumes={
        "images_data": dataset.image_array
    }
)
dataset = dataset.apply(
    component_dir="embed_text", 
    consumes={
        "text_data": dataset.image_captions
    }
)
dataset = dataset.apply(
    name="crop_images", 
    consumes={
        "images_data": dataset.image_array,
    }
)

dataset.image_array makes more sense than pipeline.image_array. This would also work great if we want to offer an eager mode in the future, since you then inspect every dataset between a component.

This is the same interface that Apache Beam uses, so we can get a lot of inspiration from there.

@mrchtr
Copy link
Contributor

mrchtr commented Nov 14, 2023

The dataset interface could also solve this problem:

Yes, you can do that, but you cannot access it as an attribute on the pipeline. So then you should be working with string mappings again.

If components operates on datasets we could add a schema to the dataset. Let the user define at the beginning the schema of the dataset or even let the first load component initialise the schema.

We could apply something like this:

produces={
        "text_normalised": dataset.schema.custom_text_column_normalized
    }

If we would call it writes instead of produces it would be even more clear that we only can operate on dataset column which were already initialised.

A Pipeline would initialise a Dataset. Components would apply different operations on a Dataset.

@RobbeSneyders
Copy link
Member Author

The last remaining part of the interface which I'd like to simplify is that of generic components. I don't like that users have to overwrite the component spec, especially since they need to repeat the arguments etc.

With the interface above (with pipelines or datasets, doesn't matter), we get a mechanism that simplifies the interface for generic write components for free:

dataset = dataset.apply(
    name="write_data", 
    consumes={
        "image": dataset.image_array,
        "embedding": dataset.embeddings,
        "caption": dataset.image_captions,
    }
)

We know the schema of every field in the dataset, so this contains all the information we get from an overwritten component specification.

In the component specification, we should indicate that the component can handle a generic input dataset.

consumes:
    "*"

If this is present, the consumes mapping accepts unknown field names, otherwise it doesn't.


Two reservations on this:

  • I hope we don't overload too much functionality on a single interface here, which might make it complex to understand again
  • We don't have as nice a solution yet for generic read components

@RobbeSneyders
Copy link
Member Author

If components operates on datasets we could add a schema to the dataset. Let the user define at the beginning the schema of the dataset or even let the first load component initialise the schema.

We could apply something like this:

produces={
"text_normalised": dataset.schema.custom_text_column_normalized
}

The dataset is created by the previous component (or the pipeline at the start), so I still think we run into this issue:

Yes, you can do that, but you cannot access it as an attribute on the pipeline.

In essence, it's the same issue that we need to solve as the one I mention in my previous comment:

We don't have as nice a solution yet for generic read components

The cleanest solution I can think of, indeed creates the schema in the generic read component:

from fondant import type

pipeline = pipeline.add_op(
    name="load_data",
    produces={
        "text": type.string,
    }
)

But this should only be used for generic read components to overwrite the component_spec. It can't be used by any non-generic components.

@PhilippeMoussalli
Copy link
Contributor

We don't have as nice a solution yet for generic read components

The cleanest solution I can think of, indeed creates the schema in the generic read component:

from fondant import type

pipeline = pipeline.add_op(
    name="load_data",
    produces={
        "text": type.string,
    }
)

But this should only be used for generic read components to overwrite the component_spec. It can't be used by any non-generic components.

I feel like we might benefit from explicitly defining I/O operators on the componentOp/Dataset class.

Maybe we can somehow tie them explicitly to specific components with the newly introduced
tags or some other mechanism. Component with those tags don't need to implement a consumes or produces section so there is no need to copy over the component spec. This is feasible since we are currently the only ones developing
read/write component. We can later have some guides on how to develop I/O components similar to Beam.

I also sometimes find the fact that a load component "produces" and a write component "consumes" to be a bit unintuitive.

from fondant.schema import type

dataset = Dataset()
dataset = dataset.read_data(
     name="load_data",
     input_dataset_schema={  # clearly mention that only a schema is needed t.Dict[str, type]
           "text": type.string
      }
 )  

dataset = dataset.transform(    #  consumes (and maybe produces)
    name="process_text", 
    consumes={
        "text": dataset.text
    }
)    

dataset = dataset.write_dataset(  # columns_to_write is "consumes" behind the scenes but it makes it more explicit
    name="write_data", 
    columns_to_write={
        "caption": dataset.text,
    }
)

@RobbeSneyders RobbeSneyders changed the title Revisit subsets & fields design Redesign dataset format and interface Nov 14, 2023
@RobbeSneyders
Copy link
Member Author

Yes, @mrchtr and I had a call and concluded the same thing. I will try to summarize our conclusions here. Let me know if I missed anything @mrchtr.


We create a pipeline and use a read component to create the first dataset. The user needs to provide a schema for the reader, which replaces overwriting the component specification.

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    name="load_images",
    schema={
        "image": type.binary  # or pa.binary()
    }
)

For the schema, we need to decide if we want to roll our own types, or if we want to use the ones from PyArrow. If the PyArrow types suffice, I would go with those myself.

From then on, transform components can be applied to the dataset. They can provide a mapping for both consumes and produces. Matthias made the case that you might want to rename produced fields to prevent conflicts (eg. cropping the same image multiple times), which convinced me that we indeed need it. The mapping is optional, if it's not provided, we still look for a field with the original name from the specification.

dataset = dataset.apply(
    name="caption_images", 
    consumes={
        "images_data": dataset.image  # or "image"
    }
    produces={
        "captions": "text"
    }

Note that the consumes mapping maps from a string to a dataset attribute, while the produces mapping maps from a string to a string. While I do see some advantages in using dataset attributes, I also see value in aligning both and making consumes a string to string mapping as well. An additional benefit of this is that you could chain apply() calls since you don't require the intermediate dataset to be defined. A third possibility would be to accept both a dataset attribute or a string, but this might just be confusing.

Write components also accept a schema. We can make the same choice here between a string to dataset attribute mapping or a string to string mapping. Linking to a dataset attribute here might make more sense, since we need to know both the name of the columns to map and their schema. However there is no reason we can't just look this up on the dataset using the string value.

dataset.write(
    name="write_data", 
    schema={
        "image": dataset.image,  # or "image"
        "caption": dataset.text,  # or "text"
    }
)

@RobbeSneyders
Copy link
Member Author

RobbeSneyders commented Nov 14, 2023

Just as validation that we don't block ourselves, a small thought experiment on how branching would work.

Branching into multiple branches would be straightforward:

branch1 = dataset.apply(...)
branch2 = dataset.apply(...)

We can choose which interface to use for merging. Eg.:

dataset = branch1.merge(branch2)
dataset = Dataset.from(branch1, branch2)

Most important here will be to select a or support multiple merge strategies, but I can't think of anything here that would complicate this compared to our current design.

A pipeline reading from multiple sources can be created by adding multiple readers to the pipeline:

branch1 = pipeline.read(...)
branch2 = pipeline.read(...)

Again, we can get a lot of ideas from Beam here.

@RobbeSneyders
Copy link
Member Author

Coming back to this part of my proposal above:

Additional fields

We still need a mechanism to remove additional fields from the output dataset of a component if it changes the index (eg. LaionRetrieval components which go from a prompt id to a Laion id).

For example, if component 3 above would define additionalFields: false, its component spec would look like this:

produces:
  field3:
    type: ...
additionalFields: false

I would actually propose a different interface here. Instead of additionalFields: false, I propose to add something like previous_index: <name>.

For example in the laion_retrieval example from above, this could be:

consumes:
  text:
    type: string

produces:
  image_urls:
    type: string

previous_index: prompt_id

In a chunking component, this could be previous_index: document_id.

This has two advantages:

  • It would let us track lineage on an entry basis across these index changes.
  • We could use the additionalFields or additionalProperties keyword inside the consumes or produces section of generic components instead. This would allow us to define some constraints on the supported fields while still following the OpenAPI schema. This would replace the "*" I proposed above.

@PhilippeMoussalli
Copy link
Contributor

For the schema, we need to decide if we want to roll our own types, or if we want to use the ones from PyArrow. If the PyArrow types suffice, I would go with those myself.

I this this makes sense, only reason we have a type class it to be able to write it to json schema and retrieve it from it.

Note that the consumes mapping maps from a string to a dataset attribute, while the produces mapping maps from a string to a string. While I do see some advantages in using dataset attributes, I also see value in aligning both and making consumes a string to string mapping as well. An additional benefit of this is that you could chain apply() calls since you don't require the intermediate dataset to be defined. A third possibility would be to accept both a dataset attribute or a string, but this might just be confusing.

I'm in favor of aligning both to text, am I correct in assuming that the attribute only makes sense when you're executing cell by cell? otherwise it might return an unrecognized attribute in your IDE.

Write components also accept a schema. We can make the same choice here between a string to dataset attribute mapping or a string to string mapping. Linking to a dataset attribute here might make more sense, since we need to know both the name of the columns to map and their schema. However there is no reason we can't just look this up on the dataset using the string value.

Would keep it to string to string just for consistency. I think we can make this optional: if not specified it will write all the
fields under their original name. The schema here is really just to remap the name of the columns if needed and select the final columns to be written.

@PhilippeMoussalli
Copy link
Contributor

  • additionalProperties

In a chunking component, this could be previous_index: document_id.

In the document_id example, we reset the index to a specific columns called document_id so this approach most likely will expect the user to return the original index as an additional column. We can either handle this internally or make sure to document it well

@RobbeSneyders
Copy link
Member Author

I'm in favor of aligning both to text, am I correct in assuming that the attribute only makes sense when you're executing cell by cell? otherwise it might return an unrecognized attribute in your IDE.

Yes it indeed only really makes sense in an interactive mode. We could still support both if we want to enable this while running interactively.

Would keep it to string to string just for consistency. I think we can make this optional: if not specified it will write all the fields under their original name. The schema here is really just to remap the name of the columns if needed and select the final columns to be written.

Ok, but just to be clear, then the schema attribute is optional. If you do provide a schema, you need to provide all fields. Otherwise there's no way to exclude fields to write.

In the document_id example, we reset the index to a specific columns called document_id so this approach most likely will expect the user to return the original index as an additional column. We can either handle this internally or make sure to document it well

Yes indeed, the component should return it as a column. We can easily validate that it is part of the produced schema.

@mrchtr
Copy link
Contributor

mrchtr commented Nov 15, 2023

@RobbeSneyders has nailed down our discussion perfectly.
In the meanwhile I agree with @PhilippeMoussalli and believe it would be nice to have the same interface for the consumes and produces section.

I think it is fine using string value for both sections. As mentioned, we would lose the benefit of type safeties. But I think it is negligible, cause we can validate at latest during the compilation.

I wanted to note down an additional thought. I've mentioned a concern during @PhilippeMoussalli implementation of the subset field mapping. We have to make sure that it isn't confusing for the user which site of the dictionary entry represents the schema of the dataset and which one the schema of the component dataframe. Back then I've proposed to have a dataclass that allows us to use explicit names.

dataset = dataset.apply(
    name="caption_images", 
    consumes=[
        ColumnMapping(component_column="images_data",  dataset_column="image"),
        ...
    ]
    produces=[
        ColumnMapping(component_column="captions",  dataset_column="text")
    ]
)

This only has an effect on the interface but might make it clear if you are looking the first time on a pipeline/component definition. I didn't found it straight forward on the initial draft to see which column names belongs to which dataframe.

If we use dataclasses, I can even think about a different interface:

dataset = dataset.apply(
    name="caption_images", 
    schema_mapping=[
        Consumes(component_column="images_data",  dataset_column="image"),
        Consumes(component_column="...",  dataset_column="..."),
        Produces(component_column="captions",  dataset_column="text")
        Index(previous_index="...")
    ]
)

Not sure about the dataclasses myself anymore, it either complicates things or improves the understanding.

@RobbeSneyders
Copy link
Member Author

I'm not a big fan of the dataclasses, it makes the interface look a lot more complex. Agree that the direction of the mapping can be confusing, but we can validate this at compilation and provide clear error messages.

@RobbeSneyders
Copy link
Member Author

Working on the RAG use case, it's become clear that we need to support "generic" behavior not just for read and write components, but also for transform components. Think about the following components:

  • A RAG retrieval component which optionally takes embeddings to query
  • An evaluation component which optionally takes ground truth answers to return additional metrics
  • An aggregation component which can aggregate an arbitrary number of fields

I therefore propose to provide an option for the user to overwrite consumes and produces on any type of component. I would keep the consumes and produces names for all component types so they are aligned everywhere.

pipeline = Pipeline("my-pipe", base_path="/base_path")

dataset = pipeline.read(
    "read_component",
    arguments={},
    produces={                    # A read component can only overwrite produces.
        "image": pa.binary()
    }
)

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={                    # A transform component can overwrite consumes
        "image": pa.binary()
    },
    produces={                    # And produces
        "caption": pa.string()
    }
)

dataset.write(
    "write_component",
    consumes={                    # A write component can only overwrite consumes
        "text": pa.string(),
    }
)

Generic components could define a schema for the consumes and produces they support. Eg. for the read_from_hf_hub component, this could be:

produces:
    additionalProperties: true

Or for a generic captioning component:

consumes:
    additionalProperties:
        type: binary

produces:
    additionalProperties:
        type: string

This way we could validate the consumes and produces arguments. We can start without this for now though.


Then of course we are still left with the question on how to map fields between components.

We could overload the consumes and produces arguments for this:

pipeline = Pipeline("my-pipe", base_path="/base_path")

dataset = pipeline.read(
    "read_component",
    arguments={},
    produces={
        "image": pa.binary(),     # A read component can only specify types in produces since we need the schema info.
    }
)

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={
        "image": "image",         # A generic transform component should specify a string. The type is inferred from the column on the dataset and the column is renamed.
    },
    produces={
        "caption": pa.string(),   # A generic transform component can only specify types in produces since we need the schema info.
    }
)

dataset = dataset.apply(
    "text_beautifier",
    "consumes={
        "text": "caption",        # A non-generic transform component can specify strings in consumes  to rename columns.
    },
    produces={
        "text": "caption",        # A non-generic transform component can specify a string in produces to rename columns
    }
)

dataset.write(
    "write_component",
    consumes={
        "text": "text",           # A write component should specify a string. The type is inferred from the column on the dataset and the column is renamed.
    }
)

The rules can be summarized succinctly as follows, so I think (hope) this won't be too complex:

  • consumes mappings are always string to string mappings. For generic components, the schema is inferred from the dataset via the column name.
  • produces mappings are
    • string to string mappings for non-generic components to rename the columns
    • string to type mappings for generic components to define the schema

I thought about other ways to do this. Eg. by using a .rename() method on the Dataset. But that would only allow for input mapping, not output mapping.

@PhilippeMoussalli
Copy link
Contributor

Working on the RAG use case, it's become clear that we need to support "generic" behavior not just for read and write components, but also for transform components. Think about the following components:

  • A RAG retrieval component which optionally takes embeddings to query

I feel like this information would still be better placed in the component spec rather than overwriting it at the dataset level since it better describes what are the possible fields that this component can take either text or text_embeddings. It would be difficult to understand that behavior if the only thing you had in the component spec was

consumes:
    additionalProperties:
        type: binary

Instead it would make more sense if it looks something like this (not sure about the final format yet)

consumes:
    anyOf:
        - text:
            type: str
        - text_embedding:
            type: binary

The component would need to have something like this

class Retrieval(PandasTransformComponent):

    def __init__(self, *args) -> None:

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
             if text in dataframe.columns:
                   embeddings = dataframe[text].apply(get_embeddings)
             else: 
                  embeddings = dataframe[text_emeddings]
            
  • An evaluation component which optionally takes ground truth answers to return additional metrics

Same comment as above except for a slightly different component spec

consumes:
      - answers:
          type: str
          optional: true

produces:
     - additional_metric:
        type:int 
        optional: true
        depends_on: answers
  • An aggregation component which can aggregate an arbitrary number of fields

There it makes sense to maybe include

 consumes:
     additionalProperties: true

provided that all the consumed columns will be aggregated.

I therefore propose to provide an option for the user to overwrite consumes and produces on any type of component. I would keep the consumes and produces names for all component types so they are aligned everywhere.

It seems like this would mean that we're defining some of the behavior of the components purely on the the how the dataset interface is defined rather than the component spec itself. The new fields that you're defining here would still need to be defined in the component itself and some of the behavior is not captured here (e.g. additional produced column depending on optional input column, one of two columns that need to be provided) which can make it difficult to validate.

I still think it might make more sense to override the spec only for read/write components and some exceptional transform components (aggregation). The rest of the behavior and cases should be outlined in the spec. For the generic transform component, we could maybe have a method apply_generic_transformation to overwrite it's produces field.

dataset = dataset.apply(
    "generic_captioning",
    arguments={},
    consumes={                    # A transform component can overwrite consumes
        "image": pa.binary()
    },
    produces={                    # And produces
        "caption": pa.string()
    }
)

The new overriden image field here would still need to be defined somehow in the component

Then of course we are still left with the question on how to map fields between components.

The rules can be summarized succinctly as follows, so I think (hope) this won't be too complex:

  • consumes mappings are always string to string mappings. For generic components, the schema is inferred from the dataset via the column name.

  • produces mappings are

    • string to string mappings for non-generic components to rename the columns
    • string to type mappings for generic components to define the schema

I generally agree with the behavior but I think it might be a bit difficult for end users. If we have a separate method for the generic components as I mentioned above we could provide better information for this via docstrings. We could define how to use the component with which method in the hub:

  • apply would expect a string to string mapping for produces
  • apply_generic would expect a string to type mapping for produces

All in all I think having additional columns and fields is desirable but it does indeed come with additional complexity (component spec definition, additional considerations in the pipeline interface, ...). But I think this should be more of advanced use cases rather than the common definition of a component.

@RobbeSneyders
Copy link
Member Author

On how to define generic stuff in the component spec, I agree that the ways you suggest might be better. But I think we can summarize it as: "Will be able to be defined by OpenAPI spec", and keep it out of scope for now.

It seems like this would mean that we're defining some of the behavior of the components purely on the the how the dataset interface is defined rather than the component spec itself. The new fields that you're defining here would still need to be defined in the component itself and some of the behavior is not captured here (e.g. additional produced column depending on optional input column, one of two columns that need to be provided) which can make it difficult to validate.

This is exactly why we need to define this on the dataset. If we make the component spec generic, the arguments on the dataset need to make it specific. So we know exactly which columns will be part of the intermediate dataset at each point. Otherwise we cannot evolve our manifest.

I still think it might make more sense to override the spec only for read/write components and some exceptional transform components (aggregation). The rest of the behavior and cases should be outlined in the spec. For the generic transform component, we could maybe have a method apply_generic_transformation to overwrite it's produces field.

I wouldn't want to make generic components a special type. Components could be a combination of specific & generic. Eg. a specific consumes, but generic produces based on provided arguments. Or produces that has some specific fields and some generic.

All in all I think having additional columns and fields is desirable but it does indeed come with additional complexity (component spec definition, additional considerations in the pipeline interface, ...). But I think this should be more of advanced use cases rather than the common definition of a component.

I'm not sure this will only be for advanced use cases. As mentioned, for the RAG use case, I see a lot of components that would benefit from being (partly) generic. Probably more than components that would not benefit from it.

RobbeSneyders added a commit that referenced this issue Dec 6, 2023
PR that introduces functionality to new pipeline interface as discussed
[here](#567 (comment))

* The component spec now accepts **OneOf** additionalFields or Fields in
it's consumes and produces section
* The new `consumes` and `produces` are defined at the Op level
similarly to the ones in the component spec, if they are present, they
will override the default `consumes` and `produces` defined in the
component spec (manifet, dataIO)
* Some changes were added to `DataIO` just to resolve tests issues but
the new functionality of the custom consumes and produces is not
implemented yet (will be tackled in a separate PR)

---------

Co-authored-by: Robbe Sneyders <robbe.sneyders@ml6.eu>
RobbeSneyders added a commit that referenced this issue Dec 7, 2023
PR that introduces functionality to new pipeline interface as discussed
[here](#567 (comment))

* The component spec now accepts **OneOf** additionalFields or Fields in
it's consumes and produces section
* The new `consumes` and `produces` are defined at the Op level
similarly to the ones in the component spec, if they are present, they
will override the default `consumes` and `produces` defined in the
component spec (manifet, dataIO)
* Some changes were added to `DataIO` just to resolve tests issues but
the new functionality of the custom consumes and produces is not
implemented yet (will be tackled in a separate PR)

---------

Co-authored-by: Robbe Sneyders <robbe.sneyders@ml6.eu>
RobbeSneyders added a commit that referenced this issue Dec 7, 2023
PR that introduces functionality to new pipeline interface as discussed
[here](#567 (comment))

* The component spec now accepts **OneOf** additionalFields or Fields in
it's consumes and produces section
* The new `consumes` and `produces` are defined at the Op level
similarly to the ones in the component spec, if they are present, they
will override the default `consumes` and `produces` defined in the
component spec (manifet, dataIO)
* Some changes were added to `DataIO` just to resolve tests issues but
the new functionality of the custom consumes and produces is not
implemented yet (will be tackled in a separate PR)

---------

Co-authored-by: Robbe Sneyders <robbe.sneyders@ml6.eu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

No branches or pull requests

4 participants