In [12]:
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt
plt.style.use('dark_background')

In [13]:
def linear(n=1000,a=0,b=0.1,start_date='2000-01-01'):
    x=np.random.normal(0,0.01,n)
    y=a+b*x+np.random.normal(0,0.01,n)
    dates=pd.date_range(start_date,periods=n,freq='D')
    data=pd.DataFrame(np.hstack((y[:,None],x[:,None])),columns=['y1','x1'],index=dates)
    return data
df1 = linear(n=1000,a=0,b=0.1,start_date='2000-01-01')
df2 = linear(n=1000,a=0,b=0.1,start_date='2000-01-01')

print(df1)
print(df2)

                  y1        x1
2000-01-01  0.001233  0.002310
2000-01-02  0.010105 -0.002970
2000-01-03  0.003227  0.021693
2000-01-04 -0.000184 -0.009007
2000-01-05 -0.006589 -0.008609
...              ...       ...
2002-09-22 -0.011448  0.000793
2002-09-23  0.014882  0.008922
2002-09-24  0.002278  0.012777
2002-09-25  0.004196  0.012272
2002-09-26  0.001667  0.016200

[1000 rows x 2 columns]
                  y1        x1
2000-01-01 -0.003928  0.011351
2000-01-02  0.006271 -0.004165
2000-01-03 -0.016271  0.002832
2000-01-04  0.007524  0.011465
2000-01-05 -0.004329  0.010114
...              ...       ...
2002-09-22  0.001471  0.002168
2002-09-23  0.000751  0.014955
2002-09-24  0.016390 -0.006712
2002-09-25  0.000866  0.017879
2002-09-26  0.006694  0.005148

[1000 rows x 2 columns]


In [19]:
class ModelPipe:
    def __init__(self, key:str = 'Master', model=None, transforms={}):
        self.key = key or 'Master'        
        self.model = copy.deepcopy(model)
        self.transforms = copy.deepcopy(transforms) if transforms else {}
        self.model_pipes = {}
    
    # add pipes
    def add(self, key, model, transforms):
        # check types
        self.model_pipes[key] = ModelPipe(key, model, transforms)
        return self
    
    # estimate model
    def estimate(self, data):
        '''
        Estimate model pipe on data
        '''
        # estimate transforms
        self.estimate_transforms(data)
        # apply transforms
        self.apply_transforms(data)
        # estimate model
        self.estimate_model(data)
        return self

    def estimate_transforms(self, data):
        for variable, transform in self.transforms.items():            
            transform.estimate(getattr(data, variable))

    def apply_transforms(self, data):
        pass
        #for variable, transform in self.transforms.items():            
        #    transform.estimate(getattr(data, variable))

    def estimate_model(self, data):
        # store estimate data - to be used later to make sure
        # that the evaluation data matches what is expected....
        # maybe not necessary to store all fields, perhaps we can
        # just store some metainfo like cols
        self._estimate_data = data.copy()       
        # just put here all dicts - easier to read
        self.model.estimate(**data.as_dict())
        return self

    # get weight
    def get_weight(self, xq, x, y, z, t, apply_transform_x = True, apply_transform_t = True, apply_transform_y = True):
        # process inputs
        if apply_transform_y: y = self.transform_y(y, True)
        if x is not None:
            if apply_transform_x: x = self.transform_x(x, True)
        if t is not None:
            if apply_transform_t: t = self.transform_t(t, True)         
        if xq is not None:
            if apply_transform_x: xq = self.transform_x(xq, True)
        return self.model.get_weight(**{'y': y, 'x': x, 'xq': xq, 'z':z, 't':t})

    def live(self, data, idx = None):
        # just to make clear that this has exactly the same functional form as what is
        # done in evaluate

        # get data at idx (including multisequence filter)
        
        # apply transforms

        # get weight from model
        return self.get_weight(data.model_input(idx))

    def evaluate(self, data):
        """Evaluate the model using the test data and return performance metrics."""
        # this will change fields s, weight_* in data object inplace        
        # iterate on data and run live        
        for i in range(data.n):            
            data.w[i] = self.live(data, i)
        # compute performance
        data.s = np.einsum('ij,ij->i', data.w, data.y)
        return data
    
    def do_something(self, factor=1):
        # Do something with self.
        result = f"Value: {self.value if self.value is not None else 'None'} (x{factor})"
        # Recursively call do_something on all children and collect their results.
        children_results = {key: child.do_something(factor) for key, child in self.children.items()}
        return {"self": result, "children": children_results}

