# Bulk RNA-seq counts normalization



## Description





The normalization step follows steps used by the GTeX pipeline. Genes are first filtered to keep genes where TPM is greater than 10% in at least 20% of the samples. They are also kept if read counts is greater than 6 in at least 20% of the samples. The filtered data is then normalized using the Trimmed Mean of M-value (TMM) method. 



## Input

1. TPM matrix and read count matrix in RNA-SeQC format
    - the first two rows should be commented text with `#` prefix.
    - the matrix should be tab delimited.
    - the matrix files should end with `gct` suffix
    - These requirements are satisfied if the inputs are outputs from [`bulk_expression_QC` pipeline](https://statfungen.github.io/xqtl-protocol/code/molecular_phenotypes/QC/bulk_expression_QC.html).
2. GTF for collapsed gene model
    - the gene names must be consistent with the GCT matrices (eg ENSG00000000003 vs. ENSG00000000003.1 will not work) 
    - chromosome names must have `chr` prefix (although we can make it an option in the pipeline, currently we assume the `chr` prefix convention)
3. Meta-data to match between sample names in expression data and genotype files
    - Required input
    - Tab delimited with header
    - Only 2 columns: first column is sample name in expression data, 2nd column is sample name in genotype data
    - **must contains all the sample name in expression matrices even if they don't existing in genotype data**

## Output

Normalized expression file in `bed` format.

## Minimal Working Example Steps

### vii. Multi-sample read count normalization

Timing <10min

TMM normalization of read counts.

In [None]:
sos run pipeline/bulk_expression_normalization.ipynb normalize \
    --cwd output/rnaseq \
    --tpm-gct output/rnaseq/bulk_rnaseq_tmp_matrix.low_expression_filtered.outlier_removed.tpm.gct.gz \
    --counts-gct output/rnaseq/bulk_rnaseq_tmp_matrix.low_expression_filtered.outlier_removed.geneCount.gct.gz \
    --annotation-gtf reference_data/Homo_sapiens.GRCh38.103.chr.reformatted.collapse_only.gene.ERCC.gtf  \
    --count-threshold 1 --sample_participant_lookup data/rnaseq/sample_participant_lookup.txt

```
INFO: Running normalize: 
INFO: normalize is completed.
INFO: normalize output:   /restricted/projectnb/xqtl/xqtl_protocol/toy_xqtl_protocol/output/rnaseq/bulk_rnaseq_tmp_matrix.low_expression_filtered.outlier_removed.tmm.expression.bed.gz
INFO: Workflow normalize (ID=wfac0c64bd10a9bed) is executed successfully with 1 completed step.
```

## Troubleshooting

| Step | Substep | Problem | Possible Reason | Solution |
|------|---------|---------|------------------|---------|
|  |  |  |  |  |




## Command interface

In [4]:
!sos run bulk_expression_normalization.ipynb -h

  import pkg_resources
usage: sos run bulk_expression_normalization.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  targets:              One or more targets to generate
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters

Workflows:
  normalize

Global Workflow Options:
  --cwd output (as path)
                        Work directory & output directory
  --counts-gct VAL (as path, required)
                        gene count table
  --tpm-gct VAL (as path, required)
                        gene TPM table
  --annotation-gtf VAL (as path, required)
                        gene gtf annotation table
  --sample-participant-lookup VAL (as path, required)
                        A file to map sample ID from expression to genotype,must
                        contain two columns, sample_id

In [9]:
[global]
# Work directory & output directory
parameter: cwd = path("output")
#  gene count table
parameter: counts_gct = path
#  gene TPM table
parameter: tpm_gct = path
#  gene gtf annotation table
parameter: annotation_gtf = path
# A file to map sample ID from expression to genotype,must contain two columns, sample_id and participant_id, mapping IDs in the expression files to IDs in the genotype (these can be the same).
parameter: sample_participant_lookup = path
parameter: tpm_threshold = 0.1
parameter: count_threshold = 6
parameter: sample_frac_threshold = 0.2
# Normalization method: TMM + CPM(voom) (tmm_cpm_voom), TMM + CPM(edgeR) (tmm_cpm_edger), or quantile normalization (qn)
parameter: normalization_method = 'tmm_cpm_voom'
# Quantile Normalization after rescale
parameter: quantile_normalize = True
# For cluster jobs, number commands to run per job
parameter: job_size = 1
# Wall clock time expected
parameter: walltime = "5h"
# Memory expected
parameter: mem = "16G"
# Number of threads
parameter: numThreads = 20
parameter: container = ""
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""

In [None]:
[normalize]
# Path to the input molecular phenotype data, should be a processd and indexed bed.gz file, with tabix index.
input: tpm_gct, counts_gct, annotation_gtf, sample_participant_lookup
output: f'{cwd:a}/{_input[0]:bnnn}.{normalization_method}.{"no_qnorm." if not quantile_normalize else ""}expression.bed.gz'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output[0]:bn}'  
python: expand = "${ }", stderr = f'{_output[0]:nn}.stderr', stdout = f'{_output[0]:nn}.stdout',container = container, entrypoint = entrypoint
    import numpy as np
    import pandas as pd
    import os
    import qtl.io
    import qtl.norm
    
    # ============= Function definition (packaged from eqtl_prepare_expression.py) =============
    # This function is adapted from the GTEx pipeline's eqtl_prepare_expression.py script
    # Source: https://github.com/broadinstitute/gtex-pipeline/blob/master/qtl/src/eqtl_prepare_expression.py
    # 
    # Modifications from the original script:
    # 1. Converted command-line interface (argparse) to function parameters for direct Python calling
    # 2. Encapsulated the main script logic into a callable function
    # 3. Made all parameters explicit function arguments with appropriate defaults
    # 4. Preserved all core processing logic and algorithms from the original implementation
    # 5. Supports three normalization methods:
    #    - TMM + CPM with voom transformation (tmm_cpm_voom) [default]
    #    - TMM + CPM with edgeR (tmm_cpm_edger)
    #    - Quantile normalization (qn)
    # 6. Added optional inverse normal transformation via the 'qnorm' parameter:
    #    - 'qnorm': Apply inverse normal transformation after primary normalization
    #    - 'none': Skip inverse normal transformation [default]
    #
    # This implementation allows direct function calls within the workflow without external script execution,
    # maintaining compatibility with the original GTEx pipeline while providing better integration.
    # =========================================================================================
    
    def eqtl_prepare_expression(tpm_gct, counts_gct, annotation_gtf, sample_to_participant,
                                prefix, output_dir='.', sample_ids=None, chrs=None,
                                convert_tpm=False, tpm_threshold=0.1, count_threshold=6,
                                sample_frac_threshold=0.2, normalization_method='tmm_cpm_voom',
                                qnorm='none', parquet=False):
        """
        Generate normalized expression BED files for eQTL analyses
        
        This function packages the main logic from eqtl_prepare_expression.py
        """
        
        # First, define the prepare_expression function (from the original script)
        def prepare_expression(counts_df, tpm_df, sample_frac_threshold=0.2,
                             count_threshold=6, tpm_threshold=0.1, mode='tmm_cpm_voom', qnorm='none'):
            """
            Genes are filtered using the following expression thresholds:
              TPM >= tpm_threshold in >= sample_frac_threshold * samples
              read counts >= count_threshold in sample_frac_threshold * samples

            The filtered counts matrix is then normalized using:
              TMM (mode='tmm'; default) or
              quantile normalization (mode='qn')
            """
            # expression thresholds
            ns = tpm_df.shape[1]
            mask = (
                (np.sum(tpm_df >= tpm_threshold, axis=1) >= sample_frac_threshold * ns) &
                (np.sum(counts_df >= count_threshold, axis=1) >= sample_frac_threshold * ns)
            ).values

            # apply normalization
            if mode.lower() == 'tmm_cpm_edger':
                norm_counts_df = qtl.norm.edger_cpm(counts_df, normalized_lib_sizes=True)
                norm_counts_df = norm_counts_df[mask]
            elif mode.lower() == 'tmm_cpm_voom':
                norm_counts_df = qtl.norm.voom_transform(counts_df)
                norm_counts_df = norm_counts_df[mask]
            elif mode.lower() == 'qn':
                norm_counts_df = qtl.norm.normalize_quantiles(tpm_df.loc[mask])
            else:
                raise ValueError(f'Unsupported mode {mode}')

            # apply quantile normalization
            if qnorm.lower() == 'qnorm':
                result_df = qtl.norm.inverse_normal_transform(norm_counts_df)
            elif qnorm.lower() == 'none':
                result_df = norm_counts_df
            else:
                raise ValueError(f'Unsupported qnorm mode {qnorm}')
                
            return result_df
        
        # Main logic (from the original __main__ block)
        print('Loading expression data', flush=True)
        
        # Handle sample_ids if provided
        if sample_ids is not None:
            if isinstance(sample_ids, str):  # If it's a file path
                with open(sample_ids) as f:
                    sample_ids = f.read().strip().split('\n')
                    print(f'  * Loading {len(sample_ids)} samples', flush=True)
        
        # Load expression data
        counts_df = qtl.io.read_gct(counts_gct, sample_ids=sample_ids, load_description=False)
        tpm_df = qtl.io.read_gct(tpm_gct, sample_ids=sample_ids, load_description=False)
        
        # Load sample to participant mapping
        sample_to_participant_s = pd.read_csv(sample_to_participant, sep='\t', index_col=0,
                                              header=None, dtype=str).squeeze('columns')
        
        # Check inputs
        if not counts_df.columns.equals(tpm_df.columns):
            raise ValueError('Sample IDs in the TPM and read counts files must match.')
        missing_ids = ~counts_df.columns.isin(sample_to_participant_s.index)
        if missing_ids.any():
            raise ValueError(f"Sample IDs in expression files and participant lookup table must match ({missing_ids.sum()} sample IDs missing from {os.path.basename(sample_to_participant)}).")
        
        # Convert to TPM if requested
        if convert_tpm:
            print('  * Converting to TPM', flush=True)
            tpm_df = tpm_df / tpm_df.sum(0) * 1e6
        
        # Normalize data
        print(f'Normalizing data ({normalization_method})', flush=True)
        norm_df = prepare_expression(counts_df, tpm_df,
                                     sample_frac_threshold=sample_frac_threshold,
                                     count_threshold=count_threshold,
                                     tpm_threshold=tpm_threshold,
                                     mode=normalization_method,
                                     qnorm=qnorm)
        print(f'  * {counts_df.shape[0]} genes in input tables', flush=True)
        print(f'  * {norm_df.shape[0]} genes remain after thresholding', flush=True)
        
        # Change sample IDs to participant IDs
        norm_df.rename(columns=sample_to_participant_s, inplace=True)
        
        # Handle chromosome list
        if chrs is not None:
            if isinstance(chrs, str):  # If it's a file path
                with open(chrs) as f:
                    chrs = f.read().strip().split('\n')
        else:
            chrs = [f'chr{i}' for i in range(1,23)] + ['chrX']
        
        # Prepare BED
        bed_template_df = qtl.io.gtf_to_tss_bed(annotation_gtf, feature='transcript')
        bed_template_df = bed_template_df[bed_template_df['chr'].isin(chrs)]
        bed_df = pd.merge(bed_template_df, norm_df, left_index=True, right_index=True)
        qtl.io.sort_bed(bed_df, inplace=True)
        print(f'  * {bed_df.shape[0]} genes remain after selecting chromosomes', flush=True)
        
        # Write output
        print('Writing BED file', flush=True)
        output_file = os.path.join(output_dir, f'{prefix}.expression.bed.gz')
        if not parquet:
            qtl.io.write_bed(bed_df, output_file)
        else:
            output_file = os.path.join(output_dir, f'{prefix}.expression.bed.parquet')
            bed_df.to_parquet(output_file)
        
        return output_file
    
    # ============= Now use the function in your workflow =============
    
    # Check for duplicates first
    sample_map = pd.read_table("${_input[3]}")
    duplicated = sample_map.loc[sample_map.duplicated(subset=['participant_id'])]
    
    if duplicated.shape[0] > 0:
        print("Duplicate samples found. Please remove duplicates from ${_input[3]} before normalizing.")
        print("Duplicates:")
        print(duplicated)
        raise ValueError
    else:
        print("No duplicates found. Proceeding with normalization...")
    
    # Create chromosome list file
    chr_file = "${_output[0]:bnnn}.vcf_chr_list"
    chrs = [f'chr{i}' for i in range(1,23)] + ['chrX', 'chrY', 'chrMT']
    with open(chr_file, 'w') as f:
        for chr in chrs:
            f.write(chr + '\n')
    
    # Set qnorm parameter based on quantile_normalize flag
    qnorm_param = 'qnorm' if ${quantile_normalize} else 'none'
    
    # Call the eqtl_prepare_expression function
    output_file = eqtl_prepare_expression(
        tpm_gct="${_input[0]}",
        counts_gct="${_input[1]}",
        annotation_gtf="${_input[2]}",
        sample_to_participant="${_input[3]}",
        prefix="${_output[0]:nnn}",
        output_dir=".",
        sample_ids=None,
        chrs=chr_file,
        convert_tpm=False,
        tpm_threshold=${tpm_threshold},
        count_threshold=${count_threshold},
        sample_frac_threshold=${sample_frac_threshold},
        normalization_method="${normalization_method}",
        qnorm=qnorm_param,
        parquet=False
    )
    
    # Clean up temporary file
    if os.path.exists(chr_file):
        os.remove(chr_file)
    
    print(f"Preparing expression completed. Output saved to: {output_file}")