# Tutorial 05: Differential Expression Analysis

This tutorial covers differential expression (DE) analysis for identifying proteins/peptides that differ significantly between experimental groups.

## Learning Objectives

By the end of this tutorial, you will:
- Understand different statistical tests for differential expression
- Perform two-group comparisons (t-test, Mann-Whitney, permutation test)
- Perform multi-group comparisons (ANOVA, Kruskal-Wallis)
- Apply paired sample analysis for matched samples
- Adjust p-values for multiple testing (FDR correction)
- Visualize DE results using volcano plots and heatmaps
- Interpret effect sizes and fold changes

---

## 1. Setup

Import required libraries and load a dataset suitable for DE analysis.

In [None]:
import numpy as np
import polars as pl
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec

# Apply SciencePlots style
plt.style.use(["science", "no-latex"])

# Import ScpTensor
import scptensor
from scptensor.datasets import load_simulated_scrnaseq_like
from scptensor import (
    # Differential expression methods
    diff_expr_ttest,
    diff_expr_paired_ttest,
    diff_expr_mannwhitney,
    diff_expr_anova,
    diff_expr_kruskal,
    diff_expr_permutation_test,
    adjust_fdr,
    check_homoscedasticity,
    # Visualization
    volcano,
    heatmap,
    # Normalization
    norm_log,
)

print(f"ScpTensor version: {scptensor.__version__}")

## 2. Load and Prepare Data

For DE analysis, we need a dataset with clearly defined groups (e.g., treatment vs control).

In [None]:
# Load dataset with multiple cell types (simulating treatment groups)
container = load_simulated_scrnaseq_like()

print(f"Dataset loaded: {container}")
print(f"Samples: {container.n_samples}")
print(f"Features: {container.assays['proteins'].n_features}")

# Check available groups
print("\nCell type distribution:")
print(container.obs.group_by("cell_type").count().sort("cell_type"))

print("\nBatch distribution:")
print(container.obs.group_by("batch").count().sort("batch"))

### 2.1 Preprocessing: Log Normalization

Before DE analysis, apply log normalization to stabilize variance.

In [None]:
# Apply log normalization
container = norm_log(
    container,
    assay_name="proteins",
    base_layer="raw",
    new_layer_name="log",
    base=2.0,
    offset=1.0,
)

print("Log normalization completed.")
print(f"Available layers: {list(container.assays['proteins'].layers.keys())}")

## 3. Two-Group Comparison: T-Test

The t-test is the most common method for comparing two groups. ScpTensor implements **Welch's t-test** by default, which does not assume equal variances between groups.

In [None]:
# Perform t-test comparing two cell types
result_ttest = diff_expr_ttest(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    group1="T-cell",
    group2="B-cell",
    layer_name="log",
    equal_var=False,  # Welch's t-test (recommended)
    missing_strategy="ignore",
)

print("T-test results summary:")
print(f"  Method: {result_ttest.method}")
print(f"  Features tested: {len(result_ttest.feature_ids)}")
print(f"  Significant features (FDR < 0.05): {(result_ttest.p_values_adj < 0.05).sum()}")
print(f"  Significant with |log2FC| > 1: {((result_ttest.p_values_adj < 0.05) & (np.abs(result_ttest.log2_fc) > 1)).sum()}")

### 3.1 Exploring T-Test Results

In [None]:
# Convert results to DataFrame
results_df = result_ttest.to_dataframe()

print("Top 10 significant features:")
print("=" * 80)
print(results_df.filter(pl.col("p_value_adj") < 0.05).head(10))

### 3.2 Volcano Plot

A volcano plot visualizes the relationship between fold change and statistical significance.

In [None]:
# Create volcano plot
fig, ax = plt.subplots(figsize=(10, 8))

# Prepare data
log2fc = result_ttest.log2_fc
neg_log_pval = -np.log10(result_ttest.p_values_adj)

# Define significance thresholds
pval_threshold = 0.05
fc_threshold = 1.0

# Color points based on significance
colors = np.full(len(log2fc), 'gray', dtype=object)
is_sig = result_ttest.p_values_adj < pval_threshold
colors[is_sig & (log2fc > fc_threshold)] = 'red'  # Up-regulated
colors[is_sig & (log2fc < -fc_threshold)] = 'blue'  # Down-regulated

