In [1]:
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

True

In [2]:
from htcdaskgateway import HTCGateway
from dask_gateway.auth import BasicAuth
import os
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

os.environ['CONDOR_BIN_DIR'] = "/home/bengal1/.conda/envs/omics/bin"
gateway = HTCGateway(address="https://dask.software-dev.ncsa.illinois.edu",
                     proxy_address=8786,
                     auth = BasicAuth(
                         username=None, 
                         password=os.environ['DASK_PASSWORD'])
                    )
cluster = gateway.new_cluster(image="ncsa/cell-ranger-arc:2.0.2", 
                              cpus=12, 
                              memory="64GB",
                              container_image="/u/bengal1/condor/cell-ranger.sif")
cluster.scale(1)
client = cluster.get_client()

client


INFO:htcdaskgateway.HTCGateway: Creating HTCGatewayCluster 
INFO:htcdaskgateway.GatewayCluster: Scaling: 1 HTCondor workers
INFO:htcdaskgateway.GatewayCluster: Sandbox : /u/bengal1/htcdask/dask.a26241c9c591444f8e7f229ca089ffab
INFO:htcdaskgateway.GatewayCluster: Submitting HTCondor job(s) for 1 workers with command: . ~/.profile && /home/bengal1/.conda/envs/omics/bin/condor_submit htcdask_submitfile.jdl | grep -oP '(?<=cluster )[^ ]*'
INFO:htcdaskgateway.GatewayCluster: Success! submitted HTCondor jobs to htc-login1.campuscluster.illinois.edu with  ClusterId 1908


0,1
Connection method: Cluster object,Cluster type: htcdaskgateway.HTCGatewayCluster
Dashboard: https://dask.software-dev.ncsa.illinois.edu/clusters/dask.a26241c9c591444f8e7f229ca089ffab/status,


In [None]:
cluster.close()

In [7]:
from typing import Tuple, Optional, List, Dict, Any
import subprocess
import os
import sys
from pathlib import Path

def run_cellranger_sitecheck(timeout: Optional[int] = 300) -> Tuple[str, str, int]:
    try:
        # Run the cellranger sitecheck command
        result = subprocess.run(
            ['cellranger', 'sitecheck'],
            capture_output=True,
            text=True,
            timeout=timeout,
            check=False  # Don't raise exception on non-zero return code
        )
        
        return result.stdout, result.stderr, result.returncode
        
    except subprocess.TimeoutExpired as e:
        logging.error(f"Command timed out after {timeout} seconds")
        raise e
    except FileNotFoundError:
        error_msg = "cellranger command not found. Please ensure Cell Ranger is installed and in your PATH."
        logging.error(error_msg)
        raise FileNotFoundError(error_msg)
    except Exception as e:
        logging.error(f"Error running cellranger sitecheck: {e}")
        raise e




In [None]:
future = client.submit(run_cellranger_sitecheck)
stdout, stderr, code = future.result()  # Get the result
print(stdout) 


In [6]:
from typing import Tuple, Optional, List, Dict, Any
from pathlib import Path
import subprocess
import csv

