In [None]:
from IPython import get_ipython
if get_ipython():
    get_ipython().run_line_magic("load_ext", "autoreload")
    get_ipython().run_line_magic("autoreload", "2")
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd

import scanpy as sc

import pickle

In [None]:
import polyptich as pp
pp.setup_ipython()
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
import seaborn as sns

import scanpy as sc

import polyptich as pp
import latenta as la
import os
import jax
import jax.numpy as jnp
import optax

import tqdm.auto as tqdm

import scipy

# Linear synthetic data

## generating data

In [None]:
genes = la.Dim(500, name="gene")
cells = la.Dim(2000, name="cell")

In [None]:
generator = np.random.default_rng(1)
x1 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Linear1")
x2 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Sigmoid2")
x3 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Linear3")

b_value = generator.normal(1, scale=0.5, size=len(genes))
b = la.Fixed(b_value, definition=[genes], label="a")

In [None]:
X = np.array([x1.loader.value, x2.loader.value, x3.loader.value])

In [None]:
plt.hist(X.T)

In [None]:
def create_a(key, genes, odds_ratio=1):
    a = la.distributions.NormalDualMixture(
    key=key,
    loc1=0.0,
    scale1=0.1,
    loc2=0.0,
    scale2=1,
    logit=odds_ratio,
    definition=[genes],)
    return a

In [None]:
def create_a_spline(key, genes,knots, odds_ratio=1):
    a = la.distributions.NormalDualMixture(
    key=key,
    loc1=0.0,
    scale1=0.1,
    loc2=0.0,
    scale2=1,
    logit=odds_ratio,
    definition=[genes, knots],)
    return a

In [None]:
generator = np.random.default_rng(1)

In [None]:
def data_adata_converter (model, complete=False, size=1000):
    model.rootify()
    observation = model

    k = la.utils.key.StatefulKey(0)
    minibatcher_validation = la.train.minibatching.Minibatcher(model.clean[0], k(), size=size,permute=False, complete=complete)
    minibatcher_validation.initialize()
    program = la.programs.Inference(root = model, minibatcher=minibatcher_validation)
    observation.value(program)

    
    outputs = program.run_all(n=1)
    arrays = np.concat([output[("root", "value")] for output in outputs])
    # Stack arrays along a new axis and compute the mean
    observation_value = arrays
    adata = sc.AnnData(
    observation_value,
    )

    sc.pp.pca(adata)
    sc.pp.neighbors(adata)
    sc.tl.umap(adata)
    return adata

In [None]:
def create_data(genes, logit, x_fixed_list):
    generator = np.random.default_rng(1)
    x1 = x_fixed_list[0]
    x2 = x_fixed_list[1]
    x3 = x_fixed_list[2]
    # x1 effect
    knots = la.Dim(5, name="knot")
    knot_positions = la.Fixed(
        np.linspace(0.0, 1.0, len(knots)), definition=[knots], label="knot_positions"
    )
    circular_knot_positions = la.Fixed(
        np.linspace(0.0, 2 * np.pi, len(knots)),
        definition=[knots],
        label="circular_knot_positions",
    )
    key=generator.integers(0, 10000)
    a = create_a(key, genes, logit)

    import math
    x1_effect = la.links.scalar.Linear(x=x1, a=a, label="x1_effect")

    oi = la.Fixed(1)

    # x2 effect
    key=generator.integers(0, 10000)
    a = create_a(key, genes)
    shift = la.Fixed(generator.normal(0.5, 0.3, size=len(genes)), definition=[genes], label="shift")
    a_real = la.links.scalar.Linear(x=oi, a=a)
    x2_effect = la.links.scalar.Sigmoid(x=x2, a=a_real, label="x2_effect")

    # x3 effect
    key=generator.integers(0, 10000)
    a = create_a(key, genes)

    import math
    x3_effect = la.links.scalar.Linear(
        x=x3, a=a,  label="x3_effect")
    y = la.modular.Additive(definition=[cells, genes], transforms=[la.transforms.Exp()])
    y.x1_effect = x1_effect
    y.x2_effect = x2_effect
    y.x3_effect = x3_effect
    y.b = b
    dispersion_value = np.clip(generator.gamma(1.0, 1.0, size=len(genes)), 2, 1)
    dispersion = la.Fixed(dispersion_value, definition=[genes], label="dispersion")
    distribution = la.distributions.NegativeBinomial2(key=1, mu=y, dispersion=dispersion)
    distribution.rootify()
    program = la.Program()
    distribution.value(program)
    y.value(program)
    y.find("Linear1").value(program)
    y.find("Sigmoid2").value(program)
    y.find("Linear3").value(program)
    y.x1_effect.value(program)
    y.x2_effect.value(program)
    y.x3_effect.value(program)
    observation_value = np.asarray(program.run()[("root", "value")])
    y_value = np.asarray(program.run()[("root.mu", "value")])
    x1_value = np.asarray(program.run()[(y.find("Linear1").uuid, "value")])
    x2_value = np.asarray(program.run()[(y.find("Sigmoid2").uuid, "value")])
    x3_value = np.asarray(program.run()[(y.find("Linear3").uuid, "value")])
    x1_effect_value = np.asarray(program.run()[(y.x1_effect.uuid, "value")])
    x2_effect_value = np.asarray(program.run()[(y.x2_effect.uuid, "value")])
    x3_effect_value = np.asarray(program.run()[(y.x3_effect.uuid, "value")])
    import scanpy as sc

    adata = sc.AnnData(
        observation_value,
        obs=pd.DataFrame(
            {
                "Linear1": x1.prior_pd(),
                "Sigmoid2": x2.prior_pd(),
                "Linear3": x3.prior_pd(),
            }
        ),
    )
    sc.pp.pca(adata)
    sc.pp.neighbors(adata)
    sc.tl.umap(adata)

    
    return adata, distribution


