## Import ##

In [None]:
!pip install biodivine_aeon==1.3.0a3



In [None]:
from biodivine_aeon import *
import requests
import itertools # itertools are used to export results as .xsxl

## Functions ##

### Implementation Functions ###

In [None]:
class EnrichmentBehaviourClass:
    def __init__(self, ) -> None:
        self.attractors = list()
        self.attractor_types = list()

    def add_attractor(self, attractor):
        self.attractors.append(attractor)
        self.attractor_types.append(attractor.attractor_type)

    def goterm_intersection(self):
        intersect = self.attractors[0].go_terms_set
        for attractor in self.attractors[1:]:
            intersect = intersect.intersection(attractor.go_terms_set)
        return intersect

    def goterm_unique(self):
        unique = []
        for attractor in self.attractors:
            unique_set = attractor.go_terms_set
            for attractor2 in self.attractors:
                if attractor == attractor2: continue
                unique_set = unique_set.difference(attractor2.go_terms_set)
            unique.append(unique_set)
        return unique

    def __str__(self):
        result = "Behaviour class \n"
        for attractor in self.attractors:
            result += " |-- " + str(attractor) + "\n"
        return result

    def __repr__(self):
        result = "Behaviour class \n"
        for attractor in self.attractors:
            result += " |-- " + str(attractor) + "\n"
        return result


class EnrichmentAttractor:
    def __init__(self, attractor_type, enrichment_result, fdr) -> None:
        self.fdr = fdr
        self.attractor_type = attractor_type
        self.goterms = dict()
        self.go_terms_set = set()

        if enrichment_result is None:
            self.mapped_ids = []
            self.unmapped_ids = []
            return

        self.mapped_ids = enrichment_result.mapped_ids
        self.unmapped_ids = enrichment_result.unmapped_ids

        for process in enrichment_result.result:
            go_term = EnrichmentGOterm(process)
            if go_term.fdr > self.fdr: continue
            self.goterms[go_term.go_id] = go_term
            self.go_terms_set.add(go_term.go_id)

    def get_goterms_by_set(self, wanted):
        return [self.goterms[go_id] for go_id in wanted if go_id in self.goterms]

    def get_all_goterms(self):
        return self.goterms

    def get_plus_goterms(self):
        return [goterm for goterm in self.goterms.values() if goterm.plus_minus == "+"]

    def get_minus_goterms(self):
        return [goterm for goterm in self.goterms.values() if goterm.plus_minus == "-"]

    def __str__(self):
        return f"{self.attractor_type}"

    def __repr__(self):
        return f"{self.attractor_type}"


class EnrichmentGOterm:
    def __init__(self, process) -> None:
        self.go_id = process.get("term", {}).get("id", "")
        self.process_name = process["term"]["label"]
        self.fold_enrichment = process["fold_enrichment"]
        self.fdr = process["fdr"]
        self.expected = process["expected"]
        self.number_in_reference = process["number_in_reference"]
        self.p_value = process["pValue"]
        self.plus_minus = process["plus_minus"]

    def __repr__(self):
        return f"{self.plus_minus}{self.process_name}"

    def __str__(self):
        return f"{self.plus_minus}{self.process_name}"


In [None]:
class EnrichmentResult:
    def __init__(self, enrichmentData):
        self.data = enrichmentData

        self.input = self.data["results"]["input_list"]
        self.organism = self.input["organism"]
        self.mapped_ids = self.input["mapped_ids"]
        self.mapped_count = self.input["mapped_count"]
        self.unmapped_ids = self.input["unmapped_ids"]
        self.unmapped_count = self.input["unmapped_count"]
        self.result = self.data["results"]["result"]  # Sorted by FDR


def prepare_list_for_enrichment(nodes):
    as_string = str(nodes)[1:-1]
    as_string = as_string.replace("\'", "")
    return as_string


def prepare_enrichment_result(enrichment):
    if isinstance(enrichment, dict) and 'search' in enrichment and isinstance(enrichment['search'], dict) and 'error' in enrichment['search']:
        return None

    enrichment_result = EnrichmentResult(enrichment)
    return enrichment_result


