# ATAC-seq Peak Calling Pipeline

This notebook provides a complete pipeline for ATAC-seq peak calling:

1. **Step 1**: Convert fragment files to Tn5 cut-site BED files
2. **Step 2**: Run MACS3 peak calling on the cut-site files

## Overview

For ATAC-seq data, each fragment represents DNA between two Tn5 transposase insertion sites. This pipeline:
- Extracts the cut sites (both ends of each fragment)
- Calls peaks using MACS3 with ATAC-seq optimized parameters

## Configuration

Set all parameters here before running the pipeline.

In [1]:
import os
import re
import subprocess
import json
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from functools import partial

# =============================================================================
# CONFIGURATION - Modify these parameters as needed
# =============================================================================

# Species selection
SPECIES = "Macaque"  # Options: Human, Bonobo, Macaque, Chimpanzee, Gorilla, Marmoset

# Effective genome sizes for each species
EFFECTIVE_GENOME_SIZES = {
    'Bonobo': 2595269547,
    'Macaque': 2653677440,
    'Chimpanzee': 2792339170,
    'Gorilla': 2661668758,
    'Marmoset': 2597026658,
    'Human': 2913022398  # value from macs3 site (deeptools)
}

# Directory paths
BASE_DIR = "/cluster/home/jjanssens/jjans/analysis/adult_intestine/peaks"
FRAGMENTS_INPUT_DIR = f"../atac/consensus_peak_calling_{SPECIES}_filter/pseudobulk_bed_files/"
CUTSITES_OUTPUT_DIR = os.path.join(os.getcwd(), f"fragment_files/{SPECIES}")
PEAKS_OUTPUT_DIR = os.path.join(os.getcwd(), f"consensus_peak_calling_{SPECIES}")

# MACS3 executable path
MACS3_PATH = "/cluster/project/treutlein/jjans/software/miniforge3/envs/scenicplus/bin/macs3"

# Parallel processing
CUTSITE_WORKERS = 8   # Workers for fragment conversion (I/O bound)
MACS3_WORKERS = 15    # Workers for MACS3 peak calling (CPU bound)

print(f"Species: {SPECIES}")
print(f"Genome size: {EFFECTIVE_GENOME_SIZES[SPECIES]:,}")
print(f"Cut-sites output: {CUTSITES_OUTPUT_DIR}")
print(f"Peaks output: {PEAKS_OUTPUT_DIR}")

Species: Macaque
Genome size: 2,653,677,440
Cut-sites output: /cluster/project/treutlein/USERS/jjans/analysis/adult_intestine/peaks/fragment_files/Macaque
Peaks output: /cluster/project/treutlein/USERS/jjans/analysis/adult_intestine/peaks/consensus_peak_calling_Macaque


## MACS3 Parameters

Configure MACS3 peak calling parameters. These are optimized for ATAC-seq data.

Documentation: https://github.com/macs3-project/MACS/blob/master/docs/callpeak.md

In [2]:
# =============================================================================
# MACS3 PARAMETERS
# =============================================================================

MACS3_PARAMS = {
    # Input format: BED, BAM, SAM, BEDPE, etc.
    "format": "BED",
    
    # q-value (minimum FDR) cutoff for peak detection
    "qvalue": 0.01,
    
    # Shift reads by this amount (negative for ATAC-seq to center on cut site)
    "shift": -73,
    
    # Extend reads to this fragment size
    "extsize": 146,
    
    # How to handle duplicate reads: "auto", "all", or integer
    "keep_dup": "all",
    
    # Minimum length of peak region
    "min_length": 200,
    
    # Boolean flags
    "nomodel": True,       # Skip model building, use shift/extsize directly
    "call_summits": True,  # Call peak summits (required for narrowPeak output)
    "nolambda": True,      # Use fixed background lambda
}

print("MACS3 Parameters:")
for key, value in MACS3_PARAMS.items():
    print(f"  {key}: {value}")

MACS3 Parameters:
  format: BED
  qvalue: 0.01
  shift: -73
  extsize: 146
  keep_dup: all
  min_length: 200
  nomodel: True
  call_summits: True
  nolambda: True


---
# Step 1: Convert Fragments to Cut-Sites

Convert paired-end fragment files to single-nucleotide Tn5 cut-site BED files.

For each fragment (chr, start, end), we extract:
- **5' cut site**: (chr, start, start+1) with + strand
- **3' cut site**: (chr, end-1, end) with - strand

