# Continuous IFW method changes

#### Aims of a single function (+ parallelised)
- Calculate independent entropy of *significant feature*
- Calculate I (entropic) between that feature and those in sign_comp (significant interactions)
- Output information-theoretical omega
- When parallelised, recreate matrix as with bIFW

### Entropy calculation for toy genes

In [1]:
# import packages
import pandas as pd
import numpy as np
from functools import partial
from os.path import exists
import multiprocess
import scipy.sparse
import matplotlib.pyplot as plt
from p_tqdm import p_map
from scipy.stats import gaussian_kde
from scipy.integrate import quad
from sklearn.feature_selection import mutual_info_regression
import random

In [3]:
f1 = []
for i in range(4):
    f1.append(0.2)
f1.append(0)
for i in range(8):
    f1.append(0.8)
for i in range(3):
    f1.append(0)

f2 = []
for i in range(3):
    f2.append(1)
f2.append(0)
for i in range(10):
    f2.append(1)
for i in range(2):
    f2.append(0)
    
print(f1)

[0.2, 0.2, 0.2, 0.2, 0, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0, 0, 0]


In [6]:
# create toy data
def toy_gene(numbers, values):
    gene = []
    for i, n in enumerate(numbers):
        for times in range(n):
            gene.append(values[i])
    return gene

toy_gene([4,1,8,3], [0.2,0,0.8,0])

[0.2, 0.2, 0.2, 0.2, 0, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0, 0, 0]

In [14]:
def toy_entropy(f1, alternative_entropy = False, evaluation_check = False):
    feature = f1    
    p0 = np.sum(feature == 0) / len(feature) if len(feature) > 0 else 0
    # Probability of zero
    non_zero_data = feature[feature != 0]

    # KDE for the non-zero part
    kde = gaussian_kde(non_zero_data)

    # Get the normalization factor by integrating KDE over the range of non-zero values
    #kde_normalisation_factor, _ = quad(lambda x: kde(x), 0, max(feature))

    # Combined PDF
    def combined_pdf(x):
        if x == 0:
            return p0 #/ kde_normalisation_factor
        else:
            return (1 - p0) * kde(x) #/ kde_normalisation_factor # sometimes we don't need it, but it helps

    integration_result, _ = quad(combined_pdf, 0, max(feature))

    # Check normalization (should be close to 1)
    normalisation_result = integration_result + p0  # Add p0 for the zero-inflated part
    if normalisation_result > 1 or normalisation_result < 0.95:
        print(f'\nNormalisation check unsuccessful. It should be close to 1 and it equals {normalisation_result}. This can lead to negative entropies. Proceeding with using summation instead of integration.')

    
    # Safe logarithm function
    def safe_log(x):
        return np.log(x) if x > 0 else 0

    # Combined PDF with log for entropy calculation
    def combined_pdf_log(x):
        fx = combined_pdf(x)
        return -fx * safe_log(fx)
    
    entropy_non_zero, _ = quad(combined_pdf_log, 0, max(feature))
    entropy_zero = -p0 * safe_log(p0)
    entropy = entropy_non_zero + entropy_zero

    if alternative_entropy == False:
        return entropy
    
    if alternative_entropy == True:
        # alternative entropy estimation: use sums
        x_values = np.arange(0, max(feature) + 1)
        pdf_log_values = [combined_pdf_log(x) for x in x_values]
        entropy_estimate = np.sum(pdf_log_values)
        return entropy, entropy_estimate

    if evaluation_check == True:
        print(f'\nNormalisation check, it should be close to 1: {normalisation_result}.')
        x_values = np.arange(0, max(feature) + 1)
        pdf_values = [combined_pdf(x) for x in x_values[0:np.random.randint(0,len(x_values), 10)]]
        log_values = [safe_log(fx) for fx in pdf_values[0:np.random.randint(0,len(x_values), 10)]]
        print("\n10 random x values:", x_values[0:np.random.randint(0,len(x_values), 10)])
        print("PDF values:", pdf_values[0:np.random.randint(0,len(x_values), 10)])
        print("Log values:", log_values[0:np.random.randint(0,len(x_values), 10)])


