-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow & bind functions #146
Comments
@minmax from typing import Any
from result import Result, as_result
@as_result(KeyError)
def process_step1(input: dict[str, Any]) -> dict[str, Any]:
del input["test1"]
return input
@as_result(ZeroDivisionError)
def process_step2(input: dict[str, Any]) -> dict[str, Any]:
input["test2"] /= input["test3"]
return input
def main(input: dict[str, Any]) -> Result[dict[str, Any], Exception]:
return (
process_step1(input)
.and_then(process_step2)
.and_then(process_step1)
)
if __name__ == '__main__':
final_result = main({"test1": 1, "test2": 3, "test3": 3})
if final_result.is_ok():
print(f"Result: {final_result.unwrap()}")
if final_result.is_err():
print(f"{type(final_result.unwrap_err())}: {final_result.unwrap_err()}") Unfortunately there are no examples in the readme or anywhere else, as far as I can find. The docstrings are not really clear in this case either so it was not really possible (at least for me) to understand how to use is this way. But with a bit of trial and error it was possible to figure it out. |
@golgor - great example, @as_result seems to be Maybe the last part of an example can be something like:
? Also thought of this:
|
Here is what I found to be working with and_then (aka bind): from result import Result, Ok, Err
def inc(x: int | float) -> Result:
return Ok(x + 1)
print(Ok(1).and_then(inc)) # Ok(2)
print(Err(None).and_then(inc)) # Err(None) |
I think you already have def flow(value, *callables):
r = Ok(value)
for f in callables:
r = r.and_then(f)
return r |
To be honest I have no experience with functional programming languages, but I understand as you mention. It takes a function as input (that is how a decorator works) and wraps the output in a Result. I also looked into using a match-statement and it makes it easier to read. You might even have separate management on different kinds of errors: match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(content):
if type(content) == KeyError:
print(f"KeyError: {content}")
if type(content) == ZeroDivisionError:
print(f"ZeroDivisionError: {content}") I have this use-case at work where we get IoT-data as JSON-structures (i.e. Python dicts) and we want to perform a series of transforms on them. I did some testing with create a wrapper class for the "pipeline data". My main idea was to properly management of more complicated structures such as dicts. I also want to get some kind of result from each step. Not sure if it just makes it more complicated, and I haven't tested it properly, but might give you some ideas. Sorry for the lengthy code: from __future__ import annotations
import copy
import json
from typing import Any, Generic, TypeVar
from result import Err, Ok, Result, as_result
Message = dict[str, Any]
V = TypeVar("V", bound=Message)
@as_result(KeyError)
def process_step1(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
del data["test1"]
input.add_step("Remove 'test1'", data)
return input
@as_result(ZeroDivisionError)
def process_step2(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test2"] /= data["test3"]
input.add_step("divide test2 with test3", data)
return input
@as_result(ValueError)
def process_step3(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test3"] = 5
input.add_step("set test3 to 5", data)
return input
@as_result(ValueError)
def process_step4(input: PipelineData[Message]) -> PipelineData[Message]:
data = input.get_last_step()
data["test1"] = 5
input.add_step("recreate and set test1 to 5", data)
return input
def main(input: PipelineData[Message]) -> Result[PipelineData[Message], Exception]:
return (
process_step1(input)
.and_then(process_step2)
.and_then(process_step3)
.and_then(process_step4)
)
class PipelineData(Generic[V]):
"""A class to hold data for processing in a pipeline.
It keeps a record of all processing steps and the data from each step, so it is possible to get the data from
any previous steps at any time in the pipeline. It also provides the option to make the steps 'immutable' by
not allowing the same keys for the steps, i.e. overwriting one step with another.
It is advised to set the step name to something descriptive of what the step does, e.g. 'Sum of run_log'.
This will make it much easier to debug the pipeline if something goes wrong.
"""
def __init__(self, data: V, safe: bool = True) -> None:
"""Initializes a new instance of PipelineData.
Saves the provided data in the processing steps under the name 'Original'.
Args:
data (V): Any kind of data to be processed in the pipeline.
"""
self._safe = safe
self._processing_steps: dict[str, V] = {"original": data}
def add_step(self, step_name: str, data: V) -> None:
"""Add a processing step.
This saves a new processing step and the data from that step.
"""
if self._safe and step_name in self._processing_steps:
raise ValueError(f"Step name '{step_name}' already exists!")
self._processing_steps[step_name] = data
def get_last_step(self) -> V:
"""Get the data from the last processing step.
This is intended to serve the data from the last processing step ready for the next step. The data is copied
using deepcopy to avoid references the old data when changes are made.
Returns:
T: The data from the last processing step.
"""
data_copy = copy.deepcopy(self._processing_steps)
return data_copy[self.last_step_name]
@property
def last_step_name(self) -> str:
"""Helper function to get the name of the last processing step.
Returns:
str: The name of the last step performed.
"""
return list(self._processing_steps)[-1]
@property
def original_data(self) -> V:
"""Helper function to get the original data.
Returns:
V: The original data as provided during initialization.
"""
return self._processing_steps["original"]
def __str__(self) -> str:
return f"{json.dumps(self._processing_steps, indent=4)}"
if __name__ == "__main__":
data = PipelineData({"test1": 1, "test2": 3, "test3": 3})
final_result = main(data)
match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(content):
if type(content) == KeyError:
print(f"KeyError: {content}")
if type(content) == ZeroDivisionError:
print(f"ZeroDivisionError: {content}") |
Realised As for printing - better use The big example of a JSON pipeline... I would suggest keep it as simple as possible, seems like very many concerned jammed into one class, seems very risky. I'd keep the original data and list of transformations if I was doing something similar, not sure dict is a perfect data structure for this. |
That won't work with the standard Exception-classes, I get an error from both Pylance (PyRight) and runtime error when executing. The following will work, but still raise a PyRight-error: match final_result:
case Ok(content):
print(f"Result: {content}")
case Err(KeyError):
print(f"KeyError: {KeyError}")
case Err(ZeroDivisionError):
print(f"ZeroDivisionError: {ZeroDivisionError}")
case _:
print("Something else went wrong!") I do see your concern about the |
Here is a code that works and also passes mypy check: Overall just printing final result is fine, there is not much that |
In practice, I tried to use my own implementation of Actually, I propose to close the ticket as not useful. OnlyOk = Result[T, Never]
AnyResult = Result[Any, Any]
@overload
def flow(
val: T,
*functions: *tuple[Callable[[OnlyOk[T]], Result[R, E]]],
) -> Result[R, E]: ...
@overload
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], Result[R1, E1]],
Callable[[Result[R1, E1]], Result[R, E]],
],
) -> Result[R, E]: ...
...
@overload
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], Result[R1, E1]],
Callable[[Result[R1, E1]], Result[R2, E2]],
Callable[[Result[R2, E2]], Result[R3, E3]],
Callable[[Result[R3, E3]], Result[R4, E4]],
Callable[[Result[R4, E4]], Result[R5, E5]],
Callable[[Result[R5, E5]], Result[R, E]],
],
) -> Result[R, E]: ...
def flow(
val: T,
*functions: *tuple[
Callable[[OnlyOk[T]], AnyResult],
*tuple[Callable[[AnyResult], AnyResult], ...],
],
) -> Result[R, E]:
"""Build a pipe from functions and pass value into it.
Typing:
-------
flow(
A,
(Container[A] -> Container[B]),
(Container[B] -> Container[C]),
) -> Container[C]
Example:
-------
flow(
1.5,
bind(int),
bind(str)
) == Ok("1")
"""
return functools.reduce(
lambda value, func: func(value),
functions,
Ok(val),
) |
What do u guys think about adding some helpers functions like flow & bind, just like
returns
already has?I try to use them and imho its looks better then chain of .and_then().and_then().
I don't test my code on mypy, but pyright is ok with types of flow & bind, so i think its possible to adapt.
The text was updated successfully, but these errors were encountered: