# Задание 1

## вопрос
Выберите любой домен из базы данных Pfam. Единственное условие – в выравнивании seed должно быть хотя бы 20 последовательностей. Кратко опишите функцию домена. Заполните таблицу
## ответ

note: accession в stockholm файле указан вместе с `.26`, хотя в браузере выводится без него


|     AC Pfam|ID Pfam|#SEED|#All|#SW|#architectures|#3D|Taxonomy|#eukaryota|#archaea|#bacteria|
|------------|-------|-----|----|---|--------------|---|--------|----------|--------|---------|
|PF01625(.26)|   PMSR|  518| 66k|327|           232| 30|     42k|      7771|    2050|    54948|


Выбранный домен - Peptide methionine sulfoxide reductase

Функция домена - This enzyme repairs damaged proteins. Methionine sulfoxide in proteins is reduced to methionine.

# Задание 2

## вопрос
Скачайте выравнивание seed для Вашего домена и откройте его в
JalView. Раскрасьте выравнивание по свойствам аминокислот и консервативности колонок. Найдите в выравнивании консервативные и функционально консервативные колонки. Найдите хотя бы один максимально достоверный блок (см. презентацию к семинару 11). Заполните таблицу
## ответ

ДЗ дано для практики использования JalView. Но, так как я зачем-то выбрал в качестве примера выравнивание размером 518, то находить консервативные колонки визуально очень сложно. Видно, например, что абсолютно консервативная колонка есть в единственном экземпляре. Но так же видно, что многие другие колонки почти полностью консервативны или функционально-консервативны. Необходимо найти колонки, где нарушений условий консервативности минимальны, но из 518 строк сделать эту задачу визуально оч муторно. Поэтому я напишу для этого свою программу и выведу в таблицу именно ее. Так как ДЗ все-таки по использованию JalView, то вывод программы будет использован для того, чтоб в JalView подобрать threshold так, чтобы получилось +- тоже самое, что в выводе моей программы, при этом не придется вглядываться в детали и пытаться глазами подсчитать сколько строк нарушают консервативность

In [1]:
from collections.abc import Iterable


aa_classes = {
    'A': 'F',
    'V': 'F',
    'L': 'F',
    'I': 'F',
    'J': 'F',
    'M': 'F',
    'P': 'F',
    'F': 'R',
    'Y': 'R',
    'W': 'R',
    'G': 'Z',
    'S': 'Z',
    'T': 'Z',
    'C': 'Z',
    'Q': 'Z',
    'N': 'Z',
    'E': 'N',
    'D': 'N',
    'K': 'P',
    'R': 'P',
    'H': 'P',
}


def residues_from_class(classes: Iterable[str]) -> Iterable[str]:
    cls = [cls.upper() for cls in classes]
    return set([it[0] for it in filter(lambda it: it[1] in cls, aa_classes.items())])


class Column:
    def __init__(self, pos: int, rows: int) -> None:
        self.pos = pos
        self.rows = rows
        self.residues = dict()
        self.classes  = dict()

    def __str__(self) -> str:
        indices  = sum(self.residues.values())
        residues = ''.join(self.residues.keys())
        s = f'{self.pos + 1}:[{residues}]'
        if indices == self.rows:
            return s
        return f'{s}[{indices}/{self.rows}]'

    def add(self, residue: str) -> None:
        residue = residue.upper()
        self.residues[residue] = 1 + self.residues.get(residue, 0)
        cls = aa_classes.get(residue)
        if cls is not None:
            self.classes[cls] = 1 + self.classes.get(cls, 0)

    def remove(self, residues: Iterable[str]) -> int:
        """
        Removes all residues from column
        :param residues: set of residues to delete
        :return: number of deleted residues
        """
        deleted = 0
        for res in residues:
            if res not in self.residues.keys():
                continue
            deleted += 1
            num = self.residues.pop(res)
            cls = aa_classes.get(res)
            if cls is None or cls not in self.classes.keys():
                continue
            self.classes[cls] -= num
            if self.classes[cls] == 0:
                self.classes.pop(cls)
        return deleted


def cons_to_md(alnrows: int, full: Iterable[Column], func: Iterable[Column], reliable: Iterable[Iterable[int]]) -> str:
    import io


    maxlen = max([len(str(col)) for col in full + func + ['seed', '100%-cons', 'func-cons', 'max-reliable']])
    header_cell = '-' * maxlen
    buf = io.StringIO()
    buf.write(f"|{'seed'.ljust(maxlen)}|{'100%-cons'.ljust(maxlen)}|{'func-cons'.ljust(maxlen)}|{'max-reliable'.ljust(maxlen)}|\n" + \
        "|" + '|'.join([header_cell]*4) + "|\n",
    )
    rows = max(len(full), len(func), len(reliable))
    for i in range(rows):
        buf.write('|' + f'{alnrows if i == 0 else ""}'.ljust(maxlen) + '|')
        for var in [full, func, reliable]:
            buf.write(f'{var[i] if i < len(var) else ""}'.ljust(maxlen) + '|')
        buf.write(f'\n')

    return buf.getvalue()


