In [None]:
import numpy as np
from pysr import PySRRegressor, jl
import random
import matplotlib.pyplot as plt
from collections import defaultdict
from sympy import *
import warnings
from itertools import permutations

warnings.filterwarnings("ignore")


Detected IPython. Loading juliacall extension. See https://juliapy.github.io/PythonCall.jl/stable/compat/#IPython


In [2]:

def create_loss(signals, states, penalty=0):
    """
    Description: Create a custom loss function for the symbolic regression based on the number of signals and states.
    The function follows RSA's steps to turn a literal listener into a pragmatic speaker and then into a pragmatic listener, 
    while using the predicted formula as a utility function. 
    The loss function calculates the discrepancy using a multinomial loss.

    Parameters:
    signals: (int): Number of signals
    states: (int): Number of states
    """
    str1 = """function pragmatic_speaker_objective(tree, dataset::Dataset{T,L}, options) where {T,L}"""
    str2 = f"""
    # Matrix dimensions
    num_rows = {signals}
    num_cols = {states}
    matrix_size = num_rows * num_cols

    # Get predictions
    prediction, completion = eval_tree_array(tree, dataset.X, options)
    if !completion
        return L(Inf)  # Return a high loss if evaluation fails
    end

    # exp on the predictions
    prediction = exp.(prediction)

    # Normalize each column of the prediction matrix
    for j in 1:num_cols
        column = prediction[j:num_cols:end]
        
        column_sum = sum(column)
        normalized_column = column_sum != 0 ? column / column_sum : column

        prediction[j:num_cols:end] .= normalized_column
    end

    # Normalize each row of the prediction matrix
    for j in 1:num_rows
        row = prediction[(j-1)*num_cols+1:j*num_cols]
        
        row_sum = sum(row)
        normalized_row = row_sum != 0 ? row / row_sum : row

        prediction[(j-1)*num_cols+1:j*num_cols] .= normalized_row
    end

    # Calculate the loss for each row and add it to the total loss
    total_loss = 0.0
    for i in 1:num_cols:length(prediction)
        row_start = i
        row_end = row_start + num_cols - 1

        predicted_row = prediction[row_start:row_end]
        target_row = dataset.y[row_start:row_end]
        
        # Multinomial loss for the row
        row_loss = -sum(target_row .* log.(predicted_row))
        total_loss += row_loss
    end
    complexity = count_nodes(tree)
    return (total_loss / num_rows) * {1+penalty}^complexity
end
"""
    return str1+str2



def create_language_tables(n_signals, n_states, e):
    """
    Description:
    This function generates all possible language tables for a given language size.
    The language size is defined by the amount of signals and the amount of states.
    
    Parameters:
    n_signals (int): The amount of signals in the language table.
    n_states (int): The amount of states in the language table.
    e (float): The epsilon value to add to the language tables.
    """
    possible_tables = np.array(np.meshgrid(*[[0, 1]] * (n_signals * n_states))).T.reshape(-1, n_signals, n_states)
    true_tables = np.array([matrix for matrix in possible_tables if np.all(np.sum(matrix, axis=0)) and np.all(np.sum(matrix, axis=1)) and np.all(min(n_signals, n_states) < np.sum(matrix) < n_signals * n_states)])
    return true_tables + e


def pragmatic_listener(matrix, alpha, priors, costs, utility=True):
    """
    Description:
    This function calculates all the matrices following the RSA framework.

    Parameters:
    matrix (array): The language table.
    alpha (float): The alpha value for the pragmatic speaker.
    priors (array): The priors for the states.
    costs (array): The costs for the signals.
    """
    if priors is None:
        priors = np.ones(matrix.shape[1])/matrix.shape[1]

    if costs is None:
        costs = np.zeros(matrix.shape[0])

    def rownorm(mat):
        return (mat.T / mat.sum(axis=1)).T

    def safelog(vals):
        with np.errstate(divide='ignore'):
            return np.log(vals)

    literal_listener = rownorm(matrix * priors)

    if utility is True:
        utilities = alpha * (safelog(literal_listener.T) + costs)
        speaker = rownorm(np.exp(utilities))
    elif utility is False:
        utilities = literal_listener.T
        speaker = rownorm(utilities)
    
    pragmatic_listener = rownorm(speaker.T * priors)

    return literal_listener, speaker, pragmatic_listener


