In [1]:
from itertools import product
import itertools
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from mindreadingautobots.entropy_and_bayesian import boolean
from mindreadingautobots.sequence_generators import make_datasets, data_io
import os

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

## Find a trick (Joker) function 

Recall that we start with uniformly random bitstrings $X$ (each length $n$ bitstrings), then we generate $Y=f(X)$ and create a pair $(X, Y)$. This pair $X,Y$ has a joint distribution. By applying bitflips to turn $X \rightarrow Z$, we end up with a new joinst distr $p_{ZY}$ where $Z$ is a bitflipped version of $X$ and $Y=f(X)$.

By trick function, we mean that for data with  we are looking for $g^*$ (optimal prediction for _noisy_ data) is more accuracte and less sensitive than $f$ (function used to generate noiseless data).

We want an example of data and noise such that MLD (for the noisy data) evaluated on noiseless data is _worse_ than MLD (for noiseless data) evaluated on noiseless data.

Suppose $f^*(z^{n-1}) = \argmax_{x'} p_{X|Z^{n-1}}(x|z^{n-1})$ is our MLD for noisy data and $g*(x^{n-1}) = \argmax_{x'} p_{X|X^{n-1}}(x|x^{n-1})$ is our MLD for noiseless data. We build $f^*$ analytically, then compare it to $g^*$. 

Our example will be $k=3$ majority function (in which case $g^*$ is just majority).

**Searching for weight-based counterexamples**

Instead of defining boolean functions as $f:\{0,1\}^n \rightarrow \{0,1\}$, we define $f$ such that 
$$
f(x) = g(\text{wt}(x))
$$
what this does is, there are only $2^{n+1}$ possible functions $g$, instead of $2^{2^n}$ possible functions $f$. For example, if $n=3$, then a possible $g$ is 
\begin{equation}
     \text{wt}(x) \rightarrow \begin{cases}
        0 \rightarrow 0 \\
        1 \rightarrow 1 \\
        2 \rightarrow 0 \\
        3 \rightarrow 1
    \end{cases}
\end{equation}

## Searching for joker functions - weight based functions

**constraints**:
- we want both senstivities to be $\gg 0$
- we want function accuracies to be $\gg 1/2$ (this is equivalent to $p$ not big)
- we want functions that are more balanced (`imbal` closer to 0)

