# Evaluation script for the correction and reconstruction of an ancient genome
## Author: Meret Häusler
## Last Updated: 31.03.2025

In [6]:
import glob
import pathlib
import pandas as pd
import glob

from Bio import SeqIO

In [None]:
# Count number of Ns in genome reconstruction

dirpath = "../01-Correction_HQ439467/*/*.fasta"
dirlist = glob.glob(dirpath)

with open('../02-Evaluation_HQ439467/HQ.sim.cont.reco.genome.stats.csv', 'w+') as f:

    f.write("COV_COND,COR_MODE,CNT_N\n")
    
    for file in dirlist:
        # Parse fasta
        seq = SeqIO.read(file, "fasta")
        
        # Initialise N counter
        N_cnt = 0
        
        # Get COV_COND
        cond = file.split("CONT.", 1)[-1].split(".truncated")[0]
        
        # Get correction method
        cor_met = pathlib.PurePath(file).parent.name
        
        for base in seq.seq:
              if base == 'N':
                  N_cnt += 1

        f.write(f"{cond},{cor_met},{N_cnt}\n")

In [None]:
def compareDamagedPositions(dict_pos_gt, dict_uncor, dict_cor):
    """
    dict_pos_gt Dictionary with index of corrected positions and respective GT call {1: 'G', 2: 'A', ...}
    dict_uncor  Dictionary with index of corrected positions and respective uncorrected call
    dict_cor    Dictionary with index of corrected positions and respective corrected call
    """

    # Define counter
    cnt_imp = 0
    cnt_simp = 0
    cnt_swor = 0
    cnt_wor = 0
    cnt_sc = 0
    cnt_diff = 0

    # Iterate over corrected positions in GT dictionary
    for idx, base_gt in dict_pos_gt.items():

        # Get uncorrected base call
        base_nocor = dict_uncor.get(idx)

        # Get corrected base call
        # Note: If no entry with given index, it means that at this position no correction took place. 
        #       Therefore, the base was called with the straightforward cov-freq filter as in the no-correction condition.
        base_cor = dict_cor.get(idx, base_nocor)

        if base_cor != base_nocor:  # Corrected and Uncorrected base call differ
            cnt_diff += 1                   # Count number of positions where correction had effect
            if base_cor == base_gt:         # Corrected base call is same as ref --> Improvement
                cnt_imp += 1
            elif base_cor == 'N':       
                if base_nocor != base_gt:   # Corrected base call is 'N' and Uncorrected base call is wrong --> Semi-improvement
                    cnt_simp += 1
                else:                       # Corrected base call is 'N' and Uncorrected base call is correct --> Semi-Worsening (Too conservative)
                    cnt_swor += 1
            #elif base_cor != base_gt:       # Corrected base call is different from ref --> Worsening
            #    cnt_wor += 1
        elif base_gt != base_nocor and base_gt != base_cor and base_nocor != 'N' and base_cor != 'N':   # Corrected and Uncorrected base call are not 'N' and not Ground truth
            cnt_wor += 1
        else:                               # Sanity check counter
            cnt_sc += 1


    return cnt_imp, cnt_simp, cnt_swor, cnt_wor, cnt_diff

In [None]:
# Iterate over no_correction files
dirpath_NoCor = "../01-Correction_HQ439467/no_correction/*.log"
dirlist_NoCor = glob.glob(dirpath_NoCor)

with open('../02-Evaluation_HQ439467/TEST.sim.cont.reco.eval.cor-pos.csv', 'w+') as f:

    f.write("COV_COND,COR_MODE,Impr,SImpr,SWors,Wors,#DAMPOS_UNION,#DAMPOS_SPEC,#POS_COR_CHANGED\n")

    for file in dirlist_NoCor:  
        # Parse no_correction file
        log_uncor = pd.read_table(file, sep='\t', header=10)

        # Extract Coverage condition and Run
        cond = file.split("CONT.", 1)[-1].split(".truncated")[0]
        
        # Initialise sets
        set_pos_gt = set()    # for all corrected positions and respective GT calls
        set_uncor = set()     # for uncorrected calls at corrected positions
        # Initialise dicts for
        dict_refbasedSil = {}  # for ref-based silencing calls at corrected positions
        dict_reffreeSil = {}   # for ref-free silencing calls at corrected positions
        dict_reffreeWei = {}   # for ref-free weighting calls at corrected positions
         
        # Collect for each correction condition corresponding log file
        dirlist_Cor = [l for l in glob.glob("../01-Correction_HQ439467/*/*"+str(cond)+"*.log") if "no-cor" not in l]

        # Iterate of log files for all correction methods
        for file_cor in dirlist_Cor:
            
            # Parse log file
            header_cnt = 12 if "ref-free_weighting" in file_cor else 10
            log_cor = pd.read_table(file_cor, sep='\t', header=header_cnt)

            # Add corrected positions to set for gt at corrected positions
            set_pos_gt.update(list(zip(log_cor.POS, log_cor.REF)))

            # Filter for base calls in uncorrected files based on position in corrected log file
            calls_uncor = log_uncor[log_uncor['POS'].isin(log_cor.POS)]['BASE_CALL']
            set_uncor.update(list(zip(log_cor.POS, calls_uncor)))
            
            # Check which correction method and add to respective dict
            cor_met = pathlib.PurePath(file_cor).parent.name
            match cor_met:
                case 'ref-based_silencing':
                    dict_refbasedSil = dict(list(zip(log_cor.POS, log_cor.BASE_CALL)))
                case 'ref-free_silencing':
                    dict_reffreeSil = dict(list(zip(log_cor.POS, log_cor.BASE_CALL)))
                case 'ref-free_weighting':
                    dict_reffreeWei = dict(list(zip(log_cor.POS, log_cor.BASE_CALL)))


        # Convert sets to dicts
        dict_pos_gt = dict(set_pos_gt)
        dict_uncor = dict(set_uncor)

        # Define a mapping of correction methods to their corresponding dictionaries
        correction_methods = {
            'ref-based_silencing': dict_refbasedSil,
            'ref-free_silencing': dict_reffreeSil,
            'ref-free_weighting': dict_reffreeWei
        }

        # Iterate over correction methods
        for corMet, dict_cor in correction_methods.items():

            # Run analysis
            cnt_imp, cnt_simp, cnt_swor, cnt_wor, cnt_diff = compareDamagedPositions(dict_pos_gt, dict_uncor, dict_cor)

            # Write Cov/Run, CorMode, counts to file
            f.write(f"{cond},{corMet},{cnt_imp},{cnt_simp},{cnt_swor},{cnt_wor},{len(set_pos_gt)}, {len(dict_cor)}, {cnt_diff}\n")