In [1]:
import glob
import re
import polars as pl
import pyarrow.dataset as ds
import pyarrow as pa
import os
import subprocess
from io import StringIO
from concurrent.futures import ThreadPoolExecutor
# from concurrent.futures import ProcessPoolExecutor
# from multiprocessing import get_context
# from pathos.multiprocessing import ProcessPool as Pool
from tqdm import tqdm

# os.environ["POLARS_MAX_THREADS"] = "4"

Message from Huong:

> Here's the main script for computing signal (can be ATAC, DNase,...) zscores that Jill made for ENCODE4 cCRE pipeline https://github.com/weng-lab/ENCODE-cCREs/blob/master/Version-4/cCRE-Pipeline/4_Calculate-Signal-Zscores.sh
>
> Other scripts you need are Retrieve-Signal.sh and log-zscore-normalization.py in ~/GitHub/ENCODE-cCREs/Version-4/cCRE-Pipeline/Toolkit
>
> Just adapt these scripts to your working directories. Anchors files are at /data/projects/encode/Registry/V4/{GRCh38,mm10}/{GRCh38,mm10}-Anchors.bed cCREs files are at /data/projects/encode/Registry/V4/{GRCh38,mm10}/{GRCh38,mm10}-cCREs.bed

In [2]:
GENOME = "GRCh38"
MODE = "ATAC"
OUTDIR = "./run_ccre_pipeline"
FILES_CATALOG = os.path.join(OUTDIR, f"{MODE}-List.txt")
SIGNAL_OUT = os.path.join(OUTDIR, "signal-output")
BIGWIGAVERAGEOVERBED = os.path.join(OUTDIR, "bigwigaverageoverbed-output")
ANCHORS = "/zata/data/zlab/zusers/moorej3/moorej.ghpcc.project/ENCODE/Encyclopedia/V7/Registry/V7-hg38/hg38-Anchors.bed"
WIDTH = 0 if MODE in ["DNase", "CTCF", "POL2", "ATAC"] else 500
LITTLE = os.path.join(OUTDIR, 'little.txt')

os.makedirs(OUTDIR, exist_ok=True)
os.makedirs(SIGNAL_OUT, exist_ok=True)
os.makedirs(BIGWIGAVERAGEOVERBED, exist_ok=True)

print(GENOME, MODE, OUTDIR, FILES_CATALOG, SIGNAL_OUT, BIGWIGAVERAGEOVERBED, ANCHORS, WIDTH, sep="\n")

GRCh38
ATAC
./run_ccre_pipeline
./run_ccre_pipeline/ATAC-List.txt
./run_ccre_pipeline/signal-output
./run_ccre_pipeline/bigwigaverageoverbed-output
/zata/data/zlab/zusers/moorej3/moorej.ghpcc.project/ENCODE/Encyclopedia/V7/Registry/V7-hg38/hg38-Anchors.bed
0


In [3]:
peaks = pl.read_csv(ANCHORS, separator='\t', has_header=False, new_columns=['chrom', 'start', 'end', 'name'])

little = peaks.with_columns(
    pl.col('start') - WIDTH,
    pl.col('end') + WIDTH,
).with_columns(
    pl.col('start').clip(lower_bound=0)
).unique().sort('chrom', 'start')

little.write_csv(LITTLE, separator='\t', include_header=False)

In [4]:
bigwig_files = glob.glob("../results/macs3_signal/*.bigWig")
pattern = r"../results/macs3_signal/(.*)-GRCH38.bigWig"

records = []

for file in bigwig_files:
    records.append(dict(sample=re.search(pattern, file).group(1), bigwig=file))
    
input_df = pl.from_records(records).sort('sample')
args = list(zip(input_df['sample'].to_list(), input_df['bigwig'].to_list(), [LITTLE] * input_df.shape[0]))
input_df.write_csv(FILES_CATALOG, separator="\t", include_header=False)

