# Count Fasta (Windows-Size, Step-Size)
Kennosuke Wada & Yoshiko Wada (Nagahama Institute of Bio-Science and Technology. Shiga, Japan)

## Setting parameters

In [None]:
dir_fasta     = "fasta"                    # Name of the directory containing the Fasta file
ext_fasta     = [".fasta", ".fas", ".fa"]  # Fasta file extensions
count_letters = ['A', 'C', 'G', 'T']       # List of characters to be counted for frequency

nuc_length    = 6       # Length of nucleotides to be frequency counted
window_size   = 10000   # Length of the segment to count the frequency
step_size     = 10000   # Step length when counting by shifting the segment
others_th     = 20      # Ignore segments where the content of characters not included in the frequency count exceeds this value

In [None]:
import os.path
import glob
import time

In [None]:
if not os.path.isdir(dir_fasta):
    raise Exception("Fasta files should be included in the directory : " + dir_fasta + ".")

In [None]:
start_time = time.time()

In [None]:
fasta_files = []
filenames = os.path.join(dir_fasta, '*')

for file in glob.glob(filenames):
    root, ext = os.path.splitext(file)
    if ext in ext_fasta:
        fasta_files.append(file)
        print(file)

if len(fasta_files) == 0:
    raise Exception("Not found Fasta files in the directory : " + dir_fasta + ".")

In [None]:
# Supports multi-faster format in which multiple sequences are described in one fasta file

class FastaData():
    def __init__(self, filename):
        self.filename = filename
        self.name = os.path.basename(filename)
        self.dataDict = {}        # Sequence name
        self.compListDict = {}    # Mononucleotide frequency
        self.patternListDict = {} # Oligo frequency
        
        self.readData()
    
    def readData(self):
        with open(self.filename) as f:
            lines = f.readlines()
        
        key = None
        txt = ''
        
        for line in lines:
            if line.startswith('>'):
                if key != None:
                    self.dataDict[key] = txt
                key = line.rstrip('\n')
                txt = ''
            else:
                txt += line.rstrip('\n')
        
        self.dataDict[key] = txt

In [None]:
fasta_data_list = []
debug_print = False

for filename in fasta_files:
    fasta_data = FastaData(filename)
    fasta_data_list.append(fasta_data)
    
    if debug_print:
        print(fasta_data.name)
        print(len(fasta_data.dataDict))
        for key, val in fasta_data.dataDict.items():
            print(key)
            print(val)
        print('------')

In [None]:
def countPattern(sequence, nuc_length, debug=False):
    sequence = sequence.upper()    # Convert all letters to uppercase
    patterns = {}
    
    for i in range(len(sequence) - nuc_length + 1):
        substr = sequence[i:(i + nuc_length)]
        
        stopFlag = False
        for s in substr:
            if s not in count_letters:  # Excludes patterns that include characters other than the specified character
                stopFlag = True
                break
        if stopFlag:
            continue
        
        if debug:
            print(substr)
        
        if substr in patterns.keys():
            patterns[substr] += 1
        else:
            patterns[substr] = 1
    
    return patterns

In [None]:
debug_print = False

# A set of nucleotide patterns that appear even once in the entire sequence
patterns_set = set()

for fasta_data in fasta_data_list:
    print(fasta_data.name)
    
    for seq_name in fasta_data.dataDict:
        sequence = fasta_data.dataDict[seq_name]
        print(seq_name)
        #print(sequence)
        
        fasta_data.patternListDict[seq_name]= []    # List of segment frequency dictionaries
        fasta_data.compListDict[seq_name] = []      # Frequencies of ACGT and others for a segment
        
        for start in range(0, len(sequence), step_size):
            if start + window_size >= len(sequence):
                break
            subsequence = sequence[start:(start + window_size)]
            
            counts = []
            sum = 0
            for x in count_letters:
                num = subsequence.count(x)
                sum += num
                counts.append(num)
            counts.append(len(subsequence) - sum)
            
            others_percent = 100.0 * counts[-1] / len(subsequence)
            if others_percent > others_th:
                print("#" + str(start) + "_" + str(start + window_size - 1))
                print("\tOthers Percent : ", others_percent)
                continue
            
            patterns = countPattern(subsequence, nuc_length)
            fasta_data.patternListDict[seq_name].append(patterns)
            fasta_data.compListDict[seq_name].append(counts)
            
            if debug_print:
                print(len(patterns.keys()))
        
            patterns_set |= set(patterns.keys())    # Add key by union
    
    #fasta_data.dataDict.clear()  # Delete the sequence dictionary to save memory
    
    if debug_print:
        print('------')

patterns_set = sorted(patterns_set)
nuc_pow = len(count_letters) ** nuc_length

print(str(len(patterns_set)) + " / " + str(nuc_pow))
#print(patterns_set)

In [None]:
dirname = "count_" + str(nuc_length) + "_" + str(window_size) + "_" + str(step_size)
dir_count = os.path.join(dir_fasta, dirname)
if not os.path.isdir(dir_count):
    os.mkdir(dir_count)

fname = os.path.join(dir_count, "labels.txt")
labels = ""
with open(fname, "w", encoding="utf-8") as f:
    labels = "\t".join(patterns_set)
    f.write(labels)

count_letters.append('-')

for fasta_data in fasta_data_list:
    fname = os.path.join(dir_count, os.path.splitext(fasta_data.name)[0] + ".cnt")
    print(fname)
    
    with open(fname, "w", encoding="utf-8") as f:
        head  = "%NUCLEOTIDES\t" + str(nuc_length) + "\n"
        head += "%WINDOWSIZE\t" + str(window_size) + "\n"
        head += "%STEPSIZE\t" + str(step_size) + "\n"
        f.write(head)
        
        for seq_name, patternDict in fasta_data.patternListDict.items():
            print(seq_name)
            f.write(seq_name + "\n")
            
            for segmentNo, segmentDict in enumerate(patternDict):
                segmentKeys = segmentDict.keys()
                
                start = segmentNo * step_size + 1
                end   = start + window_size - 1
                
                compline = "#" + str(start) + "_" + str(end)
                
                counts = fasta_data.compListDict[seq_name][segmentNo]
                                
                for idx, count in enumerate(counts):
                    compline += "\t" + count_letters[idx] + " : " + str(count)
                f.write(compline + "\n")
                
                line = []
                for pattern in patterns_set:
                    count = 0
                    if pattern in segmentKeys:
                        count = segmentDict[pattern]
                    line.append(count)
                line = [str(x) for x in line]
                line = "\t".join(line)
                
                f.write(line + "\n")

In [None]:
end_time = time.time()
print("Elapsed Time: {0} [sec]".format(end_time - start_time))