-
Notifications
You must be signed in to change notification settings - Fork 0
/
vcf_to_stockholm.py
187 lines (129 loc) · 4.26 KB
/
vcf_to_stockholm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/local/bin/python
# Python 3
# vcf_to_stockholm.py
# 2023
# Mathias Scharmann
# usage example
# python vcf_to_stockholm.py --vcf bla.vcf
"""
takes vcf and outputs in STOCKHOLM format
https://en.wikipedia.org/wiki/Stockholm_format
this format is required by rapidnj
https://birc.au.dk/software/rapidnj
example:
rapidnj bla.vcf.sth -n -o t -a kim -c 12 -m 10000 -t d -b 100 -x bla.vcf.sth.rapidNJ_tree.tre
"""
import argparse
import os
########################## HEAD
# this function checks if file exists and exits script if not
def extant_file(x):
"""
'Type' for argparse - checks that file exists but does not open.
"""
if not os.path.exists(x):
print ("Error: {0} does not exist".format(x) )
exit()
x = str(x)
return x
######
# parses arguments from command line
def get_commandline_arguments ():
parser = argparse.ArgumentParser()
parser.add_argument("--vcf", required=True, type=extant_file,
help="name/path of vcf input file", metavar="FILE")
args = parser.parse_args()
# finish
return args
################################## CORE
def parse_vcf(vcf_file):
with open(vcf_file, "r") as INFILE:
for line in INFILE:
if line.startswith("##"):
continue
if line.startswith("#"):
header_line = line.lstrip("#").strip("\n").split("\t")
# print (header_line)
break
samples = sorted(header_line[9:])
### freebayes vcf line:
# E3379_L96 43 . ATCG ATTG,ATCA,GTCA,GTCG 4179.3 . AB=BLABLA 0/0:1:
### STACKS vcf line:
# #CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sample_203-ampullaria-Brunei-3
# un 2312 25 G A . PASS NS=17;AF=0.529,0.471 GT:DP:GL 1/1:32:.,.,.
print ("samples in the input file: ")
samples_vcf_idx = {}
for sample in samples:
print (sample)
vcf_idx = header_line.index(sample)
samples_vcf_idx[sample] = vcf_idx
# print popdict_vcf_idx
# now start the main business of walking through the vcf:
out_columns = []
with open(vcf_file, "r") as INFILE:
linecnt = 0
snpcnt = 0
for line in INFILE:
linecnt += 1
if line.startswith("#"):
continue
if len(line) < 2: # empty lines or so
continue
fields = line.strip("\n").split("\t")
if len(fields[3]) > 1:
continue # exclude variants that are not SNPs; theres no way to code those things in a phylip format!
snpcnt += 1
# column = []
column = ""
variants = [fields[3]] + fields[4].split(",")
# print progress every 10k sites only
if (snpcnt / 10000.0).is_integer():
print ("processed " + str(snpcnt) + " SNPs")
for sample in samples:
idxes = [int(x) for x in fields[samples_vcf_idx[sample]].split(":")[0].split("/") if not x == "." ]
if len(idxes) > 0:
alleles = [variants[x] for x in idxes ]
# get IUPAC code if necessary (heterozygotes)
# column.append( get_IUPAC_amb_code ( "".join(sorted(alleles)) ) )
column += get_IUPAC_amb_code ( "".join(sorted(alleles)) )
else:
#column.append("-")
column += "-"
out_columns.append(column)
# columns to rows/lines:
outlines = []
ntaxa = len(samples)
len_align = snpcnt
print("writing output:")
print("sequences: " + str( ntaxa) )
print("sites: " + str( len_align) )
header = "# STOCKHOLM 1.0"
outlines.append(header)
totalgaps = 0
for idx, sample in enumerate(samples):
seq = "".join( [ x[idx] for x in out_columns ] )
out_line = sample + " " + seq
outlines.append(out_line)
totalgaps += seq.count("-")
print("proportion gap: " + str( round( totalgaps/float(ntaxa*len_align), 4 ) ))
with open(vcf_file + ".sth", "w") as OUTFILE:
OUTFILE.write("\n".join(outlines)+"\n")
OUTFILE.write("//"+"\n")
def get_IUPAC_amb_code (nucs):
# nucs must be a "".joined string of the sorted list of nucleotides
# the IUPAC ambiguity codes:
# the keys in this IUPAC ambiguity code dictionary are produced by:
# "".join(sorted(list("NUCLEOTIDES"))) -> consistent lookup possible without additional loops!
combs = {'AC':'M', 'GT':'K', 'CG':'S', 'AT':'W', 'AG':'R', 'CT':'Y',
'ACG':'V', 'CGT':'B', 'AGT':'D', 'ACT':'H', 'ACGT':'N','AA':'A','TT':'T','CC':'C','GG':'G','NN':'N'
}
try:
out_code = combs[nucs]
except KeyError:
out_code = "N"
return out_code
################################## MAIN
args = get_commandline_arguments ()
#print args
parse_vcf(args.vcf)
print ("Done!")