Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Todo list for v0.4. #393

Closed
19 of 21 tasks
tobiasraabe opened this issue Jul 4, 2023 · 9 comments
Closed
19 of 21 tasks

ENH: Todo list for v0.4. #393

tobiasraabe opened this issue Jul 4, 2023 · 9 comments
Labels
enhancement New feature or request
Milestone

Comments

@tobiasraabe
Copy link
Member

tobiasraabe commented Jul 4, 2023

Todo

Discussion

  • Should products be mandatory for tasks? Advantage: better user feedback, disadvantage: breaks old behavior, workaround: add a stub product if your task has no products.
  • Should pytask support positional arguments? Thirdparty functions might no work differently. Workaround: it is easy for users to build a wrapper.
@tobiasraabe tobiasraabe added the enhancement New feature or request label Jul 4, 2023
@tobiasraabe tobiasraabe added this to the v0.4.0 milestone Jul 4, 2023
@NickCrews
Copy link
Contributor

NickCrews commented Jul 19, 2023

Hey @tobiasraabe, I'm not sure if this is the right place for this discussion, but I just noticed this issue so I thought I'd chime in.

I have wrapped pytask in my own way so that the underlying processing function is independent of the task, so that

  1. the user can inject args into the process directly, instead of needing to supply the file path and loading it
  2. the processing function can be reused as part of the python API of a library
# in lib.py
def process(df: pd.DataFrame, nrows: int, annotations: Path) -> pd.DataFrame:
    ...

Separately, the tasks can be defined, along with how to load and save the inputs and outputs:

# tasks.py

class Loader(typing.Protocol):
    @property
    def path(self) -> Path:
        ...
    
    def load(self) -> Any:
        ...


class Saver(typing.Protocol):
    @property
    def path(self) -> Path:
        ...
    
    def save(self, data: Any) -> None:
        ...

class DataFrameLoader:
    ...

class DataFrameSaver:
    ...

class JsonLoader:
   ...

class MyTask:

    def __init__(self, func, *, loaders: dict[str, Loader], saver: Saver):
        self.func = func
        self.loaders = loaders
        self.saver = saver
  
    def to_pytask(self):

        depends_on = [l.path for l in self.loaders.values()]
        
        @pytask.mark.depends_on(depends_on)
        @pytask.mark.produces(self.saver.path)
        def my_task(depends_on, produces):
            kwargs = {name: loader.load() for name, loader in self.loaders.items()}
            result = self.func(**kwargs)
            self.saver.save(result)

        return my_task


task_process = MyTask(
    func=lib.process,
    loaders={
        "df": DataFrameLoader("input.csv"),
        "nrows": JsonLoader("config.json", "nrows"),
        "annotations": "annotations.txt",  # IDK how to support passing in literal paths, probably could avoid a PathLoader if we were clever.
    },
    saver=DataFrameSaver("output.csv"),
).to_pytask()

This could also be turned into a decorator API. Also I have glossed over a few things, like I have an additional layer that makes it possible to use positional args in addition to kwargs, and a layer for translating the raw output of a function, in case the function returns two things, so that you can have a separate saver for each.

I didn't suggest this as a feature earlier because it really was breaking to the whole API, but if we are going to make breaking changes anyway, could we lean all the way into direct dependency injection?

@tobiasraabe
Copy link
Member Author

Hi @NickCrews! It is definitely the right time to talk about changes! I started collecting ideas in this
discussion. We should probably move it there so other people are also notified.

Unfortunately, the Python API is immature or almost inexistent. Sorry that you had to work around it like this. But I am really interested in your use case. I am trying to figure out what would be the desired interface for you.

  1. How do you run this workflow? Do you use the CLI or pytask.main()?
  2. Assuming that you want to use the latter and you want to pass tasks to the function directly without collecting them from files, there might be two direct ways?
    • There is a Task class in pytask. A user could create such class and set all necessary attributes like depends_on and produces. Since there happens some validation before the object is instantiated, it might not be a super safe way.
    • The second option could be the @pytask.mark.task decorator. Your example could become this (Note: I am using some features from the new pytask interface in v0.4):
"""At first we create loader and saver classes.They inherit from pytask's
internal `FilePathNode` because then they are internally handled as paths, meaning checking
modification times of the files, etc., and by
overwriting the `.value` attribute/property, you can control was is being injected into the
task.

"""
from pytask import FilePathNode
from pathlib import Path
import pandas as pd
from typing import Any