In [None]:
x1 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Linear1")
x2 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Sigmoid2")
x3 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Linear3")
adata, model = create_data(genes, 1,[x1,x2,x3])

model.plot()

In [None]:
adata.X.max()

In [None]:
import eyck
eyck.m.t.plot_umap(adata, color=["Linear1", "Sigmoid2", "Linear3"], cmap = "magma").display()

In [None]:
import time

In [None]:
tic0 = time.time()
sc.pp.pca(adata)
toc0 = time.time()

In [None]:
factors = 5
# Import necessary libraries
from sklearn.decomposition import FastICA
tic1 = time.time()
ica = FastICA(n_components=factors, random_state=42)
X_transformed = ica.fit_transform(adata.X)
toc1 = time.time()
components = ica.components_.T
adata.obsm["X_ica"] = X_transformed
# NMF
from sklearn.decomposition import NMF
tic2 = time.time()
nmf = NMF(n_components=factors, random_state=42)
X_transformed = nmf.fit_transform(adata.X)
toc2 = time.time()
components = nmf.components_.T
adata.obsm["X_nmf"] = X_transformed
# SparsePCA
from sklearn.decomposition import SparsePCA
tic3 = time.time()
pca = SparsePCA(n_components=factors, random_state=42, alpha = 2)
X_transformed = pca.fit_transform(adata.X)
toc3 = time.time()

components = pca.components_.T
adata.obsm["X_sparsepca_5"] = X_transformed

# Factor analysis
from sklearn.decomposition import FactorAnalysis
tic4 = time.time()
fa = FactorAnalysis(n_components=factors, random_state=42)
X_transformed = fa.fit_transform(adata.X)
toc4 = time.time()

components = fa.components_.T
adata.obsm["X_fa"] = X_transformed
# Isomap
from sklearn.manifold import Isomap
tic5 = time.time()
isomap = Isomap(n_components=factors)
X_transformed = isomap.fit_transform(adata.X)
toc5 = time.time()

components = isomap.embedding_.T
adata.obsm["X_isomap"] = X_transformed
# Spectral
from sklearn.manifold import SpectralEmbedding
tic6 = time.time()
spectral = SpectralEmbedding(n_components=factors)
X_transformed = spectral.fit_transform(adata.X)
toc6 = time.time()

components = spectral.embedding_.T
adata.obsm["X_spectral"] = X_transformed

In [None]:
def tictoc (tic, toc):
    return toc - tic
names = ["pca", 'X_ica','X_nmf', 'X_sparsepca_5', 'X_fa', 'X_isomap', 'X_spectral']
tictoc_list = [(tic0,toc0),(tic1,toc1),(tic2,toc2),(tic3,toc3),(tic4,toc4),(tic5,toc5),(tic6,toc6)]
for i,(tic, toc) in enumerate(tictoc_list):
    print(names[i])
    print(tictoc(tic, toc))

