# Filter SNPs Whose Minor Allele Count is Low

2023-11-15

This notebook is an example of how to filter SNPs outputed by cellsnp-lite in a post-hoc manner. Specifically, in the following part, we will filter SNPs whose minor allele count (here we mean AD) is low.

In [1]:
data_dir = "../../data/csp_mode1/"     # cellsnp dir
out_dir = "./subset_with_minAD_issue108"

In [2]:
# whether `--genotype` is specified when calling cellsnp-lite
# i.e., whether there is a file named "cellSNP.cells.vcf" or "cellSNP.cells.vcf.gz" in the cellsnp dir.
is_genotype = False

In [3]:
# whether the output VCF file is gzipped.
is_gzip = True

In [4]:
min_AD = 3    # minimal count of minor allele

In [5]:
import anndata as ad
import numpy as np
import pandas as pd
import pysam
import scipy as sp
from scipy import io
from scipy import sparse

In [6]:
print(ad.__version__)
print(np.__version__)
print(pd.__version__)
print(pysam.__version__)
print(sp.__version__)

0.9.2
1.22.4
2.0.3
0.16.0.1
1.8.1


In [7]:
import gzip
import os

## Load data

In [8]:
def load_vcf(vcf_file, comment_char = "#"):
    # load comment
    fp = None
    if vcf_file.endswith(".gz") or vcf_file.endswith(".GZ"):
        fp = gzip.open(vcf_file, "rt")
    else:
        fp = open(vcf_file, "r")
        
    comment = ""
    pre_line = None
    for line in fp:
        if not line or line[0] != comment_char:
            break
        pre_line = line
        comment += line
    
    fp.close()
    
    if not pre_line:
        raise IOError()
    assert len(pre_line) > 6
    assert pre_line[:6] == "#CHROM"
    
    # load content
    content = pd.read_csv(vcf_file, sep = "\t", header = None, comment = "#")
    content.columns = pre_line.strip()[1:].split("\t")
    
    return((comment, content))

In [9]:
def load_samples(sample_file):
    df = pd.read_csv(sample_file, header = None)
    df.columns = ["cell"]
    return(df)

In [10]:
def load_matrix(matrix_file):
    mtx = None
    try:
        mtx = sp.io.mmread(matrix_file)
    except:
        mtx = io.mmread(matrix_file)
    mtx = mtx.toarray()    # convert from sparse matrix to ndarray to support slicing.
    return(mtx)

In [11]:
def load_cellsnp_data(csp_dir, is_genotype = False, is_gzip = True):
    vcf_suffix = ".gz" if is_gzip else ""
    base_vcf_comment, base_vcf = load_vcf(
        vcf_file = os.path.join(csp_dir, "cellSNP.base.vcf" + vcf_suffix),
        comment_char = "#")
    cell_vcf, cell_vcf_comment = None, None
    if is_genotype:
        cell_vcf_comment, cell_vcf = load_vcf(
            vcf_file = os.path.join(csp_dir, "cellSNP.cells.vcf" + vcf_suffix),
            comment_char = "#")
    samples = load_samples(os.path.join(csp_dir, "cellSNP.samples.tsv"))
    AD_mtx = load_matrix(os.path.join(csp_dir, "cellSNP.tag.AD.mtx"))
    DP_mtx = load_matrix(os.path.join(csp_dir, "cellSNP.tag.DP.mtx"))
    OTH_mtx = load_matrix(os.path.join(csp_dir, "cellSNP.tag.OTH.mtx"))
    
    adata = ad.AnnData(
        X = AD_mtx, 
        obs = base_vcf, 
        var = samples)
    
    adata.uns["is_gzip"] = is_gzip
    adata.uns["is_genotype"] = is_genotype
    
    adata.uns["base_vcf_comment"] = base_vcf_comment
    
    if cell_vcf is None:
        adata.uns["cell_vcf_comment"] = None
    else:
        adata.obsm["cell_vcf"] = cell_vcf
        adata.uns["cell_vcf_comment"] = cell_vcf_comment
        
    adata.layers["DP_mtx"] = DP_mtx
    adata.layers["OTH_mtx"] = OTH_mtx
    
    return(adata)

