In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pathlib import Path
import requests
from tqdm import tqdm


def download_if_missing(url: str, dest: Path | str, chunk_size: int = 8192) -> Path:
    """
    Download *url* to *dest* unless the file is already present.

    Parameters
    ----------
    url : str
        HTTP or HTTPS address of the remote file.
    dest : pathlib.Path or str
        Target filename (absolute or relative).  Parent directories will be
        created automatically.
    chunk_size : int, optional
        Number of bytes to read per network iteration.  Defaults to 8 KiB.

    Returns
    -------
    pathlib.Path
        Path to the existing or newly downloaded file.

    Notes
    -----
    * Uses :pymeth:`pathlib.Path.exists` to avoid unnecessary downloads.  [oai_citation:4‡PyTutorial](https://pytutorial.com/check-file-existence-with-python-pathlibexists/?utm_source=chatgpt.com)
    * Streams the response with ``requests.get(..., stream=True)`` to handle
      large weights efficiently.  [oai_citation:5‡Real Python](https://realpython.com/python-download-file-from-url/?utm_source=chatgpt.com)
    * Displays a live progress bar powered by *tqdm*.  [oai_citation:6‡proxiesapi.com](https://proxiesapi.com/articles/downloading-binary-files-with-python-requests?utm_source=chatgpt.com)

    Examples
    --------
    >>> from pathlib import Path
    >>> ckpt = download_if_missing(
    ...     "https://huggingface.co/epigen/cellwhisperer/resolve/main/cellwhisperer_clip_v1.ckpt",
    ...     Path("~/models/cellwhisperer_clip_v1.ckpt"))
    >>> ckpt
    PosixPath('/home/you/models/cellwhisperer_clip_v1.ckpt')
    """
    dest = Path(dest).expanduser().resolve()
    if dest.exists():
        return dest  # nothing to do

    dest.parent.mkdir(parents=True, exist_ok=True)

    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        total = int(r.headers.get("Content-Length", 0))
        with (
            tqdm(total=total, unit="B", unit_scale=True, desc=dest.name) as bar,
            open(dest, "wb") as f,
        ):
            for chunk in r.iter_content(chunk_size):
                if chunk:  # filter out keep-alive chunks
                    f.write(chunk)
                    bar.update(len(chunk))

    return dest

In [13]:
CKPT = "/Users/mengerj/repos/adata_hf_datasets/notebooks/cellwhisperer_jointemb_v1.ckpt"
URL = "https://medical-epigenomics.org/papers/schaefer2024/data/models/cellwhisperer_clip_v1.ckpt"
download_if_missing(URL, CKPT)

PosixPath('/Users/mengerj/repos/adata_hf_datasets/notebooks/cellwhisperer_jointemb_v1.ckpt')

In [14]:
import anndata

adata_path = "HIHA_pp.h5ad"
adata = anndata.read_h5ad(adata_path)
adata.X = adata.layers["counts"]

In [22]:
from adata_hf_datasets import InitialEmbedder

# cw_model_path: directory or file passed to GeneformerModel.from_pretrained(...)
ie = InitialEmbedder(
    method="cw-geneformer",
    embedding_dim=512,
    cw_model_path=CKPT,
    # Optional:
    # processor_kwargs={"nproc": 6, "emb_label": ["sample_name"]},
    # model_config={"emb_mode": "cell", "emb_layer": -1, "forward_batch_size": 16},
)

In [23]:
ie.prepare(adata)  # loads processor and CW Geneformer model
X = ie.embed(
    adata=adata,
    obsm_key="X_cw_geneformer",
    batch_size=16,  # optional: overrides model config forward_batch_size
    chunk_size=512,  # forwarded to processor (tokenization)
    target_sum=10_000,  # forwarded to processor
)

IndexError: index out of range in self