diff --git a/jacquard/expand.py b/jacquard/expand.py index 9ce42be..30a7ff8 100644 --- a/jacquard/expand.py +++ b/jacquard/expand.py @@ -168,8 +168,21 @@ def validate_args(args): "not be read. Review inputs/usage and " "try again."), args.selected_columns_file) + columns = None + try: + columns = _read_col_spec(args.selected_columns_file) + except: + pass + if not columns: + raise utils.UsageError("The selected_columns_file .* has no rows. Review inputs/usage and try again") + + try: + vcf.VcfReader(vcf.FileReader(args.input)) + except: + raise utils.UsageError(("The expand command requires a VCF file as an " + "input, but the specified input [{}] contains no VCF " + "metaheaders. Review inputs and try again.").format(args.input)) - _read_col_spec(args.selected_columns_file) def _get_actual_columns(vcf_reader, col_spec): columns = _create_potential_column_list(vcf_reader) diff --git a/test/expand_test.py b/test/expand_test.py index 100e039..999a264 100644 --- a/test/expand_test.py +++ b/test/expand_test.py @@ -18,6 +18,14 @@ from test.utils.vcf_test import MockVcfReader, MockFileWriter +_VALID_VCF_CONTENT = ('''##source=strelka +##INFO= +##FORMAT= +#CHROM|POS|ID|REF|ALT|QUAL|FILTER|INFO|FORMAT|NORMAL|TUMOR +chr1|1|.|A|C|.|.|SOMATIC|GT|0/1|0/1 +chr2|1|.|A|C|.|.|SOMATIC|GT|0/1|0/1 +''').replace('|', "\t") + class ExpandTestCase(test_case.JacquardBaseTestCase): def setUp(self): super(ExpandTestCase, self).setUp() @@ -209,16 +217,8 @@ def test_disambiguate_column_names(self): self.assertEquals(expected, actual) def test_execute(self): - vcf_content = ('''##source=strelka -##INFO= -##FORMAT= -#CHROM|POS|ID|REF|ALT|QUAL|FILTER|INFO|FORMAT|NORMAL|TUMOR -chr1|1|.|A|C|.|.|SOMATIC|GT|0/1|0/1 -chr2|1|.|A|C|.|.|SOMATIC|GT|0/1|0/1 -''').replace('|', "\t") - with TempDirectory() as input_dir, TempDirectory() as output_dir: - input_dir.write("P1.vcf", vcf_content.encode("utf8")) + input_dir.write("P1.vcf", _VALID_VCF_CONTENT.encode("utf8")) input_file = os.path.join(input_dir.path, "P1.vcf") output_file = os.path.join(output_dir.path, "P1.txt") args = Namespace(input=input_file, @@ -250,17 +250,19 @@ def test_predict_output(self): desired_output_files) def test_validate_args_colSpecValid(self): - with TempDirectory() as col_spec_dir: - col_spec_dir.write("col_spec.txt", b"chrom\npos\ninfo") - col_spec_file = os.path.join(col_spec_dir.path, "col_spec.txt") - - args = Namespace(input="input.txt", + with TempDirectory() as input_dir: + input_dir.write("input.vcf", _VALID_VCF_CONTENT.encode("utf8")) + input_file = os.path.join(input_dir.path, "input.vcf") + input_dir.write("col_spec.txt", b"chrom\npos\ninfo") + col_spec_file = os.path.join(input_dir.path, "col_spec.txt") + + args = Namespace(input=input_file, output="expanded.txt", selected_columns_file=col_spec_file) expand.validate_args(args) self.ok() - def test_validate_args_colSpecInvalid(self): + def test_validate_args_colSpecIsNotAFile(self): with TempDirectory() as col_spec_dir: col_spec_dir.write("col_spec.txt", b"chrom\npos\ninfo") @@ -272,6 +274,36 @@ def test_validate_args_colSpecInvalid(self): expand.validate_args, args) + def test_validate_args_colSpecIsEmpty(self): + with TempDirectory() as col_spec_dir: + col_spec_dir.write("col_spec.txt", b"") + col_spec_filename = os.path.join(col_spec_dir.path, "col_spec.txt") + + args = Namespace(input="input.txt", + output="expanded.txt", + selected_columns_file=col_spec_filename) + self.assertRaisesRegexp(utils.UsageError, + "The selected_columns_file .* has no rows. Review inputs/usage and try again", + expand.validate_args, + args) + + def test_validate_args_checkInputIfVCF(self): + with TempDirectory() as input_dir: + input_dir.write("input.vcf", b"123") + input_filename = os.path.join(input_dir.path, "input.vcf") + args = Namespace(input=input_filename, + output="expanded.txt", + selected_columns_file=None) + self.assertRaisesRegexp(utils.UsageError, + ("The expand command requires a VCF file " + "as an input, but the specified input " + "\[.*input.vcf\] contains no VCF " + "metaheaders. Review inputs and try " + "again."), + expand.validate_args, + args) + + def test_create_glossary_entry(self): header = '##INFO=' actual_line = expand._create_glossary_entry(header)