In [3]:
def convert_fragments_to_cutsites(input_fragments: str, output_bed: str) -> dict:
    """
    Convert a paired-end fragments file into a BED file of Tn5 cut sites.
    
    For ATAC-seq, each fragment has two Tn5 insertion sites:
    - 5' end (start position) ‚Üí + strand
    - 3' end (end position - 1) ‚Üí - strand
    
    Input format:  chr  start  end  [barcode]  [count]
    Output format: BED6 (chr, start, end, name, score, strand)
    """
    sample_name = Path(input_fragments).name.split('.')[0]
    
    # awk command to extract both cut sites per fragment
    awk_cmd = r"""awk -v OFS='\t' '{
        print $1, $2, $2+1, ".", ".", "+";
        print $1, $3-1, $3, ".", ".", "-"
    }'"""
    
    cmd = f"zcat {input_fragments} | {awk_cmd} | gzip > {output_bed}"
    
    try:
        subprocess.run(
            cmd, 
            shell=True, 
            check=True, 
            executable='/bin/bash',
            capture_output=True,
            text=True
        )
        
        output_size = os.path.getsize(output_bed) / (1024 * 1024)  # MB
        
        return {
            "sample": sample_name,
            "status": "success",
            "output_size_mb": round(output_size, 2),
            "message": f"‚úÖ {sample_name}: {output_size:.1f} MB"
        }
    except subprocess.CalledProcessError as e:
        return {
            "sample": sample_name,
            "status": "error",
            "output_size_mb": 0,
            "message": f"‚ùå {sample_name}: {e.stderr}"
        }


