In [None]:
%load_ext autoreload
%autoreload 2

# download_ref

> Download reference genomes, chain files.

In [None]:
#| default_exp download_ref

In [None]:
#| hide
from nbdev.showdoc import *

  import pkg_resources,importlib


# Download reference genomes

In [None]:
#| export

from bs_cpg.setup import *
from pathlib import Path
import os, json
import requests
from tqdm.auto import tqdm
import pysam
import subprocess



In [None]:
#| export 
def download_file(url: str, filename: str, sub_dir: Path = None, verbose: bool = True):
    """
    A general utility to download a file, with optional status messages and a progress bar.

    Args:
        url (str): The URL of the file to download.
        filename (str): The name for the saved file.
        sub_dir (Path, optional): A subdirectory under the main data path. Defaults to None.
        verbose (bool, optional): If True, prints status messages and shows a progress bar. 
                                  Defaults to True.
        
    Returns:
        Path: The full path to the downloaded file, or None on error.
    """
    base_dir = get_base_data_path()
    dest_dir = base_dir / sub_dir if sub_dir else base_dir
    dest_dir.mkdir(parents=True, exist_ok=True)
    dest_path = dest_dir / filename

    if dest_path.exists():
        if verbose:
            print(f"✅ File '{dest_path}' already exists. Skipping.")
        return dest_path

    if verbose:
        print(f"⬇️  Downloading '{filename}' from {url}...")
        
    try:
        response = requests.get(url, stream=True, timeout=15)
        response.raise_for_status()  # Raise an exception for bad status codes

        total_size = int(response.headers.get('content-length', 0))

        # The 'disable' parameter controls the tqdm progress bar
        with open(dest_path, 'wb') as f, tqdm(
            desc=filename,
            total=total_size,
            unit='iB',
            unit_scale=True,
            unit_divisor=1024,
            disable=not verbose
        ) as bar:
            for chunk in response.iter_content(chunk_size=8192):
                size = f.write(chunk)
                bar.update(size)

        if verbose:
            print(f"✅ Successfully downloaded to '{dest_path}'")
        return dest_path

    except requests.exceptions.RequestException as e:
        if verbose:
            print(f"❌ Error downloading file: {e}")
        if dest_path.exists():
            dest_path.unlink()  # Clean up partial file
        return None

In [None]:
def is_bgzipped(filepath: Path) -> bool:
    """
    Checks if a file is block-gzipped (BGZF) by reading its header.
    
    Args:
        filepath (Path): The path to the file to check.
        
    Returns:
        bool: True if the file is in BGZF format, False otherwise.
    """
    with open(filepath, 'rb') as f:
        header = f.read(14)

    # A BGZF file must:
    # 1. Be a valid GZIP file (starts with \x1f\x8b).
    # 2. Have the FEXTRA flag set in its header.
    # 3. Contain the BGZF subfield identifier ('BC') at byte 12.
    return (
        len(header) >= 14 and
        header.startswith(b'\x1f\x8b') and
        (header[3] & 0x04) != 0 and
        header[12:14] == b'BC'
    )
is_bgzipped(filepath="/mnt/idms/home/magyary/.bs-cpg/hg19ToHg38.over.chain.gz")

False

In [None]:
def convert_to_bgzip(input_path: Path, output_path: Path) -> bool:
    """
    Converts a standard gzip file to a bgzip file using command-line tools.
    This function replicates the command: `gunzip -c <input> | bgzip > <output>`.

    Args:
        input_path (Path): The path to the input gzip file.
        output_path (Path): The path for the output bgzip file.
        
    Returns:
        bool: True if conversion was successful, False otherwise.
    """
    try:
        # Open the output file handle BEFORE starting the processes
        f_out = open(output_path, 'wb')

        # Start the gunzip process, piping its stdout
        p1 = subprocess.Popen(['gunzip', '-c', str(input_path)], stdout=subprocess.PIPE)
        
        # Start the bgzip process, taking stdin from p1 and writing to our file handle
        p2 = subprocess.Popen(['bgzip'], stdin=p1.stdout, stdout=f_out)

        # This allows p1 to receive a SIGPIPE if p2 exits before p1 is done.
        p1.stdout.close()
        
        # Wait for the bgzip process to complete
        return_code = p2.wait()

        # NOW that the process is finished, close the file handle
        f_out.close()

        if return_code != 0:
            print(f"❌ Error: bgzip process failed with exit code {return_code}")
            # Clean up the failed output file
            if output_path.exists(): output_path.unlink()
            return False
            
    except FileNotFoundError as e:
        print(f"❌ Error: '{e.filename}' not found. Is bgzip (part of htslib) installed and in your PATH?")
        return False
        
    return True

In [None]:
#| export 
def get_ref_genome(name: str, **kwargs):
    """
    Downloads a reference genome and ensures it is properly compressed with
    bgzip for use with pysam.
    """
    verbose = kwargs.get('verbose', True)
    final_filename = f"{name}.fa.bgz"
    data_dir = get_base_data_path()
    final_path = data_dir / final_filename

    if final_path.exists():
        if verbose: print(f"✅ Final file '{final_path}' already exists.")
        return str(final_path)

    gz_filename = f"{name}.fa.gz"
    gz_path = download_file(
        url=f"https://hgdownload.soe.ucsc.edu/goldenPath/{name}/bigZips/{gz_filename}",
        filename=gz_filename,
        **kwargs
    )

    if gz_path is None: return None

    if is_bgzipped(gz_path):
        if verbose: print(f"👍 Downloaded file is already bgzipped. Renaming...")
        gz_path.rename(final_path)
    else:
        if verbose: print(f"⚙️ Converting standard gzip to bgzip format using command-line tools...")
        # ---- THIS IS THE REPLACEMENT ----
        success = convert_to_bgzip(gz_path, final_path)
        if not success:
            print(f"❌ Conversion failed. Please check the errors above.")
            return None
        # -------------------------------
        gz_path.unlink()

    if verbose: print(f"✅ Successfully prepared '{final_path}'")
    return str(final_path)
#download_ref_genome('hg38')

In [None]:
get_ref_genome("hg38")

✅ Final file '/mnt/idms/home/magyary/.bs-cpg/hg38.fa.bgz' already exists.


'/mnt/idms/home/magyary/.bs-cpg/hg38.fa.bgz'

In [None]:
"hg38".capitalize()

'Hg38'

In [None]:
#| export 
def get_liftover_chain(genome_from: str, genome_to: str, **kwargs):
    """
    Download liftover chain file between genome versions from UCSC goldenPath liftOver.
    To get path to downloaded file use with `verbose = False`. 

    Args:
        genome_from (str): The original reference genome name (e.g., 'hg19', 'hg38', 'mm10').
        genome_to (str): The new reference genome name (e.g., 'hg19', 'hg38', 'mm10').
        **kwargs: Additional keyword arguments to be passed to download_file()
                  (e.g., verbose=False)
                  
    Returns:
        Path: The path to the downloaded file, or None if an error occurred.
    """
    genome_to_camel=genome_to.capitalize()
    file_name = f"{genome_from}To{genome_to_camel}.over.chain.gz"
    url = f"https://hgdownload.soe.ucsc.edu/goldenPath/{genome_from}/liftOver/{file_name}"

    return download_file(url=url, filename=file_name, **kwargs)
get_liftover_chain("hg19", "hg38", verbose = False)

Path('/mnt/idms/home/magyary/.bs-cpg/hg19ToHg38.over.chain.gz')

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()