Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Polars example #263

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions examples/polars/README.md
@@ -0,0 +1,23 @@
# Classic Hamilton Hello World

In this example we show you how to create a simple hello world dataflow that
creates a polars dataframe as a result. It performs a series of transforms on the
input to create columns that appear in the output.

File organization:

* `my_functions.py` houses the logic that we want to compute.
Note (1) how the functions are named, and what input
parameters they require. That is how we create a DAG modeling the dataflow we want to happen.
Note (2) that we have a custom extract_columns decorator there for now. This is because we don't have
a way to parameterize the dataframe library for the extract_columns function yet. This will be fixed in the future.
* `my_script.py` houses how to get Hamilton to create the DAG, specifying that we want a polars dataframe and
exercise it with some inputs.

To run things:
```bash
> python my_script.py
```

If you have questions, or need help with this example,
join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help!
48 changes: 48 additions & 0 deletions examples/polars/my_functions.py
@@ -0,0 +1,48 @@
import polars as pl

from hamilton.function_modifiers import extract_columns


@extract_columns("signups", "spend") # , df_type=pl.DataFrame, series_type=pl.Series)
def base_df(base_df_location: str) -> pl.DataFrame:
"""Loads base dataframe of data.

:param base_df_location: just showing that we could load this from a file...
:return:
"""
return pl.DataFrame(
{
"signups": pl.Series([1, 10, 50, 100, 200, 400]),
"spend": pl.Series([10, 10, 20, 40, 40, 50]),
}
)


def avg_3wk_spend(spend: pl.Series) -> pl.Series:
"""Rolling 3 week average spend."""
return spend.rolling_mean(3)


def spend_per_signup(spend: pl.Series, signups: pl.Series) -> pl.Series:
"""The cost per signup in relation to spend."""
return spend / signups


def spend_mean(spend: pl.Series) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()


def spend_zero_mean(spend: pl.Series, spend_mean: float) -> pl.Series:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean


def spend_std_dev(spend: pl.Series) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pl.Series, spend_std_dev: float) -> pl.Series:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
55 changes: 55 additions & 0 deletions examples/polars/my_script.py
@@ -0,0 +1,55 @@
import logging
import sys
from typing import Dict

import polars as pl

from hamilton import base, driver

logging.basicConfig(stream=sys.stdout)


class PolarsDataFrameResult(base.ResultMixin):
"""We need to create a result builder for our use case. Hamilton doesn't have a standard one
to use just yet. If you think it should -- let's chat and figure out a way to make it happen!
"""

def build_result(self, **outputs: Dict[str, pl.Series]) -> pl.DataFrame:
"""This is the method that Hamilton will call to build the final result. It will pass in the results
of the requested outputs that you passed in to the execute() method.
:param outputs: The results of the requested outputs.
:return: a polars DataFrame.
"""
if len(outputs) == 1:
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.DataFrame): # it's a dataframe
return value
elif not isinstance(value, pl.Series): # it's a single scalar
return pl.DataFrame([outputs])
# TODO: check for length of outputs and determine what should
# happen for mixed outputs that include scalars for example.
return pl.DataFrame(outputs)


adapter = base.SimplePythonGraphAdapter(result_builder=PolarsDataFrameResult())
config = {
"base_df_location": "dummy_value",
}
import my_functions # where our functions are defined

dr = driver.Driver(config, my_functions, adapter=adapter)
# note -- we cannot request scalar outputs like we could do with Pandas.
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_zero_mean_unit_variance",
]
# let's create the dataframe!
df = dr.execute(output_columns)
print(df)

# To visualize do `pip install sf-hamilton[visualization]` if you want these to work
# dr.visualize_execution(output_columns, './my_dag.dot', {})
# dr.display_all_functions('./my_full_dag.dot')
2 changes: 2 additions & 0 deletions examples/polars/requirements.txt
@@ -0,0 +1,2 @@
polars
sf-hamilton
87 changes: 77 additions & 10 deletions hamilton/function_modifiers/expanders.py
@@ -1,6 +1,6 @@
import functools
import inspect
from typing import Any, Callable, Collection, Dict, Tuple, Union
from typing import Any, Callable, Collection, Dict, Optional, Tuple, Type, Union

import pandas as pd
import typing_inspect
Expand Down Expand Up @@ -307,15 +307,75 @@ class parameterized_inputs(parameterize_sources):
pass


