In [2]:
import os,sys,glob, warnings
from collections import defaultdict
from tqdm import tqdm
import numpy as np # Can't install NumPy 2.2.2 which is what the pkls were saved with
import pandas as pd # 'v2.2.3'
import anndata as ad
from pyexeggutor import (
    build_logger,
    reset_logger,
    read_pickle,
    write_pickle,
)

# Niche
import optuna
from joblib import Parallel, delayed

# PyData
from scipy.spatial.distance import (
    pdist, 
    squareform,
)
from sklearn.neighbors import (
    kneighbors_graph, 
    KNeighborsTransformer,
)
from sklearn.metrics import (
    pairwise_distances,
    silhouette_score, 
    
    # silhouette_samples, 
    # completeness_score,
)
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt

# Metabolic Niche Space
from metabolic_niche_space.neighbors import KNeighborsKernel
# from metabolic_niche_space.manifold import DiffusionMaps # Shortcut: from datafold.dynfold import DiffusionMaps
from datafold.dynfold import DiffusionMaps

# from clairvoyance.utils import ( 
#     compile_parameter_space,
# )
# from sklearn.cluster import (
#     HDBSCAN, # Not included in sklearn <1.3
# )

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def fast_groupby_sum(X:pd.DataFrame, y:pd.Series):
    if not np.all(X.shape[0] == y.size):
        raise IndexError("X.shape[0] must equal y.size")
    if not np.all(X.index == y.index):
        raise IndexError("X.index must equal y.index")
    if not isinstance(y, pd.CategoricalDtype):
        y = y.astype("category")
    # Convert y to numeric indices
    unique_classes, y_indices = np.unique(y, return_inverse=True)

    # Use np.bincount to sum efficiently for each feature
    arrays = list()
    for col in tqdm(X.columns, "Summing rows by groups", unit=" column"):
        summed_values = np.bincount(y_indices, weights=X[col].values, minlength=len(unique_classes))
        arrays.append(summed_values)
    X_grouped = np.vstack(arrays).T

    # Convert to DataFrame
    return pd.DataFrame(X_grouped, index=unique_classes, columns=X.columns)

def compile_parameter_space(trial, param_space):
    params = dict()
    for k, v in param_space.items():
        if isinstance(v, list):
            suggestion_type = v[0]
            if isinstance(suggestion_type, type):
                suggestion_type = suggestion_type.__name__
            suggest = getattr(trial, f"suggest_{suggestion_type}")
            suggestion = suggest(k, v[1], v[2])
        else:
            suggestion = v
        params[k] = suggestion
    return params

# @numba.njit(parallel=True)
# def pairwise_jaccard(X, redundant_form=False):
#     n_samples = X.shape[0]
#     D = np.zeros((n_samples, n_samples), dtype=np.float32)
    
#     for i in numba.prange(n_samples):
#         for j in range(i + 1, n_samples):
#             intersection = np.logical_and(X[i], X[j]).sum()
#             union = np.logical_or(X[i], X[j]).sum()
#             D[i, j] = 1 - (intersection / union if union > 0 else 1)
#             D[j, i] = D[i, j]  # Distance matrix is symmetric
#     if redundant_form:
#         return D
#     else:
#         return squareform(D)

In [4]:
%%time
# Data
output_directory="../data/training"
X_genomic_traits = pd.read_csv(os.path.join(output_directory, "global.genomic_traits.kofam.bool-int.pathway_subset.tsv.gz"), sep="\t", index_col=0).astype(bool)
X_genomic_traits = X_genomic_traits.loc[X_genomic_traits.sum(axis=1)[lambda x: x > 0].index]
genome_to_clusterani = pd.read_csv(os.path.join(output_directory, "genome_to_ani-cluster.tsv.gz"), sep="\t", index_col=0).iloc[:,0].astype("category").loc[X_genomic_traits.index]
X_genomic_traits_clusterani = pd.read_csv(os.path.join(output_directory, "global.genomic_traits.kofam.bool-int.cluster-ani.pathway_subset.tsv.gz"), index_col=0, sep="\t").astype(bool)
X_genomic_traits_clusterani = X_genomic_traits_clusterani.loc[X_genomic_traits_clusterani.sum(axis=1)[lambda x: x > 0].index]