class DataFrameLoader(FilePathNode):
    """Create a loader that works like a pytask node for paths, but instead of
    injecting a path into the task, it injects the loaded object.
    
    """
    path: Path

    @property
    def value(self):
        """This property controls what enters the task under the function argument."""
        return pd.read_pickle(self.path)
    

class JsonLoader(FilePathNode):
    ...


class DataFrameSaver(FilePathNode):
    path: Path

    @property
    def value(self):
        """The DataFrameSaver injects itself into the task so the user can use the save
        method."""
        return self

    def save(self, data: Any) -> None:
        ...


"""Now, we are using the @pytask.mark.decorator to turn a function into a task and
inject arguments via the `kwargs` keyword.
"""
from typing import Annotated
# Annotation for products that is new in pytask v0.4.
from pytask import Product
import pytask


def process(
    df: pd.DataFrame,
    nrows: int,
    saver: Annotated[DataFrameSaver, Product]
) -> None:
    # Function logic...
    saver.save(...)


task_example = pytask.mark.task(kwargs={
    "df": DataFrameLoader("input.csv"),
    "nrows": JsonLoader("config.json", "nrows"),
    "saver": DataFrameSaver("output.csv")
})(process)


# We need to develop an interface for pytask that accepts a collection of tasks instead
# of paths.
from pytask import run_tasks


result = run_tasks([task_example])

I hope this example still covers everything that you need. If I have missed an important detail of your workflow, please, tell me.

This solution would also be cool, because except for the run_tasks function, everything should be there already in the v0.4 release.

@NickCrews
Copy link
Contributor

How do you run this workflow? Do you use the CLI or pytask.main()?

I use the CLI, but only because there isn't a good python API. In addition to run_tasks, a basic need would be to filter tasks, similar to the -k option on the command line (but ideally more flexible, since we have the full power of python). With those two things I could make functional API for my app to execute certain tasks. A later bonus would be an API to be able to interrogate the task graph to find upstream/downstream tasks, file deps and outputs of a task, etc, but those would be nice-to-haves.

Your example looks good! I think it would work for me. I like your use of typing.Annotated, but I'm not quite sure why that is required? Is there a comprehensive overview of all the changes coming? I would love to browse.

There are a few things that I think could be improved though:

  1. Don't require me to import and inherit from FilePathNode. A Saver only needs to provide two things (I think, correct me if I'm missing something): A path known at scheduling time, and a function to do the saving at run time. A Loader only needs to provide two things: A path known at scheduling time, and a function to do the loading at run time. Since that is so simple, I would prefer the simplicity, future-proofness, and decoupled-ness of a duck-typed Protocol. When creating a Task, we can add some translation code that turns this external representation into the FilePathNode objects that get used internally.

Am I missing something with the purpose of the .value property of a Saver? is that just some boilerplate required to satisfy the requirements of a FilePathNode, or could this return something other than self for some Savers? If it's boilerplate, that is another argument for what I propose above.

  1. Can we avoid passing in the saver as an arg to the processor function? That requires the processor functions to have an unpythonic, uncomposable API. A function should ideally just do the processing, leaving the IO as a separate concern. This IO orchestration is what I think pytask does really well, so it should be part of the framework. Can we pass in the output transformer as a separate thing to pytask.mark.task()? In fact, I actually have the output stuff separated into two separate steps:
    a. Take the raw output of the function (which might be a single value, or maybe a tuple of two values, or a dict, etc) and translate this into a dict of output_name: value, so every bit of output has a name.
    b. Take this dict of values, and for each output, dispatch it to the proper Saver.

Here is my full wrapper code that shows the above. It also takes another step of assigning a unique name to every single input and output (which in this case has a simple 1:1 mapping to a Path, but in theory these could be separated), and adding a DataStore that acts as a cache so that once data is loaded or saved once you don't have to load it again. The DataStore also separates the concerns of IO from the Task object. The Task just says self.datastore.load(name_of_data) and it gets the data. The DataStore is the one that actually holds and orchestrates the Loader and Saver objects. Hopefully there is something in there that you find interesting.

from __future__ import annotations

from collections.abc import Mapping
from functools import cached_property
import logging
from pathlib import Path
from typing import Any, Callable, Hashable, Iterable, Protocol, TypeVar