# prototyping some functionality here -- use singledispatch which will at run time
# determine the right function to call.
@functools.singledispatch
def get_column(df, column_name: str):
raise NotImplementedError()


@get_column.register(pd.DataFrame)
def get_column_pandas(df: pd.DataFrame, column_name: str) -> pd.Series:
return df[column_name]


# but singledispatch doesn't work with types -- so we need a way to
# register functions to resolve what the column type is for a given dataframe type.
df_type_to_column_type_functions = []


def register_df_type_to_column_type_function(func: Callable):
global df_type_to_column_type_functions
df_type_to_column_type_functions.append(func)


def pandas_series(dataframe_type: Type) -> Optional[Type]:
if dataframe_type == pd.DataFrame:
return pd.Series
return None


register_df_type_to_column_type_function(pandas_series)

try:
import polars as pl

@get_column.register(pl.DataFrame)
def get_column_polars(df: pl.DataFrame, column_name: str) -> pl.Series:
return df[column_name]

def polars_series(dataframe_type: Type) -> Optional[Type]:
if dataframe_type == pl.DataFrame:
return pl.Series
return None

register_df_type_to_column_type_function(polars_series)
except ModuleNotFoundError:
pass


def get_column_type_from_df_type(dataframe_type: Type) -> Type:
for func in df_type_to_column_type_functions:
series_type = func(dataframe_type)
if series_type is not None:
return series_type
raise ValueError(f"Unknown dataframe type: {dataframe_type}. Cannot get Column type.")


class extract_columns(base.NodeExpander):
def __init__(self, *columns: Union[Tuple[str, str], str], fill_with: Any = None):
def __init__(
self,
*columns: Union[Tuple[str, str], str],
fill_with: Any = None,
):
"""Constructor for a modifier that expands a single function into the following nodes:
- n functions, each of which take in the original dataframe and output a specific column
- 1 function that outputs the original dataframe

:param columns: Columns to extract, that can be a list of tuples of (name, documentation) or just names.
:param fill_with: If you want to extract a column that doesn't exist, do you want to fill it with a default value?
Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a column.
Or do you want to error out? Leave empty/None to error out, set fill_value to dynamically create a column. Note:
fill with only works with dataframe libraries that support scalar assignment.
"""
if not columns:
raise base.InvalidDecoratorException(
Expand All @@ -334,10 +394,13 @@ def validate(self, fn: Callable):
:param fn: Function to validate.
:raises: InvalidDecoratorException If the function does not output a Dataframe
"""

output_type = inspect.signature(fn).return_annotation
if not issubclass(output_type, pd.DataFrame):
try:
get_column_type_from_df_type(output_type)
except NotImplementedError:
raise base.InvalidDecoratorException(
f"For extracting columns, output type must be pandas dataframe, not: {output_type}"
f"Error {fn} does not output a dataframe. The extract_columns decorator needs a function that outputs a dataframe."
)

def expand_node(
Expand All @@ -354,11 +417,12 @@ def expand_node(
fn = node_.callable
base_doc = node_.documentation

def df_generator(*args, **kwargs) -> pd.DataFrame:
def df_generator(*args, **kwargs):
df_generated = fn(*args, **kwargs)
if self.fill_with is not None:
for col in self.columns:
if col not in df_generated:
# this wont work for polars for example
df_generated[col] = self.fill_with
return df_generated

Expand All @@ -371,22 +435,25 @@ def df_generator(*args, **kwargs) -> pd.DataFrame:

def extractor_fn(
column_to_extract: str = column, **kwargs
) -> pd.Series: # avoiding problems with closures
): # avoiding problems with closures
df = kwargs[node_.name]
if column_to_extract not in df:
raise base.InvalidDecoratorException(
f"No such column: {column_to_extract} produced by {node_.name}. "
f"It only produced {str(df.columns)}"
)
return kwargs[node_.name][column_to_extract]
return get_column(df, column_to_extract)

output_type = inspect.signature(fn).return_annotation
series_type = get_column_type_from_df_type(output_type)

output_nodes.append(
node.Node(
column,
pd.Series,
series_type, # set output type here
doc_string,
extractor_fn,
input_types={node_.name: pd.DataFrame},
input_types={node_.name: output_type}, # set input type requirement here
tags=node_.tags.copy(),
)
)
Expand Down