In [24]:
def simple_entropy(f1):
    feature = f1
    # Probability of zero and data that is nonzero
    p0 = np.sum(feature == 0) / len(feature) if len(feature) > 0 else 0
    non_zero_data = feature[feature != 0]

    # zero  (H1)
    def safe_log(x):
        return np.log(x) if x > 0 else 0
    entropy_zero = -p0 * safe_log(p0)

    # H2 (simplified)
    # KDE for the non-zero part
    kde = gaussian_kde(non_zero_data)
    # Simplified kde integration
    def h_integrand(fx):
        return fx * safe_log(fx)
    
    kde_part_H2, _ = quad(h_integrand, 0, max(feature))
    entropy_non_zero = -h_integrand(1-p0) - (1-p0) * kde_part_H2
    
    # Combined entropy
    entropy = entropy_non_zero + entropy_zero
    return entropy


In [None]:
def toy_entropy(f1, alternative_entropy = False, evaluation_check = False):
    feature = f1 
    if isinstance(feature, np.ndarray) == False or np.issubdtype(feature.dtype,np.integer) == False:
        feature = np.array(feature.astype(int))
    
    if len(np.unique(feature)) < 3:
        raise ValueError(f"\nThere must be more than two unique counts for kde estimation.")
    
    p0 = np.sum(feature == 0) / len(feature) if len(feature) > 0 else 0
    # Probability of zero
    non_zero_data = feature[feature != 0]

    # KDE for the non-zero part
    kde = gaussian_kde(non_zero_data)

    # Get the normalization factor by integrating KDE over the range of non-zero values
    #kde_normalisation_factor, _ = quad(lambda x: kde(x), 0, max(feature))

    # Combined PDF
    def combined_pdf(x):
        if x == 0:
            return p0 #/ kde_normalisation_factor
        else:
            return (1 - p0) * kde(x) #/ kde_normalisation_factor # sometimes we don't need it, but it helps

    integration_result, _ = quad(combined_pdf, 0, max(feature))

    # Check normalization (should be close to 1)
    normalisation_result = integration_result + p0  # Add p0 for the zero-inflated part
    if normalisation_result > 1 or normalisation_result < 0.95:
        print(f'\nNormalisation check unsuccessful. It should be close to 1 and it equals {normalisation_result}. This can lead to negative entropies. Proceeding with using summation instead of integration.')

    
    # Safe logarithm function
    def safe_log(x):
        return np.log(x) if x > 0 else 0

    # Combined PDF with log for entropy calculation
    def combined_pdf_log(x):
        fx = combined_pdf(x)
        return -fx * safe_log(fx)
    
    entropy_non_zero, _ = quad(combined_pdf_log, 0, max(feature))
    entropy_zero = -p0 * safe_log(p0)
    entropy = entropy_non_zero + entropy_zero

    if alternative_entropy == False:
        return entropy
    
    if alternative_entropy == True:
        # alternative entropy estimation: use sums
        x_values = np.arange(0, max(feature) + 1)
        pdf_log_values = [combined_pdf_log(x) for x in x_values]
        entropy_estimate = np.sum(pdf_log_values)
        return entropy, entropy_estimate

    if evaluation_check == True:
        print(f'\nNormalisation check, it should be close to 1: {normalisation_result}.')
        x_values = np.arange(0, max(feature) + 1)
        pdf_values = [combined_pdf(x) for x in x_values[0:np.random.randint(0,len(x_values), 10)]]
        log_values = [safe_log(fx) for fx in pdf_values[0:np.random.randint(0,len(x_values), 10)]]
        print("\n10 random x values:", x_values[0:np.random.randint(0,len(x_values), 10)])
        print("PDF values:", pdf_values[0:np.random.randint(0,len(x_values), 10)])
        print("Log values:", log_values[0:np.random.randint(0,len(x_values), 10)])


