In [1]:
import scanpy as sc
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.formula.api import ols
from statsmodels.stats.multitest import multipletests
import seaborn as sns
import matplotlib.pyplot as plt
import time
import os

sns.set_theme(style="whitegrid")

  from anndata import __version__ as anndata_version
  if Version(anndata.__version__) >= Version("0.11.0rc2"):
  if Version(anndata.__version__) >= Version("0.11.0rc2"):


In [None]:
# Data Preparation

adata = sc.read_h5ad("/Users/aayush/Visual_Studio_Code/ARP (Dr. Wu)/individual_cellbender_samples/iec_wu_scdc_cellbender_merged_qc_processed_ann.h5ad")

def rename_batches(adata):
    """
    Renames original batches to simplified groups for easier analysis and plotting.
    """
    # Original labels
    gbatch = ["KO_Inulin", "KO_Cellulose", "WO_C", "B. infantis + HMOs #"]
    gbatch_cleaned = ["B_infantis", "Control", "HMOs", "B_infantis_HMOs"] # Replaced '+' for patsy formula compatibility
    
    # Create a new column for treatments
    adata.obs['Treatment'] = adata.obs['batch_id'].astype(str)
    
    for old_val, new_val in zip(gbatch, gbatch_cleaned):
        # Using string matching to group batches
        adata.obs.loc[adata.obs['batch_id'].str.contains(old_val), 'Treatment'] = new_val
        
    # Convert to categorical
    adata.obs['Treatment'] = pd.Categorical(adata.obs['Treatment'], categories=gbatch_cleaned, ordered=True)
    return adata

# adata = rename_batches(adata)

OSError: Can't synchronously read data (can't open directory (/opt/miniconda3/envs/scanpy_env_311/lib/hdf5/plugin). Please verify its existence)

In [None]:
#ANOVA execution function
def run_anova_tests(adata, cell_type, conditions, interact=True):
    """
    Subsets data by cell type and runs a Two-Way ANOVA for all genes.
    """
    # Subset cell type
    adata_sub = adata[adata.obs['cell_type_tx'] == cell_type].copy()
    print(f"Number of cells {adata_sub.n_obs} in cell type {cell_type}")
    
    # Ensure data is normalized/log-transformed here if not already done
    
    genes = adata_sub.var_names
    results_list = []
    
    # Extract condition bools
    cond1_name, cond2_name = conditions
    obs_df = adata_sub.obs.copy()
    obs_df[cond1_name] = obs_df['Treatment'].str.contains(cond1_name).astype(int)
    obs_df[cond2_name] = obs_df['Treatment'].str.contains(cond2_name).astype(int)
    
    # Define model formula
    if interact:
        formula = f'Expression ~ C({cond1_name}) + C({cond2_name}) + C({cond1_name}):C({cond2_name})'
    else:
        formula = f'Expression ~ C({cond1_name}) + C({cond2_name})'

    for gene in genes:

        tmp_df = obs_df[[cond1_name, cond2_name]].copy()
        tmp_df['Expression'] = adata_sub[:, gene].X.toarray().flatten() if sc.issparse(adata_sub.X) else adata_sub[:, gene].X.flatten()
        
        try:
            model = ols(formula, data=tmp_df).fit()
            anova_table = sm.stats.anova_lm(model, typ=2)
            
            p_cond1 = anova_table.loc[f'C({cond1_name})', 'PR(>F)']
            p_cond2 = anova_table.loc[f'C({cond2_name})', 'PR(>F)']
            p_interact = anova_table.loc[f'C({cond1_name}):C({cond2_name})', 'PR(>F)'] if interact else np.nan
            
            results_list.append({
                'Gene': gene,
                f'pval_{cond1_name}': p_cond1,
                f'pval_{cond2_name}': p_cond2,
                'pval_Interaction': p_interact
            })
        except Exception as e:
            pass
            
    return pd.DataFrame(results_list)

In [None]:
# BH Adjustment and Filtering
def export_anova_results(results_df, output_file, interact=True):
    """
    Adjusts p-values using Benjamini-Hochberg, filters for significance, and saves.
    """
    df = results_df.copy()
    
    # Python's statsmodels handles the BH adjustment directly
    df['padj_cond1'] = multipletests(df.iloc[:, 1].fillna(1), method='fdr_bh')[1]
    df['padj_cond2'] = multipletests(df.iloc[:, 2].fillna(1), method='fdr_bh')[1]
    
    if interact:
        df['padj_Interaction'] = multipletests(df['pval_Interaction'].fillna(1), method='fdr_bh')[1]
        
    # Filter for significant results (padj <= 0.05)
    if interact:
        sig_mask = (df['padj_cond1'] <= 0.05) | (df['padj_cond2'] <= 0.05) | (df['padj_Interaction'] <= 0.05)
        df_sig = df[sig_mask].sort_values(by=['padj_Interaction', 'padj_cond1', 'padj_cond2'])
    else:
        sig_mask = (df['padj_cond1'] <= 0.05) | (df['padj_cond2'] <= 0.05)
        df_sig = df[sig_mask].sort_values(by=['padj_cond1', 'padj_cond2'])
        
    df_sig.to_csv(output_file, index=False)
    return df_sig

In [None]:
# Violin Plotting
def plot_gene_expression(adata, gene_name, cell_type, pval_row, conditions, save_filename=None):
    """
    Generates a 2x2 grid of violin plots for a specific gene.
    """
    adata_sub = adata[adata.obs['cell_type_tx'] == cell_type].copy()
    
    if gene_name not in adata_sub.var_names:
        print(f'Gene "{gene_name}" not found.')
        return
        
    expr = adata_sub[:, gene_name].X.toarray().flatten() if sc.issparse(adata_sub.X) else adata_sub[:, gene_name].X.flatten()
    
    # Build dataframe for plotting
    plot_df = pd.DataFrame({
        'Expression': expr,
        'Treatment': adata_sub.obs['Treatment'].values
    })
    
    # Desired order
    category_order = ['Control', conditions[0], conditions[1], f"{conditions[0]}_{conditions[1]}"]
    colors = ['#808000', '#FF0000', '#0000FF', '#7E2F8E'] # Mapped roughly to your MATLAB RGB values
    
    fig, axes = plt.subplots(2, 2, figsize=(10, 8))
    fig.suptitle(f'Expression of {gene_name}', fontsize=16)
    
    for i, (cat, ax) in enumerate(zip(category_order, axes.flatten())):
        sns.violinplot(data=plot_df[plot_df['Treatment'] == cat], y='Expression', 
                       ax=ax, color=colors[i], alpha=0.5, inner='box')
        ax.set_xlabel(cat.replace('_', ' '), fontsize=14)
        ax.set_ylabel('Expression')
        ax.set_ylim(plot_df['Expression'].min(), plot_df['Expression'].max() * 1.1)
        
    plt.tight_layout()
    if save_filename:
        plt.savefig(save_filename, dpi=300)
    plt.show()

In [None]:
# Cell 6: Run Pipeline
# conditions = ['B_infantis', 'HMOs']
# cell_types = adata.obs['cell_type_tx'].unique()

# for ct in cell_types:
#     print(f"Processing {ct}...")
#     t0 = time.time()
#     results = run_anova_tests(adata, cell_type=ct, conditions=conditions, interact=True)
#     print(f"ANOVA finished in {time.time() - t0:.2f} seconds")
#     
#     t1 = time.time()
#     fname = f"{ct.replace(' ', '_')}_2way_anova_results.csv"
#     sig_results = export_anova_results(results, fname, interact=True)
#     print(f"Pvalues file exported in {time.time() - t1:.2f} seconds")