def process_all_fragments(
    input_dir: str,
    output_dir: str,
    max_workers: int = 8,
    pattern: str = r"\.fragments\.tsv\.gz$"
) -> list:
    """
    Process all fragment files in a directory in parallel.
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Find all fragment files
    fragment_files = []
    for entry in os.scandir(input_path):
        if entry.is_file() and re.search(pattern, entry.name):
            fragment_files.append(entry)
    
    if not fragment_files:
        print(f"‚ö†Ô∏è No fragment files found in {input_dir}")
        return []
    
    print(f"üìÇ Found {len(fragment_files)} fragment files")
    print(f"üìÅ Output directory: {output_dir}")
    print(f"üë∑ Workers: {max_workers}")
    print("-" * 60)
    
    # Build job list
    jobs = [(entry.path, str(output_path / entry.name)) for entry in fragment_files]
    
    # Process in parallel
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(convert_fragments_to_cutsites, inp, out): inp 
            for inp, out in jobs
        }
        
        for future in as_completed(futures):
            result = future.result()
            print(result["message"])
            results.append(result)
    
    return results

print("Functions defined for fragment conversion.")

Functions defined for fragment conversion.


In [None]:
# Run fragment to cut-site conversion
cutsite_results = process_all_fragments(
    input_dir=FRAGMENTS_INPUT_DIR,
    output_dir=CUTSITES_OUTPUT_DIR,
    max_workers=CUTSITE_WORKERS
)

# Summary
print("\n" + "=" * 60)
print("STEP 1 SUMMARY - Fragment Conversion")
print("=" * 60)

successful = [r for r in cutsite_results if r["status"] == "success"]
failed = [r for r in cutsite_results if r["status"] == "error"]
total_size = sum(r["output_size_mb"] for r in successful)

print(f"Total files processed: {len(cutsite_results)}")
print(f"  ‚úÖ Successful: {len(successful)}")
print(f"  ‚ùå Failed: {len(failed)}")
print(f"  üíæ Total output size: {total_size:.1f} MB")

üìÇ Found 32 fragment files
üìÅ Output directory: /cluster/project/treutlein/USERS/jjans/analysis/adult_intestine/peaks/fragment_files/Macaque
üë∑ Workers: 8
------------------------------------------------------------
‚úÖ Adipocytes: 2.5 MB
‚úÖ Enteric_neurons: 0.6 MB
‚úÖ Enteric_glia: 8.8 MB
‚úÖ ECs: 11.4 MB
‚úÖ Specialized_Fibroblasts_KCNN3+: 11.3 MB
‚úÖ ICCs: 3.1 MB
‚úÖ EECs: 19.5 MB
‚úÖ BEST4+_cells: 23.4 MB
‚úÖ Monocytes: 8.5 MB
‚úÖ Mesothelial_cells: 13.1 MB
‚úÖ Myofibroblasts: 4.8 MB
‚úÖ Paneth_cells: 5.0 MB
‚úÖ Pericytes: 1.1 MB
‚úÖ Crypt_Fibroblasts_WNT2B+: 52.4 MB
‚úÖ Specialized_Fibroblasts_PCDH9+: 5.3 MB
‚úÖ Specialized_Fibroblasts_RALYL+: 4.4 MB
‚úÖ Colonocytes: 70.0 MB
‚úÖ Plasma_B_cells: 24.0 MB
‚úÖ Lymphatic_ECs: 67.7 MB
‚úÖ Specialized_Fibroblasts_RSPO2_3+: 21.1 MB
‚úÖ Enterocytes: 91.2 MB
‚úÖ Specialized_Fibroblasts_VCAM1+: 8.5 MB
‚úÖ Specialized_Fibroblasts_RSPO3+_only: 20.2 MB
‚úÖ Specialized_Fibroblasts_SYNM+: 19.5 MB
‚úÖ Tuft_cells: 7.5 MB
‚úÖ Villus_Fibroblas

---
# Step 2: MACS3 Peak Calling

Run MACS3 peak calling on the cut-site BED files.

**Output files per sample:**
- `*_peaks.narrowPeak`: BED6+4 format peak calls
- `*_peaks.xls`: Spreadsheet with peak info  
- `*_summits.bed`: Peak summit positions

In [None]:
def build_macs3_command(sample_name, fragment_path, species, out_dir, macs3_path, params):
    """Build MACS3 command with configurable parameters."""
    
    gsize = EFFECTIVE_GENOME_SIZES.get(species)
    if gsize is None:
        raise ValueError(f"Unknown species: {species}. Available: {list(EFFECTIVE_GENOME_SIZES.keys())}")
    
    cmd = [
        macs3_path, "callpeak",
        "--treatment", fragment_path,
        "--name", sample_name,
        "--outdir", out_dir,
        "--format", params["format"],
        "--gsize", str(gsize),
        "--qvalue", str(params["qvalue"]),
        "--shift", str(params["shift"]),
        "--extsize", str(params["extsize"]),
        "--keep-dup", str(params["keep_dup"]),
        "--min-length", str(params["min_length"]),
    ]
    
    if params.get("nomodel"):
        cmd.append("--nomodel")
    if params.get("call_summits"):
        cmd.append("--call-summits")
    if params.get("nolambda"):
        cmd.append("--nolambda")
    
    return cmd


def run_macs3_worker(job, species, out_dir, macs3_path, params):
    """Worker function for parallel MACS3 execution."""
    sample_name, fragment_path = job
    
    cmd = build_macs3_command(sample_name, fragment_path, species, out_dir, macs3_path, params)
    
    print(f"üöÄ Starting: {sample_name}")
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        
        # Count peaks from the narrowPeak file
        narrowpeak_file = os.path.join(out_dir, f"{sample_name}_peaks.narrowPeak")
        peak_count = 0
        if os.path.exists(narrowpeak_file):
            with open(narrowpeak_file, 'r') as f:
                peak_count = sum(1 for _ in f)
        
        return {
            "sample_name": sample_name,
            "status": "success",
            "peak_count": peak_count,
            "message": f"‚úÖ Finished: {sample_name} ({peak_count:,} peaks)"
        }
    except subprocess.CalledProcessError as e:
        return {
            "sample_name": sample_name,
            "status": "error",
            "peak_count": 0,
            "message": f"‚ùå Error in {sample_name}: {e.stderr}"
        }


def run_peak_calling(
    species,
    frag_dir,
    out_dir,
    macs3_path=MACS3_PATH,
    max_workers=15,
    params=None,
    **param_overrides
):
    """
    Run MACS3 peak calling in parallel for all fragment files.
    
    Args:
        species: Species name (must be in EFFECTIVE_GENOME_SIZES)
        frag_dir: Directory with cut-site fragment files
        out_dir: Output directory for peaks
        macs3_path: Path to macs3 executable
        max_workers: Number of parallel workers/cores (default: 15)
        params: Full parameter dict (if None, uses MACS3_PARAMS)
        **param_overrides: Individual parameters to override
    
    Returns:
        List of result dicts containing sample info and peak counts
    """
    # Build final parameters
    final_params = (params if params is not None else MACS3_PARAMS).copy()
    final_params.update(param_overrides)
    
    # Ensure output directory exists
    os.makedirs(out_dir, exist_ok=True)
    
    # Find all fragment files
    fragment_files = [f for f in os.listdir(frag_dir) if f.endswith(".fragments.tsv.gz")]
    
    if not fragment_files:
        print(f"‚ö†Ô∏è No fragment files found in {frag_dir}")
        return []
    
    print(f"üìÇ Found {len(fragment_files)} fragment files for {species}")
    print(f"üìÅ Output directory: {out_dir}")
    print(f"üß¨ Genome size: {EFFECTIVE_GENOME_SIZES[species]:,}")
    print(f"‚öôÔ∏è Parameters: qvalue={final_params['qvalue']}, shift={final_params['shift']}, extsize={final_params['extsize']}, min_length={final_params['min_length']}")
    print(f"üë∑ Workers: {max_workers}")
    print("-" * 60)
    
    # Save parameters to file
    params_file = os.path.join(out_dir, "macs3_parameters.json")
    params_to_save = {
        "species": species,
        "genome_size": EFFECTIVE_GENOME_SIZES[species],
        "macs3_path": macs3_path,
        "max_workers": max_workers,
        "frag_dir": frag_dir,
        "out_dir": out_dir,
        "run_date": datetime.now().isoformat(),
        "macs3_params": final_params
    }
    with open(params_file, 'w') as f:
        json.dump(params_to_save, f, indent=2)
    print(f"üíæ Parameters saved to: {params_file}")
    print("-" * 60)
    
    # Create jobs list
    jobs = [(f.split('.')[0], os.path.join(frag_dir, f)) for f in fragment_files]
    
    # Create worker with fixed arguments
    worker = partial(
        run_macs3_worker,
        species=species,
        out_dir=out_dir,
        macs3_path=macs3_path,
        params=final_params
    )
    
    # Run in parallel
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(worker, jobs))
    
    # Generate peak count report
    report_file = os.path.join(out_dir, "peak_counts_report.tsv")
    with open(report_file, 'w') as f:
        f.write("cell_type\tpeak_count\tstatus\n")
        for result in results:
            f.write(f"{result['sample_name']}\t{result['peak_count']}\t{result['status']}\n")
    print(f"\nüìä Peak count report saved to: {report_file}")
    
    return results

print("Functions defined for MACS3 peak calling.")

In [None]:
# Run MACS3 peak calling
peak_results = run_peak_calling(
    species=SPECIES,
    frag_dir=CUTSITES_OUTPUT_DIR,
    out_dir=PEAKS_OUTPUT_DIR,
    macs3_path=MACS3_PATH,
    max_workers=MACS3_WORKERS,
    # Parameter overrides (uncomment to modify):
    # qvalue=0.05,
    # min_length=150,
)

# Summary
print("\n" + "=" * 60)
print("STEP 2 SUMMARY - Peak Calling Results")
print("=" * 60)

total_peaks = 0
successful = 0
failed = 0

for result in peak_results:
    print(result["message"])
    if result["status"] == "success":
        successful += 1
        total_peaks += result["peak_count"]
    else:
        failed += 1

print("\n" + "-" * 60)
print(f"Total samples processed: {len(peak_results)}")
print(f"  ‚úÖ Successful: {successful}")
print(f"  ‚ùå Failed: {failed}")
print(f"  üìä Total peaks called: {total_peaks:,}")
print(f"\nOutput saved to: {PEAKS_OUTPUT_DIR}")

---
# Summary Report

Display final statistics and output locations.

In [None]:
import pandas as pd

# Load and display peak count report
report_path = os.path.join(PEAKS_OUTPUT_DIR, "peak_counts_report.tsv")

if os.path.exists(report_path):
    df = pd.read_csv(report_path, sep='\t')
    df_sorted = df.sort_values('peak_count', ascending=False)
    
    print("Peak counts per cell type (sorted by count):")
    print("=" * 50)
    display(df_sorted)
    
    print(f"\nTotal peaks: {df['peak_count'].sum():,}")
    print(f"Mean peaks per cell type: {df['peak_count'].mean():,.0f}")
    print(f"Median peaks per cell type: {df['peak_count'].median():,.0f}")
else:
    print("Peak count report not found. Run Step 2 first.")