In [None]:
methods = ["X_pca","X_sparsepca_5"]
names = ["PCA", "X_sparsepca_5"]
for index,method in enumerate(methods):
    fig = pp.grid.Figure(pp.grid.Grid(padding_height=0, padding_width=0))

    grid = fig.main[1, 1] = pp.grid.Grid(padding_height=0.01, padding_width=0.01)
    s = 0.5

    w = 0.5
    h = 0.5

    max_n = 1000

    rows = pd.DataFrame({
        "name":["Linear1", "Sigmoid2", "Linear3"],
    })
    rows["ix"] = range(len(rows))
    rows = rows.set_index("name")


    columns = pd.DataFrame({
        "name":[f"IC{i+1}" for i in range(min(adata.obsm[method].shape[1], 5))],
    })
    columns["ix"] = range(len(columns))
    columns = columns.set_index("name")

    for row_name, (i, ) in rows.iterrows():
        panel, ax = grid[i + 1, 0] = pp.grid.Panel((w, h))
        ax.text(0.4, 0.5, row_name, ha="center", va="center")
        ax.axis("off")

    for i in columns["ix"]:
        panel, ax = grid[0, i + 1] = pp.grid.Panel((w, h))
        ax.text(0.5, 0.5, f"x{i+1}", ha="center", va="center")
        ax.axis("off")  

    for row_name, (i, ) in rows.iterrows():
        for column_name, (j, ) in columns.iterrows():
            x = adata.obs[row_name]
            y = adata.obsm[method][:, j]
            panel, ax = grid[i+1, j+1] = pp.grid.Panel((w, h))
            ax.scatter(x, y, s=0.5)
            cor = np.corrcoef(x, y)[0, 1]
            text = ax.text(0.5, 0.5, f"{cor:.2f}", transform=ax.transAxes, ha="center", va="center")
            text.set_path_effects(
                [
                    mpl.patheffects.Stroke(linewidth=3, foreground="white"),
                    mpl.patheffects.Normal(),
                ]
            )
            ax.axis("off")
    fig.main.align()

    horizontal = fig.main[0, 1] = pp.grid.Panel((grid.width, 0.2))
    horizontal.axis("off")
    horizontal.text(0.5, 0.5, names[index], va="center", ha="center")

    vertical = fig.main[1, 0] = pp.grid.Panel((0.2, grid.height))
    vertical.axis("off")
    vertical.text(-0.5, 0.5, "Real", rotation=90, va="center", ha="center")
    # Add a top-left title
    top_left = fig.main[0, 0] = pp.grid.Panel((0.4, 0.2))  # Adjust width and height as needed
    top_left.axis("off")
    top_left.text(0.5, 0.5, "Simple model", va="center", ha="center", fontsize=12, fontweight="bold")
    fig.display()

In [None]:
path = os.path.join("..", "synthetic_data", "no_dependencies_simple_synthetic_data.pkl")
os.makedirs(os.path.dirname(path), exist_ok=True)

with open(path, "wb") as f:
    pickle.dump(adata, f)

## dependencies

In [None]:
import latenta.links.vector.binary_multidimensional_spline as bms
import latenta.distributions.binary_multidimensional_spline as bms_dist

expdim = 4
n_dimensions = 3
a_zooms, a_dimensions = bms.create_zooms(expdim, n_dimensions, expdim_min=1)
a_split = bms.create_a_values(a_zooms, a_dimensions, n_dimensions)

a_values = dict(zip(zip(a_zooms, a_dimensions), a_split))

# a_values: default at 0
# 
a_values[(2, (0,))] = a_values[(2, (0,))].at[2].set(2.0)
a_values[(3, (0, 2))] = a_values[(3, (0, 2))].at[0, :, 2].set(2.0)
a_values[(2, (0, 1, 2))] = a_values[(2, (0, 1, 2))].at[1, 5, 2].set(1.0)
a_split = tuple(a_values.values())

a_concatenated, splits, shapes = bms.concatenate_a(a_split)
a = la.Fixed(a_concatenated, definition=[la.Dim(len(a_concatenated), name="a")], label="a")
dist = bms_dist.BinaryMultidimensionalSplineDistribution(
    a,
    zooms=a_zooms,
    dimensions=a_dimensions,
    n_dimensions=n_dimensions,
    zoom=4,
    kernel="normal",
    key=10
)
X = dist.prior(shape=(len(cells), n_dimensions))
plt.hist(X)
x1 = la.Fixed(X[:, 0], definition=[cells], label="Linear1")
x2 = la.Fixed(X[:, 1], definition=[cells], label="Sigmoid2")
x3 = la.Fixed(X[:, 2], definition=[cells], label="Linear3")

