In [24]:
import networkx
import obonet
from enum import Enum
import json
from os.path import join
import pandas as pd

In [25]:
IMAGE_ID = "Repeat_Test-2_Scan1"
#IMAGE_ID = "DON218-ND-52yM-T1A"

In [26]:
COL_SUFFIX = "OME-TIFF" if IMAGE_ID == "DON218-ND-52yM-T1A" else "QPTIFF"

In [27]:
CL_ROOT_ID = 'CL:0000000'

In [28]:
df = pd.read_csv(join("data", "Pancreas OMAP Markers.csv"), skiprows=1)
df = df.loc[pd.notnull(df["Marker name"]) & (df["Any OMAP?"] == "Y") & (df[f"Channel # ({COL_SUFFIX})"] != "-")]
df.head()

Unnamed: 0,Channel name (OME-TIFF),Channel # (OME-TIFF),Channel name (QPTIFF),Channel # (QPTIFF),Marker name,UniProt ID,OMAP-6?,In-progress OMAP?,Any OMAP?,Cell type(s),CT ID(s),Unnamed: 11,CT/1,CT/1/ID,CT/2,CT/2/ID,CT/3,CT/3/ID
0,CHRA,15,CHGA,28.0,Chromogranin A,P10645,Y,Y,Y,Endocrine,CL:0000169; CL:0000171; CL:0000173; CL:0002275...,,Pancreas exocrine glandular cell,CL:1001599,Pancreatic acinar cell,CL:0002064,,
1,GCG,6,GCG,18.0,Glucagon,P01275,Y,Y,Y,Endocrine (α),CL:0000171,,Pancreatic endocrine cell,CL:0008024,Type B pancreatic cell,CL:0000169,,
2,CPEP,13,C-Pep,22.0,C-peptide,P01308,Y,Y,Y,Endocrine (β),CL:0000169,,Pancreatic endocrine cell,CL:0008024,Pancreatic A cell,CL:0000171,,
3,PPY,20,PPY,20.0,Pancreatic polypeptide,P01298,Y,Y,Y,Endocrine (γ),CL:0002275,,Pancreatic endocrine cell,CL:0008024,Pancreatic D cell,CL:0000173,,
4,SST,29,SST,5.0,Somatostatin,P61278,Y,Y,Y,Endocrine (δ),CL:0000173,,Pancreatic endocrine cell,CL:0008024,Pancreatic PP cell,CL:0002275,,


In [29]:
cl_id_col = df["CT ID(s)"]
cl_ids_in_omap_col = cl_id_col.values.tolist()

In [30]:
cl_ids_in_omap_col

['CL:0000169; CL:0000171; CL:0000173; CL:0002275; CL:0005019',
 'CL:0000171',
 'CL:0000169',
 'CL:0002275',
 'CL:0000173',
 'CL:0000071; CL:0002144',
 'CL:0000669; CL:0002410; CL:0000359',
 'CL:0008019',
 nan,
 nan,
 'CL:0002079; CL:1001433',
 nan,
 'CL:0000169',
 'CL:0002064; CL:0000169',
 nan,
 'CL:0008024; CL:0000169; CL:0000171; CL:0000173; CL:0002275; CL:0005019',
 'CL:0005019',
 'CL:0000169; CL:0000171; CL:0000173; CL:0002275; CL:0005019',
 'CL:0002079; CL:1001433',
 nan,
 'CL:0002079; CL:1001433; CL:0002574; CL:0000057',
 'CL:0000542; CL:0000236; CL:0000576',
 'CL:0000235; CL:0000451',
 'CL:0000542; CL:0000084',
 'CL:0000542; CL:0000084',
 'CL:0000542; CL:0000236',
 'CL:0000542',
 'CL:0000669; CL:0002574']

In [31]:
def flatten(l):
    return [item for sublist in l for item in sublist]

In [32]:
all_cl_ids_in_omap = flatten([ cl_row.split("; ") for cl_row in cl_ids_in_omap_col if pd.notnull(cl_row) ])
unique_cl_ids_in_omap = list(set(all_cl_ids_in_omap))