# Usage:
root = A("root")
root.add_child("child1", A(10))
root.add_child("child2", A(20))
# Further nesting:
root.children["child1"].add_child("grandchild", A(5))

# Calling do_something on the root propagates through the tree.
import pprint
pprint.pprint(root.do_something(factor=2))


{'children': {'child1': {'children': {'grandchild': {'children': {},
                                                     'self': 'Value: 5 (x2)'}},
                         'self': 'Value: 10 (x2)'},
              'child2': {'children': {}, 'self': 'Value: 20 (x2)'}},
 'self': 'Value: root (x2)'}


In [20]:
slice(0,5)

slice(0, 5, None)

In [None]:
# only one data
model = Model()

# data should be transformed!
# better create a model pipeline
model_pipe = ModelPipe(model, transforms)

model_pipe.estimate(data)
model_pipe.evaluate(data)
model_pipe.get_weight(data)

# this yields a workflow like
class Workflow:
    def __init__(self, model_pipe):
        self.model_pipe = model_pipe

    def cvbt(self, data):
        # split data
        splits = data.split()
        # for each split
        for split in splits:
            tmp = self.model_pipe.copy()
            tmp.estimate(splits - split)
            tmp.evaluate(tmp)
        return results
    
    def estimate(self, data):
        # save data for estimation
        self.model_pipe.estimate(data)
    
    def evaluate(self, data):
        self.model_pipe.evaluate(data)
        return results
    
    def get_weight(self, data):
        self.model_pipe.get_weight(data)
# then we can proceed with the post processing - rethink on output format
# probably outputs can be in data (it modifies it and we pick from there...)

# even better
model_pipe = ModelPipe()
model_pipe.add(transforms1, model1)
model_pipe.add(transforms2, model2)
# when we estimate, we estimate all models with the corresponding transforms
# then the weights are averaged out!


In [None]:
# now with more data
# same thing I think
class Workflow:
    def __init__(self, model_pipes):
        # model_pipe is dict like object
        self.model_pipes = model_pipes

    def cvbt(self, data):
        # data must be a dict like and
        # there should be a model_pipe for that key...
        
        # split data
        splits = data.split()
        # for each split
        for split in splits:
            tmp = self.model_pipe.copy()
            tmp.estimate(splits - split)
            tmp.evaluate(tmp)
        # all of this is done in a individual format
        return results
    
    def estimate(self, data):
        # save data for estimation
        self.model_pipe.estimate(data)
    
    def evaluate(self, data):
        self.model_pipe.evaluate(data)
        return results
    
    def get_weight(self, data):
        self.model_pipe.get_weight(data)
        
# even better
model_pipe = ModelPipe()
model_pipe.add(key1, transforms1, model1)
model_pipe.add(key1, transforms2, model2)
model_pipe.add(key2, transforms3, model3)
# for key 1 we have two models to be averaged out
# for key one there is only one model

# when we estimate, we estimate all models with the corresponding transforms
# then the weights are averaged out!        
        
        

In [None]:
# now with a portfolio model
# how to proceed???
# find a way to estimate how different model mix together!
model_pipe = ModelPipe(portfolio_model)
model_pipe.pipe(key1, transforms1, model1)
model_pipe.pipe(key1, transforms2, model2)
model_pipe.pipe(key2, transforms3, model3)

# how should a model pipe be structured



In [3]:

class ModelPipe:
    def __init__(self, 
                 portfolio_model: 'PortfolioModel' = None
                ):
        self.portfolio_model = portfolio_model
        self.child_models = {}
    
    def view(self, level = 0):
        print(f"{' '*level}ModelPipe")
        for k,v  in self.child_models.items():
            print(f"{' '*level}key: {k}")
            v.view(level+1)
            
    def add(self, key:str, model:'Model', transform:'Transform' = None):
        if key not in self.child_models:
            self.child_models[key] = ModelPipe()         
        #self.child_models[key].add(key = key, model = model, transform = transform)
        
    def estimate(self, dataset:'Dataset'):
        # estimate portfolio_model
        
        # if there is a master model we need to
        # stack all data, fit the model and associate the 
        # model to each self.models...
        
        pass
    
    def evaluate(self, dataset:'Dataset'):
        pass