def run_cellranger_count(
    id: str,
    transcriptome: str,
    fastqs: Optional[str] = None,
    sample: Optional[str] = None,
    include_introns: bool = False,
    no_bam: bool = False,
    nosecondary: bool = False,
    no_libraries: bool = False,
    no_target_umi_filter: bool = False,
    dry: bool = False,
    disable_ui: bool = False,
    noexit: bool = False,
    nopreflight: bool = False,
    feature_ref: Optional[str] = None,
    target_panel: Optional[str] = None,
    libraries: Optional[List[Dict[str, str]]] = None,
    force_cells: Optional[int] = None,
    expect_cells: Optional[int] = None,
    chemistry: Optional[str] = None,
    r1_length: Optional[int] = None,
    r2_length: Optional[int] = None,
    working_dir: Optional[str] = None,
    capture_output: bool = True,
    verbose: bool = True
) -> Dict[str, Any]:
    """
    Run cellranger count command to process single-cell RNA-seq data.
    
    Args:
        id (str): Sample ID/name for the analysis run (pipeline instance name)
        fastqs (str): Path to directory containing FASTQ files
        transcriptome (str): Path to reference transcriptome directory
        sample (str, optional): Sample name to process (if multiple samples in FASTQ dir)
        include_introns (bool): Include intronic reads in count
        no_bam (bool): Do not generate a BAM file
        nosecondary (bool): Disable secondary analysis (e.g. clustering)
        no_libraries (bool): Proceed without Feature Barcode libraries
        no_target_umi_filter (bool): Turn off target UMI filtering
        dry (bool): Generate pipeline invocation file without executing
        disable_ui (bool): Do not serve the web UI
        noexit (bool): Keep web UI running after completion
        nopreflight (bool): Skip preflight checks
        feature_ref (str, optional): Path to feature reference CSV file
        target_panel (str, optional): Path to target panel CSV file
        libraries (str, optional): Path to libraries CSV file
        force_cells (int, optional): Force number of cells
        expect_cells (int, optional): Expected number of cells
        chemistry (str, optional): Chemistry version
        r1_length (int, optional): R1 read length
        r2_length (int, optional): R2 read length
        working_dir (str, optional): Working directory to run command in
        capture_output (bool): Whether to capture command output
        verbose (bool): Print command and progress information
    
    Returns:
        Dict containing:
            - success (bool): Whether command completed successfully
            - return_code (int): Process return code
            - stdout (str): Standard output (if captured)
            - stderr (str): Standard error (if captured)
            - command (str): The executed command
            - output_dir (str): Path to output directory
    
    Raises:
        ValueError: If required parameters are invalid
        FileNotFoundError: If required files/directories don't exist
    """
    
    # Validate required parameters
    if not id or len(id) >= 64:
        raise ValueError("ID must be a non-empty string less than 64 characters")
    
    if not all(c.isalnum() or c in ['_', '-'] for c in id):
        raise ValueError("ID can only contain alphanumeric characters, underscores, and dashes")

    if (fastqs or sample) and libraries:
        raise ValueError("""
            When using --libraries, --fastqs and --sample must not be used.
            This argument should not be used when performing gene expression-only analysis; 
            use --fastqs instead.
        """)
    
    # Validate paths
    transcriptome_path = Path(transcriptome)
    if not transcriptome_path.exists():
        raise FileNotFoundError(f"Transcriptome directory not found: {transcriptome}")

    # Set working directory
    if working_dir:
        work_path = Path(working_dir)
        if not work_path.exists():
            work_path.mkdir(parents=True, exist_ok=True)
        cwd = str(work_path.absolute())
    else:
        cwd = os.getcwd()

    # Build command
    cmd = ["cellranger-arc", "count"]
    
    # Required arguments
    cmd.extend(["--id", id])
    cmd.extend(["--reference", str(transcriptome_path.absolute())])
    cmd.extend(["--localmem=64", "--localcores=12"])
    
    # Optional arguments
    if sample:
        cmd.extend(["--sample", sample])

    if fastqs:
        fastqs_path = Path(fastqs)
        if not fastqs_path.exists():
            raise FileNotFoundError(f"FASTQ directory not found: {fastqs}")
        cmd.extend(["--fastqs", str(fastqs_path.absolute())])

    if feature_ref:
        feature_ref_path = Path(feature_ref)
        if not feature_ref_path.exists():
            raise FileNotFoundError(f"Feature reference file not found: {feature_ref}")
        cmd.extend(["--feature-ref", str(feature_ref_path.absolute())])
    
    if target_panel:
        target_panel_path = Path(target_panel)
        if not target_panel_path.exists():
            raise FileNotFoundError(f"Target panel file not found: {target_panel}")
        cmd.extend(["--target-panel", str(target_panel_path.absolute())])
    
    if libraries:
        library_path = Path(cwd, f'{id}-library.csv')
        with open(library_path, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = ['fastqs', 'sample', 'library_type']  # Custom order
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            
            writer.writeheader()
            writer.writerows(libraries)

        cmd.extend(["--libraries", str(library_path.absolute())])
    
    if force_cells is not None:
        cmd.extend(["--force-cells", str(force_cells)])
    
    if expect_cells is not None:
        cmd.extend(["--expect-cells", str(expect_cells)])
    
    if chemistry:
        cmd.extend(["--chemistry", chemistry])
    
    if r1_length is not None:
        cmd.extend(["--r1-length", str(r1_length)])
    
    if r2_length is not None:
        cmd.extend(["--r2-length", str(r2_length)])
    
    # Boolean flags
    if include_introns:
        cmd.append("--include-introns")
    
    if no_bam:
        cmd.append("--no-bam")
    
    if nosecondary:
        cmd.append("--nosecondary")
    
    if no_libraries:
        cmd.append("--no-libraries")
    
    if no_target_umi_filter:
        cmd.append("--no-target-umi-filter")
    
    if dry:
        cmd.append("--dry")
    
    if disable_ui:
        cmd.append("--disable-ui")
    
    if noexit:
        cmd.append("--noexit")
    
    if nopreflight:
        cmd.append("--nopreflight")
        
    # Determine output directory path
    output_dir = os.path.join(cwd, id)
    
    if verbose:
        print(f"Running cellranger count in: {cwd}")
        print(f"Command: {' '.join(cmd)}")
        print(f"Output will be in: {output_dir}")
    
    # Run command
    try:
        if capture_output:
            result = subprocess.run(
                cmd,
                cwd=cwd,
                capture_output=True,
                text=True,
                check=False
            )
            
            if verbose and result.stdout:
                print("STDOUT:")
                print(result.stdout)
            
            if verbose and result.stderr:
                print("STDERR:")
                print(result.stderr)
            
            return {
                "success": result.returncode == 0,
                "return_code": result.returncode,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "command": " ".join(cmd),
                "output_dir": output_dir
            }
        else:
            # Stream output in real-time
            process = subprocess.Popen(
                cmd,
                cwd=cwd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
                bufsize=1,
                universal_newlines=True
            )
            
            output_lines = []
            for line in process.stdout:
                if verbose:
                    print(line.rstrip())
                output_lines.append(line.rstrip())
            
            return_code = process.wait()
            
            return {
                "success": return_code == 0,
                "return_code": return_code,
                "stdout": "\n".join(output_lines),
                "stderr": "",
                "command": " ".join(cmd),
                "output_dir": output_dir
            }
    
    except FileNotFoundError:
        raise FileNotFoundError(
            "cellranger command not found. Please ensure Cell Ranger is installed and in your PATH."
        )
    except Exception as e:
        return {
            "success": False,
            "return_code": -1,
            "stdout": "",
            "stderr": str(e),
            "command": " ".join(cmd),
            "output_dir": output_dir
        }


In [None]:
future = client.submit(run_cellranger_count, verbose=True,
                       id="M2XM_210902_221-R_A01",
                       working_dir="/scratch/bengal1/cellranger",
                       libraries=[
                           {"fastqs": "/projects/illinois/eng/bioe/mjjang/s3bucket/biccn/grant/u19_zeng/zeng/multimodal/sncell/10xMultiome_RNAseq/mouse/raw/SQ_MX2023-4_S01_L003",
                            "sample": "SQ_MX2023-4",
                            "library_type": "Gene Expression"
                           },
                           {"fastqs": "/projects/illinois/eng/bioe/mjjang/s3bucket/biccn/grant/u19_zeng/zeng/multimodal/sncell/10xMultiome_ATACseq/mouse/raw/SQ_AT0029-6_S01_L003",
                            "sample": "SQ_AT0029-6",
                            "library_type": "Chromatin Accessibility"
                           }
                       ],

                       transcriptome="/projects/illinois/eng/bioe/mjjang/s3bucket/transcriptome/refdata-cellranger-arc-GRCm39-2024-A"
                      )
results = future.result()  # Get the result
print(results) 


In [14]:
import psutil

def get_available_memory():
    """
    Reports the amount of available system memory.

    Returns:
        float: The available memory in gigabytes (GB).
    """
    mem = psutil.virtual_memory()
    available_gb = mem.available / (1024 ** 3)  # Convert bytes to gigabytes
    return available_gb


In [15]:
future = client.submit(get_available_memory)
results = future.result()  # Get the result
print(results) 


123.74040603637695