In [2]:
# We go for a larger search space on the python script
for n in [4]:
    for p in [0.2, 0.22]:
    
        X_arr = np.array(list(itertools.product([0, 1], repeat=n)))

        # p_x = 1 / (2 ** n) # uniform distribution over x # chaos distribution and the thing with chaos distribution is its fair
        # WEIGHT-BASED FUNCTIONS

        signatures = itertools.product([0, 1], repeat=n+1)
        f_accs = []
        fn_accs = []
        fn_noiseless_accs = []
        imbal_list = []
        sentitivity_f_list = []
        sensitivity_fnstar_list = []
        sensitivity_diff_list = []

        for signature in signatures:

            hash = dict(zip(range(n+1), signature))
            func = lambda b: hash[sum(b)]


            
            noisy_lookup = np.zeros((2, 2**n)) # noisy_lookup[row,col] is the JOINT probability Pr(f(z)=row| x=col)
            true_lookup = np.zeros((2, 2**n)) # true lookup is an array with 2 rows; there is a p_x at [row, column] if  
                                            # f[column] = row]. so, true_lookup[i, j] = pr(f(x) = i| x=j)

            for i, x in enumerate(X_arr):

                func_value = func(x) 
                true_lookup[func(x), i] = 1

                # Iterate over all possible noisy strings
                for e in product([0, 1], repeat=n):

                    z = np.array(x) ^ np.array(e)
                    p_x_given_z = p ** sum(e) * (1-p)**(n - sum(e)) 

                    noisy_lookup[func_value, int(''.join(map(str, z)), 2)] += p_x_given_z 

            imbal = abs(true_lookup[0,:].sum() - true_lookup[1,:].sum())  / 2 ** n
            imbal_list.append(imbal)

            noisy_mle = np.round(noisy_lookup)  
            out = np.multiply(noisy_mle, true_lookup) / 2 ** n # "inner product" of the functions
            diff = out.sum()


            fnstar_dct = {}

            for i, x in enumerate(X_arr):
                fnstar_dct[tuple(x)] = np.argmax(noisy_lookup[:, i])

            def fnstar(x):
                return fnstar_dct[tuple(x)]

            sensitivity_f = boolean.average_sensitivity(func, X_arr)
            sensitivity_fnstar = boolean.average_sensitivity(fnstar, X_arr)
            sensitivity_diff = sensitivity_f - sensitivity_fnstar

            # accuracies on dataset
            p_zy = boolean.generate_noisy_distr(n, p, func)
            noisy_f_acc = boolean.compute_acc_noisytest(p_zy, func, n) # accuracy of f on noisy data
            noiseless_fnstar_acc = boolean.compute_acc_test(fnstar, func, n) # accuracy of fN* on noiseless data
            noisy_fnstar_acc = boolean.compute_acc_noisytest(p_zy, fnstar, n) # accuracy of fN* MLE on noisy data

            f_accs.append(noisy_f_acc)
            fn_accs.append(noisy_fnstar_acc)
            fn_noiseless_accs.append(noiseless_fnstar_acc)
            sentitivity_f_list.append(sensitivity_f)
            sensitivity_fnstar_list.append(sensitivity_fnstar)
            sensitivity_diff_list.append(sensitivity_diff)


        signatures_list = list(itertools.product([0, 1], repeat=n+1))
        df = pd.DataFrame({'signature': signatures_list, 'f_acc': f_accs, 'fn_acc': fn_accs, 'fn_noiseless_acc': fn_noiseless_accs,
                        'imbalance': imbal_list, 'sensitivity_f': sentitivity_f_list, 'sensitivity_fnstar': sensitivity_fnstar_list, 'sensitivity_diff': sensitivity_diff_list,
                        'bitflip': len(signatures_list)*[p]})
        df['acc_diff'] = df['f_acc'] - df['fn_acc']

        df_filtered = df[
            (df['imbalance'] < 1) &
            (df['sensitivity_f'] > 0) &
            (df['sensitivity_fnstar'] > 0) &
            (df['sensitivity_diff'] > 0) &
            (df['fn_acc'] > 0.6) &
            (df['f_acc'] > 0.6) &
            (df['acc_diff'] != 0)
        ]

        if len(df_filtered) > 0:

            df_filtered.to_csv(f'dentsets/weight_functions_n={n}_p={p}.csv', index=False)

### Making datasets

In [8]:
import glob
import ast

# Read all CSV files matching the pattern
file_pattern = 'dentsets/weight_functions_n=*_p=*.csv'
all_files = glob.glob(file_pattern)

minimum_acc = 0.6
maximum_imbalance = 0.25
minimum_sensitivity = 1.0
minimum_acc_diff = 0.00000001
maximum_acc_dff = 0.004

# Concatenate all dataframes
df_big = pd.concat((pd.read_csv(file) for file in all_files), ignore_index=True)

df_big_filtered = df_big[(df_big['imbalance'] < maximum_imbalance) & (df_big['sensitivity_f'] > minimum_sensitivity) & 
(df_big['sensitivity_fnstar'] > minimum_sensitivity) & (df_big['fn_acc'] > minimum_acc) & (df_big['f_acc'] > minimum_acc) & (np.abs(df_big['acc_diff']) > minimum_acc_diff) & 
(np.abs(df_big['acc_diff']) < maximum_acc_dff)]

