In [132]:
import pandas as pd
import numpy as np
import random

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import FunctionTransformer
from collections import ChainMap
from sklearn import set_config

In [133]:
set_config(display="diagram")

In [308]:
from sklearn.base import BaseEstimator, TransformerMixin

class CommunitySetter(BaseEstimator, TransformerMixin):
    def __init__(self, community_name):
        self.community_name = community_name

    def fit(self, x, y=None):
        return self

    def transform(self, df):
        return df.assign(community_name=self.community_name).set_index(["community_name"], append=True)


class ItemSelector(BaseEstimator, TransformerMixin):
    """For data grouped by feature, select subset of data at a provided key.

    The data is expected to be stored in a 2D data structure, where the first
    index is over features and the second is over samples.  i.e.

    >> len(data[key]) == n_samples

    Please note that this is the opposite convention to scikit-learn feature
    matrixes (where the first index corresponds to sample).

    ItemSelector only requires that the collection implement getitem
    (data[key]).  Examples include: a dict of lists, 2D numpy array, Pandas
    DataFrame, numpy record array, etc.

    >> data = {'a': [1, 5, 2, 5, 2, 8],
               'b': [9, 4, 1, 4, 1, 3]}
    >> ds = ItemSelector(key='a')
    >> data['a'] == ds.transform(data)

    ItemSelector is not designed to handle data grouped by sample.  (e.g. a
    list of dicts).  If your data is structured this way, consider a
    transformer along the lines of `sklearn.feature_extraction.DictVectorizer`.

    Parameters
    ----------
    key : hashable, required
        The key corresponding to the desired value in a mappable.
    """
    def __init__(self, key):
        self.key = key

    def fit(self, x, y=None):
        return self

    def transform(self, data_dict):
        return data_dict[self.key]

In [199]:
class ptest:
    def __init__(self, name=None):
        self.name = name
        self.a = pd.DataFrame({
            "A_id": [f"a{i}" for i in range(100)],
            "B_id": [f"b{random.randint(0, 9)}" for i in range(100)],
            "x": [random.randint(0, 1000) for i in range(100)],
            "y": [random.randint(0, 1000) for i in range(100)],
            "z": [random.randint(0, 1000) for i in range(100)],
        }).set_index("A_id")
        self.b = pd.DataFrame({
            "B_id": [f"b{i}" for i in range(10)],
            "b1": [random.randint(0, 50) for i in range(10)],
            "b2": [random.randint(0, 50) for i in range(10)],
        }).set_index("B_id")

In [200]:
p = ptest()

In [343]:
multiple_ps = {'A': ptest('A'), 'B': ptest('B'), 'C':ptest('C')}

In [109]:
p.b

Unnamed: 0_level_0,b1,b2
B_id,Unnamed: 1_level_1,Unnamed: 2_level_1
b0,13,26
b1,12,36
b2,25,19
b3,19,38
b4,21,16
b5,19,42
b6,28,18
b7,2,41
b8,49,25
b9,15,43


In [121]:
def x_plus_y(X):
    return {
        'x+y': X.a.groupby(by="B_id").apply(lambda g: g['x'].sum() + g['y'].sum())
    }

def x_per_b(X):
    return {
        'x_per_b': X.a.groupby(by="B_id")['x'].nunique()
    }

def keep_features(X, view, keep):
    return {
        f: getattr(X, view)[f] for f in keep
    }

In [122]:
funcs = [x_plus_y, x_per_b, keep_features]
feature_transformers = [(f.__name__, FunctionTransformer(f)) for f in funcs]

In [123]:
pipe = Pipeline([
    ('b_features', FeatureUnion(
        feature_transformers# + [
        #    ('keep_features', FunctionTransformer(keep_features, kw_args={'keep':["b1","b2"]}))
        #]
    )),
    ('feature_rows_to_df', FunctionTransformer(lambda feats: pd.DataFrame(dict(ChainMap(*feats))))),
])

In [124]:
pipe.set_params(**{
    'b_features__keep_features__kw_args': {'view': 'b', 'keep': ["b1","b2"]}
})
pipe.transform(p)

Unnamed: 0_level_0,b1,b2,x_per_b,x+y
B_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
b0,13,26,6,4244
b1,12,36,13,16880
b2,25,19,9,9796
b3,19,38,7,8756
b4,21,16,4,3078
b5,19,42,9,10525
b6,28,18,6,6563
b7,2,41,13,13248
b8,49,25,21,21852
b9,15,43,12,9365


In [87]:
pipe.get_params()

{'memory': None,
 'steps': [('features',
   FeatureUnion(transformer_list=[('x_plus_y',
                                   FunctionTransformer(func=<function x_plus_y at 0x0000020D0EDA85E0>)),
                                  ('x_per_b',
                                   FunctionTransformer(func=<function x_per_b at 0x0000020D0EC1E160>)),
                                  ('keep_features',
                                   FunctionTransformer(func=<function keep_features at 0x0000020D0EDB6F70>))])),
  ('feature_rows_to_df',
   FunctionTransformer(func=<function <lambda> at 0x0000020D0EFE5AF0>))],
 'verbose': False,
 'features': FeatureUnion(transformer_list=[('x_plus_y',
                                 FunctionTransformer(func=<function x_plus_y at 0x0000020D0EDA85E0>)),
                                ('x_per_b',
                                 FunctionTransformer(func=<function x_per_b at 0x0000020D0EC1E160>)),
                                ('keep_features',
                  

In [347]:
def generate_community_pipeline(communities, feature_pipe):

    community_transformers = [
        (community.name, Pipeline([
            ('selector', ItemSelector(key=community.name)),
            ('features', feature_pipe),
            ('add_community_name', CommunitySetter(community_name=community.name)),
            ('package', FunctionTransformer(
                lambda df: {'features': df }
            ))
        ]))
        for community in communities.values()
    ]

    pipe = Pipeline([
        ('community_features', FeatureUnion(transformer_list=community_transformers)),
        ('stack_communities', FunctionTransformer(lambda arr: pd.concat(
            [
                #res['features'].assign(community_name=res['community'].name).set_index(["community_name"], append=True)
                res['features']
                for res in arr.tolist()
            ],
            axis=0
        ))),
        #('feature_rows_to_df', FunctionTransformer(lambda feats: pd.DataFrame(dict(ChainMap(*feats))))),
    ])

    return pipe

In [348]:
pipe_all = pipeline_generator(multiple_ps, pipe)

In [349]:
pipe_all.transform(multiple_ps)

Unnamed: 0_level_0,Unnamed: 1_level_0,b1,b2,x_per_b,x+y
B_id,community_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
b0,A,5,37,13,18108
b1,A,26,29,8,8548
b2,A,12,29,12,11206
b3,A,42,40,8,8156
b4,A,31,42,7,7541
b5,A,48,47,10,10349
b6,A,39,28,9,7073
b7,A,8,18,12,12734
b8,A,36,14,9,8856
b9,A,10,27,9,8924


In [244]:
pipe_all

In [None]:
            #lambda dfs: pd.concat(dfs, axis=1)

"""
lambda pkgs: pd.concat(
    [
        df.assign(community_name=cname).set_index(["community_name"], append=True)
        for cname, df in dict(pkgs).items()
    ],
    axis=1
)
"""