Skip to content

Commit

Permalink
ex-209 (jebene) moved filter functionality to merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jebene committed Apr 13, 2015
1 parent c24ebc7 commit 1d739a3
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 29 deletions.
95 changes: 78 additions & 17 deletions jacquard/commands/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

_DEFAULT_INCLUDED_FORMAT_TAGS = ["JQ_.*"]
_MULT_ALT_TAG = "JQ_MULT_ALT_LOCUS"
_JQ_SOMATIC_TAG = "HC_SOM"
#TODO: (jebene) is the description of this really accurate?
_MULT_ALT_HEADER = ('##INFO=<ID={},Number=0,Type=Flag,'
'Description="dbSNP Membership">').format(_MULT_ALT_TAG)
Expand Down Expand Up @@ -169,17 +170,40 @@ def _create_reader_lists(file_readers):

return buffered_readers, vcf_readers

def _is_somatic_variant(record):
for sample in record.sample_tag_values:
for tag in record.sample_tag_values[sample]:
is_somatic = record.sample_tag_values[sample][tag] == "1"
if re.search(_JQ_SOMATIC_TAG, tag) and is_somatic:
return 1

def _build_coordinates(vcf_readers, args):
passed_variants = args.include_variants == "passed"
any_passed_loci = args.include_loci == "any_passed"
somatic_variants = args.include_variants == "somatic"
any_somatic_loci = args.include_loci == "any_somatic"

def _build_coordinates(vcf_readers):
coordinate_set = set()
mult_alts = defaultdict(set)

for vcf_reader in vcf_readers:
for i, vcf_reader in enumerate(vcf_readers):
logger.info("Reading [{}] ({}/{})",
os.path.basename(vcf_reader.file_name),
i + 1,
len(vcf_readers))
try:
vcf_reader.open()

for vcf_record in vcf_reader.vcf_records():
coordinate_set.add(vcf_record.get_empty_record())
if passed_variants or any_passed_loci:
if vcf_record.filter == "PASS":
coordinate_set.add(vcf_record.get_empty_record())
elif somatic_variants or any_somatic_loci:
if _is_somatic_variant(vcf_record):
coordinate_set.add(vcf_record.get_empty_record())
else:
if "JQ_EXCLUDE" not in vcf_record.filter:
coordinate_set.add(vcf_record.get_empty_record())
ref_alt = vcf_record.ref, vcf_record.alt
locus = vcf_record.chrom, vcf_record.pos
mult_alts[locus].add(ref_alt)
Expand Down Expand Up @@ -264,7 +288,7 @@ def _build_merged_record(coordinate,
for sample, tags in record.sample_tag_values.items():
if sample not in sparse_matrix:
sparse_matrix[sample] = {}
for tag, value in tags.items():
for tag, value in list(tags.items()):
if tag in tags_to_keep:
all_tags.add(tag)
sparse_matrix[sample][tag] = value
Expand All @@ -290,28 +314,53 @@ def _build_merged_record(coordinate,

return merged_record

def _pull_matching_records(coordinate, buffered_readers):
#TODO: (jebene) alter logic to reduce number of branches
def _pull_matching_records(args, coordinate, buffered_readers):
passed_variants = args.include_variants == "passed"
all_passed_loci = args.include_loci == "all_passed"
somatic_variants = args.include_variants == "somatic"
all_somatic_loci = args.include_loci == "all_somatic"

vcf_records = []
for buffered_reader in buffered_readers:
record = buffered_reader.next_if_equals(coordinate)
if record:
vcf_records.append(record)
if passed_variants:
if record.filter == "PASS":
vcf_records.append(record)
elif all_passed_loci:
if record.filter == "PASS":
vcf_records.append(record)
else:
return []
elif somatic_variants:
if _is_somatic_variant(record):
vcf_records.append(record)
elif all_somatic_loci:
if _is_somatic_variant(record):
vcf_records.append(record)
else:
return []
else:
##accounts for any_passed/any_somatic because build_coordinates handled it
vcf_records.append(record)

return vcf_records

def _merge_records(coordinates,
def _merge_records(args,
coordinate,
buffered_readers,
writer,
all_sample_names,
tags_to_keep):

for coordinate in coordinates:
vcf_records = _pull_matching_records(coordinate, buffered_readers)
vcf_records = _pull_matching_records(args, coordinate, buffered_readers)
if vcf_records:
merged_record = _build_merged_record(coordinate,
vcf_records,
all_sample_names,
tags_to_keep)
writer.write(merged_record.text())
return merged_record
return 0

def _build_sample_list(vcf_readers):
all_sample_names = set()
Expand Down Expand Up @@ -408,6 +457,14 @@ def add_subparser(subparser):
parser.add_argument("--force", action='store_true', help="Overwrite contents of output directory")
parser.add_argument("--include_format_tags", dest='tags', help="Comma-separated list of regexs for format tags to include in output. Defaults to all JQ tags.")
parser.add_argument("--log_file", help="Log file destination")
parser.add_argument("--include_variants", choices=["passed", "somatic"], help=("passed: Only include variants which passed their respective filter\n"
"somatic: Only include somatic variants"))

parser.add_argument("--include_loci", choices=["any_passed", "all_passed", "any_soamtic", "all_somatic"], help=("any_passed: Include all variants at loci where at least one variant passed\n"
# pylint: disable=line-too-long
"all_passed: Include all variants at loci where all variants passed\n"
"any_somatic: Include all variants at loci where at least one variant was somatic\n"
"all_somatic: Include all variants at loci where all variants were somatic"))

def _predict_output(args):
desired_output_files = set([os.path.basename(args.output)])
Expand Down Expand Up @@ -443,7 +500,7 @@ def execute(args, execution_context):
buffered_readers, vcf_readers = _create_reader_lists(file_readers)
vcf_readers = _sort_readers(vcf_readers, output_path)
all_sample_names, merge_metaheaders = _build_sample_list(vcf_readers)
coordinates = _build_coordinates(vcf_readers)
coordinates = _build_coordinates(vcf_readers, args)
format_tags_to_keep = _build_format_tags(format_tag_regex, vcf_readers)
info_tags_to_keep = _build_info_tags(coordinates)
contigs_to_keep = _build_contigs(coordinates)
Expand All @@ -457,11 +514,15 @@ def execute(args, execution_context):

_write_metaheaders(file_writer, headers)

_merge_records(coordinates,
buffered_readers,
file_writer,
all_sample_names,
format_tags_to_keep)
for coordinate in coordinates:
merged_record = _merge_records(args,
coordinate,
buffered_readers,
all_sample_names,
format_tags_to_keep)
if merged_record:
file_writer.write(merged_record.text())

finally:
for vcf_reader in vcf_readers:
vcf_reader.close()
Expand Down
Loading

0 comments on commit 1d739a3

Please sign in to comment.