In [None]:
def retrieve_signal(args):
    sample, bigwig_file, little_bed_file = args
    bwavgoverbed_output = subprocess.run(["bigwigaverageoverbed", "-t", "1", bigwig_file, little_bed_file, "/dev/stdout"], check=True, capture_output=True, text=True)
    bed_file = StringIO(bwavgoverbed_output.stdout)
    bed_df = pl.read_csv(bed_file, separator="\t", has_header=False, new_columns=["name", "size", "covered", "sum", "mean0", "mean"])
    bed_df_processed = bed_df.with_columns(
        pl.when((pl.col("covered") == 0) | (pl.col("mean0") == 0))
        .then(pl.lit(0))
        .otherwise(pl.col("mean0").log10())
        .alias("log10(mean0)")
    ).with_columns(
        pl.when((pl.col("covered") == 0) | (pl.col("mean0") == 0))
        .then(pl.lit(-10))
        .otherwise((pl.col("log10(mean0)") - pl.col("log10(mean0)").mean()) / pl.col("log10(mean0)").std())
        .alias("zscore")
    ).sort(
        "zscore",
        descending=True
    ).with_columns(
        pl.col('zscore').rank(method="min", descending=True).alias("rank")
    ).sort('name')
    outfile = os.path.join(SIGNAL_OUT, sample + ".txt")
    bed_df_processed.write_csv(outfile, include_header=False, separator="\t")
    return {"sample": sample, "no_ccres_with_zscore_gt_1.64": bed_df_processed.filter(pl.col("zscore") > 1.64).count()['zscore'].item()}

Output schema?

- `name`: name field from bed, which should be unique
- `size`: size of bed (sum of exon sizes)
- `covered`: # bases within exons covered by bigWig
- `sum`: sum of values over all bases covered
- `mean0`: average over bases with non-covered bases counting as zeroes
- `mean`: average over just covered base
- `log(mean0)`: log of mean0
- `zscore`: (log(mean0) - log(mean0).mean()) / log(mean0).std()
- `rank`: rank of zscore

```python
schema = {
    'name': pl.Utf8,           # Unique identifier from BED file
    'size': pl.UInt32,         # Size of BED region (sum of exon sizes)
    'covered': pl.UInt32,      # Number of bases covered by bigWig
    'sum': pl.Float64,         # Sum of values over all covered bases
    'mean0': pl.Float64,       # Average (non-covered bases = 0)
    'mean': pl.Float64,        # Average over only covered bases
    'log10(mean0)': pl.Float64,# log10(mean0)
    'zscore': pl.Float64,      # (log_mean0 - mean) / std
    'rank': pl.UInt32          # Rank by zscore (descending)
}
```

In [6]:
# with ThreadPoolExecutor(max_workers=164) as executor:
# # with ProcessPoolExecutor(max_workers=96) as executor:
# # with Pool(96) as executor:
#     records = list(tqdm(executor.map(retrieve_signal, args), total=len(args), desc="Retrieving Signal"))

# # records = tqdm([retrieve_signal(arg) for arg in args], total=len(args), desc="Retrieving Signal")

In [7]:
# def run_bigwigaverageoverbed(args):
#     sample, bigwig_file, little_bed_file = args
    
#     result = subprocess.run(
#         ["bigwigaverageoverbed", "-t", "2", bigwig_file, little_bed_file, "/dev/stdout"],
#         capture_output=True,
#         check=True
#     )
#     bed_df = (
#         pl.read_csv(
#             result.stdout,
#             separator="\t",
#             has_header=False,
#             new_columns=["name", "size", "covered", "sum", "mean0", "mean"]
#         )
#         .with_columns(pl.lit(sample).alias("sample"))
#     )

#     new_path = os.path.join(BIGWIGAVERAGEOVERBED, sample + ".txt")
#     bed_df.write_csv(new_path, include_header=False, separator="\t")

#     return {"sample": sample, "path": new_path}

In [9]:
def run_bigwigaverageoverbed(args):
    sample, bigwig_file, little_bed_file = args
    output_file = os.path.join(BIGWIGAVERAGEOVERBED, f"sample={sample}", f"{sample}.csv")
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    cmd = f"""
    echo "name,size,covered,sum,mean0,mean" > {output_file} && \
    bigwigaverageoverbed -t 2 {bigwig_file} {little_bed_file} /dev/stdout | tr '\\t' ',' >> {output_file}
    """
    subprocess.run(
        cmd,
        shell=True,
        check=True
    )

In [10]:
with ThreadPoolExecutor(max_workers=164) as executor:
# with ProcessPoolExecutor(max_workers=96) as executor:
# with Pool(96) as executor:
    # records = list(tqdm(executor.map(run_bigwigaverageoverbed, args), total=len(args), desc="Running bigwigaverageoverbed"))
    list(tqdm(executor.map(run_bigwigaverageoverbed, args), total=len(args), desc="Running bigwigaverageoverbed"))

# records = tqdm([retrieve_signal(arg) for arg in args], total=len(args), desc="Retrieving Signal")

Running bigwigaverageoverbed: 100%|██████████| 154/154 [00:33<00:00,  4.61it/s]


In [None]:
# multi_bed_df = pl.read_csv(os.path.join(BIGWIGAVERAGEOVERBED, "*.txt"), has_header=False, separator="\t", new_columns=["name", "size", "covered", "sum", "mean0", "mean", "sample"])

