Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace SourceDataset and maybe Source* with Intake and SourceIntake #43

Closed
benjaminleighton opened this issue Sep 22, 2023 · 9 comments

Comments

@benjaminleighton
Copy link
Collaborator

Hi @rafa-guedes talking to @pbranson this afternoon he suggested that some of the functionality going into SourceDataset(s) and similar could go into intake catalogs combined with SourceIntake. This makes sense to me as well. It means that filtering and xarray wrappers can sit in intake and we don't have to rebuild functionality there.

For example we could replace

SourceFile(uri='bathy_temp.tif')

With something like:

ic = intake.open_rasterio('bathy_temp.tif')
SourceIntake(dataset_id='rasterio', catalog_yaml=ic.yaml())

here catalog_yaml is an alternative to catalog_uri that allows catalog_yaml to be embedded directly in a serialized SourceIntake object

and might require an extension of the SourceIntake object like:

class SourceIntake(SourceBase):
    """Source dataset from intake catalog."""

    model_type: Literal["intake"] = Field(
        default="intake",
        description="Model type discriminator",
    )
    dataset_id: str = Field(description="The id of the dataset to read in the catalog")
    catalog_uri: Optional[str | Path] = Field(description="The URI of the catalog to read from")
    catalog_yaml: Optional[str] = Field(description="The YAML string of the catalog to read from")
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to define intake dataset parameters",
    )

    def __str__(self) -> str:
        return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"

    @property
    def catalog(self) -> Catalog:
        """The intake catalog instance."""
        if self.catalog_uri:
            return intake.open_catalog(self.catalog_uri)
        else:
            return intake.Catalog.from_dict(yaml.safe_load(self.catalog_yaml))

    def _open(self) -> xr.Dataset:
        return self.catalog[self.dataset_id](**self.kwargs).to_dask()

What do you think?

@rafa-guedes
Copy link
Collaborator

@benjaminleighton, @pbranson, I really like this idea. This would allow to dial down these Source objects to a minimum and make a better use of intake. I'd only suggest some changes in how we define the fields signature in this new SourceIntake class. I haven't been able to make this work yet though, have you got a working example? For instance, if I try this using one of the rompy test files:

import yaml
import intake

ds = intake.open_netcdf("tests/data/gebco-5km.nc")
cat = intake.Catalog.from_dict(yaml.safe_load(ds.yaml()))
list(cat)

I get:

['sources']

But I can't seem to be able to load dask from it by doing for example cat.sources.to_dask() or something similar - I get AttributeError: 'dict' object has no attribute '_catalog' error.

@pbranson
Copy link
Member

@rafa-guedes Yes I find Catalog.from_dict somewhat mis-leading. You need to use the from intake.catalog.local import YAMLFileCatalog class.

I spent some time playing around with dynamic generation of intake catalogs again and have come up with an example, which I think is the cleanest.

The part I have struggled with the most is the intake catalogs, because the schema is so simple, expects that you just define the dictionary and dump to yaml. There are two key bits of boilerplate code that make the code cleaner:

def as_entry(source):
    # Helper function to dereference the dictionary that is created by https://github.com/intake/intake/blob/4bbb8c2935700c318928ffdba88c824282ac9970/intake/source/base.py#L295C23-L295C23
    return list(source._yaml()["sources"].values())[0] # Not sure why the source (entries) yaml gets nested sources - this dereferences it

#Taking the dictionary to an in-memory catalog object is also a bit terse, expects a filesystem and file-like object path
def to_memory(cat_dict,tmp='temp.yaml'):
    fs = fsspec.filesystem('memory')
    fs_map = fs.get_mapper()
    fs_map[f'/{tmp}']=yaml.dump(cat_dict).encode('utf-8')
    return fs, tmp

so the example above could be changed to :

