# Complete Sattistical Correlation and Association Analysis

## Stat test selection based on data distribution and data type:

### Numerical data
##### Pearson correlation if both variables are normally distributed
##### Spearman correlation if at least one variable is not normal.

### Categorical data vs. numerical data
##### Mann-Whitney U test for binary categorical data
##### Kruskal-Wallis test for multi-category comparisons

### Categorical data vs. categorical data
##### Chi-square test for general associations
##### Fisher’s Exact Test if expected frequencies are low

### False Discovery Rate (FDR) adjustment to reduce false positives

In [1]:
import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.stats.multitest import multipletests
from statsmodels.stats.diagnostic import lilliefors
import os

# Ensure output directories exist
os.makedirs("data_dist", exist_ok=True)
os.makedirs("norm_res", exist_ok=True)
os.makedirs("corr_res", exist_ok=True)
os.makedirs("violin_plots", exist_ok=True)
os.makedirs("corr_plot", exist_ok=True)

# Load data
data_file_path = "input/all_cat_for_corr.csv"
data = pd.read_csv(data_file_path, encoding="ISO-8859-1")

# Identify categorical and numerical columns
categorical_cols = [col for col in data.columns if col.endswith("CAT")]
boolean_cols = data.select_dtypes(include=['bool']).columns.tolist()
categorical_cols += [col for col in boolean_cols if col not in categorical_cols]

numerical_cols = [col for col in data.columns if col not in categorical_cols and col != "strain_id"]

# Convert categorical data properly (including boolean columns)
data[categorical_cols] = data[categorical_cols].astype("category")

# Convert numerical data properly
data[numerical_cols] = data[numerical_cols].apply(pd.to_numeric, errors='coerce')

# Remove infinite values from numerical data
data.replace([np.inf, -np.inf], np.nan, inplace=True)

# **Step 1: Normality Testing & Distribution Plots**
normality_results = {}
for column in numerical_cols:
    if data[column].nunique() > 1:
        plt.figure()
        sns.histplot(data[column].dropna(), kde=True)
        plt.title(f'Distribution of {column}')
        plt.savefig(f'data_dist/distribution_{column}.png')
        plt.close()

        try:
            sample_size = len(data[column].dropna())
            if sample_size < 500:
                stat, p = stats.shapiro(data[column].dropna())
                test_type = "Shapiro-Wilk"
            else:
                stat, p = lilliefors(data[column].dropna())
                test_type = "Lilliefors"

            normality_results[column] = {'Test': test_type, 'Statistic': stat, 'p-value': p}

        except Exception:
            normality_results[column] = {'Test': 'Error', 'Statistic': 'Error', 'p-value': 'Error'}

pd.DataFrame(normality_results).T.to_csv('norm_res/normality_results.csv')

# **Step 2: Correlations (Num-Num and Cat-Num)**
results = []
p_values = []  
processed_pairs = set()

for phenotype in numerical_cols:
    for metadata in categorical_cols + numerical_cols:
        if phenotype == metadata or (metadata, phenotype) in processed_pairs:
            continue

        processed_pairs.add((phenotype, metadata))

        valid_data = data[[phenotype, metadata]].dropna()
        if valid_data.shape[0] <= 5:
            continue

        try:
            if metadata in numerical_cols:
                normal_p1 = normality_results.get(phenotype, {}).get('p-value', 1)
                normal_p2 = normality_results.get(metadata, {}).get('p-value', 1)

                if normal_p1 > 0.05 and normal_p2 > 0.05:
                    stat, p = stats.pearsonr(valid_data[metadata], valid_data[phenotype])
                    test_type = "Pearson"
                else:
                    stat, p = stats.spearmanr(valid_data[metadata], valid_data[phenotype])
                    test_type = "Spearman"

            else:
                filtered_groups = [valid_data[valid_data[metadata] == cat][phenotype].dropna() 
                                   for cat in valid_data[metadata].unique() 
                                   if len(valid_data[valid_data[metadata] == cat]) > 5]

                if len(filtered_groups) < 2:
                    continue

                if len(filtered_groups) == 2:
                    stat, p = stats.mannwhitneyu(*filtered_groups, alternative="two-sided", method="asymptotic")
                    test_type = "Mann-Whitney U"
                else:
                    stat, p = stats.kruskal(*filtered_groups)
                    test_type = "Kruskal-Wallis"

            if np.isnan(stat) or np.isnan(p):
                continue

            results.append({'Metadata': metadata, 'Phenotype': phenotype, 'Test': test_type, 'Statistic': stat, 'p-value': p})
            p_values.append(p)

        except Exception:
            continue