In [33]:
unique_cl_ids_in_omap

['CL:0000173',
 'CL:0000084',
 'CL:0008019',
 'CL:0000359',
 'CL:1001433',
 'CL:0002574',
 'CL:0002079',
 'CL:0000071',
 'CL:0008024',
 'CL:0000451',
 'CL:0002275',
 'CL:0002410',
 'CL:0000576',
 'CL:0000057',
 'CL:0000236',
 'CL:0000669',
 'CL:0002064',
 'CL:0000235',
 'CL:0002144',
 'CL:0005019',
 'CL:0000169',
 'CL:0000542',
 'CL:0000171']

In [34]:
CL_OBO_URL = "http://purl.obolibrary.org/obo/cl/cl-basic.obo"

In [35]:
graph = obonet.read_obo(CL_OBO_URL)

In [36]:
def load_cl_obo_graph(cl_obo_file):
    graph = obonet.read_obo(cl_obo_file)

    # Make sure there are no cycles.
    assert networkx.is_directed_acyclic_graph(graph)

    id_to_name = {
        id_: data.get('name')
        for id_, data in graph.nodes(data=True)
    }
    name_to_id = {
        data['name']: id_
        for id_, data in graph.nodes(data=True) if 'name' in data
    }

    return graph, id_to_name, name_to_id

In [37]:
# Recursively convert a nested dict to a level zero node
# of the cell-set hierarchy schema.
def dict_to_tree(name_or_cl_id, value, id_to_name, parent_term_id):
    '''
    >>> h_dict = {
    ...     "hematopoietic cell": {
    ...         "leukocyte": [4, 5, 6],
    ...         "hematopoietic precursor cell": [7, 8, 9]
    ...     },
    ...     "epithelial cell": [1, 2, 3]
    ... }
    >>> h_tree = dict_to_tree("test", h_dict)
    >>> h_tree["name"]
    'test'
    >>> [x["name"] for x in h_tree["children"]]
    ['hematopoietic cell', 'epithelial cell']
    '''
    try:
        name = id_to_name[name_or_cl_id]
        cl_id = name_or_cl_id
    except KeyError:
        name = name_or_cl_id
        cl_id = None
    if isinstance(value, dict):
        children = [
            dict_to_tree(child_name, child_value, id_to_name, cl_id)
            for child_name, child_value in value.items()
        ]
        filtered_children = []
        for child in children:
            if child["name"] != "any":
                filtered_children.append(child)
        
        return {
            "name": name,
            "term": cl_id,
            "children": filtered_children,
        }
    else:
        return {
            "name": name,
            "term": parent_term_id if name == "any" else cl_id,
            "set": value,
        }

In [38]:
# Try removing the extra hierarchy level for the "any" set,
# if it is an "only-child"
def remove_any_from_dict_levels(v):
    '''
    >>> h = {'type b': {'type c': {'any': ['cell 1', 'cell 2', 'cell 3']}}}
    >>> new_h = remove_any_from_dict_levels(h)
    >>> new_h
    {'type b': {'type c': ['cell 1', 'cell 2', 'cell 3']}}
    '''
    if type(v) is dict:
        keys = list(v.keys())
        if len(keys) == 1 and keys[0] == "any":
            # Return the value associated with the "any" property,
            # since "any" has no siblings.
            return v["any"]
        else:
            # This is a dict with multiple values, so recursively
            # try this function on all of its values.
            return dict(
                zip(
                    keys,
                    list(map(remove_any_from_dict_levels, v.values()))
                )
            )
    else:
        # This is not a dict, so just return as-is.
        return v