CPU times: user 1min 15s, sys: 7.25 s, total: 1min 22s
Wall time: 1min 25s


In [9]:
quality_files = glob.glob("/home/ec2-user/SageMaker/s3/newatlantis-genomics-db-prod/SourceDatabases/*/Metadata/quality.tsv")

In [24]:
df_quality = pd.read_csv("../data/quality.tsv.gz", sep="\t", index_col=0)
df = df_quality.query("completeness>=50").query("contamination<10")
df

Unnamed: 0_level_0,completeness,contamination,method,notes
id_genome,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
clavispora_lusitaniae_gca_001673695,98.60,0.00,BUSCO_v5.4.3,saccharomycetes_odb10
aspergillus_flavus_nrrl3357_gca_014117465,97.40,0.30,BUSCO_v5.4.3,eurotiales_odb10
aureobasidium_pullulans_gca_004917135,96.70,0.10,BUSCO_v5.4.3,dothideomycetes_odb10
saccharomyces_cerevisiae_yjm682_gca_000976275,98.70,2.10,BUSCO_v5.4.3,saccharomycetes_odb10
trichophyton_tonsurans_cbs_112818_gca_000151455,96.20,0.00,BUSCO_v5.4.3,onygenales_odb10
...,...,...,...,...
YM.44,76.63,2.04,CheckM2_v1.0.1,Neural Network (Specific Model)
YM.49,68.32,3.94,CheckM2_v1.0.1,Gradient Boost (General Model)
YM.5,85.21,3.23,CheckM2_v1.0.1,Neural Network (Specific Model)
YM.52,58.89,1.59,CheckM2_v1.0.1,Neural Network (Specific Model)


In [37]:
eukaryotes = set(df.index[df["method"].map(lambda x: x.startswith("BUSCO"))]) & set(X_genomic_traits.index)
prokaryotes = set(df.index[df["method"].map(lambda x: not x.startswith("BUSCO"))] ) & set(X_genomic_traits.index)

len(eukaryotes), len(prokaryotes)

(4508, 62634)

In [38]:
with open("../data/cluster/ani/prokaryotic/organisms.completeness_gte50.contamination_lt10.list", "w") as f:
    for id in prokaryotes:
        print(id, file=f)
with open("../data/cluster/ani/eukaryotic/organisms.completeness_gte50.contamination_lt10.list", "w") as f:
    for id in eukaryotes:
        print(id, file=f)

In [34]:
# len(eukaryotes) + len(prokaryotes)

(set(eukaryotes) | set(prokaryotes)) - set(X_genomic_traits.index)

{'Amaapr1',
 'Anasp1',
 'Annbov1',
 'Aspcib1',
 'Aspoc2036_1',
 'Asppsf1',
 'Aspuva1',
 'Astsub1',
 'CadmalM34_1',
 'Cante1',
 'Cersp395_1',
 'Cersp423_1',
 'Cha5317_1',
 'Chleu1',
 'Chrpa1',
 'Chrvin1',
 'Clawha1',
 'Coere1',
 'Corarc1',
 'Enche1',
 'Encin1',
 'Encro1',
 'Entgas1',
 'Entmai1',
 'FoxII5',
 'Graco1',
 'Ilysp1',
 'Krisp1',
 'LacJLM2183_1',
 'Laccon1',
 'Lactsubd1',
 'Lerce1',
 'Lerce2',
 'Lerer1',
 'Lerer2',
 'Leucr1',
 'LobpulSw1',
 'Lorju1',
 'Mordis1',
 'Morpra1',
 'Mycden1',
 'Persub1',
 'Phchr4_2',
 'Phosp1',
 'Pilbys1',
 'PleosDSM11191_1',
 'Porspa1',
 'Praco1',
 'Pseneu1',
 'Psisu1',
 'Rambr1',
 'Rapsub1',
 'Rhives1',
 'Rhivul1',
 'RusSA2_1',
 'Ruseme113_1',
 'Schco2071_1',
 'Schoc1',
 'Sclcihr1',
 'Sclsan1',
 'Semro1',
 'Stobe1',
 'Strrug1',
 'Suigla1',
 'TARA_SOC_28_MAG_00049',
 'TOPAZ_NPS1_E012',
 'Tralj1',
 'Tricec1',
 'Trihar1',
 'Trima3',
 'Tripop1',
 'Ulvmu1',
 'Undpi1',
 'Ustsp1',
 'XyFL1272_2'}

