Skip to content

Commit

Permalink
fix grouped analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
maxibor committed Jun 30, 2021
1 parent 6eb0528 commit c96f2c2
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 470 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ pip install -e .
## Quick start

```bash
pydamage analyze aligned.bam
pydamage --outdir result_directory analyze aligned.bam
```

> Note that if you specify `--outdir`, it has to be before the PyDamage subcommand, example: `pydamage --outdir test filter pydamage_results.csv`
## CLI help

Command line interface help message
Expand Down
219 changes: 113 additions & 106 deletions pydamage/damage.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def compute_damage(self):
)


def avg_coverage(pysam_cov):
def avg_coverage_contig(pysam_cov):
"""Computes average coverage of a reference
Args:
Expand Down Expand Up @@ -189,9 +189,26 @@ def test_damage(ref, bam, mode, wlen, show_al, process, verbose):
"""
al_handle = pysam.AlignmentFile(bam, mode=mode, threads=process)
try:
cov = avg_coverage(al_handle.count_coverage(contig=ref))
nb_reads_aligned = al_handle.count(contig=ref)
reflen = al_handle.get_reference_length(ref)
if ref is None:
all_references = al_handle.references
cov = np.mean(
[
avg_coverage_contig(al_handle.count_coverage(contig=ref))
for ref in all_references
]
)
nb_reads_aligned = np.sum(
[al_handle.count(contig=ref) for ref in all_references]
)
reflen = np.sum(
[al_handle.get_reference_length(ref) for ref in all_references]
)
refname = "reference"
else:
cov = avg_coverage_contig(al_handle.count_coverage(contig=ref))
nb_reads_aligned = al_handle.count(contig=ref)
reflen = al_handle.get_reference_length(ref)
refname = ref

al = al_to_damage(reference=ref, al_handle=al_handle, wlen=wlen)
al.get_damage(show_al=show_al)
Expand All @@ -214,7 +231,7 @@ def test_damage(ref, bam, mode, wlen, show_al, process, verbose):
conserved_count=conserved_count,
verbose=verbose,
)
test_res["reference"] = ref
test_res["reference"] = refname
test_res["nb_reads_aligned"] = nb_reads_aligned
test_res["coverage"] = cov
test_res["reflen"] = reflen
Expand Down Expand Up @@ -250,104 +267,94 @@ def test_damage(ref, bam, mode, wlen, show_al, process, verbose):
return False


def get_damage_group(ref, bam, mode, wlen, show_al, process):
"""Prepare data and run LR test to test for damage on reference grouped
as one.
Args:
ref (str): name of referene in alignment file
bam (str): bam file
mode (str): opening mode of alignment file
wlen (int): window length
show_al (bool): Show alignment representations
process (int): Number of process for parallelization
Returns:
list:[ct_data, ga_data, cc_data, c_data, g_data, all_bases]
"""

al_handle = pysam.AlignmentFile(bam, mode=mode, threads=process)
cov = avg_coverage(al_handle.count_coverage(contig=ref))
nb_reads_aligned = al_handle.count(contig=ref)
reflen = al_handle.get_reference_length(ref)

al = al_to_damage(reference=ref, al_handle=al_handle)
(
CT,
GA,
damage_bases,
forward,
reverse,
) = al.get_damage(wlen=wlen, show_al=show_al)

return [
CT,
GA,
damage_bases,
forward,
reverse,
cov,
nb_reads_aligned,
reflen,
]


def test_damage_group(
ct_data,
ga_data,
damage_bases,
forward,
reverse,
nb_reads_aligned,
cov,
reflen,
wlen,
verbose,
):
"""Performs damage test
Args:
ct_data (list of int): List of positions from 5' with CtoT transitions
ga_data (list of int): List of positions from 3' with GtoA transitions
damage_bases (list of int): List of positions with damage
forward (list of int): List of positions from 5' where a base is aligned
reverse(list of int): List of positions from 3' where a base is aligned
nb_reads_aligned(int): number of reads aligned
cov(float): average coverage across all references
reflen(int): length of all references
wlen (int): window length
verbose(bool): Verbose
Returns:
dict: Dictionary containing LR test results
"""
ref = "reference"
try:
if ct_data:
model_A = models.damage_model()
model_B = models.null_model()
test_res = fit_models(
ref="reference",
model_A=model_A,
model_B=model_B,
ct_data=ct_data,
cc_data=cc_data,
ga_data=ga_data,
all_bases=all_bases,
wlen=wlen,
verbose=verbose,
)
test_res["reference"] = ref
test_res["nb_reads_aligned"] = nb_reads_aligned
test_res["coverage"] = cov
test_res["reflen"] = reflen

return check_model_fit(test_res, wlen, verbose)

except (ValueError, RuntimeError) as e:
if verbose:
print(f"Model fitting for {ref} failed")
print(f"Model fitting error: {e}")
print(
f"nb_reads_aligned: {nb_reads_aligned} - coverage: {cov} "
"- reflen: {reflen}\n"
)
return False
# def get_damage_group(get_damage_list):
# """Prepare data and run LR test to test for damage on reference grouped
# as one.

# Args:
# get_damage_list (list): results of get_damage as list of dict
# Returns:
# list:[ct_data, ga_data, cc_data, c_data, g_data, all_bases]
# """

# al = al_to_damage(reference=ref, al_handle=al_handle)
# (
# CT,
# GA,
# damage_bases,
# forward,
# reverse,
# ) = al.get_damage(wlen=wlen, show_al=show_al)

# return [
# CT,
# GA,
# damage_bases,
# forward,
# reverse,
# cov,
# nb_reads_aligned,
# reflen,
# ]


# def test_damage_group(
# ct_data,
# ga_data,
# damage_bases,
# forward,
# reverse,
# nb_reads_aligned,
# cov,
# reflen,
# wlen,
# verbose,
# ):
# """Performs damage test

# Args:
# ct_data (list of int): List of positions from 5' with CtoT transitions
# ga_data (list of int): List of positions from 3' with GtoA transitions
# damage_bases (list of int): List of positions with damage
# forward (list of int): List of positions from 5' where a base is aligned
# reverse(list of int): List of positions from 3' where a base is aligned
# nb_reads_aligned(int): number of reads aligned
# cov(float): average coverage across all references
# reflen(int): length of all references
# wlen (int): window length
# verbose(bool): Verbose
# Returns:
# dict: Dictionary containing LR test results
# """
# ref = "reference"
# try:
# if ct_data:
# model_A = models.damage_model()
# model_B = models.null_model()
# test_res = fit_models(
# ref="reference",
# model_A=model_A,
# model_B=model_B,
# ct_data=ct_data,
# cc_data=cc_data,
# ga_data=ga_data,
# all_bases=all_bases,
# wlen=wlen,
# verbose=verbose,
# )
# test_res["reference"] = ref
# test_res["nb_reads_aligned"] = nb_reads_aligned
# test_res["coverage"] = cov
# test_res["reflen"] = reflen

# return check_model_fit(test_res, wlen, verbose)

# except (ValueError, RuntimeError) as e:
# if verbose:
# print(f"Model fitting for {ref} failed")
# print(f"Model fitting error: {e}")
# print(
# f"nb_reads_aligned: {nb_reads_aligned} - coverage: {cov} "
# "- reflen: {reflen}\n"
# )
# return False

0 comments on commit c96f2c2

Please sign in to comment.