def block_contains(
        msa,
        col_start: int,
        col_end: int,
        symbol: str = '-',
        row_start: int = 0,
        row_end: int = -1,
) -> bool:
    """
    Check whether block specified by coordinates contains symbol
    :param row_end: if not given or given negative, then uses len(msa)
    """
    if row_end < 0:
        row_end = len(msa)
    assert 0 <= col_start < col_end and 0 <= row_start < row_end <= len(msa)

    for row in range(row_start, row_end):
        seq = msa[row].seq
        assert col_end <= len(seq)
        if symbol in seq[col_start:col_end]:
            return True

    return False


def find_conservatives(
        msa,
        full_threshold: int,
        func_threshold: int,
        mark: str = None,
    ):
    full = list()
    func = list()
    reliable = list()

    alnlen = msa.get_alignment_length()
    msa.column_annotations['FBB_FULL'] = ['-'] * alnlen
    msa.column_annotations['FBB_FUNC'] = ['-'] * alnlen
    msa.column_annotations['FBB_RLBL'] = ['-'] * alnlen

    for col in range(len(msa[0].seq)):
        column = Column(col, len(msa))

        for row in range(len(msa)):
            assert len(msa[row].seq) == len(msa[0].seq)
            column.add(msa[row].seq[col])

        if len(column.classes) == 0 or len(column.residues) == 0:
            continue

        if '-' in column.residues.keys():
            continue

        if len(column.residues) == 1:
            full.append(column)
            msa.column_annotations['FBB_FULL'][col] = '1'
            continue

        full_cons = list(filter(lambda x: x[1] >= full_threshold, column.residues.items()))
        full_cons.sort(key=lambda x: -1 * x[1])
        if len(full_cons) != 0:
            column.remove(set(column.residues.keys()) - {full_cons[0][0]})
            full.append(column)
            msa.column_annotations['FBB_FULL'][col] = '1'
            continue

        if len(column.classes) == 1:
            func.append(column)
            msa.column_annotations['FBB_FUNC'][col] = '1'
            continue

        func_cons = list(filter(lambda x: x[1] >= func_threshold, column.classes.items()))
        func_cons.sort(key=lambda x: -1 * x[1])
        if len(func_cons) != 0:
            column.remove(set(column.residues.keys()) - residues_from_class(func_cons[0][0]))
            func.append(column)
            msa.column_annotations['FBB_FUNC'][col] = '1'

    msa.column_annotations['FBB_FULL'] = ''.join(msa.column_annotations['FBB_FULL'])
    msa.column_annotations['FBB_FUNC'] = ''.join(msa.column_annotations['FBB_FUNC'])

    if len(full) + len(func) < 2:
        msa.column_annotations['FBB_RLBL'] = ''.join(msa.column_annotations['FBB_RLBL'])
        if mark is not None and len(mark) != 0:
            mark_case_by_conservateveness(msa, mark)
        return msa, full, func, reliable

    cols = sorted(full + func, key=lambda x: x.pos)
    flag = False
    start = 0
    for i in range(1, len(cols)):
        gaped = block_contains(msa, cols[i-1].pos, cols[i].pos + 1, '-')
        if not flag and not gaped:
            flag = True
            start = cols[i-1].pos
        elif flag and gaped:
            flag = False
            reliable.append((start + 1, cols[i-1].pos + 1))
            for col in range(start, cols[i-1].pos + 1):
                msa.column_annotations['FBB_RLBL'][col] = '1'

    msa.column_annotations['FBB_RLBL'] = ''.join(msa.column_annotations['FBB_RLBL'])
    if mark is not None and len(mark) != 0:
        mark_case_by_conservateveness(msa, mark)
    return msa, full, func, reliable


def mark_case_by_conservateveness(msa, index_name: str):
    """
    Changes cases of all sequences in msa corresponding to boolean index
    """
    from Bio.Seq import Seq


    for aln in msa:
        seq = list(aln.seq)
        for i in range(len(seq)):
            if msa.column_annotations[index_name][i] != '1':
                seq[i] = seq[i].lower()
            else:
                seq[i] = seq[i].upper()
        aln.seq = Seq(''.join(seq))
    return msa


def write_to_sth(msa, output):
    from Bio.AlignIO.StockholmIO import StockholmWriter


    writer = StockholmWriter(output)
    for annotation_name in msa.column_annotations.keys():
        if annotation_name.startswith('FBB_'):
            writer.pfam_gc_mapping[annotation_name] = annotation_name
    writer.write_alignment(msa)