class SourceIntake(SourceBase):
    """Source dataset from intake catalog."""

    model_type: Literal["intake"] = Field(
        default="intake",
        description="Model type discriminator",
    )
    dataset_id: str = Field(description="The id of the dataset to read in the catalog")
    catalog_uri: Optional[str | Path] = Field(description="The URI of the catalog to read from")
    catalog_yaml: Optional[str] = Field(description="The YAML string of the catalog to read from")
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to define intake dataset parameters",
    )

    def __str__(self) -> str:
        return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"

    @property
    def catalog(self) -> Catalog:
        """The intake catalog instance."""
        if self.catalog_uri:
            return intake.open_catalog(self.catalog_uri)
        else:
            fs = fsspec.filesystem('memory')
            fs_map = fs.get_mapper()
            fs_map[f'/temp.yaml']=self.catalog_yaml.encode('utf-8')
            return YAMLFileCatalog('temp.yaml',fs=fs)

    def _open(self) -> xr.Dataset:
        return self.catalog[self.dataset_id](**self.kwargs).to_dask()

A full example is here:
https://gist.github.com/pbranson/5c65962ec8120d4844d4e34e72655f2b

@pbranson
Copy link
Member

xref: intake/intake#771

@pbranson
Copy link
Member

Worth watching this if you have time, Martin gives an overview of his thinking and prototyping for intake 2 which seems to overlap a lot with some of the challenges we are attempting to tackle with intake drivers, filters and catalogs: https://discourse.pangeo.io/t/sep-27-2023-intake-2-the-future-martin-durant/3706/3

@rafa-guedes
Copy link
Collaborator

This looks interesting @pbranson thanks for sharing the link.

@rafa-guedes
Copy link
Collaborator

Sorry @pbranson I let this slip by. Thanks for sharing your example / notebook, I tested here and it works. I just wonder though what would be the workflow for using the SourceIntake class this way - The way I managed to make it work based on your example and explanation does not look super straightforward (I may be missing something). For example, using one of the test files in rompy after making the changes to the SourceIntake class:

source = intake.open_netcdf("/source/rompy/tests/data/era5-20230101.nc")
yaml_source = list(source._yaml()["sources"].values())[0] # Or define this in rompy as a helper function such as the as_entry() one you defined in the notebook
catalog_yaml = {"metadata": {"version": 1}, "sources": {"era5": yaml_source}}
rompy_source = SourceIntake(dataset_id="era5", catalog_yaml=catalog_yaml)

I'm happy to implement these changes to SourceIntake if these are going to be useful, also happy to review a pull request with the require changes. I think we may still want to leave other source classes such as SourceFile for example since it may be easier to use that with an existing file in some cases.

@pbranson
Copy link
Member

The use case here is for circumstances where a catalog is generated by an external system and is passed in as YAML. For instance from some other database that indexes forcing files. The catalog yaml can be serialised.

I agree for the single file case it's utility may not make as much sense, other than leveraging the intake layer to take the file from source to in memory container.

Also note that the inferface should be more like this (not as terse):

source = intake.open_netcdf("/source/rompy/tests/data/era5-20230101.nc")
source.name = "era5"
rompy_source = SourceIntake(dataset_id="era5", catalog_yaml=source.yaml())

Assuming you are using a SourceIntake:

class SourceIntake(SourceBase):
    """Source dataset from intake catalog."""

    model_type: Literal["intake"] = Field(
        default="intake",
        description="Model type discriminator",
    )
    dataset_id: str = Field(description="The id of the dataset to read in the catalog")
    catalog_uri: Optional[str | Path] = Field(description="The URI of the catalog to read from")
    catalog_yaml: Optional[str] = Field(description="The YAML string of the catalog to read from")
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to define intake dataset parameters",
    )

    def __str__(self) -> str:
        return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"

    @property
    def catalog(self) -> Catalog:
        """The intake catalog instance."""
        if self.catalog_uri:
            return intake.open_catalog(self.catalog_uri)
        else:
            fs = fsspec.filesystem('memory')
            fs_map = fs.get_mapper()
            fs_map[f'/temp.yaml']=self.catalog_yaml.encode('utf-8')
            return YAMLFileCatalog('temp.yaml',fs=fs)

    def _open(self) -> xr.Dataset:
        return self.catalog[self.dataset_id](**self.kwargs).to_dask()

@rafa-guedes
Copy link
Collaborator

That looks easier Paul. I have opened a pull request #70 to implement this, assigned it to you.

@rafa-guedes
Copy link
Collaborator

Implemented in #70

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants