In [10]:
import sys
import json
import numpy as np
import pandas as pd

sys.path.append('..')
import ultrametric_distance

In [4]:
def convert_nodes_to_json_tree(node_list):
    """
    Convert a list of hierarchical node names to JSON tree structure.
    
    Args:
        node_list: List of node names like ["p__node1", "p__node1;c__node2", ...]
    
    Returns:
        JSON tree structure where keys are full node names and values are children
        Leaf nodes have {} as value, non-leaf nodes have dict of children
    """
    
    def parse_node_path(node_name):
        """Parse a node name into its hierarchical path components."""
        parts = node_name.split(';')
        return [part for part in parts if '__' in part]
    
    # First pass: collect all nodes and determine which are leaves
    all_nodes = set()
    parent_nodes = set()
    
    for node_name in node_list:
        if node_name.strip():
            path = parse_node_path(node_name)
            for i, full_name in enumerate(path):
                all_nodes.add(full_name)
                # If this is not the last node in the path, it's a parent
                if i < len(path) - 1:
                    parent_nodes.add(full_name)
    
    # Leaf nodes are those that are never parents
    leaf_nodes = all_nodes - parent_nodes
    
    def add_node_to_tree(tree, path):
        """Add a node path to the tree structure."""
        current = tree
        
        for i, full_name in enumerate(path):
            if full_name not in current:
                # Check if this node is a leaf
                if full_name in leaf_nodes:
                    current[full_name] = {}
                else:
                    current[full_name] = {}
            
            # Move to the next level if this node can have children
            if full_name not in leaf_nodes and i < len(path) - 1:
                current = current[full_name]
    
    # Initialize root tree
    tree = {}
    
    # Process each node
    for node_name in node_list:
        if node_name.strip():
            path = parse_node_path(node_name)
            if path:
                add_node_to_tree(tree, path)
    
    return {"root": tree}

In [8]:
taxa_matrix = pd.read_csv("../data/do1200_microbiome_taxa.csv",  index_col='sample_id')
sample_nodes = taxa_matrix.columns.to_list()

# Convert to JSON tree, keep all nodes
tree_all = convert_nodes_to_json_tree(sample_nodes)

# Convert to JSON tree, remove nodes with values below threshold
tree_subset = convert_nodes_to_json_tree(taxa_matrix.loc["DO0561"].index[taxa_matrix.loc["DO0561"] > 0.2].to_list())

print("\nSubset of tree:")
print(json.dumps(tree_subset, indent=2))


Subset of tree:
{
  "root": {
    "p__Firmicutes": {},
    "p__Bacteroidota": {
      "c__Bacteroidia": {
        "o__Bacteroidales": {
          "f__Muribaculaceae": {}
        }
      }
    },
    "p__Actinobacteriota": {
      "c__Actinobacteria": {
        "o__Bifidobacteriales": {
          "f__Bifidobacteriaceae": {
            "g__Bifidobacterium": {}
          }
        }
      }
    }
  }
}


In [9]:
# test tree distance
ultrametric_distance.get_ultrametric_distance(tree_subset, tree_subset)

0.06196510982275294

### KNeighborsClassifier using tree distance as metric

- Each sample has a array of node values, set a cutoff to remove nodes that have values below this threthord. 
- Test different traning n.
- Test different cutoff.

In [11]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score


def knn_classification(x_train, x_test, y_train, y_test, n_neighbors=5):
    """
    K-Nearest Neighbors Classifier
    """
    model = KNeighborsClassifier(n_neighbors=n_neighbors)
    model.fit(x_train, y_train)
    y_predict = model.predict(x_test)
    y_test    = y_test.to_numpy()

    return {
        'model_name': 'knn_classification',
        'y_test'    : y_test,
        'y_predict' : y_predict,
    }


def tree_distance(x_index, y_index, value_cutoff, taxa_matrix):
    dist_matrix = []

    for x in x_index:
        x_dist = []

        for y in y_index:
            tree_x = convert_nodes_to_json_tree(taxa_matrix.loc[x].index[taxa_matrix.loc[x] > value_cutoff].to_list())
            tree_y = convert_nodes_to_json_tree(taxa_matrix.loc[y].index[taxa_matrix.loc[y] > value_cutoff].to_list())
            tree_dist = ultrametric_distance.get_ultrametric_distance(tree_x, tree_y)
            x_dist.append(tree_dist)
        
        dist_matrix.append(x_dist)

    return np.array(dist_matrix)


def knn_classification_tree_distance(x_train, x_test, y_train, y_test, n_neighbors=5):
    """
    K-Nearest Neighbors Classifier using tree distance
    """
    model = KNeighborsClassifier(n_neighbors=n_neighbors, metric="precomputed")
    model.fit(x_train, y_train)
    y_predict = model.predict(x_test)
    y_test    = y_test.to_numpy()

    return {
        'model_name': 'knn_classification_tree_distance',
        'y_test'    : y_test,
        'y_predict' : y_predict,
    }

