In [None]:
from itertools import combinations
from math import comb
import numpy as np
import matplotlib.pyplot as plt
import math
import operator
from scipy.spatial.distance import cdist

import os
from scipy.io import savemat, loadmat


In [None]:
def features_coupling_case1(alpha):
    # Input:
    # alpha: vector of random numbers unif(0,1)

    # Output:
    # f: vector of dim(10), features f1, f2, ..., f10
    import numpy as np
    f = np.zeros(10)
    
    # linear correlation btw f1, f2, f3
    # triadic
    # 2*f1 + f2 = 3*f3
    f[0] = alpha[0]
    f[1]=alpha[1]
    f[2]=(2*alpha[0]+alpha[1])/3 
    
    # linear correlation btw f4, f5, f6
    # triadic
    # f4 + f5 = 4*f6
    f[3]=(3*alpha[2]+2*alpha[3])/5 
    f[4]=(alpha[2]+2*alpha[3])/3 
    f[5]=(2*alpha[2]+alpha[3])/3 
    
    # linear correlation btw f7, f8, f9
    # triadic
    # f8 + f9 = 2*f7
    f[6]=(alpha[4]+alpha[5])/2
    f[7]=(alpha[4]+3*alpha[5])/4 
    f[8]=(3*alpha[4]+alpha[5])/4 

    # f10, singleton
    f[9]=alpha[6]

    return f

def features_coupling_case2(alpha):
    # Input:
    # alpha: vector of random numbers unif(0,1)

    # Output:
    # f: vector of dim(10), features f1, f2, ..., f10
    import numpy as np
    f = np.zeros(10)
    
    f[0] = alpha[1]
    f[1]=alpha[1]**2
    f[2]=alpha[1]**4 
    
    # linear correlation btw f4, f5, f6
    # triadic
    # f4 + f5 = 4*f6
    f[3]=(3*alpha[2]+2*alpha[3])/5 
    f[4]=(alpha[2]+2*alpha[3])/3 
    f[5]=(2*alpha[2]+alpha[3])/3 
    
    f[6]=(alpha[4]+alpha[5])/2
    f[7]=(alpha[4]+3*alpha[5])/4 
    f[8]=(3*alpha[4]+alpha[5])/4 

    # f10, singleton
    f[9]=alpha[6]

    return f

def features_coupling_case3(alpha):
    # Input:
    # alpha: vector of random numbers unif(0,1)

    # Output:
    # f: vector of dim(10), features f1, f2, ..., f10
    import numpy as np
    f = np.zeros(10)
    
    f[0] = alpha[1]
    f[1]=alpha[1]**2
    f[2]=alpha[1]**4 
    
    # linear correlation btw f4, f5, f6
    # triadic
    # f4 + f5 = 4*f6
    f[3]=(3*alpha[2]+2*alpha[3])/5 
    f[4]=(alpha[2]+2*alpha[3])/3 
    f[5]=(2*alpha[2]+alpha[3])/3 
    
    f[6]=(alpha[4]**2 + alpha[5]**4)/2
    f[7]=( np.sin(alpha[4]) + alpha[5])/(np.sin(1)+1) 
    f[8]=(alpha[4]**3 + np.tan(alpha[5]) )/(1+np.tan(1)) 

    # f10, singleton
    f[9]=alpha[6]

    return f

In [None]:
################################ ALTERNATIVE WAY. #############################
##### Uses Matrix operations, faster than previous code.  ####################