generator = np.random.default_rng(1)
b_value = generator.normal(1, scale=0.5, size=len(genes))
b = la.Fixed(b_value, definition=[genes], label="a")

In [None]:
adata,model = create_data(genes, 0.5,[x1,x2,x3])

In [None]:
adata.X.max()

In [None]:
import eyck
eyck.m.t.plot_umap(adata, color=["Linear1", "Sigmoid2", "Linear3"], cmap = "magma").display()

In [None]:
def factor_analysis(factors, adata):
    # Import necessary libraries
    from sklearn.decomposition import FastICA
    ica = FastICA(n_components=factors, random_state=42)
    X_transformed = ica.fit_transform(adata.X)
    components = ica.components_.T
    adata.obsm["X_ica"] = X_transformed
    # NMF
    from sklearn.decomposition import NMF
    nmf = NMF(n_components=factors, random_state=42)
    X_transformed = nmf.fit_transform(adata.X)
    components = nmf.components_.T
    adata.obsm["X_nmf"] = X_transformed
    # SparsePCA
    from sklearn.decomposition import SparsePCA
    pca = SparsePCA(n_components=factors, random_state=42, alpha = 2)
    X_transformed = pca.fit_transform(adata.X)
    components = pca.components_.T
    adata.obsm["X_sparsepca_5"] = X_transformed

    # Factor analysis
    from sklearn.decomposition import FactorAnalysis
    fa = FactorAnalysis(n_components=factors, random_state=42)
    X_transformed = fa.fit_transform(adata.X)
    components = fa.components_.T
    adata.obsm["X_fa"] = X_transformed
    # Isomap
    from sklearn.manifold import Isomap
    isomap = Isomap(n_components=factors)
    X_transformed = isomap.fit_transform(adata.X)
    components = isomap.embedding_.T
    adata.obsm["X_isomap"] = X_transformed
    # Spectral
    from sklearn.manifold import SpectralEmbedding
    spectral = SpectralEmbedding(n_components=factors)
    X_transformed = spectral.fit_transform(adata.X)
    components = spectral.embedding_.T
    adata.obsm["X_spectral"] = X_transformed
factor_analysis(5, adata)

In [None]:
methods = ["X_pca","X_sparsepca_5"]
names = ["PCA", "X_sparsepca_5"]
for index,method in enumerate(methods):
    fig = pp.grid.Figure(pp.grid.Grid(padding_height=0, padding_width=0))

    grid = fig.main[1, 1] = pp.grid.Grid(padding_height=0.01, padding_width=0.01)
    s = 0.5

    w = 0.5
    h = 0.5

    max_n = 1000

    rows = pd.DataFrame({
        "name":["Linear1", "Sigmoid2", "Linear3"],
    })
    rows["ix"] = range(len(rows))
    rows = rows.set_index("name")


    columns = pd.DataFrame({
        "name":[f"IC{i+1}" for i in range(min(adata.obsm[method].shape[1], 5))],
    })
    columns["ix"] = range(len(columns))
    columns = columns.set_index("name")

    for row_name, (i, ) in rows.iterrows():
        panel, ax = grid[i + 1, 0] = pp.grid.Panel((w, h))
        ax.text(0.4, 0.5, row_name, ha="center", va="center")
        ax.axis("off")

    for i in columns["ix"]:
        panel, ax = grid[0, i + 1] = pp.grid.Panel((w, h))
        ax.text(0.5, 0.5, f"x{i+1}", ha="center", va="center")
        ax.axis("off")  

    for row_name, (i, ) in rows.iterrows():
        for column_name, (j, ) in columns.iterrows():
            x = adata.obs[row_name]
            y = adata.obsm[method][:, j]
            panel, ax = grid[i+1, j+1] = pp.grid.Panel((w, h))
            ax.scatter(x, y, s=0.5)
            cor = np.corrcoef(x, y)[0, 1]
            text = ax.text(0.5, 0.5, f"{cor:.2f}", transform=ax.transAxes, ha="center", va="center")
            text.set_path_effects(
                [
                    mpl.patheffects.Stroke(linewidth=3, foreground="white"),
                    mpl.patheffects.Normal(),
                ]
            )
            ax.axis("off")
    fig.main.align()

    horizontal = fig.main[0, 1] = pp.grid.Panel((grid.width, 0.2))
    horizontal.axis("off")
    horizontal.text(0.5, 0.5, names[index], va="center", ha="center")

    vertical = fig.main[1, 0] = pp.grid.Panel((0.2, grid.height))
    vertical.axis("off")
    vertical.text(-0.5, 0.5, "Real", rotation=90, va="center", ha="center")
    # Add a top-left title
    top_left = fig.main[0, 0] = pp.grid.Panel((0.4, 0.2))  # Adjust width and height as needed
    top_left.axis("off")
    top_left.text(0.5, 0.5, "Simple model", va="center", ha="center", fontsize=12, fontweight="bold")
    fig.display()