import fire
import ibis
from ibis.expr.types import Table
import pytask

from noatak import io

logger = logging.getLogger(__name__)


class PInputs(Protocol):
    """The specification for the inputs to a task function.

    This is used to convert input data from a name:value mapping to
    the args and kwargs that the task function expects.
    """

    @property
    def names(self) -> frozenset[str]:
        """The names of all inputs"""
        raise NotImplementedError

    def convert_input(
        self, input: Mapping[str, Any]
    ) -> tuple[tuple[Any], dict[str, Any]]:
        """
        Convert a k:v mapping to args and kwargs suitable for passing to a function
        """
        raise NotImplementedError

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({set(sorted(self.names))})"


class Inputs(PInputs):
    def __init__(self, args: Iterable[str], kwargs: Mapping[str, str]):
        self.args = frozenset(args)
        self.kwargs = dict(kwargs)  # TODO make this immutable
        # map of arg_name: input_name
        # arg_name must be the key because one input might be used for multiple args

    @cached_property
    def names(self) -> frozenset[str]:
        return frozenset(self.args) | frozenset(self.kwargs.values())

    def convert_input(
        self, input: Mapping[str, Any]
    ) -> tuple[tuple[Any], dict[str, Any]]:
        args = tuple(input[arg] for arg in self.args)
        kwargs = {arg: input[name] for arg, name in self.kwargs.items()}
        return args, kwargs

    @classmethod
    def make(
        cls,
        inputs: str | Iterable[str] | Mapping[str, str] | None,
    ):
        if inputs is None:
            args = tuple()
            kwargs = {}
            return cls(args, kwargs)
        elif isinstance(inputs, str):
            args = (inputs,)
            kwargs = {}
            return cls(args, kwargs)
        try:
            kwargs = dict(inputs.items())
            args = tuple()
            return cls(args, kwargs)
        except AttributeError:
            args = tuple(inputs)
            kwargs = {}
            return cls(args, kwargs)


class POutputs(Protocol):
    """The specification for the outputs of a task function."""

    @property
    def names(self) -> frozenset[str]:
        """The names of all outputs"""
        return self._names

    def convert_output(self, output: Any) -> dict[str, Any]:
        """Convert the output of a task function to a name:value mapping."""
        raise NotImplementedError

    @classmethod
    def make(cls, names: str | Iterable[str] | Mapping[str, Hashable] | None):
        if names is None:
            return NoneOutputs()
        elif isinstance(names, str):
            return ScalarOutputs(names)
        try:
            kwouts = dict(names.items())
            return MappingOutputs(kwouts)
        except AttributeError:
            return IterableOutputs(names)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({set(sorted(self.names))})"


class NoneOutputs(POutputs):
    _names = frozenset()

    def convert_output(self, output: Any) -> dict[str, Any]:
        return dict()


class ScalarOutputs(POutputs):
    def __init__(self, name: str):
        self._name = name
        self._names = frozenset((name,))

    def convert_output(self, output: Any) -> dict[str, Any]:
        return {self._name: output}


class IterableOutputs(POutputs):
    def __init__(self, names: Iterable[str]):
        self._names = tuple(names)

    def convert_output(self, output: Any) -> dict[str, Any]:
        return dict(zip(self._names, output))


class MappingOutputs(POutputs):
    def __init__(self, names: Mapping[str, str]):
        self._kwargs = dict(names)
        self._names = frozenset(names.keys())

    def convert_output(self, output: Any) -> dict[str, Any]:
        return {name: output[raw_name] for name, raw_name in self._kwargs.items()}


class PLoader(Protocol):
    @property
    def names(self) -> frozenset[str]:
        """The names of the data that can be loaded."""
        raise NotImplementedError

    def path(self, name: str) -> Path:
        raise NotImplementedError

    def load(self, name: str) -> Any:
        raise NotImplementedError


class PSaver(Protocol):
    @property
    def names(self) -> frozenset[str]:
        """The names of the data that can be saved."""
        raise NotImplementedError

    def path(self, name: str) -> Path:
        raise NotImplementedError

    def save(self, name: str, data: Any) -> None:
        raise NotImplementedError


_LoaderOrSaverT = TypeVar("_LoaderOrSaverT", PLoader, PSaver)