def get_enrichment(genes_string, organism_id, goterm_type, test_type="FISHER", correction="FDR"):
    input_genes = genes_string
    organism = organism_id
    test_type = test_type   # FISHER, BINOMIAL
    correction = correction # FDR, BONFERRONI, NONE
    # refInputList  <- potential extension
    # refOrganism

    match goterm_type:
        case "MF": data_type = "GO:0003674"
        case "BP": data_type = "GO:0008150"
        case "CC": data_type = "GO:0005575"
        case _:
            print('Wrong goterm_type. Use "MF","BP","CC" instead. Ending get_enrichment function.')
            return

    req_link = f"https://pantherdb.org/services/oai/pantherdb/enrich/overrep?geneInputList={input_genes}&organism={organism}&annotDataSet={data_type}&enrichmentTestType={test_type}&correction={correction}"
    headers = {"Content-Type": "application/json"}
    response = requests.get(req_link, headers=headers)
    if response.status_code == 200:
        data = response.json()
        return data

    print("Failed to get data. Ending get_enrichment function.")
    return


In [None]:
def make_union(attractors):
    unionized = attractors[0]
    for attractor in attractors[1:]:
        unionized = unionized.union(attractor)
    return unionized


def get_stability_percentage(node, stg, attractor):
    variable_on = stg.mk_subspace({node: True})

    on_in_attractor = attractor.intersect(variable_on).vertices().cardinality()
    off_in_attractor = attractor.minus(variable_on).vertices().cardinality()

    return round((on_in_attractor / (on_in_attractor + off_in_attractor)) * 100.0, 2)


def get_stabilities(classifiers, attractors, stg, calculate_unstable=True):
    all_dict = dict()
    unionized_attractors = make_union(attractors)
    for classifier in classifiers:
        subset = unionized_attractors.intersect_colors(classifiers[classifier])
        enclosing = subset.vertices().enclosing_named_subspace()
        unstable_variables = dict()

        if calculate_unstable:
            all_variables = set(stg.network_variable_names())
            stable_variables = set(enclosing)
            to_calculate = all_variables.difference(stable_variables)

            for variable in to_calculate:
                unstable_variables[variable] = get_stability_percentage(variable, stg, subset)

        stability_dict = dict()
        for node in enclosing:
            if enclosing[node] == True:
                stability_dict[node] = 100
            else:
                stability_dict[node] = 0
        if calculate_unstable:
            stability_dict = stability_dict | unstable_variables

        all_dict[classifier] = stability_dict
    return all_dict


def get_stable_nodes(classifiers, attractors, stg, lower_bound=0, upper_bound=100):
    calculate_unstable = True
    if lower_bound == 100 or upper_bound == 0:
        calculate_unstable = False

    stabilities = get_stabilities(classifiers, attractors, stg, calculate_unstable)

    result_dict = dict()
    for one_class in stabilities:
        print(one_class)
        print("  |", end='')

        sub = []
        for node in stabilities[one_class]:
            if stabilities[one_class][node] > upper_bound:
                continue
            if stabilities[one_class][node] < lower_bound:
                continue

            print(node + ": " + str(stabilities[one_class][node]) + "|", end='')
            sub.append(node)

        print()
        result_dict[one_class] = sub
    return result_dict


In [None]:
def get_evaluated_nodes(phenotype, evaluation=True):
    result_list = []
    for node in phenotype:
        if node[0] == "+" and evaluation:
            result_list.append(node[1:])
        elif node[0] == "-" and not evaluation:
            result_list.append(node[1:])
    return result_list


### Extension Functions ###

In [None]:
from openpyxl import Workbook, load_workbook
from openpyxl.utils import get_column_letter
import os