In [None]:
path = os.path.join("..", "synthetic_data", "dependencies_simple_synthetic_data.pkl")
os.makedirs(os.path.dirname(path), exist_ok=True)

with open(path, "wb") as f:
    pickle.dump(adata, f)

# Non_linear synthetic data

## generating data

In [None]:
genes = la.Dim(500, name="gene")
cells = la.Dim(2000, name="cell")

In [None]:
def create_a(key, genes, odds_ratio=1):
    a = la.distributions.NormalDualMixture(
    key=key,
    loc1=0.0,
    scale1=0.1,
    loc2=0.0,
    scale2=1,
    logit=odds_ratio,
    definition=[genes],)
    return a

In [None]:
def create_a_spline(key, genes,knots, odds_ratio=1):
    a = la.distributions.NormalDualMixture(
    key=key,
    loc1=0.0,
    scale1=0.1,
    loc2=0.0,
    scale2=1,
    logit=odds_ratio,
    definition=[genes, knots],)
    return a

In [None]:
def create_data_complex(genes, logit, x_fixed_list = None):
    x_effects = {}
    generator = np.random.default_rng(1)
    if not x_fixed_list:
        x1 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Spline1")
        x2 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="Logistic2")
        x3 = la.Fixed(generator.uniform(size=len(cells)), definition=[cells], label="CircularSpline3")
    else:
        x1 = x_fixed_list[0]
        x2 = x_fixed_list[1]
        x3 = x_fixed_list[2]


    # x1 effect
    knots = la.Dim(10, name="knot")
    knot_positions = la.Fixed(
        np.linspace(0.0, 1.0, len(knots)), definition=[knots], label="knot_positions"
    )
    circular_knot_positions = la.Fixed(
        np.linspace(0.0, 2 * np.pi, len(knots)),
        definition=[knots],
        label="circular_knot_positions",
    )
    key=generator.integers(0, 10000)
    a = create_a_spline(key, genes,knots, logit)

    import math
    x1_effect = la.links.scalar.Spline(x=x1, a=a, knot=knot_positions, label="x1_effect")
    x_effects["x1_effect"] = (x1_effect) 

    oi = la.Fixed(1)

    # x2 effect
    key=generator.integers(0, 10000)
    a = create_a(key, genes)
    shift = la.Fixed(generator.normal(0.5, 0.3, size=len(genes)), definition=[genes], label="shift")
    a_real = la.links.scalar.Linear(x=oi, a=a)
    x2_effect = la.links.scalar.Logistic(x=x2, a=a_real, shift = shift, skew = 20., label="x2_effect")
    x_effects["x2_effect"] = (x2_effect) 
    # x3 effect
    key=generator.integers(0, 10000)
    a = create_a_spline(key, genes,knots, logit)

    import math
    x3_effect = la.links.scalar.CircularSpline(
    x=la.links.scalar.Linear(x3, a = 2*math.pi), a=a, knot=circular_knot_positions, label="x3_effect", smoothness=2
)
    x_effects["x3_effect"] = (x3_effect) 
    y = la.modular.Additive(definition=[cells, genes], transforms=[la.transforms.Exp()])
    y.x1_effect = x1_effect
    y.x2_effect = x2_effect
    y.x3_effect = x3_effect
    y.b = b
    dispersion_value = np.clip(generator.gamma(1.0, 1.0, size=len(genes)), 2, 1)
    dispersion = la.Fixed(dispersion_value, definition=[genes], label="dispersion")
    distribution = la.distributions.NegativeBinomial2(key=1, mu=y, dispersion=dispersion)
    distribution.rootify()
    program = la.Program()
    distribution.value(program)
    y.value(program)
    y.find("Spline1").value(program)
    y.find("Logistic2").value(program)
    y.find("CircularSpline3").value(program)
    y.x1_effect.value(program)
    y.x2_effect.value(program)
    y.x3_effect.value(program)

    observation_value = np.asarray(program.run()[("root", "value")])
    y_value = np.asarray(program.run()[("root.mu", "value")])
    x1_value = np.asarray(program.run()[(y.find("Spline1").uuid, "value")])
    x2_value = np.asarray(program.run()[(y.find("Logistic2").uuid, "value")])
    x3_value = np.asarray(program.run()[(y.find("CircularSpline3").uuid, "value")])
    x1_effect_value = np.asarray(program.run()[(y.x1_effect.uuid, "value")])
    x2_effect_value = np.asarray(program.run()[(y.x2_effect.uuid, "value")])
    x3_effect_value = np.asarray(program.run()[(y.x3_effect.uuid, "value")])
    import scanpy as sc

    adata = sc.AnnData(
        observation_value,
        obs=pd.DataFrame(
            {
                "Spline1": x1.prior_pd(),
                "Logistic2": x2.prior_pd(),
                "CircularSpline3": x3.prior_pd(),
            }
        ),
    )[:5000]

    sc.pp.pca(adata)
    sc.pp.neighbors(adata)
    sc.tl.umap(adata)
    return adata, distribution