mp = ModelPipe()
mp.add('key1', 5)
mp.view()
    

ModelPipe
key: key1
 ModelPipe


In [None]:
# for a single dataset!

# base model pipe unit
class ModelPipeUnit:
    def __init__(self, model = None, transforms = None):
        self.model = model
        self.transforms = transforms
    
    def estimate(self, dataset):
        # apply transforms
        # estimate model
        pass
    
    def evaluate(self, dataset):
        # transform dataset
        # evaluate model
        # change dataset in place
        return dataset

# list of model pipes
# objective here is to be able to average many models
# of we could just build an ensemble model
# but each model may have it's own data processing and so
# this makes more sense!

# maybe we should just inherit from a list to make
# this looks nicer
class ModelPipeStack(list):
    
    def add(self, model = None, transforms = None):
        self.append(ModelPipeUnit(model, transforms))
    
    def estimate(self, dataset):
        # pipes cannot be empty!
        for pipe in self:
            pipe.estimate(dataset.copy())
    
    def evaluate(self, dataset):
        res = []
        for pipe in self:
            tmp = pipe.evaluate(dataset.copy())
            res.append(tmp)
        # copied datasets were changed in place
        # take mean of weights to compute performance
        # dataset.w = mean of w in res
        return dataset

# this behaves like a dict and so maybe it can inherit from it
class ModelPipeContainer(dict):    
    def add(self, key, model, transforms = None):
        if key not in self:
            self[key] = ModelPipeStack()
        self[key].add(model, transforms)
    
    def estimate(self, datasets):
        for k, dataset in datasets:
            self[k].estimate(dataset)
    
    def evaluate(self, datasets):
        for k, dataset in dataset_dict.items():
            self[k].evaluate(dataset)   
        return datasets
    
class ModelPipe():
    def __init__(self, portfolio_model = None):
        self.portfolio_model = portfolio_model
        self.model_pipe_container = ModelPipeContainer()
    
    def add(self, key, model, transforms = None):
        self.model_pipe_container.add(key, model, transforms)
    
    def estimate(self, datasets):        
        # dataset_dict is a dict of dataset        
        if self.portfolio_model:
            self.portfolio_model.estimate(datasets, self.model_pipe_container)
        self.model_pipe_container.estimate(datasets)

    def evaluate(self, dataset):
        # dataset_dict is a dict of dataset
        for k, dataset in dataset_dict.items():
            self.model_pipes[k].evaluate(dataset)
        # correct predictions/weights with portfolio model!        
        return dataset
    

In [None]:
# this should be a function that 
cvbt_path(dataset, model_pipe)
cvbt() # just run cvbt_path many times...

In [None]:
class PortfolioModel:
    def __init__(self):
        pass
    
    def estimate(self, datasets, model_pipe_container):
        # cvbt with single path...
        cvbt_path()
        # this should write a dict with k:weight
        # weights should be normalized..
    

In [None]:
PortfolioModel.pipe(Transforms).

In [None]:
model = Model()
model.apply()


In [None]:
data = Data()
data.apply(transform)
data.apply(model)

dataset = Dataset()
dataset.add(data)



In [None]:
class Workflow:
    def __init__(self, dataset, transformers, models):
        self.dataset = dataset
        self.transformers = transformers
        self.models = models

    def estimate(self):
        data = self.dataset
        for transformer in self.transformers:
             data = data.apply(transformer)
        for model in self.models:
             model.fit(data)

    def predict(self, new_data):
        data = new_data
        for transformer in self.transformers:
             data = data.apply(transformer)
        predictions = {model: model.predict(data) for model in self.models}
        return predictions
    
def cvbt(workflow: Workflow):
    # split
    splits = workflow.split()
    for split in splits:
        # build train workflow
        tmp_workflow = Workflow(train_dataset, )
        # evalute on test data
        tmp_workflow.evaluate(test_data)
    return results

workflow = Workflow(...)
results = cvbt(workflow)

        

In [7]:
# abstract classes

class PredictiveDistribution:
    def __init__(self, mean, cov):
        self.mean = mean
        self.cov = cov
    
    def get_weight(self):
        pass
    

class Weight:
    def __init__(self, w:np.ndarray):
        self.w = w