# **Categorical vs. Categorical (Chi-square & Fisher's Exact)**
for i, cat1 in enumerate(categorical_cols):
    for cat2 in categorical_cols[i+1:]:
        valid_data = data[[cat1, cat2]].dropna()
        if valid_data.shape[0] <= 5:
            continue

        contingency_table = pd.crosstab(valid_data[cat1], valid_data[cat2])
        if contingency_table.shape[0] < 2 or contingency_table.shape[1] < 2:
            continue

        try:
            if (contingency_table.values < 5).any() and contingency_table.size == 4:
                stat, p = stats.fisher_exact(contingency_table)
                test_type = "Fisher's Exact Test"
            else:
                stat, p, _, _ = stats.chi2_contingency(contingency_table)
                test_type = "Chi-square Test"

            results.append({'Metadata': cat1, 'Phenotype': cat2, 'Test': test_type, 'Statistic': stat, 'p-value': p})
            p_values.append(p)

        except Exception:
            continue

# **Apply FDR Correction**
if p_values:
    _, corrected_p_values, _, _ = multipletests(p_values, method='fdr_bh')
    for i, result in enumerate(results):
        result['corrected p-value'] = corrected_p_values[i]
else:
    for result in results:
        result['corrected p-value'] = 'NA'

pd.DataFrame(results).to_csv('corr_res/correlation_results_with_fdr.csv', index=False)

# **Step 3: Clustered Heatmap**
if len(numerical_cols) > 1:
    correlation_matrix = data[numerical_cols].corr(method='spearman')
    clustergrid = sns.clustermap(correlation_matrix, cmap="BrBG", linewidths=0.1, figsize=(60, 50), annot=False, method='average')
    clustergrid.fig.suptitle("Clustered Numerical Correlation Heatmap", fontsize=24)
    plt.savefig("corr_plot/numerical_correlation_clustered.png")
    plt.close()

# **Step 4: Violin plots with significance and log-transform for MIC variables**
for metadata in categorical_cols:
    for phenotype in numerical_cols:
        valid_data = data[[metadata, phenotype]].dropna().copy()
        
        # Log-transform values if phenotype contains 'MIC'
        if "MIC" in phenotype.upper():
            valid_data[phenotype] = np.log10(valid_data[phenotype].replace(0, np.nan).dropna())

        if valid_data.shape[0] > 5 and valid_data[metadata].nunique() in [2, 3, 4, 5]:
            plt.figure(figsize=(12, 9))
            ax = sns.violinplot(
                x=valid_data[metadata].astype(str), 
                y=valid_data[phenotype], 
                hue=valid_data[metadata].astype(str),
                palette="Accent", 
                inner="box"
            )
            plt.xticks(rotation=45)
            plt.title(f'{metadata} vs. {phenotype}' + (' (log10-transformed)' if 'MIC' in phenotype.upper() else ''))

            groups = [valid_data[valid_data[metadata] == cat][phenotype].dropna() 
                      for cat in valid_data[metadata].unique()]

            if len(groups) == 2:
                _, p_value = stats.mannwhitneyu(*groups, method="asymptotic")
            else:
                _, p_value = stats.kruskal(*groups)

            ax.text(0.5, ax.get_ylim()[1] * 0.95, f"p = {p_value:.3g}",
                    ha='center', fontsize=12, color='black')

            plt.savefig(f'violin_plots/{metadata}_vs_{phenotype}.png')
            plt.close()

print("Analysis completed successfully!")


Analysis completed successfully!


## Post-Hoc tests for significant correlation/associations:

### Cutoff: FDR < 0.05

### Pairwise Chi-square for significant Chi-Square Test results
### Pairwise Mann-Whitney U for significant Mann-Whitney U results
### Dunn's Test for significant Kruskal-Wallis results

In [1]:
import pandas as pd
import numpy as np
import scipy.stats as stats
import scikit_posthocs as sp
import os

