## Comments
This uses the output of the "MACAW_Phase_1_tests" script and assumes the filenames created by these tests.  
Change the naming convention in the first function second row to fit yours.  
Additionally you may need to change the use of the function in Main if you didn't included the model id in the file names

## Imports

In [19]:
import cobra
from cobra.io import read_sbml_model, write_sbml_model
import os
import pandas as pd

## Directories

In [32]:
# Path to your models
model_dir = '/home/lisa/Dokumente/Programmierung/Models/03_charge_balance/'

# Path to the .csv files created by the "MACAW_Phase_1_tests" script
test_results_dir = '/home/lisa/Dokumente/Programmierung/Scripts/Frowin/macaw_csv/'

# Path where the new, fixed models (xml files) should be stored
fixed_model_dir = '/home/lisa/Dokumente/Programmierung/Scripts/Frowin/macaw_csv/'

## Functions

In [21]:
# Summarizes the test results for the diphophate test throughout all files in the test_results_dir
def create_diphsophate_overview(test_results_dir):

    # Create lists to store the reactions that should be irreversible and those that should be flipped and made irreversible
    irreversible = list()
    flipped_and_irr = list()

    # Iterate through the test results directory and read each CSV file
    for file in os.listdir(test_results_dir):
        if file.endswith(".csv"):
            df = pd.read_csv(test_results_dir+file)

            # Create smaller df that only contain the rows with the columns of interest
            # and put all ids in the corresponding list
            irreversible_df = df[df.iloc[:, 6] == 'should be irreversible'] 
            irreversible.extend(irreversible_df.iloc[:, 0].tolist())
            flipped_and_irr_df = df[df.iloc[:, 6] == 'should be flipped and made irreversible']
            flipped_and_irr.extend(flipped_and_irr_df.iloc[:, 0].tolist())

    # Remove duplicates from the lists
    irreversible = list(set(irreversible))
    flipped_and_irr = list(set(flipped_and_irr))

    # Print the results
    print(f"{len(irreversible)} reactions should be Irreversible reactions:")
    for i in irreversible:
        print(i)
    print("-------------------------------------------------------------------------\n")
    print(f"{len(flipped_and_irr)} reactions should be Flipped and irreversible reactions:")
    for i in flipped_and_irr:
        print(i)

In [22]:
# Use the .csv files from the phase_1 MACAW tests to create a list of all 'duplicate pairs' for a model
# This will be four lists of tuples that represent all reactions in the model that where flagged by 
# MACAW as an exact-, directional-, coefficient- or redox-duplicate

# This takes the model_id as input (without any fomrat suffix)
def get_all_duplicate_pairs(model_id):

    # The name of the test_result file corresponding to this model is assumed and the csv is read
    # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!CHANGE THIS DEPENDING ON YOUR NAMING CONVENTION!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    test_results = pd.read_csv(test_results_dir+f"{model_id}_phase_1.csv")

    # Filter reactions with entries not equal to "ok" in the specified columns
    duplicates_df = test_results.loc[
        (test_results['duplicate_test_exact'] != 'ok') |
        (test_results['duplicate_test_directions'] != 'ok') |
        (test_results['duplicate_test_coefficients'] != 'ok') |
        (test_results['duplicate_test_redox'] != 'ok'),
        ['reaction_id', 'duplicate_test_exact', 'duplicate_test_directions', 'duplicate_test_coefficients', 'duplicate_test_redox']
    ]

    # Create a dict containing the model reactions as keys and the test results for each test as value (dict)
    # This will contain all reactions that were flagged by at least one duplicate test
    model_duplicates_dict = {
        row['reaction_id']: {
            "Exact": row['duplicate_test_exact'],
            "Directions": row['duplicate_test_directions'],
            "Coefficient": row['duplicate_test_coefficients'],
            "Redox": row['duplicate_test_redox']
        }
        for _, row in duplicates_df.iterrows()
    }

    # Create lists for all types of duplicates
    exact_duplicates = list()
    directional_duplicates = list()
    coefficient_duplicates = list()
    redox_duplicates = list()

    # Iterate through all reactions and corresponding duplicate test results
    for reaction_id, tests in model_duplicates_dict.items():

        # Append all reactions flagged by MACAW and the corresponding duplicate reactions
        if tests['Exact'] != 'ok':
            exact_duplicates.append((reaction_id, tests['Exact']))
        if tests['Directions'] != 'ok':
            directional_duplicates.append((reaction_id, tests['Directions']))
        if tests['Coefficient'] != 'ok':
            coefficient_duplicates.append((reaction_id, tests['Coefficient']))
        if tests['Redox'] != 'ok':
            redox_duplicates.append((reaction_id, tests['Redox']))

    
    # Apply the format_duplicate_pairs function to split reactions with multiple duplicates for the same type
    # into singular pairs (see function below)
    exact_duplicates = formate_duplicate_pairs(exact_duplicates)
    directional_duplicates = formate_duplicate_pairs(directional_duplicates)
    coefficient_duplicates = formate_duplicate_pairs(coefficient_duplicates)
    redox_duplicates = formate_duplicate_pairs(redox_duplicates)

    # Returns all 4 lists 
    return exact_duplicates, directional_duplicates, coefficient_duplicates, redox_duplicates