# Plot scatter
ax.scatter(log2fc, neg_log_pval, c=colors, alpha=0.6, s=20)

# Add threshold lines
ax.axhline(-np.log10(pval_threshold), color='black', linestyle='--', linewidth=0.5)
ax.axvline(fc_threshold, color='black', linestyle='--', linewidth=0.5)
ax.axvline(-fc_threshold, color='black', linestyle='--', linewidth=0.5)

# Labels
ax.set_xlabel('log2 Fold Change (T-cell vs B-cell)')
ax.set_ylabel('-log10 Adjusted P-value')
ax.set_title('Volcano Plot: T-cell vs B-cell')

# Add legend
from matplotlib.patches import Patch
legend_elements = [
    Patch(facecolor='red', label=f'Up-regulated ({(is_sig & (log2fc > fc_threshold)).sum()})'),
    Patch(facecolor='blue', label=f'Down-regulated ({(is_sig & (log2fc < -fc_threshold)).sum()})'),
    Patch(facecolor='gray', label=f'Not significant ({(~is_sig).sum()}')
]
ax.legend(handles=legend_elements, loc='upper right')

plt.tight_layout()
plt.savefig('tutorial_output/volcano_ttest.png', dpi=300)
plt.show()

print("Volcano plot saved to: tutorial_output/volcano_ttest.png")

### 3.3 Effect Size Interpretation

**Cohen's d** measures the standardized difference between groups:
- |d| < 0.2: Small effect
- |d| < 0.5: Medium effect
- |d| >= 0.8: Large effect

In [None]:
# Analyze effect sizes
effect_sizes = result_ttest.effect_sizes

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Histogram of effect sizes
valid_es = effect_sizes[~np.isnan(effect_sizes)]
axes[0].hist(valid_es, bins=50, edgecolor='black', color='steelblue')
axes[0].axvline(0, color='red', linestyle='--', linewidth=1)
axes[0].axvline(0.2, color='orange', linestyle='--', linewidth=0.5, label='Small effect')
axes[0].axvline(0.5, color='orange', linestyle='--', linewidth=0.5, label='Medium effect')
axes[0].axvline(0.8, color='orange', linestyle='--', linewidth=0.5, label='Large effect')
axes[0].set_xlabel("Cohen's d")
axes[0].set_ylabel('Frequency')
axes[0].set_title("Distribution of Effect Sizes")

# Effect size vs significance
colors = np.full(len(log2fc), 'gray', dtype=object)
is_sig = result_ttest.p_values_adj < 0.05
colors[is_sig] = 'red'

axes[1].scatter(effect_sizes, -np.log10(result_ttest.p_values_adj), c=colors, alpha=0.5, s=15)
axes[1].axhline(-np.log10(0.05), color='black', linestyle='--', linewidth=0.5)
axes[1].axvline(0, color='red', linestyle='--', linewidth=1)
axes[1].set_xlabel("Cohen's d (Effect Size)")
axes[1].set_ylabel('-log10 Adjusted P-value')
axes[1].set_title('Effect Size vs Significance')

plt.tight_layout()
plt.savefig('tutorial_output/effect_size_analysis.png', dpi=300)
plt.show()

print("Effect size analysis saved to: tutorial_output/effect_size_analysis.png")
print(f"\nEffect size summary:")
print(f"  Large effects (|d| >= 0.8): {(np.abs(effect_sizes) >= 0.8).sum()}")
print(f"  Medium effects (0.5 <= |d| < 0.8): {((np.abs(effect_sizes) >= 0.5) & (np.abs(effect_sizes) < 0.8)).sum()}")
print(f"  Small effects (0.2 <= |d| < 0.5): {((np.abs(effect_sizes) >= 0.2) & (np.abs(effect_sizes) < 0.5)).sum()}")
print(f"  Negligible effects (|d| < 0.2): {(np.abs(effect_sizes) < 0.2).sum()}")

## 4. Non-Parametric Tests

When data doesn't meet normality assumptions, use non-parametric tests.

### 4.1 Mann-Whitney U Test

A non-parametric test that doesn't assume normality. Tests whether samples from one group tend to have higher values than the other.

In [None]:
# Perform Mann-Whitney U test
result_mw = diff_expr_mannwhitney(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    group1="T-cell",
    group2="B-cell",
    layer_name="log",
    alternative="two-sided",
    missing_strategy="ignore",
)