In [36]:
ls /home/ec2-user/SageMaker/s3/newatlantis-genomics-db-prod/SourceDatabases/JGI-PhycoCosm/Genomes/Eukaryotic/ | grep "Anasp1"


In [8]:
import joblib
class NicheSpace(object):
    def __init__(
        self, 
        # General
        name:str=None,
        observation_type:str=None,
        feature_type:str=None,
        class_type:str=None,
        minimum_nfeatures:int=100,

        # Diffusion Maps
        kernel_distance_metric:str="jaccard",
        # scoring_method:str="silhouette", # or IICR
        scoring_distance_metric:str="euclidean",
        n_neighbors:int="auto",
        n_eigenpairs:int="auto",
        alpha:float="auto",
        niche_prefix="n",

        # Optuna
        n_trials=50,
        n_jobs:int=1,
        n_concurrent_trials:int=1,
        parallel_type:str="threads",
        study_timeout=None,
        # study_callbacks=None,
        random_state=0,
        verbose=1,
        stream=sys.stdout,

        ):
        # General
        self.name = name
        self.observation_type = observation_type
        self.feature_type = feature_type
        self.class_type = class_type
        self.minimum_nfeatures = minimum_nfeatures
        
        # Diffusion Maps
        self.kernel_distance_metric = kernel_distance_metric
        # self.scoring_method = scoring_method
        self.scoring_distance_metric = scoring_distance_metric
        self.niche_prefix = niche_prefix
        
        # Optuna
        self.n_jobs = n_jobs
        self.n_trials = n_trials
        self.n_concurrent_trials = n_concurrent_trials
        self.parallel_type = parallel_type
        self.random_state = random_state
        self.study_timeout = study_timeout
        # self.study_callbacks = study_callbacks
        
        # Hyperparameters
        self.is_tuned = True
        if n_neighbors == "auto":
            n_neighbors = [int, 10, 500]
            self.is_tuned = False
        self.n_neighbors = n_neighbors
        if n_eigenpairs == "auto":
            n_eigenpairs = [int, 10, 100]
            self.is_tuned = False
        self.n_eigenpairs = n_eigenpairs
        if alpha == "auto":
            alpha = [float, 0.0, 1.0]
            self.is_tuned = False
        self.alpha = alpha
        
        self._param_space = dict(
            n_neighbors = self.n_neighbors,
            n_eigenpairs = self.n_eigenpairs,
            alpha = self.alpha,
        )
        
        self.logger = build_logger(self.name, stream=stream)
        self.verbose = verbose
        self.is_fitted = False
        
    def tune(
        self,
        X:pd.DataFrame,
        y:pd.Series,
        X_grouped:pd.DataFrame,
        distance_matrix:np.array,
        sampler, 
        **study_kws,
        ):
        def _save_checkpoint(study, trial):
            joblib.dump(study, "optuna_checkpoint.pkl")
        def _objective(trial):

            # Compile parameters
            params = compile_parameter_space(
                trial, 
                self._param_space,
            )

            # Parameters
            n_neighbors = params["n_neighbors"]
            n_eigenpairs = params["n_eigenpairs"]
            alpha = params["alpha"]
            
            if n_neighbors >= X_grouped.shape[0]:
                return -1 #np.nan
            else:
                # Build kernel
                # if self.verbose > 1: print(f"[Trial {trial.number}] Creating kernel: n_neighbors={n_neighbors}, n_eigenpairs={n_eigenpairs}, alpha={alpha}", file=sys.stderr)
                kernel = KNeighborsKernel( 
                    metric=self.kernel_distance_metric, 
                    n_neighbors=n_neighbors, 
                    distance_matrix=distance_matrix, 
                    copy_distance_matrix=False,
                )

                # Calculate Diffusion Maps using KNeighbors
                model = DiffusionMaps(kernel=kernel, n_eigenpairs=n_eigenpairs, alpha=alpha)
                if self.verbose > 1: self.logger.info(f"[Trial {trial.number}] Fitting Diffision Map: n_neighbors={n_neighbors}, n_eigenpairs={n_eigenpairs}, alpha={alpha}")
                dmap_X_grouped = model.fit_transform(X_grouped)
                if self.verbose > 1: self.logger.info(f"[Trial {trial.number}] Transforming observations: n_neighbors={n_neighbors}, n_eigenpairs={n_eigenpairs}, alpha={alpha}")
                # dmap_X = model.transform(X)
                dmap_X = self._parallel_transform(X, model, msg=f"[Trial {trial.number}] [Parallel Transformation] Initial data")


                # Score
                if self.verbose > 1: self.logger.info(f"[Trial {trial.number}] Calculating silhouette score: n_neighbors={n_neighbors}, n_eigenpairs={n_eigenpairs}, alpha={alpha}")
                score = silhouette_score(dmap_X, y.values, metric=self.scoring_distance_metric, sample_size=None, random_state=None)
                return score
            
        if sampler is None:
            sampler = optuna.samplers.TPESampler(seed=self.random_state)
            
        direction = "maximize"
        study = optuna.create_study(
            direction=direction, 
            study_name=self.name, 
            sampler=sampler, 
            storage="sqlite:///optuna_study.db", 
            load_if_exists=True,
            **study_kws,
        )
        study.optimize(
            _objective, 
            n_trials=self.n_trials, 
            n_jobs=self.n_concurrent_trials,
            timeout=self.study_timeout, 
            show_progress_bar=self.verbose >= 2, 
            callbacks=[_save_checkpoint], 
            gc_after_trial=True,
        )
        return study
        
        
    def fit(
        self,
        X:pd.DataFrame,
        y:pd.Series,
        X_grouped:pd.DataFrame=None,
        distance_matrix:np.array=None,
        sampler=None,
        copy=True,
        **study_kws,
        ):
        # Check inputs
        if not np.all(X.shape[0] == y.size):
            raise IndexError("X.shape[0] must equal y.size")
        if not np.all(X.index == y.index):
            raise IndexError("X.index must equal y.index")
        if not isinstance(y, pd.CategoricalDtype):
            y = y.astype("category")
        self.X_ = X.copy()
        self.y_ = y.copy()
        
        # Group values
        if X_grouped is None:
            X_grouped = fast_groupby_sum(X, y)

            
        if not set(X_grouped.index) <= set(y.unique()):
            raise IndexError("X_grouped.index must be ≤ y categories")
            
        # Minimum number of features
        if self.minimum_nfeatures > 0:
            if self.verbose > 0:
                self.logger.info(f"[Start] Filtering observations and classes below feature threshold: {self.minimum_nfeatures}")

            number_of_features_per_class = (X_grouped > 0).sum(axis=1)
            index_classes = number_of_features_per_class.index[number_of_features_per_class > self.minimum_nfeatures]

            mask = y.map(lambda x: x not in index_classes)
            y = y.loc[~mask]
            X = X.loc[y.index]
            if self.verbose > 0:
                self.logger.info(f"[Dropping] N = {X_grouped.shape[0] - len(index_classes)} classes")
                self.logger.info(f"[Dropping] N = {sum(mask)} observations")
                self.logger.info(f"[Remaining] N = {X_grouped.shape[0]} classes")
                self.logger.info(f"[Remaining] N = {X.shape[0]} observations")
                self.logger.info(f"[Remaining] N = {X.shape[1]} features")
                self.logger.info(f"[End] Filtering observations and classes below feature threshold")
            X_grouped = X_grouped.loc[index_classes]
            
        # Dtype
        if self.kernel_distance_metric == "jaccard":
            X = X.astype(bool)
            X_grouped = X_grouped.astype(bool)
            
        # Distance matrix
        if distance_matrix is None:
            if self.verbose > 0:
                self.logger.info("[Start] Processing distance matrix")
            if self.kernel_distance_metric == "euclidean":
                distance_matrix = squareform(pdist(X_grouped, metric=self.kernel_distance_metric))
            else:
                distance_matrix = pairwise_distances(X=X_grouped.values, metric=self.kernel_distance_metric, n_jobs=self.n_jobs)
            
        if len(distance_matrix.shape) == 1:
            distance_matrix = squareform(distance_matrix)
        if self.verbose > 0:
            self.logger.info("[End] Processing distance matrix")

        # Store
        self.classes_ = y.cat.categories
        if copy:
            self.X_ = X.copy()
            self.y_ = y.copy()
            self.X_grouped_ = X_grouped.copy()
        
        # Tune
        if not self.is_tuned:
            if self.verbose > 0:
                self.logger.info("[Begin] Hyperparameter Tuning")
            self.study_ = self.tune(
                X=X,
                y=y,
                X_grouped=X_grouped,
                distance_matrix=distance_matrix,
                sampler=sampler, 
                **study_kws,
                )
            for k, v in self.study_.best_params.items():
                setattr(self,k,v)
            if self.verbose > 0:
                self.logger.info(f"Tuned parameters (Score={self.study_.best_value}): {self.study_.best_params}")
                self.logger.info("[End] Hyperparameter Tuning")
            self.is_tuned = True
            
        # Build kernel
        self.kernel_ = KNeighborsKernel( 
            metric=self.kernel_distance_metric, 
            n_neighbors=self.n_neighbors, 
            distance_matrix=distance_matrix, 
            copy_distance_matrix=True,
        )

        # Calculate Diffusion Maps using KNeighbors
        self.model_ = DiffusionMaps(kernel=self.kernel_, n_eigenpairs=self.n_eigenpairs, alpha=self.alpha)
        
        # Fit
        A = self.model_.fit(X_grouped)

        # Grouped
        # A = self.model_.fit_transform(X_grouped)
        A = self._parallel_transform(X_grouped, self.model_, msg=f"[Parallel Transformation] Grouped data")
        self.dmap_grouped_embeddings_ = pd.DataFrame(A, index=X_grouped.index)
        self.dmap_grouped_embeddings_.columns = self.dmap_grouped_embeddings_.columns.map(lambda i: f"{self.niche_prefix}{i+1}")
        self.dmap_grouped_embeddings_.index.name = self.class_type
        self.dmap_grouped_embeddings_.columns.name = self.feature_type

        # Complete
        # A = self.model_.transform(X)
        A = self._parallel_transform(X, self.model_, msg=f"[Parallel Transformation] Initial data")
        self.dmap_embeddings_ = pd.DataFrame(A, index=X.index)
        self.dmap_embeddings_.columns = self.dmap_embeddings_.columns.map(lambda i: f"{self.niche_prefix}{i+1}")
        self.dmap_embeddings_.index.name = self.observation_type
        self.dmap_embeddings_.columns.name = self.feature_type

        # Score
        self.score_ = silhouette_score(self.dmap_embeddings_.values, y.values, metric=self.scoring_distance_metric, sample_size=None, random_state=None)
        self.is_fitted = True

        return self
    


    def _process_row(self, model, row):
        """Helper function to apply model.transform to a single row"""
        return model.transform(row.reshape(1, -1))

    def _parallel_transform(self, X, model, msg=None):
        """Parallelizes the transformation using joblib"""
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")
            output = Parallel(n_jobs=self.n_jobs, prefer=self.parallel_type)(
                delayed(self._process_row)(model, row.values) for id, row in tqdm(X.iterrows(), desc=msg, total=X.shape[0], position=0, leave=True)
            )
            return np.vstack(output)


    # Example usage:
    # df_transformed = parallel_transform(X_test, model, n_jobs=8)


    #     def fit_transform(
    #         self,A = self._parallel_transform(X_grouped, self.model, m

    #         return self.embeddings_

    # def visualize(pacmap)

