diff --git a/examples/polars/README.md b/examples/polars/README.md new file mode 100644 index 00000000..9dc89753 --- /dev/null +++ b/examples/polars/README.md @@ -0,0 +1,23 @@ +# Classic Hamilton Hello World + +In this example we show you how to create a simple hello world dataflow that +creates a polars dataframe as a result. It performs a series of transforms on the +input to create columns that appear in the output. + +File organization: + +* `my_functions.py` houses the logic that we want to compute. +Note (1) how the functions are named, and what input +parameters they require. That is how we create a DAG modeling the dataflow we want to happen. +Note (2) that we have a custom extract_columns decorator there for now. This is because we don't have +a way to parameterize the dataframe library for the extract_columns function yet. This will be fixed in the future. +* `my_script.py` houses how to get Hamilton to create the DAG, specifying that we want a polars dataframe and +exercise it with some inputs. + +To run things: +```bash +> python my_script.py +``` + +If you have questions, or need help with this example, +join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help! diff --git a/examples/polars/my_functions.py b/examples/polars/my_functions.py new file mode 100644 index 00000000..bb88a81c --- /dev/null +++ b/examples/polars/my_functions.py @@ -0,0 +1,48 @@ +import polars as pl + +from hamilton.function_modifiers import extract_columns + + +@extract_columns("signups", "spend") # , df_type=pl.DataFrame, series_type=pl.Series) +def base_df(base_df_location: str) -> pl.DataFrame: + """Loads base dataframe of data. + + :param base_df_location: just showing that we could load this from a file... + :return: + """ + return pl.DataFrame( + { + "signups": pl.Series([1, 10, 50, 100, 200, 400]), + "spend": pl.Series([10, 10, 20, 40, 40, 50]), + } + ) + + +def avg_3wk_spend(spend: pl.Series) -> pl.Series: + """Rolling 3 week average spend.""" + return spend.rolling_mean(3) + + +def spend_per_signup(spend: pl.Series, signups: pl.Series) -> pl.Series: + """The cost per signup in relation to spend.""" + return spend / signups + + +def spend_mean(spend: pl.Series) -> float: + """Shows function creating a scalar. In this case it computes the mean of the entire column.""" + return spend.mean() + + +def spend_zero_mean(spend: pl.Series, spend_mean: float) -> pl.Series: + """Shows function that takes a scalar. In this case to zero mean spend.""" + return spend - spend_mean + + +def spend_std_dev(spend: pl.Series) -> float: + """Function that computes the standard deviation of the spend column.""" + return spend.std() + + +def spend_zero_mean_unit_variance(spend_zero_mean: pl.Series, spend_std_dev: float) -> pl.Series: + """Function showing one way to make spend have zero mean and unit variance.""" + return spend_zero_mean / spend_std_dev diff --git a/examples/polars/my_script.py b/examples/polars/my_script.py new file mode 100644 index 00000000..4ace51ef --- /dev/null +++ b/examples/polars/my_script.py @@ -0,0 +1,55 @@ +import logging +import sys +from typing import Dict + +import polars as pl + +from hamilton import base, driver + +logging.basicConfig(stream=sys.stdout) + + +class PolarsDataFrameResult(base.ResultMixin): + """We need to create a result builder for our use case. Hamilton doesn't have a standard one + to use just yet. If you think it should -- let's chat and figure out a way to make it happen! + """ + + def build_result(self, **outputs: Dict[str, pl.Series]) -> pl.DataFrame: + """This is the method that Hamilton will call to build the final result. It will pass in the results + of the requested outputs that you passed in to the execute() method. + :param outputs: The results of the requested outputs. + :return: a polars DataFrame. + """ + if len(outputs) == 1: + (value,) = outputs.values() # this works because it's length 1. + if isinstance(value, pl.DataFrame): # it's a dataframe + return value + elif not isinstance(value, pl.Series): # it's a single scalar + return pl.DataFrame([outputs]) + # TODO: check for length of outputs and determine what should + # happen for mixed outputs that include scalars for example. + return pl.DataFrame(outputs) + + +adapter = base.SimplePythonGraphAdapter(result_builder=PolarsDataFrameResult()) +config = { + "base_df_location": "dummy_value", +} +import my_functions # where our functions are defined + +dr = driver.Driver(config, my_functions, adapter=adapter) +# note -- we cannot request scalar outputs like we could do with Pandas. +output_columns = [ + "spend", + "signups", + "avg_3wk_spend", + "spend_per_signup", + "spend_zero_mean_unit_variance", +] +# let's create the dataframe! +df = dr.execute(output_columns) +print(df) + +# To visualize do `pip install sf-hamilton[visualization]` if you want these to work +# dr.visualize_execution(output_columns, './my_dag.dot', {}) +# dr.display_all_functions('./my_full_dag.dot') diff --git a/examples/polars/requirements.txt b/examples/polars/requirements.txt new file mode 100644 index 00000000..3874d6b9 --- /dev/null +++ b/examples/polars/requirements.txt @@ -0,0 +1,2 @@ +polars +sf-hamilton diff --git a/hamilton/function_modifiers/expanders.py b/hamilton/function_modifiers/expanders.py index c1372c51..eeae725e 100644 --- a/hamilton/function_modifiers/expanders.py +++ b/hamilton/function_modifiers/expanders.py @@ -1,6 +1,6 @@ import functools import inspect -from typing import Any, Callable, Collection, Dict, Tuple, Union +from typing import Any, Callable, Collection, Dict, Optional, Tuple, Type, Union import pandas as pd import typing_inspect @@ -307,15 +307,75 @@ class parameterized_inputs(parameterize_sources): pass +# prototyping some functionality here -- use singledispatch which will at run time +# determine the right function to call. +@functools.singledispatch +def get_column(df, column_name: str): + raise NotImplementedError() + + +@get_column.register(pd.DataFrame) +def get_column_pandas(df: pd.DataFrame, column_name: str) -> pd.Series: + return df[column_name] + + +# but singledispatch doesn't work with types -- so we need a way to +# register functions to resolve what the column type is for a given dataframe type. +df_type_to_column_type_functions = [] + + +def register_df_type_to_column_type_function(func: Callable): + global df_type_to_column_type_functions + df_type_to_column_type_functions.append(func) + + +def pandas_series(dataframe_type: Type) -> Optional[Type]: + if dataframe_type == pd.DataFrame: + return pd.Series + return None + + +register_df_type_to_column_type_function(pandas_series) + +try: + import polars as pl + + @get_column.register(pl.DataFrame) + def get_column_polars(df: pl.DataFrame, column_name: str) -> pl.Series: + return df[column_name] + + def polars_series(dataframe_type: Type) -> Optional[Type]: + if dataframe_type == pl.DataFrame: + return pl.Series + return None + + register_df_type_to_column_type_function(polars_series) +except ModuleNotFoundError: + pass + + +def get_column_type_from_df_type(dataframe_type: Type) -> Type: + for func in df_type_to_column_type_functions: + series_type = func(dataframe_type) + if series_type is not None: + return series_type + raise ValueError(f"Unknown dataframe type: {dataframe_type}. Cannot get Column type.") + + class extract_columns(base.NodeExpander): - def __init__(self, *columns: Union[Tuple[str, str], str], fill_with: Any = None): + def __init__( + self, + *columns: Union[Tuple[str, str], str], + fill_with: Any = None, + ): """Constructor for a modifier that expands a single function into the following nodes: - n functions, each of which take in the original dataframe and output a specific column - 1 function that outputs the original dataframe :param columns: Columns to extract, that can be a list of tuples of (name, documentation) or just names. :param fill_with: If you want to extract a column that doesn't exist, do you want to fill it with a default value? - Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a column. + Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a column. Note: + fill with only works with dataframe libraries that support scalar assignment. """ if not columns: raise base.InvalidDecoratorException( @@ -334,10 +394,13 @@ def validate(self, fn: Callable): :param fn: Function to validate. :raises: InvalidDecoratorException If the function does not output a Dataframe """ + output_type = inspect.signature(fn).return_annotation - if not issubclass(output_type, pd.DataFrame): + try: + get_column_type_from_df_type(output_type) + except NotImplementedError: raise base.InvalidDecoratorException( - f"For extracting columns, output type must be pandas dataframe, not: {output_type}" + f"Error {fn} does not output a dataframe. The extract_columns decorator needs a function that outputs a dataframe." ) def expand_node( @@ -354,11 +417,12 @@ def expand_node( fn = node_.callable base_doc = node_.documentation - def df_generator(*args, **kwargs) -> pd.DataFrame: + def df_generator(*args, **kwargs): df_generated = fn(*args, **kwargs) if self.fill_with is not None: for col in self.columns: if col not in df_generated: + # this wont work for polars for example df_generated[col] = self.fill_with return df_generated @@ -371,22 +435,25 @@ def df_generator(*args, **kwargs) -> pd.DataFrame: def extractor_fn( column_to_extract: str = column, **kwargs - ) -> pd.Series: # avoiding problems with closures + ): # avoiding problems with closures df = kwargs[node_.name] if column_to_extract not in df: raise base.InvalidDecoratorException( f"No such column: {column_to_extract} produced by {node_.name}. " f"It only produced {str(df.columns)}" ) - return kwargs[node_.name][column_to_extract] + return get_column(df, column_to_extract) + + output_type = inspect.signature(fn).return_annotation + series_type = get_column_type_from_df_type(output_type) output_nodes.append( node.Node( column, - pd.Series, + series_type, # set output type here doc_string, extractor_fn, - input_types={node_.name: pd.DataFrame}, + input_types={node_.name: output_type}, # set input type requirement here tags=node_.tags.copy(), ) )