def append_column_to_xlsx(filepath, data, column_name="Col"):
    """
    Adds data as a new column to an existing XLSX file.
    If the file does not exist, it will be created.
    """
    if os.path.exists(filepath):
        wb = load_workbook(filepath)
        ws = wb.active
    else:
        wb = Workbook()
        ws = wb.active

    if ws.max_column == 1 and ws.cell(row=1, column=1).value is None:
        col_index = 1
    else:
        col_index = ws.max_column + 1

    col_letter = get_column_letter(col_index)

    ws[f"{col_letter}1"] = column_name

    for i, value in enumerate(data, start=2):
        ws[f"{col_letter}{i}"] = value

    wb.save(filepath)

## Analysis ##

In [None]:
network = BooleanNetwork.from_file("deathReceptorSignaling-TTT.aeon")
human_id = "9606"

### Pipeline1 ###

The function pipeline1() encapsulates the algorithm described in the workflows folder. In addition to that, it creates .xsxl file for easier visualisation and postprocessing.

In [None]:
def pipeline1(behaviour_classes):
  ctx = SymbolicSpaceContext(network)
  stg = AsynchronousGraph(network, ctx)

  classification = Classification.classify_stable_phenotypes(ctx, stg)
  attractors = Attractors.attractors(stg)
  attractorClassifs = Classification.classify_attractor_bifurcation(stg, attractors)

  attractors_types = list(attractorClassifs)[0].feature_list()

  class_results = []
  for res in range(len(classification)):
    a = list(classification)[res]
    class_results.append(a.feature_list())

  write_combination = str(combination[0])[0] + str(combination[1])[0] + str(combination[2])[0]
  new_behaviour_class = EnrichmentBehaviourClass()
  i = 0;

  for nodes, attractor_type in zip(class_results, attractors_types):
    to_enrich = get_evaluated_nodes(nodes)
    to_enrich = prepare_list_for_enrichment(to_enrich)
    enrichment = get_enrichment(to_enrich, human_id, "BP")
    enrichment_result = prepare_enrichment_result(enrichment)
    calculated_attractor = EnrichmentAttractor(attractor_type, enrichment_result, 0.05)
    new_behaviour_class.add_attractor(calculated_attractor)

    append_column_to_xlsx("deathReceptorPipeline1.xlsx", calculated_attractor.goterms, column_name=("["+ str(model_number) +"]" + "[" + write_combination+str(i) + "]"))
    i += 1

  behaviour_classes.append(new_behaviour_class)

The main body iterates over all evaluations of input nodes and stores the results in one list.

In [None]:
combinations = list(itertools.product([True, False], repeat=3))
results = dict()
model_number = 0
behaviour_classes = []

for combination in combinations:
  print(model_number, "|", combination[0], combination[1], combination[2])

  network.set_update_function("TNF", str(combination[0]).lower())
  network.set_update_function("FADD", str(combination[1]).lower())
  network.set_update_function("FASL", str(combination[2]).lower())

  pipeline1(behaviour_classes)
  model_number += 1

0 | True True True
1 | True True False
2 | True False True
3 | True False False
4 | False True True
5 | False True False
6 | False False True
7 | False False False


Description of which attractors exists in each parametrisation.

In [None]:
i = 0
for a in behaviour_classes:
  print(str(i) + " ", end="")
  print(a)
  i += 1

0 Behaviour class 
 |-- stability
 |-- stability
 |-- stability

1 Behaviour class 
 |-- stability
 |-- stability
 |-- stability

2 Behaviour class 
 |-- stability
 |-- stability
 |-- stability

3 Behaviour class 
 |-- stability
 |-- stability
 |-- stability

4 Behaviour class 
 |-- stability
 |-- stability
 |-- stability

5 Behaviour class 
 |-- stability
 |-- stability
 |-- stability
 |-- stability

6 Behaviour class 
 |-- stability
 |-- stability
 |-- stability
 |-- stability

7 Behaviour class 
 |-- stability
 |-- stability
 |-- stability
 |-- stability



Showcase for how many attractors no GO Terms were found and in which behaviour class they are present.

