In [1]:
from typing import List, Dict

def read_csv_with_header(file: str) -> List[Dict[str, str]]:
    """Read a CSV file with a header and return a list of dictionaries."""
    with open(file) as f:
        header = [s.strip() for s in f.readline().strip().split(',')]
        return [dict(zip(header, line.strip().split(','))) for line in f]

blast = read_csv_with_header('output/blast/blast_7_9m_to_8_9m_with_known_contigs_head.csv')
print(blast[0])

{'qaccver': 'tig00000533_7_9m_to_8_9m', 'saccver': 'tig00009311', 'pident': '99.915', 'length': '102315', 'mismatch': '48', 'gapopen': '19', 'qstart': '349837', 'qend': '452136', 'sstart': '892040', 'send': '994330', 'evalue': '0.0', 'bitscore': '1.884e+005'}


In [2]:
def fix_sequence_order(blast: List[Dict[str, str]]) -> None:
    """Ensure send is always bigger than sstart"""
    for b in blast:
        if int(b['sstart']) > int(b['send']):
            b['sstart'], b['send'] = b['send'], b['sstart']
        if int(b['qstart']) > int(b['qend']):
            b['qstart'], b['qend'] = b['qend'], b['qstart']

fix_sequence_order(blast)


In [3]:
def sort_blast_by_contig_and_start(blast: List[Dict[str, str]], start_name: str) -> List[Dict[str, str]]:
    """Sorts blast results by contig and start position."""
    return sorted(blast, key=lambda x: (x['saccver'], int(x[start_name])))

survey_blast_sorted = sort_blast_by_contig_and_start(blast, 'sstart')
query_blast_sorted = sort_blast_by_contig_and_start(blast, 'qstart')
print(survey_blast_sorted[0])
print(query_blast_sorted[0])

{'qaccver': 'tig00000533_7_9m_to_8_9m', 'saccver': 'tig00000002', 'pident': '93.496', 'length': '123', 'mismatch': '6', 'gapopen': '2', 'qstart': '443667', 'qend': '443789', 'sstart': '1', 'send': '121', 'evalue': '1.67e-040', 'bitscore': '182'}
{'qaccver': 'tig00000533_7_9m_to_8_9m', 'saccver': 'tig00000002', 'pident': '78.238', 'length': '749', 'mismatch': '118', 'gapopen': '32', 'qstart': '20302', 'qend': '21033', 'sstart': '20263', 'send': '20983', 'evalue': '8.97e-118', 'bitscore': '438'}


In [4]:
def get_nonoverlapping_sequences(blast: List[Dict[str, str]], start_name: str, end_name: str) -> List[Dict[str, str]]:
    """Concats overlapping sequences from blast results into long aligned sequences."""
    sequences = []
    current_sequence = {
        "contig": blast[0]['saccver'],
        "start": blast[0][start_name],
        "end": blast[0][end_name],
    }

    for row in blast[1:]:
        try:
            if row['saccver'] != current_sequence['contig']:
                sequences.append(current_sequence)
                current_sequence = {
                    "contig": row['saccver'],
                    "start": row[start_name],
                    "end": row[end_name],
                }
            elif int(row[start_name]) > int(current_sequence['end']):
                sequences.append(current_sequence)
                current_sequence = {
                    "contig": row['saccver'],
                    "start": row[start_name],
                    "end": row[end_name],
                }
            elif int(row[end_name]) > int(current_sequence['end']):
                current_sequence['end'] = row[end_name]
        except Exception as e:
            print(row)
            print(e)
            raise e

    sequences.append(current_sequence)
    return sequences
    
survey_sequences = get_nonoverlapping_sequences(survey_blast_sorted, 'sstart', 'send')
query_sequences = get_nonoverlapping_sequences(query_blast_sorted, 'qstart', 'qend')

In [5]:
def write_csv_with_header(file: str, data: List[Dict[str, str]]) -> None:
    """Write a CSV file with a header."""
    with open(file, 'w') as f:
        header = data[0].keys()
        f.write(','.join(header))
        f.write('\n')
        for row in data:
            f.write(','.join([row[key] for key in header]))
            f.write('\n')

write_csv_with_header('output/blast/align_7_9m_to_8_9m_survey.csv', survey_sequences)
write_csv_with_header('output/blast/align_7_9m_to_8_9m_query.csv', query_sequences)