In [39]:
# Using a path through the DAG for a particular cell set,
# recursively fill in a nested dict.
def fill_in_dict_from_path(d, keys, child):
    '''
    >>> h = dict()
    >>> path_set_tuple = (
    ...     ["type a", "type b", "type c"],
    ...     ["cell 1", "cell 2", "cell 3"]
    ... )
    >>> fill_in_dict_from_path(h, path_set_tuple[0], path_set_tuple[1])
    {'any': ['cell 1', 'cell 2', 'cell 3']}
    >>> h
    {'type a': {'type b': {'type c': {'any': ['cell 1', 'cell 2', 'cell 3']}}}}
    '''
    """
    d : dict The resulting dictionary so far.
    keys : A list of keys representing the path down the cell ontology DAG.
    child : A set value.
    """
    key = keys[0]

    if key in d and isinstance(d[key], dict):
        result = d[key]
    else:
        result = d[key] = dict()

    if len(keys) == 1:
        result["any"] = child
        return result
    else:
        new_keys = keys.copy()
        new_keys.pop(0)
        return fill_in_dict_from_path(result, new_keys, child)

In [40]:
# Given a list of multiple paths up the DAG,
# sort the list according to a heuristic.
def sort_paths_up_cell_ontology(paths_up, name_to_id):
    '''
    >>> ex_paths_up = [
    ...     ['b', 'motile cell', 'native cell', 'cell'],
    ...     ['a', 'b', 'c', 'somatic cell', 'native cell', 'cell'],
    ...     ['a', 'somatic cell', 'native cell', 'cell']
    ... ]
    >>> sorted_paths_up = sort_paths_up_cell_ontology(ex_paths_up)
    >>> sorted_paths_up[0]
    ['a', 'somatic cell', 'native cell', 'cell']
    >>> sorted_paths_up[1]
    ['a', 'b', 'c', 'somatic cell', 'native cell', 'cell']
    '''
    PREFERENCE_NAMES = [
        ['animal cell', 'eukaryotic cell', 'native cell', 'cell'],
        ['somatic cell', 'native cell', 'cell'],
        ['nucleate cell', 'native cell', 'cell'],
        ['precursor cell', 'native cell', 'cell'],
    ]
    PREFERENCES = [
        [name_to_id[name] for name in name_group]
        for name_group in PREFERENCE_NAMES
    ]
    # Prefer all of the above before "functional" categories like
    # [..., 'motile cell', 'native cell', 'cell']
    WORST_PREFERENCE_INDEX = len(PREFERENCES)

    def get_first_preference_index_and_path_length(path_up):
        path_preference_match_index = WORST_PREFERENCE_INDEX
        for preference_index, preference in enumerate(PREFERENCES):
            if path_up[-len(preference):] == preference:
                path_preference_match_index = preference_index
                break
        # Return a tuple of the first matching preference "path ending"
        # and the path length (to use shorter paths if multiple paths
        # match the same top path ending).
        return (path_preference_match_index, len(path_up))
    return sorted(paths_up, key=get_first_preference_index_and_path_length)


In [41]:
# Using the cell ontology DAG,
# get all possible paths up to the root from
# a given node_id value.
def get_paths_up_cell_ontology(graph, node_id):
    if node_id == CL_ROOT_ID:
        return [
            [node_id]
        ]

    # Get ancestors of the cell type
    # (counterintuitive that the function is called descendants).
    ancestor_term_set = networkx.descendants(graph, node_id)

    # Make sure the cell type has an ancestor
    # with the 'cell' root ID.
    assert CL_ROOT_ID in ancestor_term_set

    # Get the parents of the current node.
    node_parents = graph.out_edges(node_id, keys=True)

    up_dag_paths = []
    for node_parent in node_parents:
        _, curr_parent_id, relationship = node_parent
        if relationship == "is_a":
            parent_paths = get_paths_up_cell_ontology(graph, curr_parent_id)
            for parent_path in parent_paths:
                up_dag_paths.append([node_id] + parent_path)
    return up_dag_paths


In [42]:
# Construct the tree, according to the following schema:
# https://github.com/hubmapconsortium/vitessce/blob/d5f63aa1d08aa61f6b20f6ad6bbfba5fceb6b5ef/src/schemas/cell_sets.schema.json
def init_cell_sets_tree():
    return {
        "datatype": "obs",
        "version": "0.1.3",
        "tree": []
    }