def normalized_weighted_random_increment(weights, count):
    """
    Description:
    This function generates datapoints based on the weights given and normalizes them. 
    The weights will be the rows in the language table of the pragmatic listener.
    With infinite datapoints, the average of the generated data will be the same as the weights.
    
    Parameters:
    weights (array): The weights to generate the data from.
    count (int): The amount of datapoints to generate.
    """
    if count == 0:
        return weights
    
    increments = np.zeros_like(weights, dtype=int)
    for row in range(weights.shape[0]):
        chosen_indices = np.random.choice(weights.shape[1], size=count, p=weights[row, :])
    
        for idx in chosen_indices:
            increments[row][idx] += 1
    
    return increments #/ count


def create_random_matrix_no_zero_rows_or_cols(shape, e):
    """
    Description:
    This function creates a matrix with a given size that does not contain any rows or columns with zeroes. 
    This function is used for larger languages where generating every possible language is not feasible.

    Parameters:
    shape (tuple): The shape of the matrix.
    e (float): The epsilon value to add to the matrix, to avoid division by zero.
    """
    rows, cols = shape
    if rows == 0 or cols == 0:
        raise ValueError("Matrix dimensions must be greater than 0.")
    
    matrix = np.random.choice([0, 1], size=(rows, cols))
    
    for i in range(rows):
        if not matrix[i].any():
            matrix[i, random.randint(0, cols - 1)] = 1
    
    for j in range(cols):
        if not matrix[:, j].any():
            matrix[random.randint(0, rows - 1), j] = 1
    return matrix + e



def canonical_form(matrix):
    """
    Description:
    This function computes the canonical form of a matrix.
    The canonical form is the lexicographically smallest matrix that is structurally identical under row/column permutations
    or rotations.

    Parameters:
    matrix (array): The matrix to compute the canonical form of.
    """
    rows, cols = matrix.shape
    candidates = []
    
    # Generate all row permutations
    row_perms = permutations(range(rows))
    for rp in row_perms:
        permuted_rows = matrix[list(rp), :]
        # Generate all column permutations for each row-permuted matrix
        col_perms = permutations(range(cols))
        for cp in col_perms:
            permuted = permuted_rows[:, list(cp)]
            candidates.append(permuted)
            candidates.append(permuted.T)  # Include the transposed version

    # Return the lexicographically smallest matrix as the canonical form
    return min(candidates, key=lambda x: x.tolist())

def remove_duplicate_structures(matrices):
    """
    Description:
    This function removes duplicate matrices that are structurally identical under row/column permutations
    or rotations.

    Parameters:
    matrices (array): The matrices to remove duplicates from
    """
    seen = set()
    unique_matrices = []
    
    for matrix in matrices:
        form = canonical_form(matrix)
        form_tuple = tuple(map(tuple, form))  # Convert to hashable format
        if form_tuple not in seen:
            unique_matrices.append(matrix)
            seen.add(form_tuple)
    
    return np.array(unique_matrices)

def canonical_form_fast(matrix):
    """
    Description:
    This function computes the canonical form of a matrix. It is faster, but less accurate than the canonical_form function.
    Used for larger languages.

    Parameters:
    matrix (array): The matrix to compute the canonical form of.
    """
    rows, cols = matrix.shape
    candidates = []

    for _ in range(2):  # Original and transposed
        for _ in range(4):  # 0°, 90°, 180°, 270° rotations
            # Add lexicographically smallest row-sorted form
            candidates.append(tuple(map(tuple, np.sort(matrix, axis=0))))
            matrix = np.rot90(matrix)  # Rotate 90°
        matrix = matrix.T  # Transpose the matrix

    return min(candidates)