schema = pa.schema([
    ("name", pa.string()),
    ("size", pa.int64()),
    ("covered", pa.int64()),
    ("sum", pa.float64()),
    ("mean0", pa.float64()),
    ("mean", pa.float64()),
    ("sample", pa.string())
])

dataset = ds.dataset(BIGWIGAVERAGEOVERBED, format='csv', partitioning='hive', schema=schema)
multi_bed_df = pl.scan_pyarrow_dataset(dataset)
# multi_bed_df = pl.scan_csv(BIGWIGAVERAGEOVERBED, has_header=False, separator="\t", new_columns=["name", "size", "covered", "sum", "mean0", "mean", "sample"])

multi_bed_df_processed = multi_bed_df.with_columns(
    pl.when((pl.col("covered") == 0) | (pl.col("mean0") == 0))
    .then(pl.lit(0))
    .otherwise(pl.col("mean0").log10())
    .over('sample').alias("log10(mean0)")
).with_columns(
    pl.when((pl.col("covered") == 0) | (pl.col("mean0") == 0))
    .then(pl.lit(-10))
    .otherwise((pl.col("log10(mean0)") - pl.col("log10(mean0)").mean()) / pl.col("log10(mean0)").std())
    .over('sample').alias("zscore")
).sort(
    "zscore",
    descending=True
).with_columns(
    pl.col('zscore').rank(method="min", descending=True).over('sample').alias("rank")
).sort('sample', 'name').collect()

display(multi_bed_df_processed)

name,size,covered,sum,mean0,mean,sample,log10(mean0),zscore,rank
str,i64,i64,f64,f64,f64,str,f64,f64,u32
"""EH38D0001261""",291,291,1527.0,5.247,5.247,"""EA100001""",0.719911,1.00846,393360
"""EH38D0001472""",314,314,1492.0,4.752,4.752,"""EA100001""",0.676876,0.902282,496040
"""EH38D0002482""",341,341,3707.0,10.871,10.871,"""EA100001""",1.036269,1.789002,39076
"""EH38D0004420""",223,223,918.0,4.117,4.117,"""EA100001""",0.614581,0.748582,664624
"""EH38D0004750""",201,201,508.0,2.527,2.527,"""EA100001""",0.402605,0.225581,1328020
…,…,…,…,…,…,…,…,…,…
"""EH38F0086744""",163,163,823.0,5.049,5.049,"""EA100154""",0.703205,0.254985,1297489
"""EH38F0086745""",342,342,1241.0,3.629,3.629,"""EA100154""",0.559787,-0.184023,1873464
"""EH38F0086746""",320,320,570.0,1.781,1.781,"""EA100154""",0.250664,-1.130258,2605437
"""EH38F0086747""",350,350,2607.0,7.449,7.449,"""EA100154""",0.872098,0.77197,618278


In [None]:
# records_df = pl.from_records(records)
# records_df.write_csv(os.path.join(OUTDIR, "ccres_qc.tsv"), separator="\t")
# print(*records_df.sort('sample')['no_ccres_with_zscore_gt_1.64'].to_list(), sep="\n")

In [None]:
multi_bed_zscore_gt_threshold = multi_bed_df_processed.filter(pl.col("zscore") > 1.64).group_by("sample").len('zscore').sort('sample')
display(multi_bed_zscore_gt_threshold)

In [None]:
schema = {
    'name': pl.Utf8,           # Unique identifier from BED file
    'size': pl.UInt32,         # Size of BED region (sum of exon sizes)
    'covered': pl.UInt32,      # Number of bases covered by bigWig
    'sum': pl.Float64,         # Sum of values over all covered bases
    'mean0': pl.Float64,       # Average (non-covered bases = 0)
    'mean': pl.Float64,        # Average over only covered bases
    'log10(mean0)': pl.Float64,   # log10(mean0)
    'zscore': pl.Float64,      # (log10(mean0) - log10(mean)) / std(log10(mean0))
    'rank': pl.UInt32          # Rank by zscore (descending)
}

all_signals = pl.scan_csv(os.path.join(SIGNAL_OUT, "*.txt"), separator="\t", has_header=False, schema=schema)
maxz_df = all_signals.select('name', 'zscore').group_by('name').agg(pl.col('zscore').max().alias('maxz')).collect()
display(maxz_df)

In [None]:
maxz_df.sort('name').write_csv(os.path.join(OUTDIR, f'hg38-{MODE}-maxz.txt'), separator='\t', include_header=False)