In [None]:
# set training and testing space
data_df = pd.read_csv('../data/do1200_microbiome_taxa.csv', index_col='sample_id')
meta_df = pd.read_csv('../data/do1200_metadata.csv', index_col='sample_id')

meta_df['label'] = (meta_df['Diet'] == 'HC/LF')*1

random_state=42
n_testing = 200
n_training_list = [10, 30, 50, 100]

labels = meta_df['label'].unique()

testing_index = []
for label in labels:
    label_df = meta_df[meta_df['label'] == label]
    testing_index.extend(label_df.sample(n=int(n_testing/2), random_state=random_state).index.to_list())

# select examples from remaining samples for training
meta_df_filtered = meta_df[~meta_df.index.isin(testing_index)]

training_indexs = {}

for n_training in n_training_list:
    training_index = []

    for label in labels:
        label_df = meta_df_filtered[meta_df_filtered['label'] == label]
        training_index.extend(label_df.sample(n=int(n_training/2), random_state=random_state).index.to_list())

    training_indexs[n_training] = training_index

In [14]:
# KNN with defualt setting, Euclidean distance
y = meta_df['label']
X = data_df.reindex(meta_df.index)

n_testing = 200
n_training_list = [10, 30, 50, 100]
classifier_list = [knn_classification]

output_df = []

for n_training in n_training_list:
    X_train, X_test = X.loc[training_indexs[n_training]], X.loc[testing_index]
    y_train, y_test = y.loc[training_indexs[n_training]], y.loc[testing_index]

    for classifier in classifier_list:

        prediction = classifier(X_train, X_test, y_train, y_test)

        output_df.append({
            'model'     : prediction['model_name'],
            'n_example' : n_training,
            'accuracy'  : accuracy_score(prediction['y_test'], prediction['y_predict']),
            'precision' : precision_score(prediction['y_test'], prediction['y_predict']),
            'recall'    : recall_score(prediction['y_test'], prediction['y_predict']),
            'f1'        : f1_score(prediction['y_test'], prediction['y_predict'])
        })

pd.DataFrame.from_dict(output_df)

Unnamed: 0,model,n_example,accuracy,precision,recall,f1
0,knn_classification,10,0.745,0.720721,0.8,0.758294
1,knn_classification,30,0.675,0.673267,0.68,0.676617
2,knn_classification,50,0.74,0.730769,0.76,0.745098
3,knn_classification,100,0.755,0.752475,0.76,0.756219


In [19]:
# KNN using tree distance
y = meta_df['label']
X = data_df.reindex(meta_df.index)

n_testing = 200
n_training_list = [10, 30, 50, 100]
classifier_list = [knn_classification_tree_distance]

output_df = []
value_cutoff_list = [0.1, 0.05, 0.02, 0.01]

for value_cutoff in value_cutoff_list:
    for n_training in n_training_list:
        X_train, X_test = X.loc[training_indexs[n_training]], X.loc[testing_index[:n_testing]]
        y_train, y_test = y.loc[training_indexs[n_training]], y.loc[testing_index[:n_testing]]

        for classifier in classifier_list:
            train_distances = tree_distance(training_indexs[n_training], training_indexs[n_training], value_cutoff, X)
            test_distances = tree_distance(testing_index[:n_testing], training_indexs[n_training], value_cutoff, X)

            prediction = classifier(train_distances, test_distances, y_train, y_test)

            output_df.append({
                'model'     : prediction['model_name'],
                'n_example' : n_training,
                'cutoff'    : value_cutoff,
                'accuracy'  : accuracy_score(prediction['y_test'], prediction['y_predict']),
                'precision' : precision_score(prediction['y_test'], prediction['y_predict']),
                'recall'    : recall_score(prediction['y_test'], prediction['y_predict']),
                'f1'        : f1_score(prediction['y_test'], prediction['y_predict'])
            })

pd.DataFrame.from_dict(output_df)

Unnamed: 0,model,n_example,cutoff,accuracy,precision,recall,f1
0,knn_classification_tree_distance,10,0.1,0.54,0.54878,0.45,0.494505
1,knn_classification_tree_distance,30,0.1,0.46,0.466102,0.55,0.504587
2,knn_classification_tree_distance,50,0.1,0.515,0.518519,0.42,0.464088
3,knn_classification_tree_distance,100,0.1,0.49,0.461538,0.12,0.190476
4,knn_classification_tree_distance,10,0.05,0.595,0.56129,0.87,0.682353
5,knn_classification_tree_distance,30,0.05,0.585,0.550296,0.93,0.69145
6,knn_classification_tree_distance,50,0.05,0.59,0.55625,0.89,0.684615
7,knn_classification_tree_distance,100,0.05,0.545,0.525424,0.93,0.67148
8,knn_classification_tree_distance,10,0.02,0.605,0.56213,0.95,0.70632
9,knn_classification_tree_distance,30,0.02,0.525,0.522124,0.59,0.553991