In [6]:
y = genome_to_clusterani.value_counts()[lambda x: x > 20]
clusters = y.index[np.random.RandomState(0).choice(np.arange(y.size), size=100)]

y_test = genome_to_clusterani[genome_to_clusterani.map(lambda x: x in clusters)]
X_test = X_genomic_traits.loc[y_test.index]
X_test.shape

(3707, 2124)

In [9]:
%%time
mns_test = NicheSpace(
    observation_type="genome",
    feature_type="ko",
    class_type="genome_cluster",
    name="test",
    n_trials=50,
    n_jobs=-1,
    n_concurrent_trials=1,
    parallel_type="threads",
    # verbose=,
)
reset_logger(mns_test.logger)
with warnings.catch_warnings():
    warnings.filterwarnings("ignore")
    mns_test.fit(X_test, y_test)


Summing rows by groups: 100%|██████████| 2124/2124 [00:00<00:00, 4369.52 column/s]

2025-02-06 08:09:50,027 - test - INFO - [Start] Filtering observations and classes below feature threshold: 100





2025-02-06 08:09:50,177 - test - INFO - [Dropping] N = 1 classes
2025-02-06 08:09:50,178 - test - INFO - [Dropping] N = 56 observations
2025-02-06 08:09:50,179 - test - INFO - [Remaining] N = 94 classes
2025-02-06 08:09:50,179 - test - INFO - [Remaining] N = 3651 observations
2025-02-06 08:09:50,179 - test - INFO - [Remaining] N = 2124 features
2025-02-06 08:09:50,180 - test - INFO - [End] Filtering observations and classes below feature threshold
2025-02-06 08:09:50,183 - test - INFO - [Start] Processing distance matrix
2025-02-06 08:09:50,901 - test - INFO - [End] Processing distance matrix
2025-02-06 08:09:50,904 - test - INFO - [Begin] Hyperparameter Tuning


[I 2025-02-06 08:09:51,222] Using an existing study with name 'test' instead of creating a new one.
[I 2025-02-06 08:09:51,937] Trial 1 finished with value: -1.0 and parameters: {'n_neighbors': 279, 'n_eigenpairs': 75, 'alpha': 0.6027633760716439}. Best is trial 0 with value: -1.0.
[I 2025-02-06 08:09:53,771] Trial 2 finished with value: -1.0 and parameters: {'n_neighbors': 277, 'n_eigenpairs': 48, 'alpha': 0.6458941130666561}. Best is trial 0 with value: -1.0.
[I 2025-02-06 08:09:55,458] Trial 3 finished with value: -1.0 and parameters: {'n_neighbors': 224, 'n_eigenpairs': 91, 'alpha': 0.9636627605010293}. Best is trial 0 with value: -1.0.
[I 2025-02-06 08:09:56,982] Trial 4 finished with value: -1.0 and parameters: {'n_neighbors': 198, 'n_eigenpairs': 82, 'alpha': 0.5288949197529045}. Best is trial 0 with value: -1.0.
[I 2025-02-06 08:09:58,342] Trial 5 finished with value: -1.0 and parameters: {'n_neighbors': 288, 'n_eigenpairs': 94, 'alpha': 0.07103605819788694}. Best is trial 0 wi