In [None]:
for behaviour_class in behaviour_classes:
  print("--")
  for attractor in behaviour_class.attractors:
    print(len(attractor.go_terms_set))

--
143
255
208
--
120
240
196
--
3
90
143
--
0
7
119
--
68
66
106
--
0
81
0
0
--
0
0
0
46
--
0
0
23
0


Which attractors contain given output nodes.

In [None]:
i = 0
for behaviour_class in behaviour_classes:
  print("-- " + str(i))
  i += 1
  j = 0
  for attractor in behaviour_class.attractors:
    print(attractor.unmapped_ids, end='')
    print(" [" + str(j) + "]")
    j += 1

-- 0
Cyt_c,DISC_FAS,DISC_TNF,MOMP,MPT,NonACD,SMAC,TNFR [0]
ATP,Cyt_c,DISC_FAS,DISC_TNF,MOMP,SMAC,TNFR,apoptosis,apoptosome [1]
ATP,DISC_FAS,DISC_TNF,IKK,NFkB,RIP1,RIP1k,RIP1ub,TNFR,cFLIP,cIAP,survival [2]
-- 1
Cyt_c,DISC_TNF,MOMP,MPT,NonACD,SMAC,TNFR [0]
ATP,Cyt_c,DISC_TNF,MOMP,SMAC,TNFR,apoptosis,apoptosome [1]
ATP,DISC_TNF,IKK,NFkB,RIP1,RIP1k,RIP1ub,TNFR,cFLIP,cIAP,survival [2]
-- 2
Cyt_c,MOMP,MPT,NonACD,RIP1,RIP1k,SMAC,TNFR [0]
ATP,IKK,NFkB,RIP1,RIP1k,RIP1ub,TNFR,cFLIP,cIAP,survival [1]
ATP,Cyt_c,MOMP,SMAC,TNFR,apoptosis,apoptosome [2]
-- 3
Cyt_c,MOMP,MPT,NonACD,RIP1,RIP1k,SMAC,TNFR [0]
ATP,IKK,NFkB,RIP1,RIP1k,RIP1ub,TNFR,cFLIP,cIAP,survival [1]
ATP,Cyt_c,MOMP,SMAC,TNFR,apoptosis,apoptosome [2]
-- 4
Cyt_c,DISC_FAS,MOMP,MPT,NonACD,SMAC [0]
ATP,DISC_FAS,IKK,NFkB,RIP1,RIP1k,RIP1ub,cFLIP,cIAP,survival [1]
ATP,Cyt_c,DISC_FAS,MOMP,SMAC,apoptosis,apoptosome [2]
-- 5
ATP,cIAP [0]
ATP,Cyt_c,MOMP,SMAC,apoptosis,apoptosome [1]
ATP [2]
Cyt_c,MOMP,MPT,NonACD,SMAC [3]
-- 6
Cyt_c,MOMP,MPT,NonACD,S

Intersections described in the thesis.

In [None]:
survival_intersection = behaviour_classes[0].attractors[2].go_terms_set
survival_intersection = survival_intersection.intersection(behaviour_classes[1].attractors[2].go_terms_set)
survival_intersection = survival_intersection.intersection(behaviour_classes[2].attractors[1].go_terms_set)
survival_intersection = survival_intersection.intersection(behaviour_classes[3].attractors[1].go_terms_set)
survival_intersection = survival_intersection.intersection(behaviour_classes[4].attractors[1].go_terms_set)

In [None]:
behaviour_classes[0].attractors[2].get_goterms_by_set(survival_intersection)

[+homeostasis of number of cells within a tissue,
 +extrinsic apoptotic signaling pathway via death domain receptors]

In [None]:
nonACD_intersection = behaviour_classes[0].attractors[0].go_terms_set
nonACD_intersection = nonACD_intersection.intersection(behaviour_classes[1].attractors[0].go_terms_set)
nonACD_intersection = nonACD_intersection.intersection(behaviour_classes[2].attractors[0].go_terms_set)
nonACD_intersection = nonACD_intersection.intersection(behaviour_classes[4].attractors[0].go_terms_set)