class Model(ABC):
    
    @abstractmethod
    def estimate(self,y: np.ndarray, **kwargs):
        """Subclasses must implement this method"""
        pass

    @abstractmethod
    def get_weight(self, **kwargs) -> Weight:
        """Subclasses must implement this method"""
        pass

# Portfolio Model class template
class PortfolioModel(ABC):

    @abstractmethod
    def view(self):
        pass

    @abstractmethod
    def estimate(self, **kwargs):
        """Subclasses must implement this method"""
        pass

# Data Transform template
class Transform(ABC):
    
    @abstractmethod
    def view(self):
        pass

    @abstractmethod
    def estimate(self, **kwargs):
        """Subclasses must implement this method"""
        pass

    @abstractmethod
    def transform(self, **kwargs):
        """Subclasses must implement this method"""
        pass
    
    @abstractmethod
    def inverse_transform(self, **kwargs):
        """Subclasses must implement this method"""
        pass



In [18]:
class MyClass:
    def __init__(self):
        self._data = {}

    def __getitem__(self, key):
        # Auto‑create a sub‑instance if key is missing.
        if key not in self._data:
            self._data[key] = MyClass()
        return self._data[key]

    def __setitem__(self, key, value):
        self._data[key] = value

    def __delitem__(self, key):
        del self._data[key]

    def __iter__(self):
        return iter(self._data)

    def __len__(self):
        return len(self._data)

    def keys(self):
        return self._data.keys()

    def values(self):
        return self._data.values()

    def items(self):
        return self._data.items()

    def do_something(self, *args, **kwargs):
        # Do something for this instance
        print("Doing something on", self)
        # Recursively call do_something on each sub‑instance that is a MyClass instance
        for value in self._data.values():
            if isinstance(value, MyClass):
                value.do_something(*args, **kwargs)

    def __repr__(self):
        return repr(self._data)

# Usage example:
root = MyClass()
root['a']['b'] = 42   # Accessing 'a' automatically creates a new MyClass instance.
print("Tree structure:", root)
root.do_something()

Tree structure: {'a': {'b': 42}}
Doing something on {'a': {'b': 42}}
Doing something on {'b': 42}


In [None]:
# for a single dataset..

dataset = Dataset(df)

model_pipe = ModelPipe(model, transforms)
model_pipe.estimate(dataset)

model_pipe.evaluate(dataset_test)
# best to store it all on model_pipe!
# model_pipe acts on data...
# more similar to first version!





In [None]:

# how should a workflow operate?
# -----
# what does a model pipeline has?
# - can act on different datasets
# - if not specified, just applied the same model to all datasets, otherwise needs to be compatible 
# with the data
# - if specified, join all data to train it as a "master" model
# - data can suffer transformations 
# - has a portfolio model that specified how the models should be joined!
# this portfolio model can take into account as well strategy performance statistics
# to make the decision on how to allocate
# ALSO
# must specify how does the models are trained and evaluated!

# what do we have?
# - dataset
# - portfolio model
# - transforms
# - model

    

    
    
    
    
    
    
    
class ModelPipe:
    def __init__(self):
        pass
    
    def estimate(self, dataset:Dataset):
        '''
        After estimate the model pipe is configure
        to work on data that has the same format        
        '''        
        pass
    
    def evaluate(self, dataset:Dataset):
        '''
        Need to check if the input dataset 
        makes sense to the one it was trained on
        '''
        assert self.estimate_dataset.is_compatible(dataset), "can only evaluate in compatible datasets"
        
        return res
    
    def get_weight(self):
        pass
    


# create model pipe

# when we run estimate we can do
model.estimate(dataset)
# and the model get trained

# when we run evaluate we can do
out = model.evaluate(dataset)
# and we get the output of an estimation

# so, when we do cvbt, the model can make many call to estimate
# but internally it builds the splits and the calls to estimate
# and evaluate necessary
out = model.cvbt(dataset)

# WHAT WE NEED?
# dataset must make sense for the model that was defined...
# add checks for this?





# for live we have a pd.DataFrame and a trained model and we
# need to call something like
model = load_model('filemodel.pkl')
model.get_weight(dataset) 
# or should it be 
model.live(dataset)
# ?






# create dataset from input dataframes
dataset = Dataset({'dataset1':df1, 'dataset2':df2})



