In [None]:
import os
import pandas as pd
import numpy as np
import scanpy as sc

from euclid_msi.preprocessing import Preprocessing
from euclid_msi.embedding import Embedding
from euclid_msi.clustering import Clustering
from euclid_msi.postprocessing import Postprocessing

def test_workflow():
    """
    Full pipeline test for EUCLID.
    Runs preprocessing, embedding, clustering, and postprocessing steps.
    Adjust file paths and parameters as needed.
    """
    # ---------------------------
    # Preprocessing
    # ---------------------------
    prep = Preprocessing()
    morans_df = prep.calculate_moran(
        path_data="/data/luca/lipidatlas/uMAIA_allbrains/021124_ALLBRAINS_normalised.zarr",
        acquisitions=[
            'BrainAtlas/BRAIN2/20211201_MouseBrain2_S11_306x248_Att30_25um',
            'BrainAtlas/BRAIN2/20211202_MouseBrain2_S12_332x246_Att30_25um',
        ],
        log_file="iterations_log.txt",
        morans_csv="morans_by_sec.csv"
    )
    print("Moran's I computed:", morans_df.shape)
    
    adata = prep.store_exp_data_metadata(
        path_data="/data/luca/lipidatlas/uMAIA_allbrains/021124_ALLBRAINS_normalised.zarr",
        acquisitions=[
            'BrainAtlas/BRAIN2/20211201_MouseBrain2_S11_306x248_Att30_25um',
            'BrainAtlas/BRAIN2/20211202_MouseBrain2_S12_332x246_Att30_25um',
        ],
        metadata_csv="acquisitions_metadata.csv",
        output_anndata="msi_preprocessed.h5ad"
    )
    print("Preprocessed AnnData shape:", adata.shape)
    
    # ---------------------------
    # Embedding
    # ---------------------------
    emb = Embedding()
    data_df = pd.DataFrame(adata.X, index=adata.obs_names, columns=adata.var_names).fillna(0.0001)
    nmf_embeddings, factor_to_lipid, N_factors, nmf_model = emb.learn_seeded_nmf_embeddings(data_df)
    print("Seeded NMF embeddings shape:", nmf_embeddings.shape)
    
    nmf_applied = emb.apply_nmf_embeddings(data_df, nmf_model, adata=adata)
    print("Applied NMF embeddings shape:", nmf_applied.shape)
    
    batch_meta = pd.DataFrame({
        "batch": ["A" if i % 2 == 0 else "B" for i in range(nmf_applied.shape[0])]
    }, index=nmf_applied.index)
    corrected_nmf = emb.harmonize_nmf_batches(nmf_applied, batch_meta, vars_use=["batch"])
    print("Corrected NMF embeddings shape:", corrected_nmf.shape)
    
    reconstructed_df = emb.approximate_dataset_harmonmf(corrected_nmf, factor_to_lipid, list(data_df.columns))
    print("Reconstructed dataset shape:", reconstructed_df.shape)
    
    tsne_coords = emb.tsne(corrected_nmf, perplexity=30, init_indices=(0,1))
    print("tSNE coordinates shape:", tsne_coords.shape)
    adata.obsm["X_TSNE"] = tsne_coords.values
    np.save("factor_to_lipid.npy", factor_to_lipid)
    print("Saved factor_to_lipid.npy")
    
    # ---------------------------
    # Clustering
    # ---------------------------
    standardized_embeddings_GLOBAL = pd.DataFrame(StandardScaler().fit_transform(corrected_nmf),
                                                  index=corrected_nmf.index,
                                                  columns=corrected_nmf.columns)
    metadata = adata.obs.copy()
    coordinates = pd.DataFrame({
        "zccf": np.random.uniform(0, 100, size=adata.n_obs),
        "yccf": np.random.uniform(0, 50, size=adata.n_obs),
        "Section": np.random.randint(1, 33, size=adata.n_obs),
        "xccf": np.random.uniform(0, 80, size=adata.n_obs)
    }, index=adata.obs_names)
    clust = Clustering(adata, coordinates, reconstructed_df, standardized_embeddings_GLOBAL, metadata)
    clust.leiden_nmf(resolution=1.0, key_added="leiden_nmf_test")
    print("Conventional Leiden clustering done.")
    root_node, clusteringLOG = clust.learn_euclid_clustering(max_depth=5, ds_factor=1)
    print("Euclid clustering tree learned.")
    tree_dummy = clusteringLOG.copy()
    tree_dummy["cluster"] = tree_dummy.iloc[:, 0].astype(str)
    tree_dummy["class"] = tree_dummy.iloc[:, 0].astype(str)
    lipizone_colors = clust.assign_cluster_colors(tree_dummy, coordinates, pdf_output="colorzones.h5ad")
    print("Cluster colors assigned.")
    df_paths = clust.apply_euclid_clustering()
    print("Applied Euclid clustering; hierarchy shape:", df_paths.shape)
    acronyms = pd.Series(np.random.choice(["CTX", "HIP", "STR", "THA"], size=adata.n_obs), index=adata.obs_names)
    lipizones = pd.Series(np.random.choice(["1", "2", "3"], size=adata.n_obs), index=adata.obs_names)
    anatomy_mapping = clust.name_lipizones_anatomy(acronyms, lipizones)
    print("Anatomical mapping computed.")
    clust.clusters_to_pdf(lipizone_colors, output_folder="lipizones_output", pdf_filename="clusters_combined.pdf")
    print("Cluster PDFs generated.")
    
    # ---------------------------
    # Postprocessing
    # ---------------------------
    # For postprocessing we need to supply: adata, embeddings (e.g. corrected_nmf),
    # morans (from preprocessing), alldata (e.g. from preprocessing), pixels (from preprocessing),
    # and optionally reference and annotation images.
    # For testing, we simulate alldata and pixels from adata.
    alldata = pd.DataFrame(adata.X, index=adata.obs_names, columns=adata.var_names)
    pixels = adata.obs.copy()  # assume it contains spatial info
    # For reference_image and annotation_image, we simulate dummy 3D arrays.
    reference_image = np.random.uniform(0, 255, size=(100, 100, 100))
    annotation_image = np.random.uniform(-2, 10, size=(100, 100, 100))
    
    postproc = Postprocessing(adata, corrected_nmf, morans_df, alldata, pixels, reference_image, annotation_image)
    
    # Block 1: XGBoost Feature Restoration
    coords_imputed = postproc.xgboost_feature_restoration()
    print("XGBoost feature restoration completed.")
    
    # Block 2: Anatomical Interpolation
    # For testing, use a list of a few lipid names from alldata columns
    lipid_list = list(alldata.columns[:5])
    postproc.anatomical_interpolation(lipid_list, output_dir="3d_interpolated_native", w=5)
    print("Anatomical interpolation completed.")
    
    # Block 3: Train Lipimap (placeholder)
    postproc.train_lipimap()
    
    # Block 4: Add modality – simulate a dummy modality adata
    dummy_modality = sc.AnnData(X=np.random.rand(adata.n_obs, 10))
    adata_updated = postproc.add_modality(dummy_modality, "gene_expression", modality_type="continuous")
    print("Added new modality; modalities now:", list(adata_updated.obsm.keys()))
    
    # Block 5: Compare Parcellations – simulate two parcellations
    parcellation1 = pd.Series(np.random.choice(["A", "B", "C"], size=adata.n_obs), index=adata.obs_names)
    parcellation2 = pd.Series(np.random.choice(["X", "Y", "Z"], size=adata.n_obs), index=adata.obs_names)
    comp_df = postproc.compare_parcellations(parcellation1, parcellation2, substrings=["omit"], M=200)
    print("Parcellation comparison completed; heatmap saved.")
    
    # Block 6: Run MOFA – simulate gene and lipid data
    genes = pd.DataFrame(np.random.rand(adata.n_obs, 50), index=adata.obs_names,
                         columns=[f"gene_{i}" for i in range(50)])
    lipids = pd.DataFrame(np.random.rand(adata.n_obs, 30), index=adata.obs_names,
                          columns=[f"lipid_{i}" for i in range(30)])
    mofa_results = postproc.run_mofa(genes, lipids, factors=10, train_iter=5)
    print("MOFA run completed; factors shape:", mofa_results["factors"].shape)
    
    # Block 7: Neighborhood Analysis – simulate metadata with necessary columns
    meta = pd.DataFrame({
        "SectionID": np.random.randint(1, 10, size=adata.n_obs),
        "x": np.random.randint(0, 100, size=adata.n_obs),
        "y": np.random.randint(0, 100, size=adata.n_obs),
        "lipizone_names": np.random.choice(["1", "2", "3"], size=adata.n_obs)
    }, index=adata.obs_names)
    neighbor_props = postproc.neighborhood(meta)
    print("Neighborhood analysis completed.")
    
    # Block 8a: UMAP of Molecules – simulate centroidsmolecules from adata.var
    centroidsmolecules = pd.DataFrame(np.random.rand(5, adata.n_obs),
                                       index=[f"lipid_{i}" for i in range(5)],
                                       columns=adata.obs_names)
    umap_mol = postproc.umap_molecules(centroidsmolecules, output_pdf="umap_molecules.pdf")
    print("UMAP of molecules completed.")
    
    # Block 8b: UMAP of Lipizones – simulate centroids as a DataFrame
    centroids = pd.DataFrame(np.random.rand(10, 5))
    umap_lip = postproc.umap_lipizones(centroids)
    print("UMAP of lipizones completed.")
    
    # Block 9: Spatial Modules – simulate a selection of sections
    postproc.spatial_modules(selected_sections=[1, 2, 3])
    print("Spatial modules analysis completed.")

if __name__ == "__main__":
    test_workflow()
