In [None]:
import sys
sys.path.append('/home/nick/projects')
# from PointGPT import segmentation
from fle_3d.fle_3d import FLEBasis3D
import numpy as np
import pandas as pd
import os
from glob2 import glob
import plotly.express as px
import plotly.graph_objects as go
import torch
from src.utilities.functions import path_leaf
from src.utilities.fin_class_def import FinData
from tqdm import tqdm
from sklearn.neighbors import KernelDensity

### Notebook to explore the use of spherical harmonics to read-out fin shape
As a first pass, I will perform warps on a single fin dataset, so that I have a solid sense for what to expect. 

I also hope to test whether we can generate an "average" fin. In this case, I will construct the warps such that the average is just the original fin.

### Load test dataset

In [None]:
root = "/media/nick/hdd02/Cole Trapnell's Lab Dropbox/Nick Lammers/Nick/pecfin_dynamics/"

fin_object_path = os.path.join(root, "point_cloud_data", "fin_objects", "")
fin_object_list = sorted(glob(fin_object_path + "*.pkl"))

In [None]:
file_ind01 = 46
seg_type = "tissue_only_best_model_tissue"

fp01 = fin_object_list[file_ind01]
point_prefix01 = path_leaf(fp01).replace("_fin_object.pkl", "")
print(point_prefix01)

fin_data = FinData(data_root=root, name=point_prefix01, tissue_seg_model=seg_type)
fin_df = fin_data.full_point_data
fin_df = fin_df.loc[fin_df["fin_label_curr"]==1, :]
fin_points = fin_df[["X", "Y", "Z"]].to_numpy()

fin_axis_df = fin_data.axis_fin
fin_axes = fin_data.calculate_axis_array(fin_axis_df)
fin_points_pca = np.matmul(fin_points - np.mean(fin_points, axis=0), fin_axes.T)
fin_df.loc[:, ["XP", "YP", "ZP"]] = fin_points_pca

### Apply simple warps

In [None]:
def apply_random_warps(points, mean_log=0, sigma_log=0.1):
    """
    Apply random warps to a 3D point cloud using a multivariate lognormal distribution.
    
    Parameters:
    points (numpy.ndarray): Nx3 array of points.
    mean_log (float): Mean of the lognormal distribution (default is 0, which corresponds to warps centered around 1).
    sigma_log (float): Standard deviation of the lognormal distribution (controls the variability of the warps).
    
    Returns:
    numpy.ndarray: Warped point cloud.
    """
    # Number of points in the point cloud
    N, dim = points.shape
    
    if dim != 3:
        raise ValueError("Input points should be an Nx3 numpy array representing a 3D point cloud.")
    
    # Generate random scaling factors using a lognormal distribution centered around 1
    scale_factors = np.random.lognormal(mean=mean_log, sigma=sigma_log, size=(1, 3))
    
    # Apply the scaling factors to the point cloud
    warped_points = np.multiply(points,  scale_factors)
    
    return warped_points, scale_factors

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

# test out the warp function
raw_points = fin_df[["ZP", "YP", "XP"]].to_numpy() # note that Ii permute the axes for convenience

# Apply random warps to the point cloud
warped_points, warp = apply_random_warps(raw_points, sigma_log=.5)
print(warp)
fig = go.Figure() #make_subplots(rows=1, cols=2, specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]])


# Add the first plot to the first subplot
fig.add_trace(
    go.Scatter3d(x=raw_points[:, 0], y=raw_points[:, 1], z=raw_points[:, 2], mode='markers')),
    # row=1, col=1
# )

# Add the second plot to the second subplot
fig.add_trace(
    go.Scatter3d(x=warped_points[:, 0], y=warped_points[:, 1], z=warped_points[:, 2], mode='markers')),
#     row=1, col=2
# )

fig.update_layout(
             scene=dict(
                 aspectmode='data'
                 ))

fig.show()

### Generat synthetic dataset of warped fin-variants

In [None]:
warp_sigma = 0.1 # log sigma for lognormal warp distribution
n_samples = 100 # number of synthetic fins to generate

# test out the warp function
raw_points = fin_df[["ZP", "YP", "XP"]].to_numpy()

warp_list = []
warped_points_list = []
np.random.seed(61)
max_dim = int(np.ceil(np.max(np.abs(raw_points)) / 5) * 5)

for n in tqdm(range(n_samples)):
    # Apply random warps to the point cloud	
    warped_points, warp = apply_random_warps(raw_points, sigma_log=warp_sigma)

    # save 
    warp_list.append(warp)
    warped_points_list.append(warped_points)

    max_dim = np.max([int(np.ceil(np.max(np.abs(warped_points)) / 5) * 5), max_dim])

warp_df = pd.DataFrame(np.squeeze(np.asarray(warp_list)), columns=["x_warp", "y_warp", "z_warp"])

# set first entry to be origina
warp_df.loc[0, :] = 1
warped_points_list[0] = raw_points
print(max_dim)
warp_df.head()

In [None]:
#############
# calculate density-based representations for each point cloud
kde_bw = 4
res = 5 # in um
N = int(np.ceil(2*max_dim / res)) + 1

# make ref grids
x_axis = np.linspace(-max_dim, max_dim, N)
y_axis = np.linspace(-max_dim, max_dim, N)
z_axis = np.linspace(-max_dim, max_dim, N)
x_grid, y_grid, z_grid = np.meshgrid(x_axis, y_axis, z_axis)
xyz_array = np.c_[x_grid.ravel(), y_grid.ravel(), z_grid.ravel()]

# Go even simpler and just calculate 3D histogram
x_bins = np.linspace(-max_dim, max_dim, N+1)
y_bins = np.linspace(-max_dim, max_dim, N+1)
z_bins = np.linspace(-max_dim, max_dim, N+1)


density_list = []

for i in tqdm(range(n_samples)):
    points = warped_points_list[i]
    kde = KernelDensity(bandwidth=kde_bw, kernel="gaussian").fit(points) 

    probs = np.exp(kde.score_samples(xyz_array))
    # counts, _ = np.histogramdd(points, (x_bins, y_bins, z_bins))
    probs = probs / np.max(probs)

    density_list.append(probs)

The idea is to use these warped fins as a point of reference for assessing the size of noise impact

#### Let's check out a sample dataset just to make sure things look reasonable

In [None]:
fig = go.Figure(data=go.Volume(
    x=x_grid.flatten(), y=y_grid.flatten(), z=z_grid.flatten(),
    value=density_list[10].flatten(),
    opacity=0.1,
    isomin=0.1,
    surface_count=25,
    ))
fig.update_layout(scene_xaxis_showticklabels=False,
                  scene_yaxis_showticklabels=False,
                  scene_zaxis_showticklabels=False)
fig.show()

## Obtain Ball harmonics decompositions for each fin density

In [None]:
bw_harmonic = 25 # Let's try to keep things relatively compact...
eps = 1e-6     #desired accuracy
N = int(np.ceil(2*max_dim / res)) + 1

fle_vec = []
coeffs_vec = []
vol_vec = []
loss_vec = []


for n, probs in enumerate(tqdm(density_list)):

    prob_grid = np.reshape(probs, (N, N, N))
    
    fle = FLEBasis3D(N, bw_harmonic, eps, force_real=True)
    coeffs = fle.evaluate_t(prob_grid)
    volume = fle.evaluate(coeffs)

    # record values
    fle_vec.append(fle)
    coeffs_vec.append(coeffs)
    vol_vec.append(volume)

    # calculate MSE
    loss = np.linalg.norm(probs.flatten() - volume.flatten() )
    loss_vec.append(loss)

#### Now let's experiment with using UMAP and PCA to capture differences between fins

In [None]:
from sklearn.decomposition import PCA

coeff_array = np.absolute(np.asarray(coeffs_vec))**2

n_components = 50
pca = PCA(n_components=n_components)
pca.fit(coeff_array)
fin_pca_array = pca.transform(coeff_array)
fin_pca_array.shape

# get cumulative explained variance
var_cumu = np.cumsum(pca.explained_variance_ratio_) * 100

px.scatter(x=range(n_components), y=var_cumu)

In [None]:
# Plot first two PCAs. Color according to x warp strength

fig = px.scatter_3d(x=fin_pca_array[:, 0], y=fin_pca_array[:, 1], z=fin_pca_array[:, 2], color=warp_df.loc[:, "z_warp"])
fig.add_trace(go.Scatter3d(x=[fin_pca_array[0, 0]], y=[fin_pca_array[0, 1]], z=[fin_pca_array[0, 2]]))
fig.show()

In [None]:
fig = px.scatter(x=fin_pca_array[:, 0], y=fin_pca_array[:, 1], color=warp_df.loc[:, "z_warp"])
fig.add_trace(go.Scatter3d(x=[fin_pca_array[0, 0]], y=[fin_pca_array[0, 1]], mode="markers"))
fig.update_traces(marker=dict(size=8))
fig.show()

In [None]:
warp_df.loc[fin_pca_array[:, 0]<5, :]

