<a href="https://colab.research.google.com/github/yongchanzzz/bioinformatics/blob/main/consensus_creator_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# consensus_creator on Colab
Consensus Creator from GitHub:
https://github.com/TaizoAyase/consensus_creator

In [None]:
#@title Import necessary libraries and upload files
import re
import csv
from google.colab import files
uploaded_files = files.upload()

In [None]:
#@title Choose parameters
thresholds_input = "0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.4, 1.6, 1.8, 2, 2.5, 3, 4" #@param {type:"string"}

def parse_thresholds(input_str):
    # Split the input string by commas and convert each to a float
    thresholds = [float(num.strip()) for num in input_str.split(',')]
    return thresholds

# Parse the input string to get a list of thresholds
default_thresholds = parse_thresholds(thresholds_input)


In [None]:
#@title Run code and downlaod

#!/usr/bin/env python
"""
Consensus Creator ver 2.01 adapted to Google Colab

This progam reads pre-aligned sequences as a single FASTA file, and creates
consensus sequences. The program code was written by Yongchan Lee, 2017,
and modified by Mizuki Takemoto. Adapted to Google Colab, 2024.
Theoretical framework is based on the work by Steipe B et al., 1994.

-Reference
1) Steipe, B., Schiller, B., Pluckthun, A. & Steinbacher, S.
Sequence statistics reliably predict stabilizing mutations in a protein
domain. J. Mol. Biol. 240, 188-92 (1994).


MIT License

Copyright (c) 2020 Mizuki Takemoto

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from __future__ import division, print_function
from copy import deepcopy
from datetime import datetime
import numpy as np
import sys

class AminoAcidsCounter(object):
    # TODO: key error if unusual letter is contained

    all_aa_letters = set("ACDEFGHIKLMNPQRSTVWY-")

    def __init__(self, num):
        self.id = num
        self.dict = {}
        for k in self.all_aa_letters:
            self.dict[k] = 0

    def calc_sum(self):
        n_total = sum(self.dict.values())
        self.n_gap = self.dict.pop("-")
        return n_total - self.n_gap

    def get_max_aa(self):
        return max([(v, k) for k, v in self.dict.items()])[1]


class MultiFasta(object):
    def __init__(self, filename):
        self.filename = filename
        self.fasta_ary = []
        self.find_all_fasta(filename)

    def __getitem__(self, i):
        return self.fasta_ary[i]

    def find_all_fasta(self, filename):
        with open(filename, "r") as f:
            line = f.readline().rstrip("\r\n")

            # the first line must be a header
            if not re.match("^>", line):
                raise

            header = re.sub("^>", "", line)

            line = f.readline().rstrip("\r\n")
            seq = ""

            while line:
                if re.match("^>", line):
                    self.fasta_ary.append(SingleFasta(header, seq))
                    header = re.sub("^>", "", line)
                    seq = ""
                else:
                    seq += line
                line = f.readline().rstrip("\r\n")

    def get_all(self, n):
        return [fasta.seq[n] for fasta in self.fasta_ary]

class SingleFasta(object):
    def __init__(self, header, seq):
        self.header = header
        self.seq = self.clean_sequence(seq)

    @staticmethod
    def clean_sequence(seq):
        valid_aa_letters = set("ACDEFGHIKLMNPQRSTVWY-")
        return ''.join(aa if aa in valid_aa_letters else '-' for aa in seq)

    @property
    def length(self):
        return len(self.seq)

d = datetime.now()
timestamp_str = d.strftime('%Y%m%d_%H%M%S')
output_file_path = f"Consensus_{timestamp_str}.txt"

with open(output_file_path, 'w') as file:
    sys.stdout = file
    print("Imported file:")
    for filename in uploaded_files.keys():
        print(filename)
    print()
    print("Threshold values:")
    print(", ".join([str(e) for e in default_thresholds]))
    print()
sys.stdout = sys.__stdout__

# Load Multi-Fasta file #
for filename in uploaded_files.keys():
    multifasta = MultiFasta(filename)
    # proceed with your logic

n_length = multifasta[0].length
n_seq = len(multifasta.fasta_ary)

# raise Error if all sequences do not have same length
bool_ary = [fasta.length == n_length for fasta in multifasta.fasta_ary]
if not all(bool_ary):
    raise ValueError

# make counts for each amino acids
counts = {}
for i in range(n_length):
    counter = AminoAcidsCounter(i)
    for aa in multifasta.get_all(i):
        counter.dict[aa] += 1
    counts[i] = counter

# sum up over all sequences
all_count = {}
for i in range(n_length):
    all_count[i] = counts[i].calc_sum()

# header
with open(output_file_path, 'a') as file:
    sys.stdout = file
    print("AA_Pos    WT  Freq    Con Freq    FCon/FWT")

seq_id = 1
target_seq = []
consensus_seq = []
ratio_ary = []

for i in range(n_length):
    target_aa = multifasta[0].seq[i]

    # skip gap
    if target_aa == "-":
        continue

    target_freq = counts[i].dict[target_aa] / all_count[i] * 100
    max_aa = counts[i].get_max_aa()
    max_freq = counts[i].dict[max_aa] / all_count[i] * 100

    ratio = max_freq / target_freq
    ratio_ary.append(ratio)

    # save amino acid sequences
    target_seq.append(target_aa)
    consensus_seq.append(max_aa)

    output = (
        "{seq_id:<6d}{target_aa:>6s}  {target_freq:5.1f}    "
        "{max_aa:<3s} {max_freq:5.1f}    {ratio:4.2f}"
    ).format(**locals())
    # output = "%s, %5.2f, %s, %5.2f" % (target_aa, target_freq, max_aa, max_freq)
    with open(output_file_path, 'a') as file:
        sys.stdout = file
        print(output)
    sys.stdout = sys.__stdout__

    seq_id += 1

# output sequences
title = multifasta[0].header
with open(output_file_path, 'a') as file:
    sys.stdout = file
    print()
    print(">WT_" + title)
    print("".join(target_seq))
    print()
    print(">Consensus_" + title)
    print("".join(consensus_seq))
    print()
sys.stdout = sys.__stdout__

# print suggested mutation and seq
target_seq = np.array(target_seq)
consensus_seq = np.array(consensus_seq)
ratio_ary = np.array(ratio_ary)

for t in default_thresholds:
    bool_ary = np.log(ratio_ary) > t

    target = deepcopy(target_seq)

    # mutation suggestion
    base_seq = ["-" for i in range(seq_id - 1)]
    base_seq = np.array(base_seq)
    base_seq[bool_ary] = consensus_seq[bool_ary]

    n_mutation = sum(bool_ary)
    header = ">Threshold_value:{t} (suggested_mutations:{n_mutation})".format(**locals())

    with open(output_file_path, 'a') as file:
        sys.stdout = file
        print(header)
        print("".join(base_seq))
        print()
    sys.stdout = sys.__stdout__

    # construct suggestion
    target[bool_ary] = consensus_seq[bool_ary]
    header = ">Construct_threshold_value:{}".format(t)
    with open(output_file_path, 'a') as file:
        sys.stdout = file
        print(header)
        print("".join(target))
        print()
    sys.stdout = sys.__stdout__

print(f"Data written to {output_file_path}")
files.download(output_file_path)

In [None]:
#@title Plot histogram
# Plots for ln(Fcons/FWT)
import numpy as np
import matplotlib.pyplot as plt

target_arr    = np.array(target_seq)
consensus_arr = np.array(consensus_seq)
ratio_arr     = np.array(ratio_ary, dtype=float)

# Masks: finite and strictly >1
finite_mask = np.isfinite(ratio_arr)
valid_mask  = finite_mask & (ratio_arr > 1.0)

ratios_gt1   = ratio_arr[valid_mask]
ln_ratios_gt1 = np.log(ratios_gt1)

if ratios_gt1.size == 0:
    raise ValueError("No positions with Fcons/FWT > 1")

# Define bins
nbins = 50 #@param {type:"integer"}
edges_raw = np.linspace(ratios_gt1.min(), ratios_gt1.max(), nbins + 1)
edges_ln  = np.linspace(ln_ratios_gt1.min(), ln_ratios_gt1.max(), nbins + 1)

plt.figure()
plt.hist(ln_ratios_gt1, bins=edges_ln)
plt.xlabel("ln(Fcons/FWT)")
plt.ylabel("Number of residues")
plt.title("Histogram of ln(Fcons/FWT) (Fcons/FWT > 1)")
plt.tight_layout()
plt.show()

cons_diff = (consensus_arr != target_arr)
thresholds_ln, counts_ln = [], []
for t in edges_ln[:-1]:
    mask = valid_mask & cons_diff & (np.log(ratio_arr) > t)
    thresholds_ln.append(t)
    counts_ln.append(int(mask.sum()))

plt.figure()
plt.plot(thresholds_ln, counts_ln, marker="o")
plt.xlabel("Threshold on ln(Fcons/FWT)")
plt.ylabel("Suggested mutations")
plt.title("Suggested mutations vs threshold")
plt.tight_layout()
plt.show()