class generate_synthetic_data:
    def __init__(self, n_features, N, coupling_func, *random_seed):
        self.n_features = n_features
        self.N = N
        self.coupling_func = coupling_func
        if random_seed:
            np.random.seed(random_seed)
        
    def data_matrix(self):
        # Inputs:
        # n_features: number of features for an instance
        # N: number of instances
    
        # Output:
        # f_mat: (n_features x N) dim matrix of artificial data
        # obtained from pre-defined coupling among the features.
        # f_mat = feature matrix
        
        # Initialize the feature matrix
        f_mat = np.zeros((self.n_features, self.N))
        
        for i in range(self.N):
            alpha = np.random.uniform(size=self.n_features)
            
            # assign values to each column
            f_vector = self.coupling_func(alpha)
    
            # Add a check
            if len(f_vector)!=self.n_features:
                print("Error: No. of synthetic coupled features do not match with n_features!!")
                break
            else:
                f_mat[:, i] = f_vector
            
        return f_mat


class disentangle_parenclitic_hypergraph:
    
    def __init__(self, f_mat, M, random_seed, dist_type, \
                 theta, uniform_size, frac=0.0, noise=(0.0,0.0), p=10):
        # Input:
        # f_mat: data matrix (num features x num instances), (n_features x N)
        # M: No. of random points to be introduced
        # random_seed for reproducibility of added noise
        
        # dist_type: for our analysis "min" is to be set
        # but other distrances like mean, max, percentile-median can also be given as input

        # l: no. of features set (f_i, f_j, f_k) to select which has 
        
        # noise is tuple (mean, std) for gaussian sampling
        self.n_features = f_mat.shape[0]
        self.N = f_mat.shape[1]
        self.f_mat = f_mat
        
        self.M = M
        
        self.random_seed = random_seed
        
        self.dist_type = dist_type
        if self.dist_type=="percentile_median":
            self.p = p
            # else ignore the parameter 'p'
        
        self.theta = theta
        self.uniform_size = uniform_size

        #choosing fraction of (fN) elements of synthetic data
        self.frac = frac

        # Initializes the dictionary whose "keys" being all the possible hyperedges
        # "values" being count of number of times a feature hyperedge(i,j,k)
        # has a random number (identity m) with maximum deviation from N points.
        #self.L = {}
        #for h in self.k_uniform_hyperedges():
        #    self.L[h]=0                   
        #self.all_hyperedges = list(self.k_uniform_hyperedges())
        self.n_comb = math.comb(n_features, uniform_size)# 120
        self.L = {h: 0 for h in self.k_uniform_hyperedges()}
        
        np.random.seed(self.random_seed)

        if self.frac!=0.0:
            self.noise_mean = noise[0]
            self.noise_std = noise[1]
        
            self.add_noise()

    def add_noise(self):
        f_ind = np.random.randint(self.n_features, size=int(self.frac*self.n_features*self.N))
        N_ind = np.random.randint(self.N, size=int(self.frac*self.n_features*self.N))

        #ind_list = list(zip(f_ind, N_ind))
        noise_list = np.random.normal(self.noise_mean, self.noise_std, size=len(f_ind))
        for i in range(len(f_ind)):
            self.f_mat[f_ind[i], N_ind[i]] += noise_list[i]
        return None    
    
    def k_uniform_hyperedges(self):
        # This function is used to generalize to k-uniform hypergraph
        # where k is not just equal to 3 referring to triadic interaction
        
        # Inputs:
        # n_features: no. of features of the data
        # uniform_size: size of hyperedges of k-uniform hypergraph
    
        # Outputs:
        # list of all k-uniform hyperedges
    
        return combinations(range(self.n_features), self.uniform_size)
    
    
    def inter_cluster_distance(self, distance_list):
        # Computes the inter-cluster distances between two clusters
        # singleton cluster which is a random number with n_features "m_random"
        
        # Single linkage
        if self.dist_type == "min":
            return np.min(distance_list, axis=1)
    
        # Complete linkage
        elif self.dist_type == "max":
            return np.max(distance_list, axis=1)
    
        # Average linkage
        elif self.dist_type == "mean":
            return np.mean(distance_list, axis=1)

        # Sorted dist Percentile median
        # robust against noise and outliers
        elif self.dist_type == "percentile_median":
            d_p = np.percentile(np.sort(distance_list, axis=1), self.p)
            D_p = distance_list[:, distance_list<=d_p]
            return np.median(D_p, axis=1)
    
        else:
            print(" Error: type = min/max/mean/percentile_median ")

    
    def compute_pairwise_distances(self, h, random_matrix):
        # computing pairwise distances between N points and 1 random point 
        # in (fi, fj, fk, ...) k-projected feature space.
    
        # Inputs:
        # h: hyperedge with its elements being indices (f1, f2, f3, ...)
        # f_mat: synthetic data with higher order interaction embedded into it
        # m_random: uniform random number belonging to R^(n_features)
    
        # Output:
        # dist: vector of dim(N). pairwise distance between N individuals and
        # random number m in the k-projected feature space (f1, f2, f3, ...) 
    
        # ssd -> sum of squared difference
        #N=f_mat.shape[1]
        #ssd = np.zeros(N)
        #for ind in h:
        #    ssd += (f_mat[ind, :] - m_random[ind])**2
        #dist=np.sqrt(ssd)
        # M x N matrix
        
        dist = cdist(random_matrix[:, np.array(h)], self.f_mat[np.array(h),:].T)
        
        return dist

    def freq_maxDev_h(self):
        random_matrix = np.random.uniform(size=(self.M, self.n_features))

        L_matrix = np.zeros((self.M, self.n_comb))

        for i, h in enumerate(self.k_uniform_hyperedges()):
            MxN_dist_matrix = self.compute_pairwise_distances(h, random_matrix)
            L_matrix[:, i] = self.inter_cluster_distance(MxN_dist_matrix)
        
        #print(L_matrix.shape)
        for i in range(self.M):
            # indices of top 'l' hyperedges of range(n_comb)
            ind_list = np.argwhere(L_matrix[i,:]>self.theta).flatten()
            #ind_list = np.argsort(L_matrix[i, :])[-self.l:]
            for j in ind_list:
                h_paren = list(self.k_uniform_hyperedges())[j]
                self.L[h_paren]+=1

        return self.L


    def overlap_coefficient(self):
        L = self.freq_maxDev_h()
        
        cluster_B_keys = [(0,1,2), (3,4,5), (6,7,8)]
        cluster_A_keys = set(list(L.keys())) - set(cluster_B_keys)
        
        cluster_B = [L[key] for key in cluster_B_keys]
        cluster_A = [L[key] for key in cluster_A_keys]
        
        cluster_A, cluster_B = np.array(cluster_A), np.array(cluster_B)
        
        # Assuming cluster_A and cluster_B are the two clusters with continuous values in 1D
    
        # Calculate the intersection of the two clusters
        intersection = max(0, min(cluster_A.max(), cluster_B.max()) - max(cluster_A.min(), cluster_B.min()))
        
        # Calculate the minimum range spanned by the clusters
        min_range = min(cluster_A.max() - cluster_A.min(), cluster_B.max() - cluster_B.min())
        # Compute the overlap coefficient
        if min_range == 0:
            overlap_coefficient = 1.0
        else:
            overlap_coefficient = intersection / min_range
        return overlap_coefficient

    def cluster_ranges(self):
        L = self.freq_maxDev_h()

        cluster_B_keys = [(0,1,2), (3,4,5), (6,7,8)]
        cluster_A_keys = set(list(L.keys())) - set(cluster_B_keys)
        
        cluster_B = [L[key] for key in cluster_B_keys]
        cluster_A = [L[key] for key in cluster_A_keys]
        
        cluster_A, cluster_B = np.array(cluster_A), np.array(cluster_B)
        
        return np.array([min(cluster_A), max(cluster_A), min(cluster_B), max(cluster_B)])
        