# Ensure output directory exists
os.makedirs("posthoc_res", exist_ok=True)

# Load correlation results
data = pd.read_csv('corr_res/correlation_results_with_fdr.csv')

# Load original dataset (single file input)
original_data = pd.read_csv('input/all_cat_for_corr.csv', encoding='ISO-8859-1')

# Initialize list for storing post-hoc results
posthoc_results = []

# Iterate over significant results
for _, row in data.iterrows():
    metadata, phenotype, test, p, fdr_p = row['Metadata'], row['Phenotype'], row['Test'], row['p-value'], row['corrected p-value']
    
    # Apply post-hoc only if the FDR-adjusted p-value is significant
    if fdr_p < 0.05:
        valid_data = original_data[[phenotype, metadata]].dropna()
        valid_data[metadata] = valid_data[metadata].astype(str)  # <-- FIX HERE

        try:
            if test == 'Chi-square Test':
                contingency_table = pd.crosstab(valid_data[metadata], valid_data[phenotype])
                comparisons_done = set()
                for row_label in contingency_table.index:
                    for col_label in contingency_table.columns:
                        if row_label != col_label:
                            comparison_key = tuple(sorted([str(row_label), str(col_label)]))
                            if comparison_key not in comparisons_done:
                                comparisons_done.add(comparison_key)
                                observed = np.array([
                                    [contingency_table.loc[row_label, col_label], sum(contingency_table.loc[row_label, :]) - contingency_table.loc[row_label, col_label]],
                                    [sum(contingency_table[col_label]) - contingency_table.loc[row_label, col_label],
                                     contingency_table.values.sum() - sum(contingency_table.loc[row_label, :]) - sum(contingency_table[col_label]) + contingency_table.loc[row_label, col_label]]
                                ])
                                chi2, post_p, _, _ = stats.chi2_contingency(observed)
                                group1_count = observed[0].sum()
                                group2_count = observed[1].sum()
                                posthoc_results.append({
                                    'Metadata': metadata,
                                    'Phenotype': phenotype,
                                    'Comparison': f'{row_label} vs {col_label}',
                                    'Post Hoc Test': 'Pairwise Chi-square',
                                    'p-value': post_p,
                                    'Group1_Count': group1_count,
                                    'Group2_Count': group2_count
                                })

            elif test == 'Mann-Whitney U':
                categories = list(valid_data[metadata].unique())
                comparisons_done = set()
                for i in range(len(categories)):
                    for j in range(i + 1, len(categories)):
                        comparison_key = tuple(sorted([categories[i], categories[j]]))
                        if comparison_key not in comparisons_done:
                            comparisons_done.add(comparison_key)
                            group1 = valid_data[valid_data[metadata] == categories[i]][phenotype]
                            group2 = valid_data[valid_data[metadata] == categories[j]][phenotype]
                            stat, post_p = stats.mannwhitneyu(group1, group2, alternative='two-sided')
                            group1_median = np.median(group1)
                            group2_median = np.median(group2)
                            posthoc_results.append({
                                'Metadata': metadata,
                                'Phenotype': phenotype,
                                'Comparison': f'{categories[i]} vs {categories[j]}',
                                'Post Hoc Test': 'Pairwise Mann-Whitney U',
                                'p-value': post_p,
                                'Group1_Median': group1_median,
                                'Group2_Median': group2_median
                            })

            elif test == 'Kruskal-Wallis':
                posthoc = sp.posthoc_dunn(valid_data, val_col=phenotype, group_col=metadata, p_adjust='bonferroni')
                comparisons_done = set()
                for group1 in posthoc.index:
                    for group2 in posthoc.columns:
                        if group1 != group2:
                            comparison_key = tuple(sorted([group1, group2]))
                            if comparison_key not in comparisons_done:
                                comparisons_done.add(comparison_key)
                                group1_vals = valid_data[valid_data[metadata] == group1][phenotype]
                                group2_vals = valid_data[valid_data[metadata] == group2][phenotype]
                                group1_median = np.median(group1_vals)
                                group2_median = np.median(group2_vals)
                                posthoc_results.append({
                                    'Metadata': metadata,
                                    'Phenotype': phenotype,
                                    'Comparison': f'{group1} vs {group2}',
                                    'Post Hoc Test': "Dunn's Test",
                                    'p-value': posthoc.loc[group1, group2],
                                    'Group1_Median': group1_median,
                                    'Group2_Median': group2_median
                                })
        except Exception as e:
            print(f"Skipping post-hoc for {metadata} vs {phenotype} due to error: {e}")

