In [1]:
import numpy as np
import pandas as pd
from torch.nn import MSELoss
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy
from IPython.display import display, Math, Latex

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [3]:
def descriptive_length_of_fraction(numerator, denominator):
    return torch.log2((1 + torch.abs(torch.tensor(denominator))) * torch.abs(torch.tensor(numerator)))


def logplus(number):
    return torch.log2(1 + number**2) / 2


def descriptive_length_of_real_number(real_number, precision_floor=1e-8):
    return logplus(real_number / torch.tensor(precision_floor))

In [4]:
print(descriptive_length_of_fraction(1.0, 3.0))
print(descriptive_length_of_real_number(0.3333333345))

tensor(2.)
tensor(24.9905)


In [5]:
def FormFractionRepresentation(fraction: torch.tensor) -> str:
    if fraction[1].item() != 1:
        return r"\frac{" + str(int(fraction[0].item())) + "}{" + str(int(fraction[1].item())) + "}"
    return str(int(fraction[0].item()))


def FormReal(number: torch.tensor) -> str:
    return "{:.3}".format(number.item())


def AddRationalInName(name: str) -> str:
    if 'lambda' in name:
        position = name.find('lambda')
    else:
        position = name.find('power')
    return name[:position] + "rational_" + name[position:]

In [6]:
def info(formula):
    print("depth: {}, number of variables: {}, total parameters: {}".format(
        formula.depth, formula.num_variables, len(formula.parameters)))
    
def PrintFormula(formula, mode="slow"):
#     info(network)
    if mode == "slow":
        display(Math(str(formula)))   
    else:
        print(formula)
        

In [7]:
class RecursiveFormula(nn.Module):
    """
    Class used for representing formulas
    
    Attributes:
        depth
        num_variables
        subformulas - list of subformulas of smaller depth, which are used for computing
    """
    
    def __init__(self, depth=0, num_variables=1):
        super(RecursiveFormula, self).__init__()
        self.depth = depth
        self.num_variables = num_variables
        self.subformulas = nn.ModuleList()
        # When depth is zero, formula is just a real number
        if depth == 0:
            new_lambda = nn.Parameter((2 * torch.randn((1, 1)))).requires_grad_(True)
            self.register_parameter("lambda_0", new_lambda)
            new_rational_lambda = nn.Parameter(torch.tensor([0., 0.])).requires_grad_(False)
            self.register_parameter("rational_lambda_0", new_rational_lambda)
        else:
            for i in range(self.num_variables):
                # When depth is 1, we do not need to create subformulas, since they would be just real numbers
                if self.depth != 1:
                    subformula = RecursiveFormula(self.depth - 1, self.num_variables)
                    self.subformulas.append(subformula)
                new_lambda = nn.Parameter((2 * torch.randn((1, 1)))).requires_grad_(True)
                new_power = nn.Parameter((2 * torch.randn((1, 1)))).requires_grad_(True)
                new_rational_lambda = nn.Parameter(torch.tensor([0., 0.])).requires_grad_(False)
                new_rational_power = nn.Parameter(torch.tensor([0., 0.])).requires_grad_(False)
                self.register_parameter("lambda_{}".format(i), new_lambda)
                self.register_parameter("power_{}".format(i), new_power)
                self.register_parameter("rational_lambda_{}".format(i), new_rational_lambda)
                self.register_parameter("rational_power_{}".format(i), new_rational_power)
            self.last_subformula = RecursiveFormula(self.depth - 1, self.num_variables)
                                    
    def forward(self, x):
        """
        Iterate over subformulas, recursively computing result using results of subformulas
        """
        # When depth is 0, we just return the corresponding number
        if self.depth == 0:
            return self.get_lambda(0).repeat(x.shape[0], 1)
        
        ans = torch.zeros(x.shape[0], 1)
        for i in range(self.num_variables):
            x_powered = torch.t(x[:, i]**self.get_power(i))
            subformula_result = torch.ones((x.shape[0], 1))
            # When depth is 1, we do not need to compute subformulas
            if self.depth != 1:
                subformula_result = self.subformulas[i](x)
            ans += self.get_lambda(i) * x_powered * subformula_result           
        ans += self.last_subformula(x)
        return ans
    
    def simplify(self, X_val, y_val, max_denominator=10, inplace=False):
        """
        Simplifies the formula, iterating over all its parameters and trying to substitute them with close rational number
        
        Parameters:
            X_val: torch.tensor, shape (n_samples, n_features)
                Training vector, where n_samples is the number of samples and n_features is the number of features.
            y_val: torch.tensor, shape (n_samples, 1)
                Target vector relative to X.
            max_denominator: int
                algorithm tries rational numbers with denominator not greater than max_denominator
            inplace: bool
                if True, when modify the original formula
                otherwise return new formula, leaving original one unchanged
        Returns:
            self, if inplace set to True
            simplified_version otherwise
        """
        
        simplified_version = copy.deepcopy(self)  
        simplified_state_dict = simplified_version.state_dict()
        
        # Iterate over all parameters
        for key, value in self.state_dict().items():
            if "rational" not in key: # We do not simplify rational parameters - they will be the result of simplification
                simplified_version_for_iteration = copy.deepcopy(simplified_version)
                simplified_state_dict_for_iteration = simplified_version_for_iteration.state_dict()
                y_predict = simplified_version(X_val)
                loss = MSELoss()(y_val, y_predict)
                descriptive_length_of_loss = descriptive_length_of_real_number(loss)
                descriptive_length_of_existing_parameter = descriptive_length_of_real_number(value)

                # Iterate over all possible denominators
                for possible_denominator in range(1, max_denominator + 1):