df_big_filtered['distance'] = np.sqrt(df_big_filtered['acc_diff']**2 + df_big_filtered['sensitivity_diff']**2)
df_big_filtered['n'] = df_big_filtered['signature'].apply(lambda x: len(ast.literal_eval(x)) - 1)
df_big_filtered.sort_values(by='distance', ascending=False, inplace=True)
print(f'Surviving samples: {len(df_big_filtered)}')
df_big_filtered.head(20)

Surviving samples: 8


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_big_filtered['distance'] = np.sqrt(df_big_filtered['acc_diff']**2 + df_big_filtered['sensitivity_diff']**2)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_big_filtered['n'] = df_big_filtered['signature'].apply(lambda x: len(ast.literal_eval(x)) - 1)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_big_filtered.sort_values(by='distance', ascending=

Unnamed: 0,signature,f_acc,fn_acc,fn_noiseless_acc,imbalance,sensitivity_f,sensitivity_fnstar,sensitivity_diff,bitflip,acc_diff,distance,n
957,"(1, 1, 1, 1, 0, 0, 1, 1, 1)",0.620041,0.623912,0.890625,0.015625,3.5,2.625,0.875,0.2,-0.00387,0.875009,8
949,"(1, 1, 1, 0, 0, 1, 1, 1, 1)",0.620041,0.623912,0.890625,0.015625,3.5,2.625,0.875,0.2,-0.00387,0.875009,8
776,"(0, 0, 0, 1, 1, 0, 0, 0, 0)",0.620041,0.623912,0.890625,0.015625,3.5,2.625,0.875,0.2,-0.00387,0.875009,8
768,"(0, 0, 0, 0, 1, 1, 0, 0, 0)",0.620041,0.623912,0.890625,0.015625,3.5,2.625,0.875,0.2,-0.00387,0.875009,8
459,"(1, 1, 1, 0, 0, 1, 1, 0)",0.612732,0.615028,0.992188,0.109375,3.390625,3.28125,0.109375,0.2,-0.002296,0.109399,7
418,"(1, 0, 0, 1, 1, 0, 0, 0)",0.612732,0.615028,0.992188,0.109375,3.390625,3.28125,0.109375,0.2,-0.002296,0.109399,7
399,"(0, 1, 1, 0, 0, 1, 1, 1)",0.612732,0.615028,0.992188,0.109375,3.390625,3.28125,0.109375,0.2,-0.002296,0.109399,7
358,"(0, 0, 0, 1, 1, 0, 0, 1)",0.612732,0.615028,0.992188,0.109375,3.390625,3.28125,0.109375,0.2,-0.002296,0.109399,7


In [2]:
# golden_signatures = df_big_filtered['signature'].values[:5]
# golden_signatures = [ast.literal_eval(sig) for sig in golden_signatures]

In [9]:
# idx = [[3, 6, 7, 11, 13, 14, 18], 
#        [1, 2, 5, 7, 11, 12, 13], 
#        [0, 3, 5, 6, 7, 12, 13], 
#        [2, 5, 6, 9, 10, 15, 18],
#        [4, 7, 8, 11, 14, 16, 17]]

idx = [[0, 1, 2, 3, 5, 6, 7, 8]]

p_bitflips = [0.2]
n_bits = 10
seed = 1234
n_val = 15000 # number of validation examples
n_train = 10000

selected_signatures = [(0, 0, 0, 1, 1, 0, 0, 0, 0)]

for signature, subseq_idx in zip(selected_signatures, idx):

    k = len(subseq_idx)
    gen_name = "counterexample" + "".join([str(i) for i in signature])
    signature = dict(zip(range(len(signature)), signature))
    
    for p_bitflip in p_bitflips:

        p100 = int(p_bitflip*100)
        suffix = f"_nbits{n_bits}_n{n_train}_bf{p100}_seed{seed}"
        dirname = gen_name + suffix
        print(f"Generating {dirname} with p_bitflip={p_bitflip}")
        # If your dataset has a hidden subset, update this list:

        X, Z, subseq_idx = make_datasets.sparse_boolean_weightbased_k_n(n_bits, k, n_train + n_val, signature, p_bitflip=p_bitflip, seed=seed, subseq_idx=subseq_idx)
        print("idx for sparse function: save these:", subseq_idx)

        if p_bitflip == 0:
            Z = X
            
        Z_train = Z[:n_train]
        Z_val = Z[n_train:]

        # Check if the data directory exists, if not create it
        if not os.path.exists(dirname):
            os.makedirs(dirname)
            base_dir = os.path.abspath(os.path.join(os.getcwd(), "../../../../data"))
            target_dir = os.path.join(base_dir, dirname)

            # Check if the data directory exists, if not create it
            if not os.path.exists(target_dir):
                os.makedirs(target_dir)

            train_path = os.path.join(target_dir, "train.pkl")
            val_path = os.path.join(target_dir, "val.pkl")
            data_io.save_numpy_as_dict(Z_train, train_path)
            data_io.save_numpy_as_dict(Z_val, val_path)

            X_train = X[:n_train]
            X_val = X[n_train:]
            noiseless_train_path = os.path.join(target_dir, "noiseless_train.pkl")
            noiseless_val_path = os.path.join(target_dir, "noiseless_val.pkl")
            data_io.save_numpy_as_dict(X_train, noiseless_train_path)
            data_io.save_numpy_as_dict(X_val, noiseless_val_path)
            print(f"Saved {train_path}, {val_path}, {noiseless_train_path}, {noiseless_val_path}")

Generating counterexample000110000_nbits10_n10000_bf20_seed1234 with p_bitflip=0.2
idx for sparse function: save these: [0, 1, 2, 3, 5, 6, 7, 8]
Saved /u/mhzambia/ResearchDocuments/MindReadingAutobot/mindreadingautobots/data/counterexample000110000_nbits10_n10000_bf20_seed1234/train.pkl, /u/mhzambia/ResearchDocuments/MindReadingAutobot/mindreadingautobots/data/counterexample000110000_nbits10_n10000_bf20_seed1234/val.pkl, /u/mhzambia/ResearchDocuments/MindReadingAutobot/mindreadingautobots/data/counterexample000110000_nbits10_n10000_bf20_seed1234/noiseless_train.pkl, /u/mhzambia/ResearchDocuments/MindReadingAutobot/mindreadingautobots/data/counterexample000110000_nbits10_n10000_bf20_seed1234/noiseless_val.pkl


## Different search: not weight based.

In [None]:
# this code looks at all possible boolean functions that are perfectly balanced

def boolean_function_from_signature(f):
    """Given a length 2^n binary array, return the function with that boolean signature."""
    X_arr = list(itertools.product([0, 1], repeat=n))
    X_arr = [tuple(x) for x in X_arr]
    lookup = dict(zip(X_arr, f))
    def func(x):
        return lookup[tuple(x)]
    return lookup, func

n = 4
# WARNING: n=4 might take 5 minutes
assert n <= 4
k = n
# pvals = [0.01, 0.25, 0.49]
pvals = [0.49]

# a "signature" of a boolean function is a length 2**n bitstring S where f(x) = S[bin(x)]
# we will check signatures for perfectly balanced functions
sig_arr = np.array(list(itertools.product([0, 1], repeat=2**n)))
sig_arr = sig_arr[sig_arr.sum(axis=1) == 2**(n-1)] # only balanced functions

X_arr = np.array(list(itertools.product([0, 1], repeat=n)))
p_x = 1 / (2 ** k) # uniform distribution over x
for i, signature in enumerate(sig_arr):
    # iterate over signatures
    print(signature)
    dct, func = boolean_function_from_signature(signature)
    for p in pvals:        
        # noisy_lookup[row,col] is the JOINT probability Pr(f(z)=row| x=col)
        noisy_lookup = np.zeros((2, 2**n))
        true_lookup = np.zeros((2, 2**n))
        # simulate a noisy dataset essentially
        for i, x in enumerate(product([0,1], repeat=k)):
            func_value = func(x)
            # true lookup is an array with 2 rows; there is a p_x at [row, column] if 
            # f[column] = row]. so, true_lookup[i, j] = pr(f(x) = i| x=j)
            true_lookup[func(x), i] = 1
            # iterate over all of the z values that contribute to 
            for e in product([0, 1], repeat=k):
                z = np.array(x) ^ np.array(e)
                p_x_given_z = p ** sum(e) * (1-p)**(k - sum(e))
                # increment noisy_lookup at the binary index of z
                # noisy_lookup[i, j] = pr(f(z) = i,  x=j) 
                noisy_lookup[func_value, int(''.join(map(str, z)), 2)] += p_x_given_z 
        
        # the function is balanced if the sums of the two rows of true_lookup are equal
        # imbal = abs(true_lookup[0,:].sum() - true_lookup[1,:].sum())  / 2 ** n
        # round up to get argmax 
        noisy_mle = np.round(noisy_lookup)  
        out = np.multiply(noisy_mle, true_lookup) / 2 ** n # "inner product" of the functions
        diff = out.sum()
        fnstar_dct = {}
        for i, x in enumerate(X_arr):
            fnstar_dct[tuple(x)] = np.argmax(noisy_lookup[:, i])
        def fnstar(x):
            return fnstar_dct[tuple(x)]
        
        sensitivity_f = boolean.average_sensitivity(func, X_arr)
        sensitivity_fnstar = boolean.average_sensitivity(fnstar, X_arr)
        sensitivity_diff = sensitivity_f - sensitivity_fnstar
        # accuracies on dataset
        p_zy = boolean.generate_noisy_distr(k, p, func)
        noisy_f_acc = boolean.compute_acc_noisytest(p_zy, func, n) # accuracy of f on noisy data
        # noiseless_fnstar_acc = compute_acc_test(fnstar, func, n) # accuracy of fN* on noiseless data
        noisy_fnstar_acc = boolean.compute_acc_noisytest(p_zy, fnstar, n) # accuracy of fN* MLE on noisy data


#### Random search over boolean functions

In [None]:
# n = 5
n = 5
k = n
# p = 0.2
pvals = [0, 0.1, 0.15, 0.2, 0.25, 0.3]

X_arr = np.array(list(itertools.product([0, 1], repeat=n)))


p_x = 1 / (2 ** k) # uniform distribution over x

H = np.array(list(itertools.product([0, 1], repeat=n+1)))
mine = [
    [1,0,0,1,1,0],
    [0,1,1,0,1,0],
    [0,1,1,0,0,0]
]
# H = [[0, 1, 0, 0, 0]]

for p in pvals:
    print()
    print(f"!!!p={p}")
    print("boolean signature | imbal? | nacc(fN*) | taccfN* |naccfN*-naccf| S(f) | S(fN*) | S(f) - S(fN*)")
    print("--------------------------------------------------------------------------")
    for i, signature in enumerate(H):
        sss = "  "
        if list(signature) in mine:
            sss = ">>"
        hash = dict(zip(range(n+1), signature))
        func = lambda b: hash[sum(b)]
        
        # noisy_lookup[row,col] is the JOINT probability Pr(f(z)=row| x=col)
        noisy_lookup = np.zeros((2, 2**n))
        true_lookup = np.zeros((2, 2**n))
        # simulate a noisy dataset essentially
        for i, x in enumerate(product([0,1], repeat=k)):
            func_value = func(x)
            # true lookup is an array with 2 rows; there is a p_x at [row, column] if 
            # f[column] = row]. so, true_lookup[i, j] = pr(f(x) = i| x=j)
            true_lookup[func(x), i] = 1
            # iterate over all of the z values that contribute to 
            for e in product([0, 1], repeat=k):
                z = np.array(x) ^ np.array(e)
                p_x_given_z = p ** sum(e) * (1-p)**(k - sum(e))
                # increment noisy_lookup at the binary index of z
                # noisy_lookup[i, j] = pr(f(z) = i,  x=j) 
                noisy_lookup[func_value, int(''.join(map(str, z)), 2)] += p_x_given_z 
        
        # the function is balanced if the sums of the two rows of true_lookup are equal
        imbal = abs(true_lookup[0,:].sum() - true_lookup[1,:].sum())  / 2 ** n
        
        # if not balanced:
        #     continue
        # round up to get argmax 
        noisy_mle = np.round(noisy_lookup)  
        out = np.multiply(noisy_mle, true_lookup) / 2 ** n # "inner product" of the functions
        noiseless_fnstar_acc = out.sum()


        fnstar_dct = {}
        for i, x in enumerate(X_arr):
            fnstar_dct[tuple(x)] = np.argmax(noisy_lookup[:, i])
        def fnstar(x):
            return fnstar_dct[tuple(x)]
        
        sensitivity_f = average_sensitivity(func, X_arr)
        sensitivity_fnstar = average_sensitivity(fnstar, X_arr)
        sensitivity_diff = sensitivity_f - sensitivity_fnstar
        # accuracies on dataset
        #  = compute_acc_test(fnstar, func, n) # accuracy of fN* on noiseless data

        p_zy = generate_noisy_distr(k, p, func)
        noisy_f_acc = compute_acc_noisytest(p_zy, func, n) # accuracy of f on noisy data
        noisy_fnstar_acc = compute_acc_noisytest(p_zy, fnstar, n) # accuracy of fN* MLE on noisy data
        nacc_diff = noisy_fnstar_acc - noisy_f_acc



        print(f"{sss}{signature}   | {imbal:0.4f} |   {noisy_fnstar_acc:1.4f}  | {noiseless_fnstar_acc:1.4f} | {nacc_diff:1.4f}    |{sensitivity_f:1.4f}|{sensitivity_fnstar:1.4f}  |  {sensitivity_diff:1.4f} ")
        if sensitivity_fnstar == 0:
            topr = sum(noisy_mle[0,:])
            botr = sum(noisy_mle[1,:])
            assert (np.allclose(topr, 0) or np.allclose(topr, 1 << n))
            assert (np.allclose(botr, 0) or np.allclose(botr, 1 << n))

    
    



!!!p=0
boolean signature | imbal? | nacc(fN*) | taccfN* |naccfN*-naccf| S(f) | S(fN*) | S(f) - S(fN*)
--------------------------------------------------------------------------
  [0 0 0 0 0 0]   | 1.0000 |   1.0000  | 1.0000 | 0.0000    |0.0000|0.0000  |  0.0000 
  [0 0 0 0 0 1]   | 0.9375 |   1.0000  | 1.0000 | 0.0000    |0.3125|0.3125  |  0.0000 
  [0 0 0 0 1 0]   | 0.6875 |   1.0000  | 1.0000 | 0.0000    |1.5625|1.5625  |  0.0000 
  [0 0 0 0 1 1]   | 0.6250 |   1.0000  | 1.0000 | 0.0000    |1.2500|1.2500  |  0.0000 
  [0 0 0 1 0 0]   | 0.3750 |   1.0000  | 1.0000 | 0.0000    |3.1250|3.1250  |  0.0000 
  [0 0 0 1 0 1]   | 0.3125 |   1.0000  | 1.0000 | 0.0000    |3.4375|3.4375  |  0.0000 
  [0 0 0 1 1 0]   | 0.0625 |   1.0000  | 1.0000 | 0.0000    |2.1875|2.1875  |  0.0000 
  [0 0 0 1 1 1]   | 0.0000 |   1.0000  | 1.0000 | 0.0000    |1.8750|1.8750  |  0.0000 
  [0 0 1 0 0 0]   | 0.3750 |   1.0000  | 1.0000 | 0.0000    |3.1250|3.1250  |  0.0000 
  [0 0 1 0 0 1]   | 0.3125 |   1.0000  