def find_conservatives_from_file(
        input_filepath: str = None,
        input_format: str = None,
        full_threshold: int = None,
        func_threshold: int = None,
        output: str = None,
        mark: str = None,
    ):
    from Bio import AlignIO


    msa, full, func, reliable = find_conservatives(
        AlignIO.read(input_filepath, input_format),
        int(full_threshold),
        int(func_threshold),
        mark,
    )

    if output is None or len(output) == 0:
        return msa, full, func, reliable

    with open(output, 'w') as f:
        write_to_sth(msa, f)

    return msa, full, func, reliable


if __name__ == '__main__' and '__file__' in globals():
    import argparse


    parser = argparse.ArgumentParser(
        description='This program searches for convservative, functionally convservative and maximally reliable columns of MSA.',
    )
    parser.add_argument(
        "--input",
        "-i",
        required=True,
        dest='input_filepath',
        help="input filepath with alignment",
    )
    parser.add_argument(
        "--input-format",
        "-f",
        required=True,
        dest='input_format',
        help="format of query. required by Bio.AlignIO",
    )
    parser.add_argument(
        "--full-threshold",
        "-l",
        required=True,
        dest='full_threshold',
        help="threshold for detecting fully conservative column"
    )
    parser.add_argument(
        "--func-threshold",
        "-n",
        required=True,
        dest='func_threshold',
        help="threshold for detecting fully conservative column"
    )
    parser.add_argument(
        "--output",
        "-o",
        default='',
        help="filepath to write down stockholm output. print markdown table to stdout, if not given",
    )
    parser.add_argument(
        "--mark-conservative-by-case",
        "-m",
        dest='mark',
        default='',
        help="index name which will store sequences formated with lower case," +
            "if not conservative column, and upper case otherwise." +
            "stores nothing if not given",
    )

    args = parser.parse_args()
    msa, full, func, reliable = find_conservatives_from_file(**vars(args))
    if args.output == '':
        print(cons_to_md(len(msa), full, func, reliable))



In [3]:
from IPython import display


msa, full, func, reliable = find_conservatives_from_file('PF01625.seed.sth', 'stockholm', 515, 515)
display.display_markdown(cons_to_md(len(msa), full, func, reliable), raw=True)

|seed               |100%-cons          |func-cons          |max-reliable       |
|-------------------|-------------------|-------------------|-------------------|
|518                |7:[G][515/518]     |8:[CGS]            |(7, 10)            |
|                   |10:[W][517/518]    |46:[GCS][515/518]  |(76, 80)           |
|                   |76:[E][517/518]    |80:[IVLMA][517/518]|                   |
|                   |119:[Y][515/518]   |92:[LIVMA]         |                   |
|                   |234:[H]            |                   |                   |


По таблице видно, что бОльшая часть полностью консервативных колонок консервативны по не менее чем в 99% (не менее 515) организмах,  а так же одна полностью консервативная колонка. Схожая картина с функционально-консервативными колонками (программа не записывает полностью консервативные колонки в функционально консервативные). Всего найдено 9 колонок, из которых 3 попадают в 5-ти колоночный максимальный блок (первый). Возможно это можно будет использовать как маркер для будущих сравнений выравниваний


# Задание 3

## вопрос
Сравнение программ множественного выравнивания. Выровняйте последовательности из seed любыми двумя программами множественного выравнивания. Сравните полученные выравнивания с выравниванием seed. Для этого необходимо найти совпадающие блоки в двух выравниваниях.

## ответ

