For end users wanting to build a new pipeline verb or add pipeline functionality to a new data source, there are two functions to build new pipeline parts:
from pydatapipes.pipes import singledispatch_pipeverb, make_pipesource
import pandas as pd
# generic version which defines the API and should raise NotImplementedError
@singledispatch_pipeverb
def append_col(input, x = 1):
"""Appends x to the data source"""
raise NotImplementedError("append_col is not implemented for data of type %s" % type(input))
# concrete implementation for pandas.DataFrame
@append_col.register(pd.DataFrame)
def append_col_df(input, x = 1):
# always ensure that you return new data!
copy = input.copy()
copy["X"] = x
return copy
# ensure that pd.DataFrame is useable as a pipe source
make_pipesource(pd.DataFrame)
This can then be used in a pipeline:
import pandas as pd
print(pd.DataFrame({"a" : [1,2,3]}) >> append_col(x=3))
a X
0 1 3 1 2 3 2 3 3
The above example implements a pipeline verb for pandas.DataFrame
, but due to the useage of singledispatch
, this is generic. By implementing additional append_col_<data_source_type>()
functions and registering it with the original append_col
function, the append_col
function can be used with other data sources, e.g. SQL databases, HDF5, or even builtin data types like list
or dict
:
@append_col.register(list)
def append_col_df(input, x = 1):
return input + [x]
[1, 2] >> append_col()
[1, 2, 1]
If a verb has no actual implementation for a data source, it will simply raise an NotImplementedError
:
try:
1 >> append_col()
except NotImplementedError as e:
print(e)
append_col is not implemented for data of type <class 'int'>
singledispatch
also makes it easy to work with grouped and ungrouped pd.DataFrame
s:
@singledispatch_pipeverb
def groupby(input, columns):
"""Group the input by columns"""
raise NotImplementedError("groupby is not implemented for data of type %s" % type(input))
@groupby.register(pd.DataFrame)
def groupby_DataFrame(input, columns):
"""Group a DataFrame"""
return input.groupby(columns)
@singledispatch_pipeverb
def summarize_mean(input):
"""Summarize the input via mean aggregation"""
raise NotImplementedError("summarize_mean is not implemented for data of type %s" % type(input))
@summarize_mean.register(pd.DataFrame)
def summarize_mean_DataFrame(input):
"""Summarize a DataFrame via mean aggregation"""
return input.mean()
@summarize_mean.register(pd.core.groupby.GroupBy)
def summarize_mean_GroupBy(input):
"""Summarize a grouped DataFrame via mean aggregation"""
return input.mean()
df = pd.DataFrame({"a" : [1, 2, 3, 4], "b": [1, 1, 2, 2]})
print(df >> summarize_mean())
a 2.5 b 1.5 dtype: float64
print(df >> groupby("b") >> summarize_mean())
a
b 1 1.5 2 3.5
Compared to R's implementation in the magrittr package, input >> verb(x)
can't be used as verb(input, x)
.
The problem here is that verb(x)
under the hood constructs a helper object (PipeVerb
) which is used in the rshift operation. At the time of calling verb(...)
, we can't always be sure whether we want an object which can be used in the pipeline or already compute the result. As an example consider a verb merge(*additional_data)
. You could call that as data >> merge(first, second)
to indicate that you want all three (data
, first
, and second
) merged. On the other hand, merge(first, second)
is also valid ("merge first
and second
together).
To help work around this problem, the convenience decorator singledispatch_pipeverb
is actually not the best option if you want to create reuseable pipline verbs. Instead, the singledispatch_pipeverb
decorator is also available in two parts, so that one can both expose the original function (with singledispatch
enabled) and the final pipeline verb version:
#from pydatapipes.pipes import pipeverb, singledispatch
# first use singledispatch on the original function, but define it with a trailing underscore
@singledispatch
def my_verb_(input, x=1, y=2):
raise NotImplemented("my_verb is not implemented for data of type %s" % type(input))
# afterwards convert the original function to the pipeline verb:
my_verb = pipeverb(my_verb_)
# concrete implementations can be registered on both ``my_verb`` and ``my_verb_``
@my_verb_.register(list)
def my_verb_df(input, x=1, y=2):
return input + [x, y]
A user can now use both versions:
[1] >> my_verb(x=2, y=3)
[1, 2, 3]
my_verb_([9], x=2, y=3)
[9, 2, 3]
To work as a pipline verb, functions must follow these rules:
- Pipelines assume that the verbs itself are side-effect free, i.e. they do not change the inputs of the data pipeline. This means that actual implementations of a verb for a specific data source must ensure that the input is not changed in any way, e.g. if you want to pass on a changed value of a
pd.DataFrame
, make a copy first. - The initial function (not the actual implementations for a specific data source) should usually do nothing but simply raise
NotImplementedError
, as it is called for all other types of data sources.
The strength of the tidyverse is it's coherent API design. To ensure a coherent API for pipeline verbs, it would be nice if verbs would follow these conventions:
- Pipeline verbs should actually be named as verbs, e.g. use
input >> summarize()
instead ofinput >> Summary()
- If you expose both the pipeline verb and a normal function (which can be called directly), the pipeline verb should get the "normal" verb name and the function version should get an underscore
_
appended:x >> verb()
->verb_(x)
- The actual implementation function of a
verb()
for a data source of classType
should be calledverb_Type(...)
, e.g.select_DataFrame()
So what is missing? Quite a lot :-)
- Symbolic expressions: e.g.
select(X.x)
instead ofselect("x")
- Helper for dplyr style column selection (e.g.
select(starts_with("y2016_"))
andselect(X[X.first_column:X.last_column])
) - all the dplyr, tidyr, ... verbs which make the tidyverse so great
Some of this is already implemented in the other dplyr like python libs (pandas-ply, dplython, and dfply), so I'm not sure how to go on. I really like my versions of pipelines but duplicating the works of them feels like a waste of time. So my next step is seeing if it's possible to integrate this with one of these solutions, probably dfply as that looks the closest implementation.