In [None]:
fig = go.Figure(data=go.Volume(
    x=x_grid.flatten(), y=y_grid.flatten(), z=z_grid.flatten(),
    value=density_list[1].flatten(),
    opacity=0.1,
    isomin=0.01,
    surface_count=25,
    ))
fig.update_layout(scene_xaxis_showticklabels=False,
                  scene_yaxis_showticklabels=False,
                  scene_zaxis_showticklabels=False)
fig.show()

The second PCA captures z-axis extent. This "makes sense", because this is the largest spatial axis

In [None]:
fig = px.scatter(x=fin_pca_array[:, 0], y=fin_pca_array[:, 1], color=np.prod(warp_df.loc[:, :], axis=1))
fig.add_trace(go.Scatter3d(x=[fin_pca_array[0, 0]], y=[fin_pca_array[0, 1]], mode="markers"))
fig.update_traces(marker=dict(size=8))
fig.show()

First PCA is predominantly fin volume, which also makes sense

#### Look at UMAP

In [None]:
import umap.umap_ as umap

reducer = umap.UMAP(n_components=3)
embedding = reducer.fit_transform(coeff_array)

In [None]:
fig = px.scatter_3d(x=embedding[:, 0], y=embedding[:, 1], z=embedding[:, 2], color=warp_df["y_warp"])
fig.update_traces(marker=dict(size=8))
fig.show()

### Add noise to see how it interacts with SH

In [None]:
n_out_modes = 15
n_points = fin_df.shape[0]
n_outlier_vec = np.floor(np.logspace(0, np.log10(n_points), n_out_modes)).astype(int)
noise_scale_vec = np.linspace(0.1, 10, n_out_modes)

noise_list = []
noisy_points_list = []
np.random.seed(61)


for n, n_out in enumerate(tqdm(n_outlier_vec)):
    for m, mag_out in enumerate(noise_scale_vec):
        out_indices = np.random.choice(range(n_points), n_out, replace=False)
        noise_array = np.random.normal(loc=0, scale=mag_out, size=(n_out, 3))
        points = raw_points.copy()
        points[out_indices, :] += noise_array
        noisy_points_list.append(points)
        noise_list.append([n_out, mag_out])


noise_df = pd.DataFrame(np.squeeze(np.asarray(noise_list)), columns=["n_outliers", "sigma"])

noise_df.head()

### Calculate densities and harmonics for noised point clouds

In [None]:
density_list_noise = []

for i in tqdm(range(noise_df.shape[0])):
    points =noisy_points_list[i]
    kde = KernelDensity(bandwidth=kde_bw, kernel="gaussian").fit(points) 

    probs = np.exp(kde.score_samples(xyz_array))
    # counts, _ = np.histogramdd(points, (x_bins, y_bins, z_bins))
    probs = probs / np.max(probs)

    density_list_noise.append(probs)
    
    # points = warped_points_list[i]
    # counts, _ = np.histogramdd(points, (x_bins, y_bins, z_bins))
    # probs = counts / np.max(counts)

    # density_list_noise.append(probs)

fle_vec_noise = []
coeffs_vec_noise = []
vol_vec_noise = []
loss_vec_noise = []


for n, probs in enumerate(tqdm(density_list_noise)):

    prob_grid = np.reshape(probs, (N, N, N))
    
    fle = FLEBasis3D(N, bw_harmonic, eps, force_real=True)
    coeffs = fle.evaluate_t(prob_grid)
    volume = fle.evaluate(coeffs)

    # record values
    fle_vec_noise.append(fle)
    coeffs_vec_noise.append(coeffs)
    vol_vec_noise.append(volume)

    # calculate MSE
    loss = np.linalg.norm(probs.flatten() - volume.flatten() )
    loss_vec_noise.append(loss)

Spot-check...how different do noised versions look?

In [None]:
coeff_array = np.absolute(np.asarray(coeffs_vec + coeffs_vec_noise))**2

n_components = 50
pca = PCA(n_components=n_components)
pca.fit(coeff_array)
fin_pca_array = pca.transform(coeff_array)
fin_pca_array.shape

# get cumulative explained variance
var_cumu = np.cumsum(pca.explained_variance_ratio_) * 100

px.scatter(x=range(n_components), y=var_cumu)

In [None]:
fig = px.scatter(x=fin_pca_array[100:, 0], y=fin_pca_array[100:, 1], color=noise_df["n_outliers"], size=noise_df["sigma"])
fig.add_trace(go.Scatter(x=fin_pca_array[:100, 0], y=fin_pca_array[:100, 1], mode="markers", opacity=0.2))

fig.show()