In [23]:
# A function to create singular reaction pairs in all duplicate lists
# This is necessary since some reactions have several duplicate reactions

# The function takes a list of tuples as input
def formate_duplicate_pairs(duplicate_list):

    # Create a new duplicate list for the formated results at the end = output list
    formated_duplicate_list = list()

    # Iterate through all pairs in the input list and check if there are multiple duplicates for the same reaction
    # This is indicated by the presence of a semicolon
    for r1, r2 in duplicate_list:

        # If a semicolon is found new pairs will be created and added to the output list
        if ";" in r2:

            # Split the string by semicolon and create new pairs
            r2_list = r2.split(";")

            # Create a list for new pairs
            new_pairs = list()
            
            # Create new pairs from all duplicates and add them to the new_pairs list
            # The new pairs will be sorted to later remove doubles
            for r in r2_list:
                new_pairs.append(tuple(sorted((r1, r))))

            # Extend the output list with new pairs
            formated_duplicate_list.extend(new_pairs)

        # When no semicolon is found the pair is sorted and just added to the output list
        else:
            formated_duplicate_list.append(tuple(sorted((r1, r2))))

    # Remove all duplucate pairs (so all pairs of duplicates that are in the list multiple times)
    formated_duplicate_list = list(set(formated_duplicate_list))

    # Return the formated list of duplicates 
    # This is also a list of tuples and most likely longer than the input
    return formated_duplicate_list

In [36]:
# Creates a dictionary containing all reactions in the model as keys
# The value is always a list of the sorted genes corresponding to this reaction 
# OR ["None"] if no GPR could be found for this reaction in this model

# Input is a cobra model object
def create_gpr_list_dir(model):

    # Create the output dict
    gpr_list_dir = {}

    # Iterate through all reactions in the model
    for reaction in model.reactions:

        # Get the GPR as string
        gpr_string = reaction.gene_reaction_rule

        # If no GPR is given set ["None"] as the value in the output dict
        if gpr_string == '':
            gpr_list_dir[reaction.id] = ["None"]

        # If GPR are found formate them into a list by first splitting them by the 'or' relation
        # Genes with an 'and' relation will be split further and added to the GPR list as an list themself
        else:
            gpr_list_or_split = list(gpr_string.split(' or '))

            # Create a list where both 'or' and 'and' relations are kept = Value list
            gpr_list_or_and_split = []

            # Check if an entry in the list contains an 'and'
            for entry in gpr_list_or_split:

                # If yes the split this entry into a new list, clean it and add the list to the value list
                if 'and' in entry:
                    new_entry = list(entry.split(' and '))
                    new_entry_cleaned = [s.replace("(", "").replace(")", "") for s in new_entry]

                    # Sort this sublist alphabetically
                    gpr_list_or_and_split.append(sorted(new_entry_cleaned))

                # If no then just append the entry into the output list
                else:
                    gpr_list_or_and_split.append(entry)

            # Sort the output list alphabetically by treating each entry as a string 
            # Add the sorted value list to the output dict
            gpr_list_dir[reaction.id] = sorted(gpr_list_or_and_split, key=lambda x: ' '.join(x) if isinstance(x, list) else x)

    # Output is a dict with an entry for each reaction in the input model
    return gpr_list_dir