Программа для сравнений для сравнений будет реализована по аналогии с VerAlign. Но, для демонстрации принципа использования и построения `sp score`, будет так же добавлена возможность использовать другие эвристики, для определения консервативности VerAlign добавляет за совпадение остаток по 1 баллу, а так же отнимает за несовпадение разрывов. Но можно, например, для более чувствительного сравнения, использовать score из матриц замен, добавляя его к сумме колонки. В прямом виде это почти не будет иметь биологического смысла и сравнивать выравнивания так не выйдет. Но score колонок на основе матриц замен можно использовать, чтобы показать гибкость подхода Sum-of-Pairs. Похожим образом Sum-of-Pairs использует [AL2CO](https://doi.org/10.1093/bioinformatics/17.8.700). Та же AL2CO использует еще и Entropy-based показатель, который можно хранить и вычилять как `sp score`, переиспользуя бОльшую часть кода.  Для простоты отладки, программа будет записывать score каждой колонки в аннотацию стокгольм файла. Из-за ограничения формата, по которому аннотации колонок должны быть строго по одному символу на файл, придется хранить score в виде юникод символов. В таких же аннотациях можно хранить независимо друг от друга и все остальные индексы.  По одному из таких индексов-аннотаций (с посчитанными score'ами) можно привести остатки во всем в выравнивании в большой или малый регистр в зависимости от того, яв-ся ли они консервативными

In [12]:
ascii_zero = 48
ascii_last = 126
utf_shift = 128


def score_chr(score) -> str:
    """
    Maps columns score to single character (stockholm annotations allows only single char)
    :return: printable ascii or utf-8
    :raises ValueError: if unexpected type, got non-single-char-str, got negative score or bigger than printable utf-8
    """
    if score is True:
        return '1'
    if score is False or score is None:
        return '-'
    if isinstance(score, str):
        assert len(score) == 1
        return score
    if isinstance(score, int) or isinstance(score, float):
        assert 0 <= score <= 0x10ffff
        score = int(score)
        # add code of ascii zero
        # to print ascii digits and letters
        if score + ascii_zero < ascii_last:
            score += ascii_zero
        else:
            score += utf_shift
        return chr(score)

    raise ValueError(f'unsupported type for stockholm score annotation: {score}')


aa_classes = {
    'A': 'F',
    'V': 'F',
    'L': 'F',
    'I': 'F',
    'J': 'F',
    'M': 'F',
    'P': 'F',
    'F': 'R',
    'Y': 'R',
    'W': 'R',
    'G': 'Z',
    'S': 'Z',
    'T': 'Z',
    'C': 'Z',
    'Q': 'Z',
    'N': 'Z',
    'E': 'N',
    'D': 'N',
    'K': 'P',
    'R': 'P',
    'H': 'P',
}


for residue, cls in list(aa_classes.items()):
    aa_classes[residue.lower()] = cls


def are_residues_eq(x, y) -> bool:
    """
    :return: True if X and Y equals
    """
    return x in aa_classes and x.upper() == y.upper()


def are_residues_similar(x, y) -> bool:
    """
    :return: True if X and Y have similar properties
    """
    x = aa_classes.get(x)
    y = aa_classes.get(y)
    return x is not None and x == y


class SubsMatCmp:
    """
    Scores residues during pairwise based on Bio.Align.substitution_matrices
    Example: SubsMatCmp(substitution_matrices.load("BLOSUM62")).cmp('A', 'W') == -3.0
    """
    def __init__(self, matrix):
        self.mx = matrix

    def cmp(self, x, y) -> int:
        return int(self.mx[x.upper()][y.upper()])


def fill_indices(record) -> int:
    """
    For every residue fills letter annotation with original sequence index
    :param record: SeqRecord, with or without gaps
    :return: number of last residue
    """
    # should not be printed into file and just store indices
    record.letter_annotations['fbb_indices'] = [0] * len(record.seq)
    original_index = 0
    for seq_index in range(len(record.seq)):
        if record.seq[seq_index] == '-':
            continue
        original_index += 1
        record.letter_annotations['fbb_indices'][seq_index] = original_index
    return original_index


def fill_maparray(msa1, msa2, row) -> int:
    """
    Fills veralign maparray into letter_notations.
    Maparray contains columns indexes of same alpha-C number
    :return: number of found matched residues
    """
    matches = 0
    f, s = 0, 0
    msa1[row].letter_annotations['fbb_maparray'] = [0] * len(msa1[row])
    msa2[row].letter_annotations['fbb_maparray'] = [0] * len(msa2[row])
    while f <= len(msa1[row])-1 and s <= len(msa2[row])-1:
        res1 = msa1[row].letter_annotations['fbb_indices'][f]
        res2 = msa2[row].letter_annotations['fbb_indices'][s]
        if res1 != 0 and res1 == res2:
            matches += 1
            msa1[row].letter_annotations['fbb_maparray'][f] = s+1
            msa2[row].letter_annotations['fbb_maparray'][s] = f+1
        if f == len(msa1[row])-1 or s == len(msa2[row])-1:
            break
        if res1 == 0:
            f += 1
        if res2 == 0:
            s += 1
        if res1 != 0 and res2 != 0:
            f += 1
            s += 1
    return matches


def fill_test_with_truth(test_record, truth_record):
    """
    Fills test_rec with truth_rec into letter_annotations
    :return: mutated test_rec
    """
    if len(truth_record.seq) >= len(test_record.seq):
        test_record.letter_annotations['FBB_TR'] = str(truth_record.seq)[:len(test_record.seq)]
    elif len(truth_record.seq) < len(test_record.seq):
        residue = len(test_record.seq) - len(truth_record.seq)
        seq = str(truth_record.seq)
        seq += '_' * residue
        test_record.letter_annotations['FBB_TR'] = seq
    return test_record


def column_wise(
        test_msa,
        truth_msa,
        col_annotation: str,
        aa_cmp,
        msa_annotation_prefix: str = '',
        scorech=score_chr,
    ):
    """
    Compares corresponding columns with each other and writes bool score as column_annotations to test_msa.
    :param test_msa: msa where to save scores
    :param truth_msa: msa against which score will be calculated
    :param col_annotation: name of annotation to save scores
    :param msa_annotation_prefix: if given then writes count and percentage of scores (which tests as True) to annotations of test_sma
    :param aa_cmp: comparator of residues, takes two aa and returns bool
    :return: tuple of count and percentage of scores which tests as True
    """

    assert len(test_msa) == len(truth_msa)

    matches = 0
    alnlen = test_msa.get_alignment_length()
    test_msa.column_annotations[col_annotation] = [scorech(False)] * alnlen
    for col in range(alnlen):
        got = None
        equals = True
        for row in range(len(test_msa)):
            tst = test_msa[row].seq[col].upper()
            trh_col = test_msa[row].letter_annotations['fbb_maparray'][col]
            if got is None:
                got = trh_col
            if got != trh_col:
                equals = False
                break
            if trh_col == 0:
                equals = False
                break
            
            # substract 1 since maparray starts from 1
            trh = truth_msa[row].seq[trh_col-1].upper()
            if not aa_cmp(tst, trh):
                equals = False
                break
        
        if equals:
            matches += 1
        test_msa.column_annotations[col_annotation][col] = scorech(equals)

    pcnt = matches * 100 / alnlen
    test_msa.column_annotations[col_annotation] = ''.join(test_msa.column_annotations[col_annotation])
    if len(msa_annotation_prefix) != 0:
        test_msa.annotations[f'{msa_annotation_prefix}_cw_cnt'] = matches
        test_msa.annotations[f'{msa_annotation_prefix}_cw_pcnt'] = pcnt
    return matches, pcnt


def pairwise(
        test_msa,
        truth_msa,
        col_annotation: str,
        aa_cmp,
        threshold_index: str,
        threshold = lambda t: t > 0,
        msa_annotation_prefix: str= '',
        scorech=score_chr,
    ):
    """
    Compares corresponding cells of corresponding columns with each other and writes scores as column_annotations to test_msa.
    :param test_msa: msa where to save scores
    :param truth_msa: msa against which score will be calculated
    :param col_annotation: name of annotation to save scores
    :param msa_annotation_prefix: if given then writes sum of scores (which tests as True) to annotations of test_sma
    :param aa_cmp: comparator of residues, takes two aa and returns integer score for pair
    :param threshold: function that takes column sum of pairs and returns True if threshold matched
    :param threshold_index: where to write conservativeness
    :return: tuple (sum of column scores, amount of pairs, amount of pairs with positive score)
    """

    assert len(test_msa) == len(truth_msa)

    sp = 0
    pairs = 0
    positive_pairs = 0

    alnlen = test_msa.get_alignment_length()
    test_msa.column_annotations[col_annotation] = [scorech(False)] * alnlen
    test_msa.column_annotations[threshold_index] = [scorech(False)] * alnlen
    for col in range(alnlen):
        colsum = 0
        got = None
        equals = True
        for row in range(len(test_msa)):
            pairs += 1
            tst = test_msa[row].seq[col].upper()
            trh_col = test_msa[row].letter_annotations['fbb_maparray'][col]
            if trh_col == 0:
                continue
            if got is None:
                got = trh_col
            if got != trh_col:
                equals = False
                break

            
            # substract 1 since maparray starts from 1
            trh = truth_msa[row].seq[trh_col-1].upper()
            score = int(aa_cmp(tst, trh))
            if score > 0:
                positive_pairs += 1
            colsum += score

        if not equals:
            continue

        sp += colsum
        if threshold(colsum):
            test_msa.column_annotations[threshold_index][col] = scorech(True)
        if colsum >= 0:
            test_msa.column_annotations[col_annotation][col] = scorech(colsum)

    test_msa.column_annotations[col_annotation] = ''.join(test_msa.column_annotations[col_annotation])
    test_msa.column_annotations[threshold_index] = ''.join(test_msa.column_annotations[threshold_index])
    if len(msa_annotation_prefix) != 0:
        test_msa.annotations[f'{msa_annotation_prefix}_pw_sum'] = sp
        test_msa.annotations[f'{msa_annotation_prefix}_pw_amount'] = pairs
        test_msa.annotations[f'{msa_annotation_prefix}_pw_matched'] = positive_pairs
        test_msa.annotations[f'{msa_annotation_prefix}_sp_score'] = positive_pairs * 100 / pairs
    return sp, pairs, positive_pairs


def mark_case_by_conservateveness(msa, index_name: str):
    """
    Changes cases of all sequences in msa corresponding to boolean index
    """
    from Bio.Seq import Seq


    for aln in msa:
        seq = list(aln.seq)
        for i in range(len(seq)):
            if msa.column_annotations[index_name][i] != score_chr(True):
                seq[i] = seq[i].lower()
            else:
                seq[i] = seq[i].upper()
        aln.seq = Seq(''.join(seq))
    return msa


def find_conservative_ranges(msa, index_name: str):
    """
    Finds all ranges of conservative columns.
    :param index_name: name of boolean index which determines wheteher column is conservative
    :return: list of tuples (start_test, end_test, start_truth, end_truth) of each conservative.
    """
    # could decide column is conservative
    # even if there is gapes.
    # need to find first row without gap
    # to find out position in truth alignment
    def get_maparray(column):
        for row in range(len(msa)):
            maparray = msa[row].letter_annotations['fbb_maparray']
            if len(index) > len(maparray) or maparray[column] == 0:
                continue
            return maparray[column]

    ranges = []
    index = msa.column_annotations[index_name]
    flag = False
    start = 0
    for col in range(len(index)):
        if not flag and index[col] == score_chr(True):
            flag = True
            start = col
        elif flag and index[col] == score_chr(False):
            flag = False
            trh = get_maparray(col-1)
            start_trh = get_maparray(start)
            # should not be marked as conservative in any case
            assert trh is not None
            assert start_trh is not None
            # maparray positions starts with 1 so no need to add
            ranges.append((start+1, col, start_trh, trh))

    return ranges


def load_substitution_matrices() -> list:
    """
    Returns list of tuples (matix_name, matrix) loaded from from Bio.Align.substitution_matrices.
    Matrices having not all of residues will be filtered out
    """
    from Bio.Align import substitution_matrices as sm


    out = [(name, sm.load(name)) for name in sm.load()]
    residues = set([r.upper() for r in aa_classes.keys()])
    def check(item):
        for aa in residues:
            if aa not in item[1].alphabet and aa != 'J':
                return False
        return True

    return sorted(filter(check, out), key=lambda x: x[0])


def compare_alignments(query, subject, threshold: int = 0, mark: str = None, sub_mxs=[]):
    """
    :return: tuple (mutated_query, conservative_ranges or None if mark index not given)
    """
    query.sort(key=lambda x: x.id)
    subject.sort(key=lambda x: x.id)

    for record in query:
        fill_indices(record)
    for record in subject:
        fill_indices(record)
    for i in range(min(len(query), len(subject))):
        fill_maparray(query, subject, i)
        fill_test_with_truth(query[i], subject[i])

    column_wise(query, subject, 'FBB_EQ', are_residues_eq, msa_annotation_prefix='FBB_EQ')
    column_wise(query, subject, 'FBB_SM', are_residues_similar, msa_annotation_prefix='FBB_SM')

    pairwise(
        query,
        subject,
        'FBB_SP_EQ',
        lambda x, y: int(are_residues_eq(x, y)),
        'FBB_SP_EQ_TH',
        msa_annotation_prefix='FBB_SP_EQ',
    )
    pairwise(
        query,
        subject,
        'FBB_SP_SM',
        lambda x, y: int(are_residues_similar(x, y)),
        'FBB_SP_SM_TH',
        lambda x: x >= int(threshold),
        msa_annotation_prefix='FBB_SP_SM',
    )

    for sm in sub_mxs:
        comparator = SubsMatCmp(sm[1])
        pairwise(
            query,
            subject,
            f'FBB_SP_{sm[0]}',
            comparator.cmp,
            f'FBB_SP_{sm[0]}_TH',
            msa_annotation_prefix=f'FBB_SP_{sm[0]}',
        )

    if not isinstance(mark, str) or not len(mark) != 0:
        return query, None

    mark_case_by_conservateveness(query, mark)
    ranges = find_conservative_ranges(query, mark)

    return query, ranges


def compare_alignments_files(
        query=None,
        subject=None,
        output=None,
        query_format=None,
        subject_format=None,
        output_stats=None,
        output_matrices=None,
        **kwargs,
    ):
    import io
    from Bio import AlignIO
    from Bio.AlignIO.StockholmIO import StockholmWriter


    qry = AlignIO.read(query, query_format)
    sbj = AlignIO.read(subject, subject_format)

    if output_matrices and 'sub_mxs' not in kwargs:
        kwargs['sub_mxs'] = load_substitution_matrices()
    qry, ranges = compare_alignments(qry, sbj, **kwargs)
    if ranges:
        print("Position in query,Position in subject")
        for rng in ranges:
            print(f'{rng[0]}-{rng[1]},{rng[2]}-{rng[3]}')

    # biopython 1.83 stockholm writer lacks
    # support of writing (not reading though)
    # file-level GF annotatioins.
    # biopython required that column_annotations
    # aligned by length with alignment length,
    # and letter_annotations aligned by length
    # with length of annotated sequence.
    # also, by format spec, file starts with version info,
    # and ends with '//\n'.
    # so we end up with only option - catch all writes
    # into buffer, insert stats into GF annotations
    # between header (with version and etc) and sequences
    # and only then write it to the file.
    buf = io.StringIO()
    writer = StockholmWriter(buf)
    for record in qry:
        for annotation_name in record.letter_annotations.keys():
            if annotation_name.startswith('FBB_'):
                writer.pfam_gr_mapping[annotation_name] = annotation_name
    for annotation_name in qry.column_annotations.keys():
        if annotation_name.startswith('FBB_'):
            writer.pfam_gc_mapping[annotation_name] = annotation_name
    writer.write_alignment(qry)

    out = buf.getvalue()
    if output_stats:
        stats = []
        for key, value in sorted(qry.annotations.items()):
            stats.append(f'#=GF {key} {value}\n')
        stats = ''.join(stats)

        view = buf.getvalue()
        fst_nl = view.find('\n')
        snd_nl = view.find('\n', fst_nl+1)
        out = view[:snd_nl] + "\n" + stats + view[snd_nl+1:]

    with open(output, 'w') as f:
        f.write(out)

    return qry, sbj


if __name__ == '__main__' and '__file__' in globals():
    import argparse


    parser = argparse.ArgumentParser(
        description='This program compares two MSA and writes statistics to stockholm output.',
    )
    parser.add_argument(
        "--query",
        "-r",
        required=True,
        help="alignment to test against \"the truth\"",
    )
    parser.add_argument(
        "--subject",
        "-t",
        required=True,
        help="alignment to compare with",
    )
    parser.add_argument(
        "--query-format",
        "-q",
        dest='query_format',
        required=True,
        help="format of query. required by Bio.AlignIO",
    )
    parser.add_argument(
        "--subject-format",
        "-s",
        dest='subject_format',
        required=True,
        help="format of subject. required by Bio.AlignIO",
    )
    parser.add_argument(
        "--output",
        "-o",
        required=True,
        help="path to file to write output",
    )
    parser.add_argument(
        "--threshold",
        "-d",
        required=True,
        help="threshold for pairwise conservative detection"
    )
    parser.add_argument(
        "--mark-conservative-by-case",
        "-m",
        dest='mark',
        help="index name which will store sequences formated with lower case," +
            "if not conservative column, and upper case otherwise." +
            "stores nothing if not given",
    )
    parser.add_argument(
        "--stats",
        "-a",
        default=False,
        action='store_true',
        dest='output_stats',
        help="whether should output statistics"
    )
    parser.add_argument(
        "--matrices",
        "-i",
        default=False,
        action='store_true',
        dest='output_matrices',
        help="whether should build indices based on substitution matrices"
    )

    args = parser.parse_args()
    compare_alignments_files(**vars(args))




## Сравним колонки
Для начала воспользуемся первой программой для того, чтобы сделать первые прикидки. Посмотрим насколько в наших выравниваниях совпадают консервативные столбцы

In [4]:
from IPython import display


msa, full, func, reliable = find_conservatives_from_file('reductase.mafft.fa', 'fasta', 515, 515)
display.display_markdown(cons_to_md(len(msa), full, func, reliable), raw=True)

|seed               |100%-cons          |func-cons          |max-reliable       |
|-------------------|-------------------|-------------------|-------------------|
|518                |7:[G][515/518]     |8:[CGS]            |(7, 10)            |
|                   |10:[W][517/518]    |83:[IVLMA][517/518]|(79, 83)           |
|                   |79:[E][517/518]    |95:[LIVMA]         |                   |
|                   |127:[Y][515/518]   |                   |                   |
|                   |235:[H]            |                   |                   |


In [5]:
from IPython import display


msa, full, func, reliable = find_conservatives_from_file('reductase.muscle.fa', 'fasta', 515, 515)
display.display_markdown(cons_to_md(len(msa), full, func, reliable), raw=True)

|seed               |100%-cons          |func-cons          |max-reliable       |
|-------------------|-------------------|-------------------|-------------------|
|518                |7:[G][515/518]     |8:[CGS]            |(7, 10)            |
|                   |10:[W][517/518]    |47:[GCS][515/518]  |(82, 86)           |
|                   |82:[E][517/518]    |86:[IVLMA][517/518]|                   |
|                   |126:[Y][515/518]   |98:[LIVMA]         |                   |
|                   |228:[H]            |                   |                   |


Некоторые консервативные колонки совпадают между всеми выравниваниями. Визуально в JalView с теми же порогами (в 99%) так же приходит в голову такой вывод. Так же и по табличкам (по номерам колонок) и визуально видно, что некоторые колонки съехали, так как разные методы по-разному расставили разрывы

## Сравнение программой

### Mafft

Для начала сравним мафт и сид. Воспользуемся так же программой, данной в задании. Посмотрим будет ли разница между программами сравнений

In [20]:
compare_alignments_files(
    query='reductase.mafft.fa',
    subject='PF01625.seed.sth',
    query_format='fasta',
    subject_format='stockholm',
    output='reductase.cmp.mafft.sth',
    threshold=510,
    mark='FBB_EQ',
    output_stats=True,
    output_matrices=True,
)

Position in query,Position in subject
1-16,1-16
40-47,38-45
77-86,74-83
94-96,91-93
107-107,103-103
124-132,116-124
232-239,231-238


(<<class 'Bio.Align.MultipleSeqAlignment'> instance (518 records of length 255) at 7fddc6907700>,
 <<class 'Bio.Align.MultipleSeqAlignment'> instance (518 records of length 253) at 7fddb3dfbbe0>)

Так как программа, данная в задании, принимает на вход только фасты - переформатируем наш файл с сидом

In [21]:
from Bio import AlignIO


AlignIO.write(AlignIO.read('PF01625.seed.sth', 'stockholm'), 'PF01625.seed.fa', 'fasta')

1

Теперь сравним программы сравнения

In [22]:
%%bash

/opt/share/conda/bin/python3 compare_alignments.py -a2 'PF01625.seed.fa' -a1 reductase.mafft.fa > mafft.truth.txt
/opt/share/conda/bin/python3 msa.comparsion.py -t 'PF01625.seed.sth' -r reductase.mafft.fa -s stockholm -q fasta  -o out.sth -d 515 -m FBB_EQ > mafft.test.txt

cat mafft.truth.txt | wc -l
cat mafft.test.txt | wc -l

12
8


In [23]:
!diff mafft.truth.txt mafft.test.txt

1c1
< Position in alignment1,Position in alignment2
---
> Position in query,Position in subject
3d2
< 32-33,25-26
5,7c4
< 56-56,52-52
< 77-90,74-87
< 92-92,89-89
---
> 77-86,74-83
11d7
< 230-230,229-229


Ввидно, что по большей части выводы программ совпали. Несовпадения были, в основном, в случаях одиночных колонок, что при такой длине и ширине выравнивания, я считаю, не яв-ся критичным.

## Muscle

Теперь тоже самое с muscle

In [24]:
compare_alignments_files(
    query='reductase.muscle.fa',
    subject='PF01625.seed.sth',
    query_format='fasta',
    subject_format='stockholm',
    output='reductase.cmp.muscle.sth',
    threshold=510,
    mark='FBB_EQ',
    output_stats=True,
    output_matrices=True,
)

Position in query,Position in subject
1-15,1-15
39-47,38-46
80-89,74-83
97-99,91-93
102-104,96-98
121-132,114-125
225-232,231-238


(<<class 'Bio.Align.MultipleSeqAlignment'> instance (518 records of length 247) at 7fddc69075e0>,
 <<class 'Bio.Align.MultipleSeqAlignment'> instance (518 records of length 253) at 7fddb3b8d2e0>)

In [25]:
%%bash

/opt/share/conda/bin/python3 compare_alignments.py -a2 'PF01625.seed.fa' -a1 reductase.muscle.fa > muscle.truth.txt
/opt/share/conda/bin/python3 msa.comparsion.py -t 'PF01625.seed.sth' -r reductase.muscle.fa -s stockholm -q fasta  -o out.sth -d 515 -m FBB_EQ > muscle.test.txt

cat muscle.truth.txt | wc -l
cat muscle.test.txt | wc -l

11
8


In [26]:
!diff muscle.truth.txt muscle.test.txt

1c1
< Position in alignment1,Position in alignment2
---
> Position in query,Position in subject
5d4
< 93-93,87-87
8d6
< 115-118,108-111
10d7
< 222-223,228-229


Как видно в случае muscle программы для сравнений так же выдали крайне похожий результат. Отличия в двух из трех случаев были снова в одиночных колонках. Возможно обе программы годятся для сравнения выравниваний

## CS + SP

Сейчас результаты сравнений записаны в соответсвующие файлы - `reductase.cmp.mafft.sth`, `reductase.cmp.muscle.sth`.
Выведем сравнения в таблицу

In [28]:
from IPython import display


cs_mafft = !grep FBB_EQ_cw_pcnt reductase.cmp.mafft.sth
sp_mafft = !grep FBB_SP_EQ_sp_score reductase.cmp.mafft.sth
cs_muscle = !grep FBB_EQ_cw_pcnt reductase.cmp.muscle.sth
sp_muscle = !grep FBB_SP_EQ_sp_score reductase.cmp.muscle.sth
cs_mafft = float(cs_mafft[0].split(' ')[-1])
sp_mafft = float(sp_mafft[0].split(' ')[-1])
cs_muscle = float(cs_muscle[0].split(' ')[-1])
sp_muscle = float(sp_muscle[0].split(' ')[-1])
table=f'|method|cs score|sp score|\n|--|--|--|\n|mafft|{cs_mafft:.2f}|{sp_mafft:.2f}|\n|muscle|{cs_muscle:.2f}|{sp_muscle:.2f}|\n'
display.display_markdown(table, raw=True)

|method|cs score|sp score|
|--|--|--|
|mafft|21.57|58.15|
|muscle|25.10|62.46|


## Интерпретация

1. В обоих выравниваниях около 60% пар совпало. По колонкам в обоих выравниваниях совпало около четверти. 
2. Но, так как у muscle оба скора больше, можно уверенно говорить, что он справился лучше мафта
3. Учитывая такие низкие скоры, можно так же утверждать, что в изначальном сиде выравнивание гораздо лучше обоих - и mafft и muscle 