#                     print("trying denominator", possible_denominator)
                    simplified_parameter_numerator = torch.round(value * possible_denominator)
                    simplified_state_dict_for_iteration[key] = simplified_parameter_numerator / possible_denominator
                    simplified_version_for_iteration.load_state_dict(simplified_state_dict_for_iteration)
                    descriptive_length_of_simplified_parameter = descriptive_length_of_fraction(simplified_parameter_numerator, possible_denominator)
#                     print(simplified_parameter_numerator, possible_denominator)
                    y_predict_simplified = simplified_version_for_iteration(X_val)
                    loss_of_simplified_model = MSELoss()(y_val, y_predict_simplified)
                    descriptive_length_of_loss_of_simplified_model = descriptive_length_of_real_number(loss_of_simplified_model)                
                    # If the descriptive length did not improve, revert the change.
#                     print("descriptive_length_of_loss_of_simplified_model", descriptive_length_of_loss_of_simplified_model)
#                     print("descriptive_length_of_simplified_parameter", descriptive_length_of_simplified_parameter)
#                     print("descriptive_length_of_loss", descriptive_length_of_loss)
#                     print("descriptive_length_of_existing_parameter", descriptive_length_of_existing_parameter)

                    if descriptive_length_of_loss_of_simplified_model + descriptive_length_of_simplified_parameter > descriptive_length_of_loss + descriptive_length_of_existing_parameter:
                        simplified_version_for_iteration.load_state_dict(simplified_state_dict)
                    else:
                        # If we are successful, we update everything
                        simplified_state_dict[AddRationalInName(key)] = torch.tensor([simplified_parameter_numerator, possible_denominator])
                        simplified_version.load_state_dict(simplified_state_dict)
                        simplified_version_for_iteration = copy.deepcopy(simplified_version)
                        simplified_state_dict_for_iteration = simplified_version_for_iteration.state_dict()

                simplified_state_dict = simplified_state_dict_for_iteration
                simplified_version.load_state_dict(simplified_state_dict)
        
        if inplace:
            self = copy.deepcopy(simplified_version)
        else:
            return simplified_version      
    
    def get_lambda(self, i):
        return self.__getattr__('lambda_{}'.format(i))
    
    def get_rational_lambda(self, i):
        return self.__getattr__('rational_lambda_{}'.format(i))
    
    def get_power(self, i):
        return self.__getattr__('power_{}'.format(i))
    
    def get_rational_power(self, i):
        return self.__getattr__('rational_power_{}'.format(i))
    
    def __repr__(self):
        """
        Return tex-style string, recursively combining result from representation of subformulas
        """
        if self.depth == 0:
            if self.get_rational_lambda(0)[1] > 0: # if it is equal to 0, it means that there is no rational value
                return FormFractionRepresentation(self.get_rational_lambda(0))            
            return FormReal(self.get_lambda(0))
        
        ans = ["("]
        for i in range(self.num_variables):
            # First we add lambda
            if i != 0 and self.get_lambda(i) > 0:
                ans.append(" + ")
            if self.get_rational_lambda(i)[1] > 0:
                ans.append(FormFractionRepresentation(self.get_rational_lambda(i)))
            else:
                ans.append(FormReal(self.get_lambda(i)))   
            # Then we add variable and its power
            ans.append("x_{}^".format(i + 1) + "{")
            if self.get_rational_power(i)[1] > 0:
                ans.append(FormFractionRepresentation(self.get_rational_power(i)))
            else:
                ans.append(FormReal(self.get_power(i)))  
            ans += "}"    
            # Then we add the corresponding subformula
            if self.depth != 1:
                ans.append(str(self.subformulas[i]))
        if self.last_subformula.get_lambda(0) > 0:        
            ans.append(" + ")
        ans.append(str(self.last_subformula))
        ans.append(")")
        ans = ''.join(ans)
        return ans