In [25]:
# A function to compare GPR lists created by the create_gpr_list_dir function
# The output is highly variable and specific
# Only strings describing the relation of the GPR for both input reactions are returned

# Input are two reaction_ids
def compare_gpr_lists(r1, r2):

    # GPR lists for both input reactions are fetched from the dir
    r1_gpr = gpr_list_dir[r1]
    r2_gpr = gpr_list_dir[r2]

    # If either of the reactions has no GPR it is checked if they are both boundary reactions or not
    if (r1_gpr[0] == "None") or (r2_gpr[0] == "None"):
        if (model.reactions.get_by_id(r1) in model.boundary) and (model.reactions.get_by_id(r2) in model.boundary):
            return 'No GPR and both boundary reactions'
        else:
            return 'No GPR but one not a boundary reaction'
    
    # Do they have the same GPR list?
    elif r1_gpr == r2_gpr:
        return 'Same GPR list'
    
    # Is the list the same lenght and shares genes or not?
    elif len(r1_gpr) == len(r2_gpr):
        if any(item in r1_gpr for item in r2_gpr):
            return 'Same length of GPR list and share some genes'
        else:
            return 'Same length of GPR list but no common genes'
    
    # Are the lists of different lenght and:
    # Is one list a sublist of the other?
    # Share the lists genes or not?
    else:
        if (len(r1_gpr) > len(r2_gpr)) and all(item in r1_gpr for item in r2_gpr):
            return 'r2 in r1'
        elif (len(r2_gpr) > len(r1_gpr)) and all(item in r2_gpr for item in r1_gpr):
            return 'r1 in r2'
        elif any(item in r1_gpr for item in r2_gpr):
            return 'Different lenght of GPR list and share some genes'
        else:
            return 'Different lenght of GPR list but no common genes'
    

In [45]:
# Exact duplicates will be removed or combined based on their GPR 

def fix_exact_duplicates():

    # Compare the GPR lists of both reactions and apply fixes depending on the returned status
    for r1, r2 in exact_duplicates:
        status = compare_gpr_lists(r1, r2)

        # Delete one reaction if they have the same GPR
        if status == "Same GPR list":
            model.remove_reactions([model.reactions.get_by_id(r2)])

        # If the GPR list of r2 is a substring of the r1 GPR list, delete r2
        elif status == "r2 in r1":
            model.remove_reactions([model.reactions.get_by_id(r2)])

        # If the GPR list of r1 is a substring of the r2 GPR list, delete r1
        elif status == "r1 in r2":
            model.remove_reactions([model.reactions.get_by_id(r1)])

        # If both reactions share genes in their GPR list then combine the the reactions by
        # adding all GPR of r2 to r1 and set the new list as the gpr string for r1
        # Then delete r2
        elif (status == 'Same length of GPR list and share some genes') or (status == 'Different length of GPR list and share some genes'):
            new_gpr_list = list(set(gpr_list_dir[r1].extend(gpr_list_dir[r2])))
            new_gpr_string = create_gpr_strings(new_gpr_list)
            model.reactions.get_by_id(r1).gene_reaction_rule = new_gpr_string
            model.remove_reactions([model.reactions.get_by_id(r2)])

        # For all status where no fix was found yet
        else:
            print(f'exact: No fix possible for {r1} and {r2} with status: {status}')
            continue

        print(f"exact: Fixed reactions {r1} and {r2} with status {status}")


In [27]:
# A function to transform a GPR list back into a GPR string 

def create_gpr_strings(gpr_list):
    result = []
    for item in gpr_list:
        if isinstance(item, list):
            # Join sublist with ' and ' and wrap in parentheses
            sub_str = ' and '.join(item)
            result.append(f'({sub_str})')
        else:
            result.append(item)
    return ' or '.join(result)

In [46]:
# Directional duplicates will be removed or kept based on their GPR

