In [57]:
import logbook
import re
import os
from Bio import SeqIO
from attic_util import util
from itertools import islice
import numpy as np
import linecache

#handle file 
class SeqGenerator:
    def __init__(self, filenames, nb_epochs, seqlen_ulim=5000):
        self.filenames = filenames
        self.nb_epochs = nb_epochs
        self.seqlen_ulim = seqlen_ulim
        self.logger = logbook.Logger(self.__class__.__name__)
        self.logger.info('Number of epochs: {}'.format(nb_epochs))

    def filehandle_generator(self):
        for curr_epoch in range(self.nb_epochs):
            for filename in self.filenames:
                with open(filename) as file:
                    self.logger.info('Opened file: {}'.format(filename))
                    self.logger.info('Memory usage: {} MB'.format(util.memory_usage()))
                    self.logger.info('Current epoch: {} / {}'.format(curr_epoch + 1, self.nb_epochs))
                    yield file
                    

    def generator(self, rng):
        for fh in self.filehandle_generator():
            print(fh)
            # SeqIO takes twice as much memory than even simple fh.readlines()
            for seq_record in SeqIO.parse(fh, "fasta"):
                whole_seq = seq_record.seq
                self.logger.info('Whole fasta seqlen: {}'.format(len(whole_seq)))
                curr_left = 0
                while curr_left < len(whole_seq):
                    seqlen = rng.randint(self.seqlen_ulim // 2, self.seqlen_ulim)
                    segment = seq_record.seq[curr_left: seqlen + curr_left]
                    curr_left += seqlen
                    self.logger.debug('input seq len: {}'.format(len(segment)))
                    yield segment
                    #print(segment)
                    
class SeqFragmenter:
    """
    Split a sequence into small sequences based on some criteria, e.g. 'N' characters
    """
    def __init__(self):
        pass

    def get_acgt_seqs(self, seq):
        return remove_empty(re.split(r'[^ACGTacgt]+', str(seq)))

def remove_empty(str_list):
    return filter(bool, str_list)  # fastest way to remove empty string


class SlidingKmerFragmenter:
    """
    Slide only a single nucleotide
    """
    def __init__(self, k_low, k_high):
        self.k_low = k_low
        self.k_high = k_high

    def apply(self, rng, seq):
        kmers=[seq[i: i + rng.randint(self.k_low, self.k_high + 1)] for i in range(len(seq) - self.k_high + 1)]
        return kmers
'''
        with open ('vocab.txt', mode = 'a+',encoding='utf-8') as f:
            for kmer in kmers_Noverlap:
                file_line=[]
                f.seek(0)
                lines=f.readlines()
                for line in lines:
                    line=line.strip('\n')
                    file_line.append(line)
                if kmer in file_line:
                    pass
                else:
                    f.write(kmer+'\n')
            f.close()
'''

class SeqMapper:
    def __init__(self, use_revcomp=True):
        self.use_revcomp = use_revcomp

    def apply(self, rng, seq): 
        seq = seq.lower()
        if self.use_revcomp and rng.rand() < 0.5:
            return seq.reverse_complement()
        else:
            return seq

'''
        file_path='/home/langmei/notebook/study_software/pro_bert_input/inputs/vocab/'
        filenames =os.listdir(file_path)
        file_name=filenames[0]
        with open (file_path+'vocab.txt', mode = 'a+',encoding='utf-8') as f:
            vocab_file=linecache.getlines(file_name)
            print(vocab_file)
            for kmer in kmers_Noverlap:
                if kmer not in vocab_file:
                     f.write(kmer+'\n')
                # f.close()
            return kmers

'''

class KmerSeqIterable:
    def __init__(self,rand_seed,seq_generator, mapper, seq_fragmenter, kmer_fragmenter):
        self.logger = logbook.Logger(self.__class__.__name__)
        self.seq_generator = seq_generator
        self.mapper = mapper
        self.kmer_fragmenter = kmer_fragmenter
        self.seq_fragmenter = seq_fragmenter
        self.rand_seed = rand_seed
        self.iter_count = 0
   
    def __iter__(self):
        all_kmer=[]
        self.iter_count += 1
        rng = np.random.RandomState(self.rand_seed)
        for seq in self.seq_generator.generator(rng):
            seq = self.mapper.apply(rng, seq)
            acgt_seq_splits = list(self.seq_fragmenter.get_acgt_seqs(seq))
            self.logger.debug('Splits of len={} to: {}'.format(len(seq), [len(f) for f in acgt_seq_splits]))
            for acgt_seq in acgt_seq_splits:
                kmers = self.kmer_fragmenter.apply(rng, acgt_seq)# list of strings
                kmers.sort()
                kmers_Noverlap=list(set(kmers))
                for kmer in kmers_Noverlap:
                    if kmer not in all_kmer:
                        all_kmer.append(kmer)
        with open ('vocab.txt', mode = 'a+',encoding='utf-8') as f:
            for kmer in all_kmer:
                    f.write(kmer+'\n')
            f.close()
        #return all_kmer
                        #print(all_kmer)
                #yield kmer_seq

    

In [65]:
nb_epochs=5
path = '/home/langmei/notebook/study_software/pro_bert_input/inputs/test'
os.chdir(path)
filenames = os.listdir()
#file_names=filenames[]
#print(filenames)
kmer_segment= SeqGenerator(filenames,nb_epochs)
rand_seed=7
rng=rng = np.random.RandomState(rand_seed)
kmer_segment.generator(rng)

<generator object SeqGenerator.generator at 0x7f0170f4f308>

In [66]:
kmer_fragmenter = SlidingKmerFragmenter(3, 8)

In [67]:
 kmer_seq_iterable = KmerSeqIterable(
        rand_seed,
        SeqGenerator(filenames, nb_epochs),
        SeqMapper(),
        SeqFragmenter(),
        kmer_fragmenter,
 )

In [68]:
kmer_seq_iterable.__iter__()

<_io.TextIOWrapper name='chr1.fa' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='chr1.fa' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='chr1.fa' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='chr1.fa' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='chr1.fa' mode='r' encoding='UTF-8'>