def remove_duplicate_structures_fast(matrices):
    """
    Description:
    This function removes duplicate matrices that are structurally identical under row/column permutations
    or rotations. It is faster, but less accurate than the remove_duplicate_structures function.

    Parameters:
    matrices (array): The matrices to remove duplicates from
    """
    seen = set()
    unique_matrices = []
    
    for matrix in matrices:
        form = canonical_form_fast(matrix)
        if form not in seen:
            unique_matrices.append(matrix)
            seen.add(form)
    
    return np.array(unique_matrices)


def remove_useless_structures(matrices):    
    """
    Description:
    This function removes matrices that are not influenced by the utility function.

    Parameters:
    matrices (array): The matrices to remove useless structures from
    """
    result = []
    for matrix in matrices:
        if not np.array_equal(pragmatic_listener(matrix, 2.7, None, None, True)[2], pragmatic_listener(matrix, 2.7, None, None, False)[2]):
            result.append(matrix)
    return np.array(result)


def dynamic_replace_with_vars_sympy(formula):
    """
    Description:
    This function replaces constants with variables in the predicted formulas. 
    This allows us to group formulas that have the same structure.

    Parameters:
    formula (str): The formula to replace constants with variables in.
    """
    variable_counter = 0
    variable_names = ['alpha', 'beta', 'gamma', 'delta', 'epsilon', 'zeta', 'eta', 'theta', 'iota', 'kappa']
    variable_mapping = {}

    def relu(x):
        return Max(0, x)

    def neg(x):
        return -x

    def get_new_variable():
        nonlocal variable_counter
        var_name = variable_names[variable_counter] if variable_counter < len(variable_names) else f'var{variable_counter}'
        variable_counter += 1
        return symbols(var_name)

    formula = formula.replace('^', '**')
    
    P_lit = symbols('P_lit', positive=True)

    expr = eval(formula, {'log': log, 'P_lit': P_lit, 'neg': neg, 'relu': relu, 'abs': abs, 'exp': exp})

    # Simplify the expression and expand logs
    simplified_expr = simplify(expand(expr))

    # Replace constants with variables
    def replace_constants(expr):
        if expr.is_Number:
            abs_value = abs(expr)
            if abs_value == 0:
                return Rational(0)
            if abs_value not in variable_mapping:
                variable_mapping[abs_value] = get_new_variable()
            transformed_constant = variable_mapping[abs_value]

            # Avoid invalid operations
            if abs_value != 0:
                multiplier = Rational(expr / abs_value)
            else:
                multiplier = 1  # Handle edge case
            return multiplier * transformed_constant
        elif expr.is_Atom:  # Leave atoms (like symbols) unchanged
            return expr
        else:
            args = [replace_constants(arg) for arg in expr.args]
            if expr.func == neg:
                return -args[0]
            elif expr.func == log:
                return log(args[0])
            elif expr.func == exp:
                return exp(args[0])
            elif expr.func == abs:
                return abs(args[0])
            elif expr.func == relu:
                return relu(args[0])
            else:
                return expr.func(*args)

    transformed_expr = replace_constants(simplified_expr)
    cleaned_expr = simplify(transformed_expr)

    return cleaned_expr, variable_mapping


def dump_formulas(formulas, filename):
    with open(filename + '.pkl', 'wb') as f:
        pickle.dump(formulas, f)



In [3]:
import pandas as pd
from pysr import PySRRegressor