def fix_directional_duplicates():

    # Compare the GPR lists of both reactions and apply fixes depending on the returned status
    for r1, r2 in directional_duplicates:
        status = compare_gpr_lists(r1, r2)

        # Delete one reaction based on the directionality of both if the GPR list is the same
        # Always keep the reversible reaction or make r1 reversible if both are irreversible and delete r2
        if status == "Same GPR list":
            if model.reactions.get_by_id(r1).reversibility:
                model.remove_reactions([model.reactions.get_by_id(r2)])
            elif model.reactions.get_by_id(r2).reversibility:
                model.remove_reactions([model.reactions.get_by_id(r1)])
            else:
                model.reactions.get_by_id(r1).bounds = (1000.0, 1000.0)
                model.remove_reactions([model.reactions.get_by_id(r2)])

        # If the GPR list of r2 is a substring of the r1 GPR list and r1 is reversible, delete r2
        elif status == "r2 in r1":
            if model.reactions.get_by_id(r1).reversibility:
                model.remove_reactions([model.reactions.get_by_id(r2)])

        # If the GPR list of r1 is a substring of the r2 GPR list and r2 is reversible, delete r1
        elif status == "r1 in r2":
            if model.reactions.get_by_id(r2).reversibility:
                model.remove_reactions([model.reactions.get_by_id(r1)])

        # For all status where no fix was found yet
        else:
            print(f'direct: No fix possible for {r1} and {r2} with status: {status}')
            continue

        print(f"direct: Fixed reactions {r1} and {r2} with status {status}")

## Diphosphate test summary
This gives you an overview for all reactions flagged by the diphosphate test throughout all your models

In [29]:
create_diphsophate_overview(test_results_dir)

17 reactions should be Irreversible reactions:
ADK2
GALT
FACOAL150_ISO
FACOAL160_ISO
FACOAL150_anteiso
SERASr
UDPACGLP
NNATr
FACOAL170_anteiso
ADK2_1
FACOAL170_ISO
AADb
APAT_1
NAPRT
FACOAL180_2
FACOALPHDCA
FACOAL140_ISO
-------------------------------------------------------------------------

1 reactions should be Flipped and irreversible reactions:
ORPT


## Duplicate Main  
This is not done. Feel free to investigate all other duplicate lists or whatever else

In [48]:
# Iterate over all xml files in the model_dir and applies all functions
for file in (f for f in os.listdir(model_dir) if f.endswith(".xml")):

    # Remove the .xml suffix to generate only the model id
    model_id = file[:-4]

    # Prints the model that is investigated right now
    print(f"Model: {model_id}")

    # Parse the model
    model = read_sbml_model(model_dir+file)

    # Create duplicate lists for all four types of duplicates MACAW tests for
    exact_duplicates, directional_duplicates, coefficient_duplicates, redox_duplicates = get_all_duplicate_pairs(model_id)

    # Creates a dict containing the GPR information for all reactions in the model
    gpr_list_dir = create_gpr_list_dir(model)

    # Apply fixes for exact duplicates
    fix_exact_duplicates()

    # Apply fixes for directional duplicates
    fix_directional_duplicates()

    print("-----------------")

    # save model
   # write_sbml_model(model, fixed_model_dir+f"{model_id}_phase_1.xml")

Model: AA4_curated
['WP_014172105_1', 'WP_032678623_1']
exact: No fix possible for ATPM and NTP1 with status: No GPR but one not a boundary reaction
exact: Fixed reactions OPETDC and OPTCCL with status Same GPR list
direct: No fix possible for GLBRAN2 and GLDBRAN2 with status: Same length of GPR list but no common genes
direct: No fix possible for INDOLEt2pp and INDOLEt2rpp with status: Different lenght of GPR list but no common genes
direct: Fixed reactions ACACT10 and ACACT5r with status Same GPR list
direct: No fix possible for ALCD2ir and ALCD2x with status: Different lenght of GPR list but no common genes
direct: No fix possible for GLBRAN3 and GLDBRAN3 with status: No GPR but one not a boundary reaction
direct: Fixed reactions HACD7 and HACD7i with status Same GPR list
direct: No fix possible for ACTD and ACTD_1 with status: Different lenght of GPR list but no common genes
direct: Fixed reactions HACD5 and HACD5i with status Same GPR list
direct: Fixed reactions HACD8 and HACD8i 