Skip to content

Commit

Permalink
ex-231: Added progress for expand; revised progress logging for merge.
Browse files Browse the repository at this point in the history
  • Loading branch information
cgates committed Apr 22, 2015
1 parent 3a97fa2 commit 140f429
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
6 changes: 6 additions & 0 deletions jacquard/expand.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def execute(args, dummy_execution_context):

file_writer.write("#" + "\t".join(actual_columns) + "\n")

line_count = 0
vcf_reader.open()
for vcf_record in vcf_reader.vcf_records():
row_dict = _create_row_dict(vcf_reader.split_column_header,
Expand All @@ -175,6 +176,11 @@ def execute(args, dummy_execution_context):
new_line.append(".")

file_writer.write("\t".join(new_line) + "\n")
line_count +=1
if line_count % 10000 == 0:
logger.info("Expanding: {} rows processed", line_count)
logger.info("Expand complete: {} rows processed", line_count)

file_writer.close()
vcf_reader.close()

Expand Down
27 changes: 24 additions & 3 deletions jacquard/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ def _write_headers(reader, file_writer):
file_writer.write("\n".join(headers) + "\n")

def _sort_vcf(reader, temp_dir):
logger.info("Sorting vcf [{}]", reader.file_name)
vcf_records = []
sorted_dir = os.path.join(temp_dir, "tmp")
os.makedirs(sorted_dir)
Expand All @@ -359,7 +358,12 @@ def _sort_vcf(reader, temp_dir):

def _get_unsorted_readers(vcf_readers):
unsorted_readers = []
for reader in vcf_readers:
for i, reader in enumerate(vcf_readers):
logger.info("Checking sort order of [{}] ({}/{})",
reader.file_name,
i+1,
len(vcf_readers)
)
previous_record = None
reader.open()
for vcf_record in reader.vcf_records():
Expand All @@ -378,8 +382,14 @@ def _get_unsorted_readers(vcf_readers):
def _sort_readers(vcf_readers, temp_dir):
unsorted_readers = _get_unsorted_readers(vcf_readers)
sorted_readers = []
unsorted_count = 0
for reader in vcf_readers:
if reader in unsorted_readers:
unsorted_count += 1
logger.info("Sorting vcf [{}] ({}/{})",
reader.file_name,
unsorted_count,
len(unsorted_readers))
reader = _sort_vcf(reader, temp_dir)
sorted_readers.append(reader)
return sorted_readers
Expand Down Expand Up @@ -598,10 +608,10 @@ def execute(args, execution_context):
file_writer.open()

buffered_readers, vcf_readers = _create_reader_lists(file_readers)
format_tags_to_keep = _build_format_tags(format_tag_regex, vcf_readers)
vcf_readers = _sort_readers(vcf_readers, output_path)
all_sample_names, merge_metaheaders = _build_sample_list(vcf_readers)
coordinates = _build_coordinates(vcf_readers)
format_tags_to_keep = _build_format_tags(format_tag_regex, vcf_readers)
info_tags_to_keep = _build_info_tags(coordinates)
contigs_to_keep = _build_contigs(coordinates)
incoming_headers = _FILE_FORMAT + execution_context + merge_metaheaders
Expand All @@ -614,6 +624,8 @@ def execute(args, execution_context):

_write_metaheaders(file_writer, headers)

row_count = 0
next_breakpoint = 0
for coordinate in coordinates:
merged_record = _merge_records(filter_strategy,
coordinate,
Expand All @@ -623,6 +635,15 @@ def execute(args, execution_context):
if merged_record:
file_writer.write(merged_record.text())

row_count += 1
progress = 100 * row_count / len(coordinates)
if progress > next_breakpoint:
logger.info("Merging: {} rows processed ({}%)",
row_count,
next_breakpoint)
next_breakpoint = 10 * int(progress/10) + 10

logger.info("Merge complete: {} rows processed (100%)", row_count)
filter_strategy.log_statistics()

finally:
Expand Down
8 changes: 4 additions & 4 deletions test/merge_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import test.utils.test_case as test_case
from test.utils.vcf_test import MockVcfReader, MockFileReader

#TODO: (cgates): Suspect duplicate tests
#Consider tests across Filter, merge_records, pull_matching_records; simplify?

class MockBufferedReader(object):
def __init__(self, vcf_records):
Expand Down Expand Up @@ -1044,8 +1042,10 @@ def test_sort_readers_vcfsResortedAsNecessary(self):
self.assertEquals(record2.text(), actual_records[1].text())
self.assertEquals(record3.text(), actual_records[2].text())
actual_log_infos = test.utils.mock_logger.messages["INFO"]
self.assertEquals(1, len(actual_log_infos))
self.assertRegexpMatches(actual_log_infos[0], r"Sorting vcf \[unsorted.vcf\]")
self.assertEquals(3, len(actual_log_infos))
self.assertRegexpMatches(actual_log_infos[0], r"Checking sort order of \[vcfName\] \(1/2\)")
self.assertRegexpMatches(actual_log_infos[1], r"Checking sort order of \[unsorted.vcf\] \(2/2\)")
self.assertRegexpMatches(actual_log_infos[2], r"Sorting vcf \[unsorted.vcf\] \(1/1\)")
actual_log_debugs = test.utils.mock_logger.messages["DEBUG"]
self.assertEquals(1, len(actual_log_debugs))
self.assertRegexpMatches(actual_log_debugs[0],
Expand Down

0 comments on commit 140f429

Please sign in to comment.