Load the cellsnp output into an AnnData object *adata*:
- **adata.X** is the SNP x Cell AD mtx (in "numpy.ndarray" format)
- **adata.obs** is the annotation of the SNPs (a pandas DataFrame)
- **adata.var** is the annotation of the cells (a pandas DataFrame)
- **adata.uns** contains four keys: *is_gzip*, *is_genotype*, *base_vcf_comment* (the comment string of the file cellSNP.base.vcf[.gz]), and *cell_vcf_comment* (the comment string of the file cellSNP.cells.vcf[.gz]; None if this file does not exist).
- **adata.layers** contains two keys: *DP_mtx*, and *OTH_mtx*, both are in "numpy.ndarray" format.
- if file cellSNP.cells.vcf[.gz] exists, **adata.obsm["cell_vcf"]** will store the content of the file (in pandas DataFrame format).

In [12]:
adata = load_cellsnp_data(csp_dir = data_dir, is_genotype = is_genotype, is_gzip = is_gzip)
adata



AnnData object with n_obs × n_vars = 798 × 400
    obs: 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO'
    var: 'cell'
    uns: 'is_gzip', 'is_genotype', 'base_vcf_comment', 'cell_vcf_comment'
    layers: 'DP_mtx', 'OTH_mtx'

In [13]:
type(adata.layers["DP_mtx"])

numpy.ndarray

## Subset data

Here, as an example, we will filter SNPs whose AD is less than `min_AD`.

In [14]:
# aggregate AD of all cells
agg_AD = adata.X.sum(axis = 1)
agg_AD = np.squeeze(np.asarray(agg_AD))
agg_AD.shape

(798,)

In [15]:
# filter SNPs whose AD is less than `min_AD`.
adata_s = adata[agg_AD >= min_AD, ]
adata_s

View of AnnData object with n_obs × n_vars = 609 × 400
    obs: 'CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO'
    var: 'cell'
    uns: 'is_gzip', 'is_genotype', 'base_vcf_comment', 'cell_vcf_comment'
    layers: 'DP_mtx', 'OTH_mtx'

## Save data

In [16]:
def save_vcf(vcf_df, vcf_comment, vcf_file, is_gzip = True):
    df_file = vcf_file + ".df"
    vcf_df.to_csv(df_file, sep = "\t", header = False, index = False)
    
    fp = pysam.BGZFile(vcf_file, "w") if is_gzip else open(vcf_file, "w")
    fp.write(vcf_comment.encode())
    with open(df_file, "r") as df_fp:
        for line in df_fp:
            fp.write(line.encode())
    fp.close()
    
    os.remove(df_file)

In [17]:
def save_matrix(mtx, matrix_file):
    mtx = sparse.csr_matrix(mtx)   # convert from ndarray to sparse matrix to be fully compatible with .mtx format.
    io.mmwrite(matrix_file, mtx)

In [18]:
def save_samples(sample_df, sample_file):
    sample_df.to_csv(sample_file, header = False, index = False)

In [19]:
def save_cellsnp_data(adata, out_dir):
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)
        
    is_gzip = adata.uns["is_gzip"]
    is_genotype = adata.uns["is_genotype"]
    
    vcf_suffix = ".gz" if is_gzip else ""
    
    save_vcf(adata.obs, adata.uns["base_vcf_comment"], 
             vcf_file = os.path.join(out_dir, "cellSNP.base.vcf" + vcf_suffix),
             is_gzip = is_gzip)

    if is_genotype:
        save_vcf(adata.obsm["cell_vcf"], adata.uns["cell_vcf_comment"],
                 vcf_file = os.path.join(out_dir, "cellSNP.cells.vcf" + vcf_suffix),
                 is_gzip = is_gzip)
    
    save_samples(adata.var, os.path.join(out_dir, "cellSNP.samples.tsv"))
    
    save_matrix(adata.X, os.path.join(out_dir, "cellSNP.tag.AD.mtx"))
    save_matrix(adata.layers["DP_mtx"], os.path.join(out_dir, "cellSNP.tag.DP.mtx"))
    save_matrix(adata.layers["OTH_mtx"], os.path.join(out_dir, "cellSNP.tag.OTH.mtx"))

In [20]:
save_cellsnp_data(adata_s, out_dir)

In [21]:
os.listdir(out_dir)

['cellSNP.base.vcf.gz',
 'cellSNP.tag.AD.mtx',
 'cellSNP.samples.tsv',
 'cellSNP.tag.OTH.mtx',
 'cellSNP.tag.DP.mtx']