class DataStore:
    def __init__(
        self,
        loaders: Iterable[PLoader] = [],
        savers: Iterable[PSaver] = [],
        data: dict[str, Any] | None = None,
    ):
        self._loaders = frozenset(loaders)
        self._savers = frozenset(savers)
        self.datas = dict(data) if data is not None else {}

        try:
            self._loader_map = self.build_map(loaders)
        except ValueError as e:
            raise ValueError("Duplicate loader names") from e

        try:
            self._saver_map = self.build_map(savers)
        except ValueError as e:
            raise ValueError("Duplicate saver names") from e

    def evolve(self, loaders=None, savers=None, data=None):
        return DataStore(
            loaders=loaders or self._loaders,
            savers=savers or self._savers,
            data=data or self.datas,
        )

    def get_loader(self, name: str) -> PLoader:
        try:
            return self._loader_map[name]
        except KeyError as e:
            raise ValueError(f"No loader for {name}") from e

    def get_saver(self, name: str) -> PSaver:
        try:
            return self._saver_map[name]
        except KeyError as e:
            raise ValueError(f"No saver for {name}") from e

    def load(self, name: str) -> Any:
        if name in self.datas:
            return self.datas[name]
        result = self.get_loader(name).load(name)
        self.datas[name] = result
        return result

    def save(self, name: str, data: Any) -> None:
        saver = self.get_saver(name)
        saver.save(name, data)
        self.datas[name] = data

    @staticmethod
    def build_map(ls: Iterable[_LoaderOrSaverT]) -> dict[str, _LoaderOrSaverT]:
        m = {}
        duplicates = set()
        for loader in ls:
            for name in loader.names:
                if name in m:
                    duplicates.add(name)
                else:
                    m[name] = loader
        if duplicates:
            raise ValueError(duplicates)
        return m


class Task:
    def __init__(
        self,
        func,
        *,
        inputs: PInputs,
        outputs: POutputs,
        data_store: DataStore,
        name: str | None = None,
    ):
        self.func = func
        self.inputs = inputs
        self.outputs = outputs
        self.data_store = data_store
        self.name = name or func.__name__

    def load(self) -> tuple[tuple[Any], dict[str, Any]]:
        logger.info(f"Reading {self.inputs}")
        input = {name: self.data_store.load(name) for name in self.inputs.names}
        return self.inputs.convert_input(input)

    def save(self, raw_output: Any):
        output = self.outputs.convert_output(raw_output)
        logger.info(f"Writing {self.outputs}")
        for name, data in output.items():
            self.data_store.save(name, data)

    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)

    def cli(self, verbose: bool = False):
        level = logging.DEBUG if verbose else logging.INFO
        logging.basicConfig(level=level, force=True)
        self.run()

    def fire(self):
        fire.Fire(self.cli)

    def run(self):
        logger.info(f"Running {self.func.__name__}")
        args, kwargs = self.load()
        result = self.func(*args, **kwargs)
        self.save(result)

    def to_pytask(self):
        """Convert into a pytask task function."""
        input_paths = frozenset(
            (self.data_store.get_loader(name).path(name) for name in self.inputs.names)
        )

        output_paths = frozenset(
            (self.data_store.get_saver(name).path(name) for name in self.outputs.names)
        )

        @pytask.mark.depends_on(input_paths)
        @pytask.mark.produces(output_paths)
        def my_task(depends_on, produces):
            self.cli()

        return my_task


class _TableLoaderSaverBase:
    def __init__(self, names: Iterable[str], path_getter):
        self.names = frozenset(names)
        self.path_getter = path_getter

    def path(self, name: str) -> Path:
        return self.path_getter(name)


class TableLoader(_TableLoaderSaverBase):
    def load(self, name: str) -> Table:
        return ibis.read_parquet(self.path(name))


class TableSaver(_TableLoaderSaverBase):
    def save(self, name: str, data: Table) -> None:
        io.to_parquet(data, self.path(name))


def make_task(
    ins, outs, path_getter
) -> Callable[[Callable], Task]:
    """
    Decorator that creates a task that reads and writes tables with the given names.
    """
    i = Inputs.make(ins)
    o = POutputs.make(outs)
    loader = TableLoader(i.names, path_getter)
    saver = TableSaver(o.names, path_getter)
    data_store = DataStore(loaders=[loader], savers=[saver])

    def wrapper(func):
        return Task(func, inputs=i, outputs=o, data_store=data_store)

    return wrapper