In [23]:
def LearnFormula(X, y, n_init=10, max_iter=100000, depth=1, verbose=2, verbose_frequency=5000, max_epochs_without_improvement=500,
                minimal_acceptable_improvement=3e-6) -> RecursiveFormula:
    """
    Parameters:
        X: torch.tensor, shape (n_samples, n_features)
            Training vector, where n_samples is the number of samples and n_features is the number of features.
        y: torch.tensor, shape (n_samples, 1)
            Target vector relative to X.
        n_init: int 
            number of times algorithm will be run with different initial weights. 
            The final results will be the best output of n_init consecutive runs in terms of loss.
        max_iter: int 
            Maximum number of iterations of the algorithm for a single run.
        depth: int
            depth of formula to learn
        verbose: int
            if is equal to 0, no output
            if is equal to 1, output number of runs and losses
            if is equal to 2, output number of runs and losses and print loss every verbose_frequency epochs
        verbose_frequency: int
            if verbose equals 2, then print loss every verbose_frequency epochs
        max_epochs_without_improvement: int
            if during this number of epochs loss does not decrease more than minimal_acceptable_improvement, the learning process
            will be finished
        minimal_acceptable_improvement: float
            if during max_epochs_without_improvement number of epochs loss does not decrease more than this number, 
            the learning process will be finished
            
    Returns:
        best_formula: RecursiveFormula
            fitted formula
    """
    
    best_model = RecursiveFormula(depth, X.shape[1])
    best_loss = 1e10
    for init in range(n_init):
        if verbose > 0:
            print("Run #{}".format(init + 1))
    #     torch.random.manual_seed(seed)
        formula = RecursiveFormula(depth, X.shape[1])
        # create your optimizer
        optimizer = optim.Adam(formula.parameters(), weight_decay=1)
        criterion = nn.MSELoss()
        epochs_without_improvement = 0
        epoch = 0
        optimizer.zero_grad()
        output = formula(X)
        previous_loss = criterion(output, y).item() 
        while epoch < max_iter and epochs_without_improvement < max_epochs_without_improvement:
            optimizer.zero_grad()
            output = formula(X)
            loss = criterion(output, y)
            loss.backward()
            if verbose == 2 and (epoch + 1) % verbose_frequency == 0:
                print("Epoch {}, current loss {:.3}, current formula ".format(epoch + 1, loss.item()), end='')
                PrintFormula(formula, "fast")       
            optimizer.step()  
            epoch += 1
            if torch.abs(previous_loss - loss) < minimal_acceptable_improvement:
                epochs_without_improvement += 1
            else:
                epochs_without_improvement = 0
            previous_loss = loss.item()
        if loss < best_loss:
            best_loss = loss
            best_formula = formula
        if verbose > 0:
            print("Finished run #{}, loss {}, best loss {}".format(init + 1, loss, best_loss))
    return best_formula