In [None]:

data_path = "./data/"
if not os.path.isdir(data_path):
    os.mkdir(data_path)

image_path = "./image/"
if not os.path.isdir(image_path):
    os.mkdir(image_path)


In [None]:
import multiprocess
from functools import partial


In [None]:
# Initialise
n_features, N, M = 10, 10**4, 10**4
dist_type = "min"

uniform_size = 3
realizations = 100
n_comb = math.comb(n_features, uniform_size)
theta_vals = np.arange(0.05, 0.8+0.05, 0.05)

In [None]:

def count_freq_hedges(args, f_mat, M, dist_type,\
                    uniform_size, frac, noise):
    random_seed2, theta = args[0], args[1]    
    class_obj = disentangle_parenclitic_hypergraph(f_mat, M, random_seed2, dist_type, theta, \
                                           uniform_size, frac, noise)
    L = class_obj.freq_maxDev_h()
 
    return list(L.values()) 

In [None]:
for perturbation in ["without_noise", "with_noise"]:
    print(perturbation)
    if perturbation=="without_noise":
        frac = 0.0
        noise = (0.0,0.0)
    else:
        frac = 0.1
        noise = (0.0,0.05)

    vary_theta_dict = {}
    for case_num in [1,2,3]:
        
        print("case num:",case_num)
        if case_num==1:
            coupling_func = features_coupling_case1
        elif case_num==2:
            coupling_func = features_coupling_case2
        elif case_num==3:
            coupling_func = features_coupling_case3
            
        L_matrix = np.zeros((realizations, len(theta_vals), n_comb))

        i=0
        # random_seed1 is for generating same data 
        for random_seed1 in range(123, 123+realizations):

            f_mat = generate_synthetic_data(n_features, N, coupling_func, random_seed1).data_matrix()

            # random_seed2 is for generating same random noise
            opt_model = partial( count_freq_hedges, f_mat=f_mat, M=M, dist_type=dist_type, \
                            uniform_size=uniform_size, frac=frac, noise=noise)

            pool = multiprocess.Pool( 16 )  
            rand_seeds = range( i+1 + i*L_matrix.shape[1], i+1+L_matrix.shape[1]+ i*L_matrix.shape[1])

            L_matrix[i,:, :] = pool.map(opt_model, zip(rand_seeds, theta_vals) )
            pool.close()
            pool.join()
            vary_theta_dict["case_%d"%case_num] = L_matrix

            i+=1
            if (i+1)%10==0:
                print(i+1)
    
    savemat(data_path+"%s.mat"%perturbation, vary_theta_dict)
    print("")