In [None]:
fig = px.scatter(x=fin_pca_array[100:, 0], y=fin_pca_array[100:, 1], color=noise_df["n_outliers"], opacity=0.1)
fig.add_trace(go.Scatter(x=[fin_pca_array[0, 0]], y=[fin_pca_array[0, 1]], mode="markers", opacity=0.6))
fig.update_traces(marker=dict(size=8))

fig.show()

### This looks promising! More work needs to be dne, but this indicates a significant level of noise-robustness 

In [None]:
bw = 25
l_filter = fle.ls <= bw
total_power = np.sqrt(np.sum(coeff_array[0, l_filter]))

shape_dist_vec = np.sqrt(np.sum((np.sqrt(coeff_array[:, l_filter])-np.sqrt(coeff_array[0, l_filter]))**2, axis=1)) / total_power
fig = px.scatter(noise_df, x="sigma", y="n_outliers", color=shape_dist_vec[100:], opacity=1)
fig.update_traces(marker=dict(size=16))

fig.update_layout(
    yaxis_type="log")

fig.show()

In [None]:
shape_dist_vec = np.sqrt(np.sum((np.sqrt(coeff_array[:, l_filter])-np.sqrt(coeff_array[0, l_filter]))**2, axis=1)) / total_power
warp_df["x_warp_log"] = np.log10(warp_df["x_warp"])
warp_df["y_warp_log"] = np.log10(warp_df["y_warp"])
warp_df["z_warp_log"] = np.log10(warp_df["z_warp"])

fig = px.scatter_3d(warp_df, x="x_warp_log", y="y_warp_log", z="z_warp_log", color=shape_dist_vec[:100], opacity=1)
fig.update_traces(marker=dict(size=6))

# fig.update_layout(
#     yaxis_type="log",
#     xaxis_type="log")

fig.show()

In [None]:
ind = np.where((np.round(noise_df["sigma"],1)==8.6) & (noise_df["n_outliers"]==3))[0][0]
print(ind)
fig = px.scatter_3d(x=noisy_points_list[ind][:, 0], y=noisy_points_list[ind][:, 1], z=noisy_points_list[ind][:, 2])
fig.add_trace(go.Scatter3d(x=warped_points_list[0][:, 0], y=warped_points_list[0][:, 1], z=warped_points_list[0][:, 2], mode="markers"))
fig.show()

In [None]:
noisy_points_list[0]

#### What does the average across all warps look like?

In [None]:
from plotly.subplots import make_subplots

fle = FLEBasis3D(N, bw_harmonic, eps, force_real=True)

coeff_array_full = np.asarray(coeffs_vec)
coeffs_mean = np.median(coeff_array_full, axis=0)
vol_mean = fle.evaluate(coeffs_mean)

fig = make_subplots(rows=1, cols=2)

fig.add_trace(go.Contour(z=vol_mean[:, 37, :], contours=dict(start=0.05, end=0.75), showscale=False), 1, 1)
fig.add_trace(go.Contour(z=vol_vec[0][:, 37, :], contours=dict(start=0.05, end=0.75), showscale=False), 1, 2)

fig.show()

In [None]:
fig = make_subplots(rows=1, cols=2, specs=[[{'type': 'volume'}, {'type': 'volume'}]])


fig.add_trace(go.Volume(
    x=x_grid.flatten(), y=y_grid.flatten(), z=z_grid.flatten(),
    value=vol_mean.flatten(),
    opacity=0.1,
    isomin=0.05,
    isomax=0.7,
    surface_count=25,
    ), row=1, col=1)

fig.add_trace(go.Volume(
    x=x_grid.flatten(), y=y_grid.flatten(), z=z_grid.flatten(),
    value=vol_vec[0].flatten(),
    opacity=0.1,
    isomin=0.05,
    isomax=0.7,
    surface_count=25,
    ), row=1, col=2)

fig.update_layout(scene_xaxis_showticklabels=False,
                  scene_yaxis_showticklabels=False,
                  scene_zaxis_showticklabels=False,
                 scene=dict( aspectmode='data'
                 ))

fig.show()

In [None]:
fig = go.Figure()

fig.add_trace(go.Isosurface(
    x=x_grid.flatten(), y=y_grid.flatten(), z=z_grid.flatten(),
    value=vol_mean.flatten(),
    opacity=0.5,
    isomin=0.05,
    isomax=0.7,
    surface_count=5,
    ))

fig.update_layout(scene_xaxis_showticklabels=False,
                  scene_yaxis_showticklabels=False,
                  scene_zaxis_showticklabels=False,
                 scene=dict( aspectmode='data'
                 ))

fig.show()

