In [1]:
from pathlib import Path
import logging
import sys

import matplotlib
import scanpy as sc
import numpy as np

#from ..utils.const import FIGURE_PATH, OUTPUT_PATH, SEED_DICT

from pathlib import Path
import sys
#repo_root = Path("/home/pschaefer/sds-hd/sd22b002/projects/ParTIpy_paper") # beast
repo_root = Path("/mnt/sds-hd/sd22b002/projects/ParTIpy_paper") # helix
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))
sys.modules.pop("code", None)
from code.utils.data_utils import load_ms_data
from code.utils.const import FIGURE_PATH, OUTPUT_PATH, SEED_DICT
def get_minimal_value_key(dict_input):
    return int(
        np.array(list(dict_input.keys()))[
            np.argmin(np.array(list(dict_input.values())))
        ]
    )

project_path = Path(".")

## set up backend for matplotlib: https://matplotlib.org/stable/users/explain/figure/backends.html
matplotlib.use("Agg")

## set up output directory
figure_dir = Path(FIGURE_PATH) / "ms_bench"
figure_dir.mkdir(exist_ok=True, parents=True)

output_dir = Path(OUTPUT_PATH) / "ms_bench"
output_dir.mkdir(exist_ok=True, parents=True)


def setup_logger(log_path, level=logging.INFO):
    logger = logging.getLogger("k562_memory_bench")
    logger.setLevel(level)
    logger.propagate = False

    if logger.handlers:
        return logger

    formatter = logging.Formatter(
        "%(asctime)s %(levelname)s %(name)s: %(message)s"
    )

    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)

    file_handler = logging.FileHandler(log_path)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

    return logger


logger = setup_logger(output_dir / "k562_memory_bench.log")

logger.info("Starting K562 memory benchmark")
logger.info(f"Output dir: {output_dir.resolve()}")
logger.info(f"Figure dir: {figure_dir.resolve()}")

# read the data
data_path = (
    project_path
    / ".."
    / "gpp_bench"
    / "data"
    / "prc"
    / "replogle2022_k562_gwps"
    / "replogle2022_k562_gwps_adata.h5ad"
)
# check if data path exists:
assert data_path.exists()
logger.info(f"Reading data: {data_path}")
adata = sc.read_h5ad(data_path)
logger.info(f"Loaded data: n_obs={adata.n_obs} n_vars={adata.n_vars}")


  from anndata import __version__ as anndata_version
  if Version(anndata.__version__) >= Version("0.11.0rc2"):
  if Version(anndata.__version__) >= Version("0.11.0rc2"):


2026-01-13 16:23:01,334 INFO k562_memory_bench: Starting K562 memory benchmark
2026-01-13 16:23:01,334 INFO k562_memory_bench: Output dir: /mnt/sds-hd/sd22b002/projects/ParTIpy_paper/output/ms_bench
2026-01-13 16:23:01,335 INFO k562_memory_bench: Figure dir: /mnt/sds-hd/sd22b002/projects/ParTIpy_paper/figures/ms_bench
2026-01-13 16:23:01,335 INFO k562_memory_bench: Reading data: ../gpp_bench/data/prc/replogle2022_k562_gwps/replogle2022_k562_gwps_adata.h5ad
2026-01-13 16:23:39,498 INFO k562_memory_bench: Loaded data: n_obs=1814089 n_vars=8248


In [4]:
import os
import tarfile
import zipfile
import hashlib
from pathlib import Path
import logging

import requests
import scanpy as sc
import numpy as np
import pandas as pd

import anndata as ad
from scipy.sparse import issparse

logger = logging.getLogger(__name__)




def guess_is_lognorm(
    adata: ad.AnnData,
    epsilon: float = 1e-3,
    max_threshold: float = 15.0,
    validate: bool = True,
) -> bool:
    """
    Guess whether `adata.X` contains raw integer counts (False) or log1p-normalized values (True).

    Heuristic:
      1) Detect fractional entries: frac(x) > epsilon.
      2) If none -> assume raw counts.
      3) If present -> require min >= 0; optionally enforce max < max_threshold.

    Raises:
      ValueError if `adata.X` is None, if min < 0, or if validate=True and max >= max_threshold.
    """
    X = adata.X
    if X is None:
        raise ValueError("adata.X is None")

    # Fractional-value check (avoid densifying sparse)
    if issparse(X):
        frac = np.modf(X.data)[0]
        has_decimals = bool(np.any(frac > epsilon))
    else:
        X_arr = np.asarray(X)
        frac = np.modf(X_arr)[0]
        has_decimals = bool(np.any(frac > epsilon))

    if not has_decimals:
        logger.info("Data appears to be integer counts (no decimal values detected)")
        return False

    # Range check
    if issparse(X):
        min_val = float(X.min())
        max_val = float(X.max())
    else:
        # Reuse X_arr if already materialized above
        X_arr = np.asarray(X)
        min_val = float(np.min(X_arr))
        max_val = float(np.max(X_arr))

    if min_val < 0:
        raise ValueError(
            f"Invalid scale: min value {min_val:.2f} is negative. "
            "Both natural counts and log1p-normalized data must have all values >= 0."
        )

    if validate and max_val >= max_threshold:
        raise ValueError(
            f"Invalid scale: max value {max_val:.2f} exceeds log1p threshold of {max_threshold}. "
            f"Expected log1p normalized values in range [0, {max_threshold}), but found values suggesting "
            "raw counts or incorrect normalization. Values above the threshold can indicate mixed scales."
        )

    logger.info(
        "Data appears to be log1p normalized (decimals detected, range [%.2f, %.2f])",
        min_val,
        max_val,
    )
    return True


In [5]:
guess_is_lognorm(adata)

False