In [None]:
from sklearn.preprocessing import MinMaxScaler

class compute_properties_l:
    
    def __init__(self, arr, n_features, uniform_size):
        
        # arr: (realization, l, all triplet feature combinations)
        
        
        self.arr = arr
        self.n_features = n_features
        self.uniform_size=uniform_size
        
        self.h_edges = list(combinations(range(self.n_features),\
                                    self.uniform_size))
        self.h_corr = [(0,1,2), (3,4,5), (6,7,8)]

        self.cluster_A_bool = [i not in self.h_corr for i in self.h_edges ]
        self.cluster_B_bool = [i in self.h_corr for i in self.h_edges ]
        
        self.arr_norm = np.zeros((self.arr.shape[0], \
                                  self.arr.shape[1],\
                                  self.arr.shape[2]))
        for i in range(self.arr.shape[0]):
            scaler=MinMaxScaler()
            self.arr_norm[i,:,:] = scaler.fit_transform(self.arr[i,:,:].T).T
        
    def compute_cluster_prop(self, cluster_name, prop_name):
        if cluster_name=="A":
            data = self.arr_norm[:, :, self.cluster_A_bool]
        elif cluster_name=="B":
            data = self.arr_norm[:, :, self.cluster_B_bool]
        else:
            print("Currently there are only 2 clusters 'A', 'B'.")

        mean_values = []
        std_values = []
        # looping over each l value, averaging over each realization
        for i in range(self.arr_norm.shape[1]):

            if prop_name=='min':
                min_values = np.min(data[:,i,:], axis=1)
                mean_values.append(np.mean(min_values))
                std_values.append(np.std(min_values))

            elif prop_name=='max':
                max_values = np.max(data[:,i,:], axis=1)
                mean_values.append(np.mean(max_values))
                std_values.append(np.std(max_values))
            
            elif prop_name=='centroid':
                mu_values = np.mean(data[:,i,:], axis=1)
                mean_values.append(np.mean(mu_values))
                std_values.append(np.std(mu_values))

            elif prop_name=='width':
                width_vaues = np.max(data[:,i,:], axis=1)-np.min(data[:,i,:], axis=1)
                mean_values.append(np.mean(width_vaues))
                std_values.append(np.std(width_vaues))

        return np.array(mean_values), np.array(std_values)

    def compute_intercluster_dist(self):
        data_A = self.arr_norm[:, :, self.cluster_A_bool]
        data_B = self.arr_norm[:, :, self.cluster_B_bool]

        mean_values = []
        std_values = []

        n_realizations = self.arr.shape[0]
        
        # looping over each l value, averaging over each realization
        for i in range(self.arr.shape[1]):
            dist_l = []
            for j in range(n_realizations):
                pairwise_dist=[]
                for p1 in data_A[j,i,:]:
                    for p2 in data_B[j,i,:]:
                        pairwise_dist.append(np.abs(p2-p1))
                dist_l.append(np.min(pairwise_dist))
                
            mean_values.append(np.mean(dist_l))
            std_values.append(np.std(dist_l))
        
        return np.array(mean_values), np.array(std_values)

    def overlap_coefficient(self):
        
        data_A = self.arr[:, :, self.cluster_A_bool]
        data_B = self.arr[:, :, self.cluster_B_bool]

        mean_values = []
        std_values = []
        n_realizations = self.arr.shape[0]
        
        # looping over each l value, averaging over each realization
        for i in range(self.arr.shape[1]):
            overlap_l = []
            for j in range(n_realizations):
                
                min_A, max_A, min_B, max_B = min(data_A[j,i,:]), max(data_A[j,i,:]),\
                                             min(data_B[j,i,:]), max(data_B[j,i,:])   
                # Calculate the intersection of the two clusters
                intersection = max(0, min(max_A, max_B) - max(min_A, min_B))

                # Calculate the minimum range spanned by the clusters
                min_range = min(max_A - min_A, max_B - min_B)
                # Compute the overlap coefficient
                if min_range == 0:
                    overlap_coefficient = 1.0
                else:
                    overlap_coefficient = intersection / min_range
                overlap_l.append(overlap_coefficient)
            mean_values.append(np.mean(overlap_l))
            std_values.append(np.std(overlap_l))
        return np.array(mean_values), np.array(std_values)


# Plotting

In [None]:
import matplotlib as mpl

# Get the 'tab20' colormap
cmap = mpl.colormaps.get_cmap('tab20')


In [None]:
# intercluster distance
for case_num in [1,2,3]:

    fig,ax= plt.subplots(figsize=(5,3))

    for perturbation in ['without_noise', 'with_noise']:

        arr = loadmat(data_path+"%s.mat"%perturbation)["case_%d"%case_num]
        comp = compute_properties_l(arr, n_features, uniform_size) 

        if perturbation=="without_noise":
            color = cmap(0)
        else:
            color = cmap(2)

        mean, std = comp.overlap_coefficient()

        ax.plot(theta_vals, mean, color=color, label=perturbation)
        ax.fill_between(theta_vals, mean-std,mean+std, alpha=0.3, color='grey')

    ax.set_ylabel("Overlap coef.", size=13)
    ax.set_xlabel(r"$\theta$", size=13)
    #ax.set_title("Case %s"%case_num, size=13)
    ax.legend(loc='best')

    plt.savefig(image_path+"case%s_overlap_coef.pdf"%case_num,  facecolor="white", \
    bbox_inches="tight", dpi=600 )

    plt.close()