print("Mann-Whitney U test results:")
print(f"  Features tested: {len(result_mw.feature_ids)}")
print(f"  Significant features (FDR < 0.05): {(result_mw.p_values_adj < 0.05).sum()}")
print(f"  Significant with |log2FC| > 1: {((result_mw.p_values_adj < 0.05) & (np.abs(result_mw.log2_fc) > 1)).sum()}")

### 4.2 Permutation Test

A resampling-based test that makes no distributional assumptions. Useful for small sample sizes.

In [None]:
# Perform permutation test
# Note: n_permutations=100 for speed; use 1000+ for real analysis
result_perm = diff_expr_permutation_test(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    group1="T-cell",
    group2="B-cell",
    layer_name="log",
    n_permutations=500,  # Increase for higher precision
    alternative="two-sided",
    missing_strategy="ignore",
    random_seed=42,
)

print("Permutation test results:")
print(f"  Features tested: {len(result_perm.feature_ids)}")
print(f"  Significant features (FDR < 0.05): {(result_perm.p_values_adj < 0.05).sum()}")
print(f"  Significant with |log2FC| > 1: {((result_perm.p_values_adj < 0.05) & (np.abs(result_perm.log2_fc) > 1)).sum()}")

### 4.3 Comparing Test Results

In [None]:
# Compare p-values from different tests
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# T-test vs Mann-Whitney
valid_mask = ~np.isnan(result_ttest.p_values_adj) & ~np.isnan(result_mw.p_values_adj)
axes[0].scatter(result_ttest.p_values_adj[valid_mask], 
            result_mw.p_values_adj[valid_mask], 
            alpha=0.5, s=10)
axes[0].plot([0, 1], [0, 1], 'r--', linewidth=1)
axes[0].set_xlabel('T-test Adjusted P-value')
axes[0].set_ylabel('Mann-Whitney Adjusted P-value')
axes[0].set_title('T-test vs Mann-Whitney P-values')

# T-test vs Permutation
valid_mask2 = ~np.isnan(result_ttest.p_values_adj) & ~np.isnan(result_perm.p_values_adj)
axes[1].scatter(result_ttest.p_values_adj[valid_mask2], 
            result_perm.p_values_adj[valid_mask2], 
            alpha=0.5, s=10)
axes[1].plot([0, 1], [0, 1], 'r--', linewidth=1)
axes[1].set_xlabel('T-test Adjusted P-value')
axes[1].set_ylabel('Permutation Adjusted P-value')
axes[1].set_title('T-test vs Permutation P-values')

plt.tight_layout()
plt.savefig('tutorial_output/test_comparison.png', dpi=300)
plt.show()

print("Test comparison saved to: tutorial_output/test_comparison.png")

## 5. Multi-Group Comparison: ANOVA

When comparing more than two groups, use ANOVA (parametric) or Kruskal-Wallis (non-parametric).

In [None]:
# Perform one-way ANOVA
result_anova = diff_expr_anova(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    layer_name="log",
    missing_strategy="ignore",
)

print("ANOVA results:")
print(f"  Number of groups: {result_anova.params['n_groups']}")
print(f"  Groups: {result_anova.params['groups']}")
print(f"  Features tested: {len(result_anova.feature_ids)}")
print(f"  Significant features (FDR < 0.05): {(result_anova.p_values_adj < 0.05).sum()}")

### 5.1 Visualizing ANOVA Results

In [None]:
# ANOVA results visualization
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# F-statistic distribution
f_stats = result_anova.test_statistics[~np.isnan(result_anova.test_statistics)]
axes[0].hist(f_stats, bins=50, edgecolor='black', color='lightgreen')
axes[0].set_xlabel('F-statistic')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Distribution of F-statistics')

# P-value distribution
pvals = result_anova.p_values_adj[~np.isnan(result_anova.p_values_adj)]
axes[1].hist(pvals, bins=50, edgecolor='black', color='coral')
axes[1].axvline(0.05, color='red', linestyle='--', linewidth=1)
axes[1].set_xlabel('Adjusted P-value')
axes[1].set_ylabel('Frequency')
axes[1].set_title('Distribution of Adjusted P-values')

plt.tight_layout()
plt.savefig('tutorial_output/anova_results.png', dpi=300)
plt.show()

print("ANOVA results saved to: tutorial_output/anova_results.png")