def run_task(name: str):
    # s flag means don't capture stdout, so
    # we get logging output as tasks run
    session = pytask.main({"k": name, "s": True})
    if session.exit_code != 0:
        raise RuntimeError(f"Task {name} failed", session)

@NickCrews
Copy link
Contributor

Also an interesting API worth exploring would be if Loaders and Savers exposed their path using the __fspath__ protocol used by os.PathLike. But maybe an explicit .path attribute/method would be better.

@tobiasraabe
Copy link
Member Author

tobiasraabe commented Jul 28, 2023

a basic need would be to filter tasks

Should be possible by passing main({"expression": "..."}), isn't it? A more flexible approach is, for example, implemented in https://github.com/darrenburns/ward. I think it allows you to even search for code in the test function body.

A later bonus would be an API to be able to interrogate the task graph

I am sure you found it, but in case and for now, you find everything under session.dag which is a networkx.DiGraph. We could build on top of it.

Is there a comprehensive overview of all the changes coming?

I created #400 to track the changes in v0.4 that are developed on branch v4 to keep the main branch clean for patches. In #392, you find a preliminary documentation for the new features. Follow the RTD link in the Github Actions Checks for a built version.

Don't require me to import and inherit from FilePathNode.

That is a good point and I really like how you use protocols and how they might better fit to pytask. Let me explain what is happening.

  • pytask uses nominal subtyping for nodes which are dependencies, products, tasks. You find the module here: https://github.com/pytask-dev/pytask/blob/main/src/_pytask/nodes.py. There exist a common MetaNode that all inherit from and that requires nodes to have a name and a state method (to produce a value to detect changes like hashes or modification timestamps) and it should also add .value which is maybe missing.
  • The idea is that as long as a user plugs in a node with these specs, pytask can handle it.
  • I suggested to inherit from FilePathNode for code sharing and as a quick solution. Thereby, your node reuses the necessary methods which works because it also relies on paths. And we get some other benefits that are only implemented for FilePathNodes like prettier formatting of paths.
  • I see how a protocol for nodes with paths could be beneficial because then every node with a path gets these additional benefits without necessarily needing to inherit from FilePathNode.
  • Same argument for MetaNode could also be a protocol and save an import.

I further have to think about it, but it sounds like protocols are preferable over abcs.

Am I missing something with the purpose of the .value property of a Saver?

.value is determines what is injected into the task. It should be part of the protocol for nodes.

Can we avoid passing in the saver ...

I have to further think about this one. I find it super interesting and allowing task functions to return could make some things a lot simpler for users. Thanks for sharing your code examples! I will take a look soon.

@NickCrews
Copy link
Contributor

main({"expression": "..."})

I don't see "expression" as one of the options on the CLI? Per https://pytask-dev--392.org.readthedocs.build/en/392/reference_guides/api.html#pytask.main, I thought that this expects the same args as the CLI. What would "expression" do? I think making main() actually have typed args would help here. But exposing main() might not even be the best idea, I think ideally it would be more granular with pytask.find_tasks() and then you could filter tasks like filtered = [t for r in pytask.find_tasks() if is_good_task(t)] and then pytask.run_tasks(filtered)

Note that my example was one of my first uses of Protocols, so I'm not quite doing it correctly in my example, and am using them more like ABCs.

Thanks for the explanation of MetaNode, etc. That makes sense, and I like how you did that internally. I think those should stay as internal details though, and the public interface is you give pytask something with a .path() and a .load()/.save() method, and pytask can create Nodes from those.

@tobiasraabe
Copy link
Member Author

I agree it is quite opaque. I had to check myself. Internally, the config is a big dictionary and this line determines the name of the option internally. So, "expression" should be the key for -k and "marker_expression" for -m. I partially agree with a typed main. It needs to be somewhat flexible since plugins can add more options, but we cannot dynamically change the signature. Something like **kwargs might help.

Your examples made me reread this amazing guide (https://hynek.me/articles/python-subclassing-redux/) and I think now I understand protocols a lot better.

@NickCrews
Copy link
Contributor

The new release looks awesome @tobiasraabe ! Thanks so much for your work on this! The new release solves many of the missing features I was looking for.

@tobiasraabe
Copy link
Member Author

Thank you for the nice words, @NickCrews. Our discussions helped to shape this release significantly. Thanks for your ideas and time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants