Skip to content

Commit

Permalink
coverage_table & combine_peaks
Browse files Browse the repository at this point in the history
  • Loading branch information
siebrenf committed Jun 10, 2022
1 parent 5de418d commit 20819ee
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 175 deletions.
310 changes: 183 additions & 127 deletions gimmemotifs/preprocessing.py
Expand Up @@ -5,119 +5,109 @@
# distribution.

""" Data preprocessing to create GimmeMotifs input. """
# Python imports
import os
import sys
import logging
import multiprocessing
import multiprocessing as mp
import os
from tempfile import NamedTemporaryFile
from typing import Iterable

# External imports
import genomepy
import numpy as np
import pysam
from fluff.fluffio import load_heatmap_data # noqa: biofluff
import pandas as pd
import pysam
import qnorm
from fluff.track import Track # noqa: biofluff
from pybedtools import BedTool
from sklearn.preprocessing import scale
import qnorm
from tqdm.auto import tqdm

# gimme imports
from gimmemotifs import mytmpdir
from gimmemotifs.utils import determine_file_type

logger = logging.getLogger("gimme.preprocessing")


def coverage_table(
peakfile,
datafiles,
window,
datafiles: Iterable,
window=200,
log_transform=True,
normalization="none",
top=0,
normalization=None,
top=-1,
topmethod="var",
rmdup=True,
rmrepeats=True,
ncpus=12,
):
for x in datafiles:
if not os.path.isfile(x):
print("ERROR: Data file '{0}' does not exist".format(x))
sys.exit(1)
for x in datafiles:
if ".bam" in x and not os.path.isfile("{0}.bai".format(x)):
"""
Create a dataframe with peaks from the peakfile as index.
For each datafile, add a column with reads in these peaks.
Peaks are normalized to the given window size, and can be normalized,
transformed and filtered as specified.
"""
missing = [f for f in datafiles if not os.path.isfile(f)]
if missing:
print(f"Could not find {len(missing)} files: {','.join(missing)}")
raise FileNotFoundError

for f in datafiles:
if ".bam" in f and not os.path.isfile(f"{f}.bai"):
print(
"Data file '{0}' does not have an index file."
" Creating an index file for {0}.".format(x)
f"Data file '{f}' does not have an index file. "
f"Creating an index file..."
)
pysam.index(x)
pysam.index(x) # noqa

if normalization not in [None, "quantile", "scale"]:
raise ValueError(f"Unknown method '{normalization}' for normalization")
if topmethod not in ["var", "std", "mean", "random"]:
raise ValueError(f"Unknown method '{topmethod}' for selecting regions")

logger.info("Loading data")
data = {}
try:
# Load data in parallel
pool = multiprocessing.Pool(processes=ncpus)
jobs = []
for datafile in datafiles:
jobs.append(
pool.apply_async(
load_heatmap_data,
args=(
peakfile,
datafile,
1,
window // 2,
window // 2,
rmdup,
False,
rmrepeats,
None,
False,
None,
),
)
)
for job in tqdm(jobs):
track, regions, profile, guard = job.get()
data[os.path.splitext(track)[0]] = profile[:, 0]
except Exception as e:
sys.stderr.write("Error loading data in parallel, trying serial\n")
sys.stderr.write("Error: {}\n".format(e))
df = pd.DataFrame()
peak_bed = peakfile2bedfile(peakfile, window=window)
regions = _load_peaks(peak_bed)
if ncpus == 1:
for datafile in tqdm(datafiles):
track, regions, profile, guard = load_heatmap_data(
peakfile,
datafile,
1,
window // 2,
window // 2,
rmdup,
False,
rmrepeats,
None,
False,
None,
)
data[os.path.splitext(track)[0]] = profile[:, 0]
column = _load_reads(peak_bed, datafile, regions, rmdup, rmrepeats)
df = df.merge(column, left_index=True, right_index=True, how="outer")

# Create DataFrame with regions as index
regions = ["{}:{}-{}".format(*region[:3]) for region in regions]
df = pd.DataFrame(data, index=regions)
else:
pool = mp.Pool(processes=ncpus)
try:
jobs = []
for datafile in datafiles:
jobs.append(
pool.apply_async(
_load_reads, (peak_bed, datafile, regions, rmdup, rmrepeats)
)
)
pool.close()
for job in tqdm(jobs):
column = job.get()
df = df.merge(column, left_index=True, right_index=True, how="outer")
pool.join()
except Exception as e:
pool = None # noqa: force garbage collection on orphaned workers
raise e

# normalize & transform
if log_transform:
logger.info("Log transform")
df = np.log1p(df)
if normalization == "scale":
if normalization is None:
logger.info("No normalization")
elif normalization == "scale":
logger.info("Normalization by scaling")
df[:] = scale(df, axis=0)
if normalization == "quantile":
elif normalization == "quantile":
logger.info("Normalization by quantile normalization")
# stay between 1-4 ncpus, after 4 highly diminishing returns
df = qnorm.quantile_normalize(df, ncpus=sorted(1, ncpus, 4)[1])
else:
logger.info("No normalization")
df = qnorm.quantile_normalize(df, ncpus=sorted([1, ncpus, 4])[1])

# select the top peaks, based on the specified method
if top > 0:
idx = None
if topmethod == "var":
idx = df.var(1).sort_values().tail(top).index
elif topmethod == "std":
Expand All @@ -126,71 +116,82 @@ def coverage_table(
idx = df.mean(1).sort_values().tail(top).index
elif topmethod == "random":
idx = df.sample(top).index
else:
raise ValueError(
"unknown method {} for selecting regions".format(topmethod)
)
df = df.loc[idx]

return df


def read_peak_file_to_df(fname):
def peakfile2bedfile(peakfile, bedfile=None, window=200):
"""
Read a MACS2 summits.bed or narrowPeak file and return a DataFrame.
Create a BED file with peak widths normalized to the window size
"""
if bedfile:
peak_bed = bedfile
else:
# deleted when gimme exits
peak_bed = os.path.join(mytmpdir(), "peaks.bed")

Parameters
----------
fname : str
Filename.
with open(peak_bed, "w+") as f:
half_window = window // 2
for line in open(peakfile):
if line.startswith("#") or line[:5] == "track":
continue
vals = line.strip().split("\t")

Returns
-------
pandas.DataFrame
DataFrame with summits.
"""
summit_header = ["chrom", "start", "end", "name", "value"]
ftype = determine_file_type(fname)
# Read MACS2 summit files
if ftype == "narrowpeak":
header = [
"chrom",
"start",
"end",
"name",
"value",
"strand",
"signalValue",
"pval",
"qval",
"peak",
]
df = pd.read_table(fname, names=header, dtype={"chrom": "str"})
df["chrom"] = df["chrom"].astype(str)
gene = ""
score = 0
strand = "+"
if len(vals) > 3:
gene = vals[3]
if len(vals) > 4:
score = vals[4]
if len(vals) > 5:
strand = vals[5]

# get the summit
df["start"] = df["start"].astype(int) + df["peak"].astype(int)
df["end"] = df["start"] + 1
center = (int(vals[2]) + int(vals[1])) // 2
if strand == "+":
start = center - half_window
end = center + half_window
else:
start = center + half_window
end = center - half_window

# qva
df["value"] = df["qval"]
df = df[summit_header]
elif ftype == "bed":
df = pd.read_table(fname, names=summit_header, dtype={"chrom": "str"})
if ((df["end"] - df["start"]) != 1).sum() != 0:
raise ValueError(f"{fname} does not contain summits.")
else:
raise ValueError(
f"Can't determine file type of {fname}. "
"Is the file a narrowPeak or summits.bed file?"
)
if start >= 0:
f.write(f"{vals[0]}\t{start}\t{end}\t{gene}\t{score}\t{strand}\n")

df["experiment"] = df.loc[0, "name"].replace("_peak_1", "")
df["log_value"] = np.log1p(df["value"])
df["log_value_scaled"] = scale(df[["log_value"]])
return peak_bed


def _load_peaks(peak_bed):
"""read a bed file and return regions in 'chr:start-end' format"""
regions = []
with open(peak_bed) as f:
for line in f:
vals = line.split("\t")
region = f"{vals[0]}:{vals[1]}-{vals[2]}"
regions.append(region)
return regions


def _load_reads(peak_bed, datafile, regions, rmdup=True, rmrepeats=True):
"""
read a bam, bed or bw file and return the number of reads per peak
"""
track = Track.load(
datafile,
rmdup=rmdup,
rmrepeats=rmrepeats,
fragmentsize=None,
)
result = track.binned_stats(peak_bed, nbins=1, split=True, rpkm=False)

reads = [float(r[3]) for r in result]
col = [os.path.splitext(os.path.basename(datafile))[0]]
df = pd.DataFrame(reads, columns=col, index=regions)
return df


def combine_peaks(peaks, genome, window, scale_value):
def combine_peaks(peaks, genome, window=200, scale_value=False):
"""
Combine multiple MACS2 summit files and returns the summit
with the maximum value.
Expand Down Expand Up @@ -218,7 +219,7 @@ def combine_peaks(peaks, genome, window, scale_value):
try:
g = genomepy.Genome(genome)
genome = g.sizes_file
except Exception:
except FileNotFoundError:
pass

dfs = [read_peak_file_to_df(fname) for fname in peaks]
Expand Down Expand Up @@ -253,3 +254,58 @@ def combine_peaks(peaks, genome, window, scale_value):

df = pd.DataFrame(all_summits, columns=["chrom", "start", "end"])
return df


def read_peak_file_to_df(fname):
"""
Read a MACS2 summits.bed or narrowPeak file and return a DataFrame.
Parameters
----------
fname : str
Filename.
Returns
-------
pandas.DataFrame
DataFrame with summits.
"""
summit_header = ["chrom", "start", "end", "name", "value"]
ftype = determine_file_type(fname)
# Read MACS2 summit files
if ftype == "narrowpeak":
header = [
"chrom",
"start",
"end",
"name",
"value",
"strand",
"signalValue",
"pval",
"qval",
"peak",
]
df = pd.read_table(fname, names=header, dtype={"chrom": "str"})
df["chrom"] = df["chrom"].astype(str)

# get the summit
df["start"] = df["start"].astype(int) + df["peak"].astype(int)
df["end"] = df["start"] + 1

# qva
df["value"] = df["qval"]
df = df[summit_header]
elif ftype == "bed":
df = pd.read_table(fname, names=summit_header, dtype={"chrom": "str"})
if ((df["end"] - df["start"]) != 1).sum() != 0:
raise ValueError(f"{fname} does not contain summits.")
else:
raise ValueError(
f"Can't determine file type of {fname}. "
"Is the file a narrowPeak or summits.bed file?"
)

df["log_value"] = np.log1p(df["value"])
df["log_value_scaled"] = scale(df[["log_value"]])
return df

0 comments on commit 20819ee

Please sign in to comment.