### 5.2 Kruskal-Wallis Test (Non-Parametric ANOVA)

In [None]:
# Perform Kruskal-Wallis test
result_kw = diff_expr_kruskal(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    layer_name="log",
    missing_strategy="ignore",
)

print("Kruskal-Wallis results:")
print(f"  Features tested: {len(result_kw.feature_ids)}")
print(f"  Significant features (FDR < 0.05): {(result_kw.p_values_adj < 0.05).sum()}")

## 6. Paired Sample Analysis

For paired/matched samples (e.g., before-after treatment), use the paired t-test.

In [None]:
# Simulate paired data by creating a patient_id column
# In real data, this would come from your experimental design
import polars as pl

# Create simulated pairs (same number of samples in each batch)
n_samples = container.n_samples
n_pairs = n_samples // 2

# Add pair_id to obs
pair_ids = []
for i in range(n_samples):
    pair_ids.append(f"Pair_{i % n_pairs}")

container.obs = container.obs.with_columns(
    pl.Series("pair_id", pair_ids)
)

print("Added pair_id column to obs for paired analysis.")
print(f"Number of unique pairs: {len(container.obs['pair_id'].unique())}")

In [None]:
# Perform paired t-test (comparing batches as paired samples)
try:
    result_paired = diff_expr_paired_ttest(
        container=container,
        assay_name="proteins",
        group_col="batch",
        pair_id_col="pair_id",
        layer_name="log",
        missing_strategy="ignore",
    )
    
    print("Paired t-test results:")
    print(f"  Number of pairs: {result_paired.params['n_pairs']}")
    print(f"  Features tested: {len(result_paired.feature_ids)}")
    print(f"  Significant features (FDR < 0.05): {(result_paired.p_values_adj < 0.05).sum()}")
except Exception as e:
    print(f"Paired t-test not available: {e}")
    print("(Paired test requires complete pairs across all groups)")

## 7. Multiple Testing Correction

When testing many features, control the false discovery rate (FDR) using p-value adjustment.

In [None]:
# Compare different correction methods
from scptensor.diff_expr import adjust_fdr

# Get raw p-values from t-test
raw_pvals = result_ttest.p_values

# Apply different corrections
methods = ['bh', 'by', 'bonferroni', 'holm', 'hommel']
corrections = {}

for method in methods:
    corrections[method] = adjust_fdr(raw_pvals, method=method)

# Visualize comparison
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Count significant features at different thresholds
alphas = [0.01, 0.05, 0.1, 0.2]
x = np.arange(len(alphas))
width = 0.15

for i, method in enumerate(methods):
    counts = [(corrections[method] < alpha).sum() for alpha in alphas]
    axes[0].bar(x + i * width, counts, width, label=method.upper())

axes[0].set_xlabel('FDR Threshold')
axes[0].set_ylabel('Number of Significant Features')
axes[0].set_xticks(x + width * 2)
axes[0].set_xticklabels([str(a) for a in alphas])
axes[0].set_title('Significant Features by Correction Method')
axes[0].legend()

# P-value inflation plot
sorted_pvals = np.sort(raw_pvals[~np.isnan(raw_pvals)])
expected = np.linspace(0, 1, len(sorted_pvals))

axes[1].scatter(expected, sorted_pvals, s=5, alpha=0.5, label='Raw p-values')
axes[1].plot([0, 1], [0, 1], 'r--', linewidth=1, label='Expected (null)')
axes[1].set_xlabel('Expected p-value (Uniform)')
axes[1].set_ylabel('Observed p-value')
axes[1].set_title('P-value Inflation Plot')
axes[1].legend()

plt.tight_layout()
plt.savefig('tutorial_output/fdr_correction.png', dpi=300)
plt.show()

print("FDR correction comparison saved to: tutorial_output/fdr_correction.png")

print("\nSignificant features (FDR < 0.05) by method:")
for method in methods:
    n_sig = (corrections[method] < 0.05).sum()
    print(f"  {method.upper():12s}: {n_sig:4d} features")

## 8. Homoscedasticity Testing

Before using parametric tests, check if variances are equal across groups.

In [None]:
# Test for equality of variances (Levene's test)
homo_results = check_homoscedasticity(
    container=container,
    assay_name="proteins",
    group_col="cell_type",
    layer_name="log",
    test_type="levene",
    center="median",
)