In [None]:
adata, model = create_data_complex(genes, 0.5)
model.plot()

In [None]:
import eyck
eyck.m.t.plot_umap(adata, color=["Spline1", "Logistic2", "CircularSpline3"], cmap = "magma").display()

In [None]:
factors = 5
# Import necessary libraries
from sklearn.decomposition import FastICA
ica = FastICA(n_components=factors, random_state=42)
X_transformed = ica.fit_transform(adata.X)
components = ica.components_.T
adata.obsm["X_ica"] = X_transformed
# NMF
from sklearn.decomposition import NMF
nmf = NMF(n_components=factors, random_state=42)
X_transformed = nmf.fit_transform(adata.X)
components = nmf.components_.T
adata.obsm["X_nmf"] = X_transformed
# SparsePCA
from sklearn.decomposition import SparsePCA
pca = SparsePCA(n_components=factors, random_state=42, alpha = 2)
X_transformed = pca.fit_transform(adata.X)
components = pca.components_.T
adata.obsm["X_sparsepca_5"] = X_transformed

# Factor analysis
from sklearn.decomposition import FactorAnalysis
fa = FactorAnalysis(n_components=factors, random_state=42)
X_transformed = fa.fit_transform(adata.X)
components = fa.components_.T
adata.obsm["X_fa"] = X_transformed
# Isomap
from sklearn.manifold import Isomap
isomap = Isomap(n_components=factors)
X_transformed = isomap.fit_transform(adata.X)
components = isomap.embedding_.T
adata.obsm["X_isomap"] = X_transformed
# Spectral
from sklearn.manifold import SpectralEmbedding
spectral = SpectralEmbedding(n_components=factors)
X_transformed = spectral.fit_transform(adata.X)
components = spectral.embedding_.T
adata.obsm["X_spectral"] = X_transformed

