In [None]:
# default_exp metrics
from nbdev.showdoc import *
import numpy as np
import matplotlib.pyplot as plt
import torch
import FRED
if torch.__version__[:4] == '1.13': # If using pytorch with MPS, use Apple silicon GPU acceleration
    device = torch.device("cuda" if torch.cuda.is_available() else 'mps' if torch.has_mps else "cpu")
else:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device", device)
%load_ext autoreload
%autoreload 2

Using device mps


# Flow Neighbor Metric
Verifies that the arrows in embedded space point towards the same points they do in ambient space. This is a wrapper around our flow neighbor loss, to ensure compatibility with other embedding techniques.

The parameter choices shouldn't impact the outcome too much. Automatic sigma selection works well, and the number of neighbors used is held constant.

In [None]:
# export
import torch
import numpy as np
from FRED.embed import flow_neighbor_loss
from FRED.data_processing import flashlight_affinity_matrix, diffusion_map_from_affinities, flow_neighbors
import torch.nn.functional as F

def flow_neighbor_metric(X, flows, embedded_points, embedded_velocities):
    A = flashlight_affinity_matrix(X, flows, sigma = "automatic", flow_strength = 5)
    P_graph = F.normalize(A, p=1, dim=1)
    neighborhoods = flow_neighbors(num_nodes = len(X), P_graph = P_graph, n_neighbors = 5)
    row, col = neighborhoods
    directions = (embedded_points[col] - embedded_points[row])
    directions = F.normalize(torch.tensor(directions),dim=1)
    embedded_velocities = torch.tensor(embedded_velocities[row])
    embedded_velocities = F.normalize(embedded_velocities, dim=1)
    loss = torch.norm(directions - embedded_velocities)**2
    # neighbor_score = flow_neighbor_loss(neighborhoods, torch.tensor(embedded_points), torch.tensor(embedded_velocities))
    return loss

# Silhouette Metric
Calculates silhouette score, a measure of the separation between classes which ranges from -1 (bad) to 1 (exceptional). Uses both the raw points, and the points with associated flows concatenated. The latter makes very little difference in practice, but might in theory.

In [None]:
# export
import sklearn
def silhouette_metric(embedded_points, embedded_velocities, labels):
    points_and_flows = np.concatenate([embedded_points, embedded_velocities], axis=1)
    silhouette_points = sklearn.metrics.silhouette_score(embedded_points, labels)
    silhouette_points_and_flows = sklearn.metrics.silhouette_score(points_and_flows, labels)
    return silhouette_points, silhouette_points_and_flows

# KNN Classifier Score
Fits a knn classifier to the data, as another measure of the separability of the clusters. This uses the appended arrows by default.

In [None]:
# export
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

def nn_classification_metric(embedded_points, embedded_velocities, labels):
    points_and_flows = np.concatenate([embedded_points, embedded_velocities], axis=1)
    X_train, X_test, y_train, y_test = train_test_split(points_and_flows, labels, test_size=0.33, random_state=42)
    neighClass = KNeighborsClassifier(n_neighbors=3)
    neighClass.fit(X_train, y_train)
    knn_classifier_score = neighClass.score(X_test, y_test)
    return knn_classifier_score

# Monotone Increasing metric
With single cell data, there's an easy way to test if the "flows" between cells in the embedded space make sense biologically: are the cells always moving forward in time? 

Presently, we just test if the timestamps of cells ever decrease (are not monotone increasing) as time progresses. We sum any negative changes from time $t$ to $t+1$ as a measure of the severity of the violation. This is summed over a sample points in the dataset.

In [None]:
# export
from FRED.inference import diffusion_flow_integration
from tqdm.notebook import trange, tqdm
def monotone_increasing_metric(embedded_points, embedded_velocities, time_labels, num_samples = 100, flow_strength=5):
    # sample random starting points
    idxs = torch.randint(len(embedded_points), size=[num_samples])
    neg_diffs = 0
    for pointA in tqdm(idxs):
        flowline = diffusion_flow_integration(torch.tensor(embedded_points), torch.tensor(embedded_velocities), starting_index = pointA, num_steps = 20, flow_strength=flow_strength)
        flowline = np.array(flowline)
        # take difference between neighbors in the vector
        times_at_flowline = time_labels[flowline]
        neighb_diffs = times_at_flowline[1:] - times_at_flowline[:-1]
        # print("neighb diffs: ",neighb_diffs)
        # get sums of negative numbers
        neg_diffs += (np.sum(neighb_diffs) - np.sum(np.abs(neighb_diffs)))/2
    neg_diffs/num_samples
    return neg_diffs

# Comprehensive Metrics

Run all metrics and save them to a given spreadsheet.

In [None]:
# export
import csv
def comprehensive_flow_metrics(X, flows, labels, embedded_points, embedded_velocities, time_labels, spreadsheet_name, unid, flow_strength):
    neighbor_score = flow_neighbor_metric(X, flows, embedded_points,embedded_velocities)
    silhouette_score, silhouete_score_with_flow = silhouette_metric(embedded_points, embedded_velocities, labels)
    knn_score = nn_classification_metric(embedded_points, embedded_velocities, labels)
    monotone_score = monotone_increasing_metric(embedded_points, embedded_velocities, time_labels)
    print(f"## SCORES ## \n silhouette score w/o flows: {silhouette_score}.\n silhouette score w/ flows:  {silhouete_score_with_flow} \n kNN Classifier {knn_score} \n Flow Neighbor Score {neighbor_score} \n Monotone Increasing Score {monotone_score}")
    return silhouette_score, silhouete_score_with_flow, knn_score, neighbor_score, monotone_score
        