print("Homoscedasticity test results (Levene's test):")
print(homo_results.head(10))

# Count features with unequal variances
n_hetero = (homo_results.filter(pl.col("p_value_adj") < 0.05)).shape[0]
print(f"\nFeatures with unequal variances (FDR < 0.05): {n_hetero}")
print(f"This supports using Welch's t-test (equal_var=False) over Student's t-test.")

## 9. Comprehensive DE Analysis Workflow

Putting it all together: a complete DE analysis pipeline.

In [None]:
# Complete workflow function
def run_de_analysis(container, group_col, group1, group2, alpha=0.05, fc_threshold=1.0):
    """Run complete differential expression analysis."""
    
    # Run t-test
    result = diff_expr_ttest(
        container=container,
        assay_name="proteins",
        group_col=group_col,
        group1=group1,
        group2=group2,
        layer_name="log",
        equal_var=False,
    )
    
    # Get significant features
    sig_df = result.get_significant(alpha=alpha, min_log2_fc=fc_threshold)
    
    return result, sig_df

# Run analysis
result, sig_features = run_de_analysis(
    container, 
    group_col="cell_type",
    group1="T-cell",
    group2="B-cell",
    alpha=0.05,
    fc_threshold=1.0
)

print("=" * 60)
print("DIFFERENTIAL EXPRESSION ANALYSIS SUMMARY")
print("=" * 60)
print(f"\nComparison: {result.params['group1']} vs {result.params['group2']}")
print(f"Method: {result.method}")
print(f"Features tested: {len(result.feature_ids)}")
print(f"\nSignificant features (FDR < 0.05, |log2FC| > 1): {len(sig_features)}")

# Print top significant features
print("\nTop 10 significant features:")
print(sig_features.head(10))

## 10. Exporting Results

In [None]:
# Export results to CSV
import os

# Create output directory
os.makedirs('tutorial_output', exist_ok=True)

# Save full results
full_results = result.to_dataframe()
full_results.write_csv('tutorial_output/de_results_full.csv')
print("Full results saved to: tutorial_output/de_results_full.csv")

# Save significant results only
sig_features.write_csv('tutorial_output/de_results_significant.csv')
print("Significant results saved to: tutorial_output/de_results_significant.csv")

# Print summary statistics
print("\nSummary:")
print(f"  Total features: {len(full_results)}")
print(f"  Tested features: {full_results.filter(pl.col('p_value').is_not_null()).shape[0]}")
print(f"  Significant (FDR < 0.05): {full_results.filter(pl.col('p_value_adj') < 0.05).shape[0]}")
print(f"  Up-regulated: {full_results.filter((pl.col('p_value_adj') < 0.05) & (pl.col('log2_fc') > 1)).shape[0]}")
print(f"  Down-regulated: {full_results.filter((pl.col('p_value_adj') < 0.05) & (pl.col('log2_fc') < -1)).shape[0]}")

## Summary

In this tutorial, you learned:

### Statistical Tests:
1. **T-test (Welch's)**: Two-group comparison with unequal variance assumption
2. **Mann-Whitney U**: Non-parametric two-group comparison
3. **Permutation test**: Resampling-based non-parametric test
4. **ANOVA**: Multi-group comparison (parametric)
5. **Kruskal-Wallis**: Multi-group comparison (non-parametric)
6. **Paired t-test**: For matched/paired samples

### Key Concepts:
- **Effect Size**: Cohen's d measures standardized difference between groups
- **Fold Change**: Log2 fold change indicates magnitude of difference
- **FDR Correction**: Benjamini-Hochberg (BH) controls false discovery rate
- **Homoscedasticity**: Test equality of variances before parametric tests

### Choosing a Test:
- **Two groups, normal distribution**: Welch's t-test
- **Two groups, non-normal**: Mann-Whitney U or permutation test
- **Multiple groups**: ANOVA or Kruskal-Wallis
- **Paired samples**: Paired t-test

### Best Practices:
- Always apply appropriate normalization before DE analysis
- Check assumptions (normality, homoscedasticity) when using parametric tests
- Use FDR correction for multiple testing
- Consider both statistical significance (p-value) and effect size
- Visualize results with volcano plots

### Next Steps:
- **Tutorial 06**: Advanced Quality Control
- **Tutorial 07**: Feature Selection
- **Tutorial 08**: Custom Pipeline