ValueError: Input X contains NaN.

In [24]:
# write_pickle(mns_test, "test.pkl")
mns_test2 = read_pickle("test.pkl")
mns_test2

<__main__.NicheSpace at 0x7f87b414fe20>

In [7]:
%%time
mns_test = NicheSpace(
    observation_type="genome",
    feature_type="ko",
    class_type="genome_cluster",
    name="test",
    n_trials=3,
    n_jobs=48,
    n_concurrent_trials=1,
    parallel_type="threads",
    verbose=0,
)
mns_test.fit(X_test, y_test)

Summing rows by groups: 100%|██████████| 2124/2124 [00:00<00:00, 16001.09 column/s]
[I 2025-02-05 23:52:39,864] A new study created in memory with name: test
[I 2025-02-05 23:52:39,867] Trial 0 finished with value: -1.0 and parameters: {'n_neighbors': 279, 'n_eigenpairs': 75, 'alpha': 0.6027633760716439}. Best is trial 0 with value: -1.0.
[I 2025-02-05 23:52:40,111] Trial 1 finished with value: -1.0 and parameters: {'n_neighbors': 277, 'n_eigenpairs': 48, 'alpha': 0.6458941130666561}. Best is trial 0 with value: -1.0.
[I 2025-02-05 23:52:40,261] Trial 2 finished with value: -1.0 and parameters: {'n_neighbors': 224, 'n_eigenpairs': 91, 'alpha': 0.9636627605010293}. Best is trial 0 with value: -1.0.