This reconstruction actually looks quite "good" for the H-R sampling, even though it has a bw of just 10. Clearly, MSE is not the ideal metric for measuring how "close" a reconstruction is. 

Also, feeding a more information-rich distribution in may be fine, so long as I don't expect to obtain a perfect reconstruction. The harmonics porivde a natural way to filter for larger-scale features. 

In [None]:
# calculate loading arrays for each fin
l_vec = np.asarray(fle01.ls)
l_index = np.unique(l_vec)
m_vec = np.asarray(fle01.ms)
m_index = np.unique(m_vec)
k_vec = np.asarray(fle01.ks)
k_index = np.unique(k_vec)

power_array01 = np.empty((len(l_index), len(k_index)))
power_array01_rot = np.empty((len(l_index), len(k_index)))
power_array02 = np.empty((len(l_index), len(k_index)))

mags01 = np.absolute(coeff01)**2
mags01_rot = np.absolute(coeff01_rot)**2
mags02 = np.absolute(coeff02)**2
for li, l in enumerate(l_index):
    for ki, k in enumerate(k_index):
        indices = (l_vec==l) & (k_vec==k)
        power_array01[li, ki] = np.sum(mags01[indices])
        power_array01_rot[li, ki] = np.sum(mags01_rot[indices])
        power_array02[li, ki] = np.sum(mags02[indices])

# for li, l in enumerate(l_index):
#     for mi, m in enumerate(k_index):
#         indices = (l_vec==l) & (m_vec==m)
#         m_array[li, mi] = np.sum(mags[indices])
#         m_array2[li, mi] = np.sum(mags2[indices])

In [None]:
px.imshow(np.log10(power_array01+1e-10))
# np.all(fle.ms == fle2.ms)

In [None]:
print(np.sqrt(np.sum((power_array01)**2)))
print(np.sqrt(np.sum((power_array01-power_array01_rot)**2)))
print(np.sqrt(np.sum((power_array01-power_array02)**2)))

In [None]:
# calculate loading arrays for each fin
l_vec = np.asarray(fle0.ls)
l_index = np.unique(l_vec)
k_vec = np.asarray(fle0.ks)
k_index = np.unique(k_vec)
power_array_list = []
power_vec_list = []
for i in tqdm(range(len(sph_coeff_list))):
    power_array = np.empty((len(l_index), len(k_index)))
    coeffs = sph_coeff_list[i]
    mags = np.absolute(coeffs)**2
    for li, l in enumerate(l_index):
        for ki, k in enumerate(k_index):
            indices = (l_vec==l) & (k_vec==k)
            power_array[li, ki] = np.sum(mags[indices])

    power_array_list.append(power_array)
    power_vec_list.append(power_array.ravel())

In [None]:
power_array.shape

In [None]:
recon_bw = 25

file_ind = 1
fp = fin_object_list[file_ind]
point_prefix = path_leaf(fp).replace("_fin_object.pkl", "")
print(point_prefix)

fin_data = FinData(data_root=root, name=point_prefix, tissue_seg_model=seg_type)

# get fin dataset
fin_df = fin_data.full_point_data
fin_df = fin_df.loc[fin_df["fin_label_curr"]==1, :]

# fit density function # fit density
X = fin_df[["X", "Y", "Z"]].to_numpy()
X = X - np.mean(X, axis=0)
kde = KernelDensity(bandwidth=kde_bandwidth, kernel="gaussian").fit(X)

# get density grid
score_array = kde.score_samples(test_array)
score_array_thresh = score_array.copy()
prob_array[prob_array < -5] = -np.inf
prob_array = np.exp(score_array_thresh)
prob_grid = np.reshape(prob_array, x_grid.shape)
prob_grid = prob_grid / np.max(np.abs(prob_grid))

# fit transform
N = prob_grid.shape[0]  #replace this by the side-length of your volume array
# bandlimit = 25   #maximum number of basis functions to use

fle0 = FLEBasis3D(N, recon_bw, eps, force_real=True)
coeff = fle0.evaluate_t(prob_grid)
volume = fle0.evaluate(coeff)
weigths = fle0.weights


print(weight)

In [None]:
bandlimit = 25   #maximum number of basis functions to use
eps = 1e-7      #desired accuracy
fle = FLEBasis3D(N, bandlimit, eps, force_real=True)
      FLEBasis3D(N, recon_bw, eps, force_real=True)
# fle.force_real=True
coeff = fle.evaluate_t(prob_grid)
volume = fle.evaluate(coeff)
weights = fle.weights

print(weights)

In [None]:
test = fle.create_denseB()

In [None]:
test.shape

### 