In [None]:
methods = ["X_pca","X_sparsepca_5"]
names = ["PCA", "X_sparsepca_5"]
for index,method in enumerate(methods):
    fig = pp.grid.Figure(pp.grid.Grid(padding_height=0, padding_width=0))

    grid = fig.main[1, 1] = pp.grid.Grid(padding_height=0.01, padding_width=0.01)
    s = 0.5

    w = 0.5
    h = 0.5

    max_n = 1000

    rows = pd.DataFrame({
        "name":["Spline1", "Logistic2", "CircularSpline3"],
    })
    rows["ix"] = range(len(rows))
    rows = rows.set_index("name")


    columns = pd.DataFrame({
        "name":[f"IC{i+1}" for i in range(min(adata.obsm[method].shape[1], 5))],
    })
    columns["ix"] = range(len(columns))
    columns = columns.set_index("name")

    for row_name, (i, ) in rows.iterrows():
        panel, ax = grid[i + 1, 0] = pp.grid.Panel((w, h))
        ax.text(0.4, 0.5, row_name, ha="center", va="center")
        ax.axis("off")

    for i in columns["ix"]:
        panel, ax = grid[0, i + 1] = pp.grid.Panel((w, h))
        ax.text(0.5, 0.5, f"x{i+1}", ha="center", va="center")
        ax.axis("off")  

    for row_name, (i, ) in rows.iterrows():
        for column_name, (j, ) in columns.iterrows():
            x = adata.obs[row_name]
            y = adata.obsm[method][:, j]
            panel, ax = grid[i+1, j+1] = pp.grid.Panel((w, h))
            ax.scatter(x, y, s=0.5)
            cor = np.corrcoef(x, y)[0, 1]
            text = ax.text(0.5, 0.5, f"{cor:.2f}", transform=ax.transAxes, ha="center", va="center")
            text.set_path_effects(
                [
                    mpl.patheffects.Stroke(linewidth=3, foreground="white"),
                    mpl.patheffects.Normal(),
                ]
            )
            ax.axis("off")
    fig.main.align()

    horizontal = fig.main[0, 1] = pp.grid.Panel((grid.width, 0.2))
    horizontal.axis("off")
    horizontal.text(0.5, 0.5, names[index], va="center", ha="center")

    vertical = fig.main[1, 0] = pp.grid.Panel((0.2, grid.height))
    vertical.axis("off")
    vertical.text(-0.5, 0.5, "Real", rotation=90, va="center", ha="center")
    # Add a top-left title
    top_left = fig.main[0, 0] = pp.grid.Panel((0.4, 0.2))  # Adjust width and height as needed
    top_left.axis("off")
    top_left.text(0.5, 0.5, "Simple model", va="center", ha="center", fontsize=12, fontweight="bold")
    fig.display()

In [None]:
path = os.path.join("..", "synthetic_data", "no_dependencies_complex_synthetic_data.pkl")
os.makedirs(os.path.dirname(path), exist_ok=True)

with open(path, "wb") as f:
    pickle.dump(adata, f)

## dependencies

In [None]:
import latenta.links.vector.binary_multidimensional_spline as bms
import latenta.distributions.binary_multidimensional_spline as bms_dist

expdim = 4
n_dimensions = 3
a_zooms, a_dimensions = bms.create_zooms(expdim, n_dimensions, expdim_min=1)
a_split = bms.create_a_values(a_zooms, a_dimensions, n_dimensions)

a_values = dict(zip(zip(a_zooms, a_dimensions), a_split))

# a_values: default at 0
# 
a_values[(2, (0,))] = a_values[(2, (0,))].at[2].set(2.0)
a_values[(3, (0, 2))] = a_values[(3, (0, 2))].at[0, :, 2].set(2.0)
a_values[(2, (0, 1, 2))] = a_values[(2, (0, 1, 2))].at[1, 5, 2].set(1.0)
a_split = tuple(a_values.values())

a_concatenated, splits, shapes = bms.concatenate_a(a_split)
a = la.Fixed(a_concatenated, definition=[la.Dim(len(a_concatenated), name="a")], label="a")
dist = bms_dist.BinaryMultidimensionalSplineDistribution(
    a,
    zooms=a_zooms,
    dimensions=a_dimensions,
    n_dimensions=n_dimensions,
    zoom=4,
    kernel="normal",
    key=10
)
X = dist.prior(shape=(len(cells), n_dimensions))
plt.hist(X)
x1 = la.Fixed(X[:, 0], definition=[cells], label="Spline1")
x2 = la.Fixed(X[:, 1], definition=[cells], label="Logistic2")
x3 = la.Fixed(X[:, 2], definition=[cells], label="CircularSpline3")

generator = np.random.default_rng(1)
b_value = generator.normal(1, scale=0.5, size=len(genes))
b = la.Fixed(b_value, definition=[genes], label="a")

In [None]:
adata,model = create_data_complex(genes, 0.5,[x1,x2,x3])

In [None]:
adata.X.max()

In [None]:
import eyck
eyck.m.t.plot_umap(adata, color=["Spline1", "Logistic2", "CircularSpline3"], cmap = "magma").display()