ValueError: Found array with 70 sample(s) (shape=(70, 2124)) while a minimum of 75 is required by DiffusionMaps.

In [7]:
%%time
mns_test = NicheSpace(
    observation_type="genome",
    feature_type="ko",
    class_type="genome_cluster",
    name="test",
    n_trials=3,
    n_jobs=48,
    n_concurrent_trials=1,
    parallel_type="processes",
    verbose=0,
)
mns_test.fit(X_test, y_test)

Summing rows by groups: 100%|██████████| 2124/2124 [00:00<00:00, 18838.29 column/s]
[I 2025-02-05 23:18:48,139] A new study created in memory with name: test
[Trial 0] [Parallel Transformation] Initial data:   5%|▍         | 48/998 [00:02<00:40, 23.44it/s][W 2025-02-05 23:19:03,587] Trial 0 failed with parameters: {'n_neighbors': 279, 'n_eigenpairs': 75, 'alpha': 0.6027633760716439} because of the following error: PicklingError('Could not pickle the task to send it to the workers.').
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ec2-user/SageMaker/environments/mns/lib/python3.9/site-packages/joblib/externals/loky/backend/queues.py", line 159, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/home/ec2-user/SageMaker/environments/mns/lib/python3.9/site-packages/joblib/externals/loky/backend/reduction.py", line 215, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/home/ec2-user/SageMaker/en

PicklingError: Could not pickle the task to send it to the workers.

In [8]:
%%time
mns_test = NicheSpace(
    observation_type="genome",
    feature_type="ko",
    class_type="genome_cluster",
    name="test",
    n_trials=3,
    n_jobs=48*1.5,
    n_concurrent_trials=1,
    parallel_type="processes",
    verbose=0,
)
mns_test.fit(X_test, y_test)