In [17]:
toy_gene1 = toy_gene([4,1,8,3], [0.2,0,0.8,0])
toy_entropy(np.array(toy_gene1), alternative_entropy = False, evaluation_check = False)


Normalisation check unsuccessful. It should be close to 1 and it equals 0.716669630846595. This can lead to negative entropies. Proceeding with using summation instead of integration.


0.5668066919556543

In [None]:
toy_gene1 = toy_gene([4,1,8,3], [0.2,0,0.8,0])
simple_entropy(np.array(toy_gene1))

0.7358895969342187

In [20]:
# how fast are these functions?
import timeit

def my_function():
    # Your short function
    pass

# Measure the average execution time over 1,000,000 runs
execution_time = timeit.timeit(my_function, number=1_000_000)
print(f"Average time per run: {execution_time / 1_000_000:.10f} seconds")


Average time per run: 0.0000002639 seconds


In [27]:
execution_time = timeit.timeit(lambda: simple_entropy(np.array(toy_gene1)), number=1_000_000)
print(f"Average time per run: {execution_time / 1_000_000:.10f} seconds")

KeyboardInterrupt: 

Correlation metric cIFW

In [None]:
#### Omega (Ω), the correlation metric, formerly Entropy Sort Score (ESS)

# The individual function to calculate the Omega score for a given feature
def bIFW_correlation(feature_ind, sign_comp, normalised_matrix, extra_vectors = False, zero_info = True, extra_info = False):
    import numpy as np
    import scipy
    # get the list of significant features
    sign_list = [feat[1] for feat in sign_comp if feat[0] == feature_ind][0]
    MI_vector = []
    S_q_vector = []
    S_m_vector = []
    f1 = np.array(normalised_matrix.iloc[:,feature_ind])
    
    for i in sign_list:
        #define feature 2
        f2 = np.array(normalised_matrix.iloc[:,i])
        n00 = 0
        n01 = 0
        n10 = 0
        n11 = 0
        
        if len(f1) != len(f2):
            print("Fixed feature and features from matrix must be of the same length (same n of cells).")
        else:
            c = len(f1)
            for (cell1, cell2) in zip(f1, f2):
                if cell1 == cell2:
                    if cell1 == 0:
                        n00 += 1
                    elif cell1 == 1:
                        n11 += 1
                        
                elif cell1 == 0:
                    if cell2 == 1:
                        n01 += 1
                        
                elif cell1 == 1:
                    if cell2 == 0:
                        n10 += 1
                        
        # check discretization
        ns = np.array([n00, n01, n10, n11])
        n_str = ["n00", "n01", "n10", "n11"]
        nsum = np.sum(ns)
        if nsum != c:
            print("Sum of state counts do not add up.")
            MI = np.nan
            S_q = np.nan
            S_m = np.nan
            
        #calculate c's - need to have at least one of each
        else:
            #wrt to f1
            c_m0 = n00 + n01
            c_m1 = n10 + n11
            #wrt to f2
            c_q0 = n00 + n10
            c_q1 = n01 + n11
            
            cs_MI = np.array([[c_m0, c_q0], [c_m0, c_q1],
                              [c_m1, c_q0], [c_m1, c_q1]])
            cs_S = np.array([[c_m0, c_m1],
                             [c_q0, c_q1]])
            
            MI_terms = []
            zeroterms = []
            for ind, n in enumerate(ns):
                if n != 0 & np.all(cs_MI[ind]) == False: #if n and both cs are nonzero, calculate
                    MI_term = (n/c * np.log2(c * n / (cs_MI[ind][0] * cs_MI[ind][1])))
                    MI_terms.append(MI_term)
                    
                else:
                    zeroterms.append(n_str[ind])
            MI = np.sum(MI_terms)
            
            # entropies separately
            S_m_terms = []
            S_q_terms = []
            
            for ind in range(len(cs_S)):
                S_m_terms.append(cs_S[0][ind]/c * np.log2(cs_S[0][ind]/c))
                S_q_terms.append(cs_S[1][ind]/c * np.log2(cs_S[1][ind]/c))
                
            S_m = np.sum(S_m_terms) * (-1)
            S_q = np.sum(S_q_terms) * (-1)

            if extra_info == True:     
                exclude = str()
                for t in zeroterms:
                    exclude += (t + ", ")
                print("Be aware that the counts " + exclude + "were 0. This affects the calculations.")
                
        MI_vector.append(MI)
        S_q_vector.append(S_q)
        S_m_vector.append(S_m)
    
    max_entropy = [max(Sm, Sq) for Sm, Sq in zip(S_m_vector, S_q_vector)]
    
    #now calculate omega
    if len(MI_vector) != len(max_entropy):
        raise ValueError("All vectors (MI, x_max, S_q and S_m) must have the same length")    

    omega_vector = np.array(MI_vector) / np.array(max_entropy)
    if extra_vectors == True:
        return [omega_vector, MI_vector, max_entropy]            
    else:
        return [omega_vector]
    
    