In [None]:
def factor_analysis(factors, adata):
    # Import necessary libraries
    from sklearn.decomposition import FastICA
    ica = FastICA(n_components=factors, random_state=42)
    X_transformed = ica.fit_transform(adata.X)
    components = ica.components_.T
    adata.obsm["X_ica"] = X_transformed
    # NMF
    from sklearn.decomposition import NMF
    nmf = NMF(n_components=factors, random_state=42)
    X_transformed = nmf.fit_transform(adata.X)
    components = nmf.components_.T
    adata.obsm["X_nmf"] = X_transformed
    # SparsePCA
    from sklearn.decomposition import SparsePCA
    pca = SparsePCA(n_components=factors, random_state=42, alpha = 2)
    X_transformed = pca.fit_transform(adata.X)
    components = pca.components_.T
    adata.obsm["X_sparsepca_5"] = X_transformed

    # Factor analysis
    from sklearn.decomposition import FactorAnalysis
    fa = FactorAnalysis(n_components=factors, random_state=42)
    X_transformed = fa.fit_transform(adata.X)
    components = fa.components_.T
    adata.obsm["X_fa"] = X_transformed
    # Isomap
    from sklearn.manifold import Isomap
    isomap = Isomap(n_components=factors)
    X_transformed = isomap.fit_transform(adata.X)
    components = isomap.embedding_.T
    adata.obsm["X_isomap"] = X_transformed
    # Spectral
    from sklearn.manifold import SpectralEmbedding
    spectral = SpectralEmbedding(n_components=factors)
    X_transformed = spectral.fit_transform(adata.X)
    components = spectral.embedding_.T
    adata.obsm["X_spectral"] = X_transformed
factor_analysis(5, adata)

In [None]:
methods = ["X_pca","X_sparsepca_5"]
names = ["PCA", "X_sparsepca_5"]
for index,method in enumerate(methods):
    fig = pp.grid.Figure(pp.grid.Grid(padding_height=0, padding_width=0))

    grid = fig.main[1, 1] = pp.grid.Grid(padding_height=0.01, padding_width=0.01)
    s = 0.5

    w = 0.5
    h = 0.5

    max_n = 1000

    rows = pd.DataFrame({
        "name":["Spline1", "Logistic2", "CircularSpline3"],
    })
    rows["ix"] = range(len(rows))
    rows = rows.set_index("name")


    columns = pd.DataFrame({
        "name":[f"IC{i+1}" for i in range(min(adata.obsm[method].shape[1], 5))],
    })
    columns["ix"] = range(len(columns))
    columns = columns.set_index("name")

    for row_name, (i, ) in rows.iterrows():
        panel, ax = grid[i + 1, 0] = pp.grid.Panel((w, h))
        ax.text(0.4, 0.5, row_name, ha="center", va="center")
        ax.axis("off")

    for i in columns["ix"]:
        panel, ax = grid[0, i + 1] = pp.grid.Panel((w, h))
        ax.text(0.5, 0.5, f"x{i+1}", ha="center", va="center")
        ax.axis("off")  

    for row_name, (i, ) in rows.iterrows():
        for column_name, (j, ) in columns.iterrows():
            x = adata.obs[row_name]
            y = adata.obsm[method][:, j]
            panel, ax = grid[i+1, j+1] = pp.grid.Panel((w, h))
            ax.scatter(x, y, s=0.5)
            cor = np.corrcoef(x, y)[0, 1]
            text = ax.text(0.5, 0.5, f"{cor:.2f}", transform=ax.transAxes, ha="center", va="center")
            text.set_path_effects(
                [
                    mpl.patheffects.Stroke(linewidth=3, foreground="white"),
                    mpl.patheffects.Normal(),
                ]
            )
            ax.axis("off")
    fig.main.align()

    horizontal = fig.main[0, 1] = pp.grid.Panel((grid.width, 0.2))
    horizontal.axis("off")
    horizontal.text(0.5, 0.5, names[index], va="center", ha="center")

    vertical = fig.main[1, 0] = pp.grid.Panel((0.2, grid.height))
    vertical.axis("off")
    vertical.text(-0.5, 0.5, "Real", rotation=90, va="center", ha="center")
    # Add a top-left title
    top_left = fig.main[0, 0] = pp.grid.Panel((0.4, 0.2))  # Adjust width and height as needed
    top_left.axis("off")
    top_left.text(0.5, 0.5, "Simple model", va="center", ha="center", fontsize=12, fontweight="bold")
    fig.display()

In [None]:
path = os.path.join("..", "synthetic_data", "dependencies_complex_synthetic_data.pkl")
os.makedirs(os.path.dirname(path), exist_ok=True)

with open(path, "wb") as f:
    pickle.dump(adata, f)