In [43]:
def generate_cell_type_cell_sets(cl_ids, cl_obo_file):
    """
    Generate a tree of cell sets
    for hierarchical cell type annotations.
    """
    tree = init_cell_sets_tree()
    # Load the cell ontology DAG
    graph, id_to_name, name_to_id = load_cl_obo_graph(cl_obo_file)

    ancestors_and_sets = []

    for cl_id in cl_ids:
        try:
            node_id = cl_id
            node_name = id_to_name[cl_id]
        except KeyError:
            print(
                f"ERROR: annotation '{cl_id}' does "
                "not match any node in the cell ontology."
            )
            continue

        # Get all of the possible paths up to the root
        # from the current node.
        paths_up = get_paths_up_cell_ontology(graph, node_id)
        # Get the names of each node in each path.
        named_paths_up = [
            [id_to_name[n_id] for n_id in path_nodes]
            for path_nodes in paths_up
        ]
        print(
            f"WARNING: {id_to_name[node_id]} has {len(paths_up)} paths"
            f" up to {CL_ROOT_ID} ({id_to_name[CL_ROOT_ID]})."
        )

        # Sort potential paths "up the hierarchy" by our preferences,
        # to avoid "functional" parent nodes like "motile cell"
        sorted_named_paths_up = sort_paths_up_cell_ontology(paths_up, name_to_id)

        named_ancestors = sorted_named_paths_up[0]
        named_ancestors_reversed = list(reversed(named_ancestors))
        # Get a list of (cell_id, prediction_score) tuples for the set.
        set_value = []

        ancestors_and_sets.append((
            named_ancestors_reversed,
            set_value
        ))

    # Pop off all ancestors that are the same for all cell types.
    # e.g. 'cell', 'native cell', ...
    ancestor_list_lens = [len(x[0]) for x in ancestors_and_sets]
    min_ancestor_list_len = min(ancestor_list_lens)
    assert min_ancestor_list_len >= 1
    for level in range(min_ancestor_list_len - 1):
        unique_level_cell_types = set()
        for ancestors, cell_set in ancestors_and_sets:
            unique_level_cell_types.add(ancestors[0])

        if len(unique_level_cell_types) == 1:
            for ancestors, cell_set in ancestors_and_sets:
                ancestors.pop(0)
        else:
            break

    # Create the hierarchy as a dict.
    h = dict()
    for ancestors, cell_set in ancestors_and_sets:
        fill_in_dict_from_path(h, ancestors, cell_set)

    # Try removing all of the single-child "any" levels
    # now that the hierarchy has been created as a dict.
    h = remove_any_from_dict_levels(h)

    # Transform the dict into an object matching the JSON schema.
    tree["tree"] = [
        dict_to_tree("Cell Type Annotations", h, id_to_name, None)
    ]
    return tree

In [44]:
cell_sets = generate_cell_type_cell_sets(unique_cl_ids_in_omap, CL_OBO_URL)



In [45]:
cell_sets

{'datatype': 'obs',
 'version': '0.1.3',
 'tree': [{'name': 'Cell Type Annotations',
   'term': None,
   'children': [{'name': 'endocrine cell',
     'term': 'CL:0000163',
     'children': [{'name': 'enteroendocrine cell',
       'term': 'CL:0000164',
       'children': [{'name': 'type D enteroendocrine cell',
         'term': 'CL:0000502',
         'children': [{'name': 'pancreatic D cell',
           'term': 'CL:0000173',
           'set': []}]},
        {'name': 'PP cell',
         'term': 'CL:0000696',
         'children': [{'name': 'pancreatic PP cell',
           'term': 'CL:0002275',
           'set': []}]},
        {'name': 'type A enteroendocrine cell',
         'term': 'CL:0002067',
         'children': [{'name': 'pancreatic A cell',
           'term': 'CL:0000171',
           'set': []}]}]}]},
    {'name': 'somatic cell',
     'term': 'CL:0002371',
     'children': [{'name': 'hematopoietic cell',
       'term': 'CL:0000988',
       'children': [{'name': 'leukocyte',
        

In [46]:
with open(join("data", f"{IMAGE_ID}.cell_sets.json"), "w") as f:
    json.dump(cell_sets, f)