class SRonRSA:
    """
    Description:
    This class generates data and runs symbolic regression on it to predict formulas.
    These formulas and other information is stored in an output file.
    
    Parameters:
    datapoints (int): The amount of datapoints to generate in each language.
    size (tuple): The size of the language table.
    e (float): The epsilon value to add to the language tables.
    runs (int): The amount of runs to perform (total amount on input and output matrices generated).
    filename (str): The name of the output file.
    model_iterations (int): The amount of iterations to run the symbolic regression model.
    alpha (float): The alpha value for the utiltity function.
    """
    def __init__(self, datapoints, size, e, runs, filename, model_iterations, alpha=2.7):
        self.datapoints = datapoints
        self.size = size
        self.e = e
        self.runs = runs
        self.filename = filename
        self.model_iterations = model_iterations
        self.alpha = alpha
        self.output = pd.DataFrame(columns=['signals', 'states', 'datapoints','raw formula (score)', 'raw formula (loss)', 'predicted formula (score)', 'predicted formula (loss)', 'loss', 'alpha', 'model_iterations', 'language table', 'literal listener', 'pragmatic listener', 'distrubed pragmatic listener'])

    def create_matrices(self):
        n_signals, n_states = self.size

        if n_signals * n_states < 20:
            language_tables = create_language_tables(n_signals, n_states, 0)
        else:
            language_tables = [create_random_matrix_no_zero_rows_or_cols(self.size, 0) for _ in range(self.runs*2)]      

        if len(language_tables) * n_signals *  n_states < 5000:
            language_tables = remove_duplicate_structures(language_tables)
            language_tables = remove_useless_structures(language_tables) + self.e

        else:
            language_tables = remove_duplicate_structures_fast(language_tables)
            language_tables = remove_useless_structures(language_tables) + self.e

        mul = int(np.ceil(self.runs/len(language_tables)))
        language_tables = np.tile(language_tables, (mul, 1, 1))
        language_tables = language_tables[:self.runs]

        return language_tables

    def run(self):
        run = 0
        language_tables = self.create_matrices()

        for language in language_tables:
            run += 1
            print(f'run {run} of {len(language_tables)}')
            literal_listener, _, prag_listener = pragmatic_listener(language, self.alpha, None, None)

            distributed_listener = normalized_weighted_random_increment(prag_listener, self.datapoints)
            
            literal_listener = literal_listener.flatten().reshape(-1, 1)
            prag_listener = prag_listener.flatten().reshape(-1, 1)
            distributed_listener = distributed_listener.flatten().reshape(-1, 1)

            model = PySRRegressor(
                random_state=0,
                niterations=self.model_iterations,
                binary_operators=["*", "+", "-"],
                unary_operators=["log", "exp", "abs", "relu", "neg"],
                loss_function=create_loss(self.size[0], self.size[1]),
                temp_equation_file="/home/lev/afstuderen/debug_log.txt",
                maxsize=7,
                )

            model.fit(literal_listener, distributed_listener, variable_names=['P_lit'])
            best_loss = model.equations_['loss'].idxmin()
            best_score = model.equations_['score'].idxmax()
            raw_formula_loss = model.equations_['equation'][best_loss]
            formula_loss = dynamic_replace_with_vars_sympy(raw_formula_loss)[0]
            raw_formula_score = model.equations_['equation'][best_score]
            formula_score = dynamic_replace_with_vars_sympy(raw_formula_score)[0]
            loss = model.equations_['loss'][best_loss]


            new_row = pd.DataFrame({
                'signals': [self.size[0]], 
                'states': [self.size[1]], 
                'datapoints': [self.datapoints],
                'raw formula (score)': [str(raw_formula_score)],
                'raw formula (loss)': [str(raw_formula_loss)],
                'formula (score)': [str(formula_score)], 
                'formula (loss)': [str(formula_loss)], 
                'alpha': [self.alpha], 
                'model_iterations': [self.model_iterations], 
                'language table': [language.tobytes()], 
                'literal listener': [literal_listener.tobytes()], 
                'pragmatic listener': [prag_listener.tobytes()], 
                'distrubed pragmatic listener': [distributed_listener.tobytes()],
                'loss': [loss]
            })

            self.output = pd.concat([self.output, new_row], ignore_index=True)

        self.output.to_csv(self.filename + '.csv', index=False)