In [None]:
behaviour_classes[0].attractors[0].get_goterms_by_set(nonACD_intersection)

[+necroptotic process,
 +programmed necrotic cell death,
 +necroptotic signaling pathway]

### Pipeline2 ###

Pipeline2 works in the same manner as the first one.

In [None]:
def pipeline2(behaviour_classes):

  ctx = SymbolicSpaceContext(network)
  stg = AsynchronousGraph(network, ctx)

  classification = Classification.classify_stable_phenotypes(ctx, stg)

  attractors = Attractors.attractors(stg)

  attractorClassifs = Classification.classify_attractor_bifurcation(stg, attractors)

  attractors_types = list(attractorClassifs)[0].feature_list()


  enrichment_input_a3 = get_stable_nodes(attractorClassifs, attractors, stg, 0, 100)
  enrichment_input_a3 = list(enrichment_input_a3.values())[0]
  behaviour_class_a3 = EnrichmentBehaviourClass()

  to_enrich = prepare_list_for_enrichment(enrichment_input_a3)
  enrichment = get_enrichment(to_enrich, "9606", "BP")
  enrichment_result = prepare_enrichment_result(enrichment)
  calculated_attractor = EnrichmentAttractor("CombinedAttractor", enrichment_result, 0.05)
  behaviour_class_a3.add_attractor(calculated_attractor)

  write_combination = str(combination[0])[0] + str(combination[1])[0] + str(combination[2])[0]
  append_column_to_xlsx("deathReceptorPipeline2.xlsx", calculated_attractor.goterms, column_name=("["+ str(i) +"]" + "[" + write_combination + "]"))


  behaviour_classes.append(behaviour_class_a3)

In [None]:
combinations = list(itertools.product([True, False], repeat=3))
vysledky = dict()
i = 0
behaviour_classes2 = []

for combination in combinations:
  print(i, "|", combination[0], combination[1], combination[2])

  network.set_update_function("TNF", str(combination[0]).lower())
  network.set_update_function("FADD", str(combination[1]).lower())
  network.set_update_function("FASL", str(combination[2]).lower())

  pipeline2(behaviour_classes2)
  i += 1


0 | True True True
["stability", "stability", "stability"]
  |DISC_FAS: 100|FADD: 100|FASL: 100|DISC_TNF: 100|TNF: 100|TNFR: 100|RIP1k: 33.33|IKK: 33.33|Cyt_c: 66.67|RIP1ub: 33.33|cIAP: 33.33|SMAC: 66.67|MOMP: 66.67|NFkB: 33.33|CASP3: 33.33|BAX: 66.67|ATP: 66.67|BCL2: 33.33|RIP1: 33.33|cFLIP: 33.33|NonACD: 33.33|apoptosis: 33.33|apoptosome: 33.33|MPT: 33.33|ROS: 33.33|CASP8: 66.67|survival: 33.33|XIAP: 33.33|
1 | True True False
["stability", "stability", "stability"]
  |TNF: 100|TNFR: 100|DISC_TNF: 100|DISC_FAS: 0|FADD: 100|FASL: 0|RIP1k: 33.33|IKK: 33.33|Cyt_c: 66.67|RIP1ub: 33.33|cIAP: 33.33|SMAC: 66.67|MOMP: 66.67|NFkB: 33.33|CASP3: 33.33|BAX: 66.67|ATP: 66.67|BCL2: 33.33|RIP1: 33.33|cFLIP: 33.33|NonACD: 33.33|apoptosis: 33.33|apoptosome: 33.33|MPT: 33.33|ROS: 33.33|CASP8: 66.67|survival: 33.33|XIAP: 33.33|
2 | True False True
["stability", "stability", "stability"]
  |DISC_FAS: 0|FADD: 0|FASL: 100|TNF: 100|TNFR: 100|DISC_TNF: 0|RIP1k: 66.67|IKK: 33.33|Cyt_c: 66.67|RIP1ub: 33.33|cI