# Save posthoc results
posthoc_df = pd.DataFrame(posthoc_results)
posthoc_df.to_csv('posthoc_res/posthoc_results.csv', index=False)

print(f'Post-hoc analysis completed successfully! {len(posthoc_results)} comparisons saved.')

Post-hoc analysis completed successfully! 12770 comparisons saved.


## Post-Hoc results interpretation

In [4]:
import pandas as pd
import numpy as np

# Load significant post-hoc results
posthoc_df = pd.read_csv('posthoc_res/posthoc_results.csv')

# Filter for significant results only
significant_results = posthoc_df[posthoc_df["p-value"] < 0.05].copy()

# Define interpretation function
def interpret_row(row):
    metadata = row['Metadata'].replace("_CAT", "")
    phenotype = row['Phenotype']
    comparison = row['Comparison']
    p_val = row['p-value']
    group1, group2 = comparison.split(' vs ')
    test_type = row['Post Hoc Test']

    # Decide if we deal with medians or counts
    if 'Group1_Median' in row and not pd.isna(row['Group1_Median']):
        median1 = row['Group1_Median']
        median2 = row['Group2_Median']

        if median1 > median2:
            interpretation = (f"'{phenotype}': '{group1}' shows significantly higher values "
                              f"than '{group2}' in '{metadata}' (median: {median1:.3f} vs {median2:.3f}; p={p_val:.2e}).")
            comparison_direction = f"{group1} > {group2}"
            fold_change = abs(median1) / abs(median2) if median2 != 0 else np.inf

        elif median2 > median1:
            interpretation = (f"'{phenotype}': '{group2}' shows significantly higher values "
                              f"than '{group1}' in '{metadata}' (median: {median2:.3f} vs {median1:.3f}; p={p_val:.2e}).")
            comparison_direction = f"{group2} > {group1}"
            fold_change = abs(median2) / abs(median1) if median1 != 0 else np.inf

        else:
            interpretation = (f"'{phenotype}': '{group1}' and '{group2}' in '{metadata}' have identical medians ({median1:.3f}), "
                              f"but differ significantly (p={p_val:.2e}).")
            comparison_direction = f"{group1} = {group2}"
            fold_change = 1.0

    elif 'Group1_Count' in row and not pd.isna(row['Group1_Count']):
        count1 = row['Group1_Count']
        count2 = row['Group2_Count']

        # Adjusted interpretation for categorical vs categorical tests
        interpretation = (f"There is a significant association between '{metadata}' ('{group1}') "
                          f"and '{phenotype}' ('{group2}') (counts: {count1} vs {count2}; p={p_val:.2e}).")
        comparison_direction = f"{metadata}:{group1} ↔ {phenotype}:{group2}"
        fold_change = count1 / count2 if count2 != 0 else np.inf

    else:
        interpretation = "Insufficient data"
        comparison_direction = "N/A"
        fold_change = np.nan

    return pd.Series({
        "Comparison_Direction": comparison_direction,
        "Fold_Change": fold_change,
        "Interpretation": interpretation
    })

# Apply interpretation
interpretation_results = significant_results.apply(interpret_row, axis=1)

# Insert 'Comparison_Direction' and 'Fold_Change' columns after 'Group2_Count'
insert_pos = significant_results.columns.get_loc("Group2_Count") + 1
significant_results.insert(loc=insert_pos, column="Comparison_Direction",
                           value=interpretation_results["Comparison_Direction"])
significant_results.insert(loc=insert_pos + 1, column="Fold_Change",
                           value=interpretation_results["Fold_Change"])

# Add Interpretation as last column
significant_results["Interpretation"] = interpretation_results["Interpretation"]

# Save results
significant_results.to_csv('posthoc_res/posthoc_interpretation.csv', index=False)

print("Post-hoc interpretation file with fold change created successfully!")


Post-hoc interpretation file with fold change created successfully!