Summing rows by groups:   0%|          | 0/2124 [00:00<?, ? column/s][A
Summing rows by groups: 100%|██████████| 2124/2124 [00:00<00:00, 19458.83 column/s][A


TypeError: 'float' object cannot be interpreted as an integer

In [13]:
X_grouped = fast_groupby_sum(X_test, y_test).astype(bool)

distance_matrix = pairwise_distances(X_grouped.values, metric="jaccard", n_jobs=-1)

# Parameters
n_neighbors = 224 #params["n_neighbors"]
n_eigenpairs = 91 #params["n_eigenpairs"]
alpha = 0.9636627605010293 #params["alpha"]

# Build kernel
# if self.verbose > 1: print(f"[Trial {trial.number}] Creating kernel: n_neighbors={n_neighbors}, n_eigenpairs={n_eigenpairs}, alpha={alpha}", file=sys.stderr)
kernel = KNeighborsKernel( 
    metric="jaccard", 
    n_neighbors=n_neighbors, 
    distance_matrix=distance_matrix, 
    copy_distance_matrix=False,
)

# Calculate Diffusion Maps using KNeighbors

model = DiffusionMaps(kernel=kernel, n_eigenpairs=n_eigenpairs, alpha=alpha)
dmap_X_grouped = model.fit_transform(X_grouped)


Summing rows by groups: 100%|██████████| 2124/2124 [00:00<00:00, 99560.81 column/s]


In [33]:
import warnings



    
def f():
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")
        output = list()
        for id, row in tqdm(X_test.iterrows(), total=X_test.shape[0]):
            row = row.values.reshape(1,-1)
            output.append(model.transform(row))

        return pd.DataFrame(np.vstack(output), index=X_test.index)
        
# # %timeit dmap_X = model.transform(X_test)
# f()

# %timeit model.transform(X_test.values)
# %timeit f()
f()


100%|██████████| 1000/1000 [00:07<00:00, 140.38it/s]


Unnamed: 0_level_0,0,1,2,3,4,5,6,7,8,9,...,81,82,83,84,85,86,87,88,89,90
id_genome,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
fusarium_verticillioides_gca_003316975,0.049752,-0.054707,0.006878,-0.023523,0.090399,-0.092728,-0.043287,-0.047209,0.033100,0.052264,...,-0.071029,0.012871,0.081674,0.061861,-0.046340,-0.075263,0.001921,-0.134172,-0.051013,0.014474
fusarium_oxysporum_nrrl_32931_gca_000271745,0.049752,-0.054734,0.007085,-0.009211,0.071841,-0.093091,-0.025714,-0.057438,-0.000467,0.046115,...,0.090255,0.146425,0.037952,-0.088785,0.005634,-0.070943,-0.174516,-0.029518,-0.161928,0.032184
thermothelomyces_thermophilus_atcc_42464_gca_000226095,0.049752,-0.054843,0.006351,-0.023803,0.067951,-0.075482,-0.041262,-0.057784,-0.016519,0.016696,...,-0.062754,0.120516,0.112300,0.020578,-0.037880,0.026246,0.032648,-0.101277,-0.037033,0.009976
fusarium_oxysporum_f_sp_cubense_gca_007994515,0.049752,-0.054755,0.006789,-0.021073,0.084491,-0.087831,-0.043509,-0.051007,0.018231,0.035294,...,-0.022874,0.009583,-0.018063,0.083173,-0.086636,-0.008317,-0.046026,-0.121648,0.090355,-0.069778
fusarium_coffeatum_gca_003316985,0.049752,-0.054600,0.006848,-0.016711,0.092110,-0.094159,-0.035044,-0.059960,0.017959,0.060754,...,0.036994,0.056290,0.088017,0.040396,-0.083534,-0.098031,-0.077144,0.016278,-0.170502,0.055225
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
Kermadec_Trench_FDZ130_5861_4-6cm_bin.31,0.049752,0.034613,0.092420,0.032911,-0.059148,-0.054667,0.060806,0.020360,0.054410,0.039389,...,0.150791,0.097830,0.170205,0.050420,-0.176387,-0.015720,-0.048847,0.225047,0.099457,0.212960
Kermadec_Trench_FDZ130_5861_8-10cm_bin.58,0.049752,0.036806,0.094458,0.075366,0.020698,-0.042706,-0.004093,0.037368,0.039943,-0.011360,...,0.054862,-0.006464,-0.024793,-0.026550,0.015904,-0.152350,-0.047993,-0.244356,-0.002085,0.097821
Kermadec_Trench_FDZ127_7600_16-18cm_bin.34,0.049752,0.038766,0.092018,0.062179,0.060315,0.008210,-0.010332,0.013704,0.019641,-0.029957,...,0.002307,0.163637,-0.051293,0.029739,0.035771,-0.055522,-0.085417,-0.123376,0.121737,0.028578
Diamantina_Trench_FDZ171_5321.5_0-2cm_bin.38,0.049752,0.034945,0.092670,0.082812,0.014776,-0.069765,0.017049,0.012057,-0.003911,-0.065883,...,-0.024191,0.217812,0.083435,0.067256,-0.028768,0.018097,-0.057788,-0.083379,0.172413,-0.064357


In [31]:
import warnings

with warnings.catch_warnings():
    warnings.filterwarnings('ignore')
    output = list()
    for id, row in tqdm(X_test.iterrows(), total=X_test.shape[0]):
        row = row.values.reshape(1,-1)
        output.append(model.transform(row))
        
    df = pd.DataFrame(np.vstack(output), index=X_test.index)


100%|██████████| 1000/1000 [00:07<00:00, 140.23it/s]


array([[ 0.04975186, -0.05470668,  0.00687802, ..., -0.13417182,
        -0.05101278,  0.01447413],
       [ 0.04975186, -0.05473446,  0.00708505, ..., -0.02951793,
        -0.16192849,  0.03218404],
       [ 0.04975186, -0.054843  ,  0.00635086, ..., -0.10127658,
        -0.03703257,  0.00997642],
       ...,
       [ 0.04975186,  0.03876587,  0.09201765, ..., -0.12337625,
         0.12173668,  0.02857764],
       [ 0.04975186,  0.03494509,  0.09266998, ..., -0.08337854,
         0.1724128 , -0.06435694],
       [ 0.04975186,  0.04037729,  0.08273706, ...,  0.0036449 ,
         0.07484179, -0.08343118]])