In [24]:
X1 = torch.rand(100, 1) * 10
y1 = 2.5 * X1**2 + 3

X2 = torch.rand(100, 2) * 10
y2 = 2.5 * X2[:, 0]**2 + 0.3333 * X2[:, 1]**0.25 + 3    
# y = 1.2 * X[:, 0]**2.1 * X[:, 2] + 2.5 * X[:, 1]**(-3) + 1/2 * X[:, 2]**0.3333 * X[:, 1]**(-4)

In [28]:
formula1 = LearnFormula(X1, y1)

Run #1
Epoch 5000, current loss 55.4, current formula (1.26x_1^{2.33} + 1.74)
Epoch 10000, current loss 1.41, current formula (2.32x_1^{2.04} + 2.34)
Finished run #1, loss 0.8252627849578857, best loss 0.8252627849578857
Run #2
Epoch 5000, current loss 1.75, current formula (2.89x_1^{1.94} + 0.0967)
Epoch 10000, current loss 0.833, current formula (2.69x_1^{1.97} + 1.1)
Finished run #2, loss 0.8254114985466003, best loss 0.8252627849578857
Run #3
Epoch 5000, current loss 8.37, current formula (1.82x_1^{2.15} + 3.57)
Epoch 10000, current loss 1.97, current formula (2.16x_1^{2.07} + 3.22)
Epoch 15000, current loss 0.728, current formula (2.66x_1^{1.98} + 1.27)
Finished run #3, loss 0.8252681493759155, best loss 0.8252627849578857
Run #4
Epoch 5000, current loss 0.803, current formula (2.46x_1^{2.01} + 1.93)
Finished run #4, loss 0.8252605199813843, best loss 0.8252605199813843
Run #5
Epoch 5000, current loss 52.0, current formula (1.35x_1^{2.3} + 1.13)
Epoch 10000, current loss 13.8, cur

KeyboardInterrupt: 

In [None]:
simplified_formula1 = formula.simplify(X1, y1, inplace=False)

In [None]:
PrintFormula(simplified_formula1)

In [25]:
formula = LearnFormula(X2, y2)

Run #1
Epoch 5000, current loss 5.96e+03, current formula (2.73x_1^{1.39} + 4.52x_2^{1.15} + 6.39)
Epoch 10000, current loss 5.54e+03, current formula (5.27x_1^{1.02} + 8.6x_2^{0.811} + 10.8)
Epoch 15000, current loss 5.26e+03, current formula (9.14x_1^{0.717} + 12.5x_2^{0.584} + 14.6)
Epoch 20000, current loss 5.15e+03, current formula (12.7x_1^{0.547} + 14.9x_2^{0.47} + 15.3)
Epoch 25000, current loss 5.13e+03, current formula (14.6x_1^{0.494} + 14.6x_2^{0.467} + 14.6)
Epoch 30000, current loss 5.13e+03, current formula (14.6x_1^{0.493} + 14.6x_2^{0.467} + 14.6)
Finished run #1, loss 5134.796875, best loss 5134.796875
Run #2
Epoch 5000, current loss 7.34e+03, current formula (4.05x_1^{1.47} + 0.619x_2^{-1.89} + 0.481)
Epoch 10000, current loss 6.53e+03, current formula (8.24x_1^{1.12} + 0.167x_2^{-1.75} + 5.16)
Epoch 15000, current loss 6.05e+03, current formula (12.6x_1^{0.889} + 0.167x_2^{-1.61} + 9.79)
Epoch 20000, current loss 5.32e+03, current formula (16.3x_1^{0.551} + 3.91x_2^

In [26]:
simplified_formula = formula.simplify(X2, y2, inplace=False)

  


In [27]:
PrintFormula(simplified_formula)

<IPython.core.display.Math object>