# The parallelised function to calculate the Omega for all features
def parallel_bIFW_correlation(binarised_data, sign_matrix, Use_Cores=-1):
    global binarised_dataset
    binarised_dataset = binarised_data
    print(f"Data loaded. Shape: {binarised_dataset.shape}. Proceeding to obtain indices for efficient ESS calculation.\n")
    nonzero = np.nonzero(sign_matrix.to_numpy())
    sign_comp = []
    for f in np.unique(nonzero[0]):
        #print(f"Gene {f} has a significant interaction with genes {nonzero[1][nonzero[0] == f]}")
        l = nonzero[1][nonzero[0] == f]
        sign_comp.append([f,l])
        #print(f"Gene {f} has a significant interaction with {len(l)} genes")
    Feature_Inds = [feat[0] for feat in sign_comp]
    print(f"Indices obtained. Proceeding with calculating ESSs in parallel.\n")
    
    ## Identify number of cores to use.
    Cores_Available = multiprocess.cpu_count()
    print("Cores Available: " + str(Cores_Available))
    if Use_Cores == -1:
        Use_Cores = Cores_Available - 1 # -1 Is an arbitrary buffer of idle cores that I set.
        if Use_Cores < 1:
            Use_Cores = 1
    print("Cores Used: " + str(Use_Cores))
    ## Perform calculations
    with np.errstate(divide='ignore',invalid='ignore'):
        allscores = p_map(partial(bIFW_correlation, sign_comp=sign_comp, normalised_matrix=binarised_dataset), Feature_Inds, num_cpus=Use_Cores)
    print(f"Calculations complete. Proceeding with matrix reconstruction.\n")
    omega = [row[0] for row in allscores]
    
    # Use allscores to build square matrix
    n = binarised_dataset.shape[1]
    indices = (nonzero[0], nonzero[1])
    values = [value for sublist in omega for value in sublist]
    # Initialize a zero matrix
    matrix = np.zeros((n, n), dtype=float)
    for row, col, value in zip(indices[0], indices[1], values):
        #print(f"Placing value {value} at position ({row}, {col})")  # Debug print
        matrix[row, col] = value
    
    print("Matrix construction complete. Saving to dataframe.")
    m = pd.DataFrame(matrix)
    return allscores, m

allscores, omega_matrix = parallel_bIFW_correlation(binarised_data=binarised_df, sign_matrix=chip_masked, Use_Cores=4)

Data loaded. Shape: (3000, 2546). Proceeding to obtain indices for efficient ESS calculation.

Indices obtained. Proceeding with calculating ESSs in parallel.

Cores Available: 8
Cores Used: 4


  0%|          | 0/2546 [00:00<?, ?it/s]

Calculations complete. Proceeding with matrix reconstruction.

Matrix construction complete. Saving to dataframe.
