-
Notifications
You must be signed in to change notification settings - Fork 0
/
split_fasta.py
65 lines (52 loc) · 1.76 KB
/
split_fasta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Feb 8 16:39:20 2020
@author: paulo
"""
import os
import gzip
import glob
import itertools
from collections import Counter
from Bio import SeqIO
def is_header(line):
return line[0] == '>'
def parse_multi_fasta_file_compressed_or_not(filename):
if filename.endswith('.gz'):
opener = lambda filename: gzip.open(filename, 'rt')
else:
opener = lambda filename: open(filename, 'r')
with opener(filename) as f:
fasta_iter = (it[1] for it in itertools.groupby(f, is_header))
for name in fasta_iter:
name = name.__next__()[1:].strip()
sequences = ''.join(seq.strip() for seq in fasta_iter.__next__())
yield name, sequences
def count_fasta_files(filename):
if filename.endswith('.gz'):
opener = lambda filename: gzip.open(filename, 'rt')
else:
opener = lambda filename: open(filename, 'r')
with opener(filename) as f:
return sum(g for g, _ in itertools.groupby(f, key=is_header))
def split_fasta_batch(iterator, file_size):
loop = True
while loop:
size = []
while len(size) < file_size:
try:
loop = iterator.__next__()
except StopIteration:
loop = None
if loop is None:
break
size.append(loop)
if size:
yield size
filename = 'data/uniprot_sprot.fasta.gz'
with gzip.open(filename, 'rt') as fh:
for i, batch in enumerate(split_fasta_batch(SeqIO.parse(fh, 'fasta'), 20000), start=1):
filename = 'split_data/uniprot_sprot_{}.fasta'.format(i)
count = SeqIO.write(batch, filename, 'fasta')
print('Wrote {} records to {}'.format(count, filename))