diff --git a/.github/workflows/nf-test.yml b/.github/workflows/nf-test.yml index 9d7642e..e7b5844 100755 --- a/.github/workflows/nf-test.yml +++ b/.github/workflows/nf-test.yml @@ -118,7 +118,7 @@ jobs: confirm-pass: needs: [nf-test] if: always() - runs-on: # use self-hosted runners + runs-on: # use self-hosted runners - runs-on=${{ github.run_id }}-confirm-pass - runner=2cpu-linux-x64 steps: diff --git a/CITATIONS.md b/CITATIONS.md index 647ec02..4972da7 100755 --- a/CITATIONS.md +++ b/CITATIONS.md @@ -10,8 +10,6 @@ ## Pipeline tools - - - [MultiQC](https://pubmed.ncbi.nlm.nih.gov/27312411/) > Ewels P, Magnusson M, Lundin S, Käller M. MultiQC: summarize analysis results for multiple tools and samples in a single report. Bioinformatics. 2016 Oct 1;32(19):3047-8. doi: 10.1093/bioinformatics/btw354. Epub 2016 Jun 16. PubMed PMID: 27312411; PubMed Central PMCID: PMC5039924. diff --git a/assets/schema_input.json b/assets/schema_input.json index 7fedd10..3d45c92 100755 --- a/assets/schema_input.json +++ b/assets/schema_input.json @@ -7,27 +7,30 @@ "items": { "type": "object", "properties": { - "sample": { + "group_id": { "type": "string", "pattern": "^\\S+$", - "errorMessage": "Sample name must be provided and cannot contain spaces", + "errorMessage": "Group ID must be provided and cannot contain spaces", "meta": ["id"] }, - "fastq_1": { + "sample_id": { "type": "string", - "format": "file-path", - "exists": true, - "pattern": "^([\\S\\s]*\\/)?[^\\s\\/]+\\.f(ast)?q\\.gz$", - "errorMessage": "FastQ file for reads 1 must be provided, cannot contain spaces and must have extension '.fq.gz' or '.fastq.gz'" + "pattern": "^\\S+$", + "errorMessage": "Sample ID must be provided and cannot contain spaces" + }, + "sample_type": { + "type": "string", + "enum": ["case", "control"], + "errorMessage": "Sample type must be either 'case' or 'control'" }, - "fastq_2": { + "bam_path": { "type": "string", "format": "file-path", "exists": true, - "pattern": "^([\\S\\s]*\\/)?[^\\s\\/]+\\.f(ast)?q\\.gz$", - "errorMessage": "FastQ file for reads 2 cannot contain spaces and must have extension '.fq.gz' or '.fastq.gz'" + "pattern": "^([\\S\\s]*\\/)?[^\\s\\/]+\\.bam$", + "errorMessage": "BAM file path must be provided, cannot contain spaces and must have extension '.bam'" } }, - "required": ["sample", "fastq_1"] + "required": ["group_id", "sample_id", "sample_type", "bam_path"] } } diff --git a/conf/base.config b/conf/base.config index 62162c3..4c08a1f 100755 --- a/conf/base.config +++ b/conf/base.config @@ -13,10 +13,10 @@ process { // TODO nf-core: Check the defaults for all processes cpus = { 1 * task.attempt } memory = { 6.GB * task.attempt } - // time = { 4.h * task.attempt } + time = { 4.h * task.attempt } errorStrategy = { task.exitStatus in ((130..145) + 104 + 175) ? 'retry' : 'finish' } - maxRetries = 1 + maxRetries = 2 maxErrors = '-1' // Process-specific resource requirements @@ -29,22 +29,22 @@ process { withLabel:process_single { cpus = { 1 } memory = { 6.GB * task.attempt } - time = { 4.h * task.attempt } + // time = { 4.h * task.attempt } } withLabel:process_low { cpus = { 2 * task.attempt } memory = { 12.GB * task.attempt } - time = { 4.h * task.attempt } + // time = { 4.h * task.attempt } } withLabel:process_medium { cpus = { 6 * task.attempt } memory = { 36.GB * task.attempt } - time = { 8.h * task.attempt } + // time = { 8.h * task.attempt } } withLabel:process_high { cpus = { 12 * task.attempt } memory = { 72.GB * task.attempt } - time = { 16.h * task.attempt } + // time = { 16.h * task.attempt } } withLabel:process_long { time = { 20.h * task.attempt } diff --git a/conf/modules.config b/conf/modules.config index 2dab2f9..2e9471c 100755 --- a/conf/modules.config +++ b/conf/modules.config @@ -32,8 +32,9 @@ process { // Raw SV caller outputs (don't publish - intermediate) withName: 'SNIFFLES' { - cpus = 3 - memory = 3.GB + cpus = 12 + memory = { 24.GB * task.attempt } + ext.prefix = { "${meta.sample ?: meta.id}_sniffles" } ext.args = { def base_args = "--mapq ${params.mapq} --minsvlen ${params.minsvlen} --minsupport ${params.minsupport} --cluster-merge-pos ${params.cluster_merge_pos}" def phase_arg = params.phase ? "--phase" : "" @@ -46,8 +47,9 @@ process { } withName: 'CUTESV' { - cpus = 4 - memory = 12.GB + cpus = 8 + memory = { 16.GB * task.attempt } + ext.prefix = { "${meta.sample ?: meta.id}_cutesv" } ext.args = { def base_args = "--min_support ${params.cutesv_min_support} --min_size ${params.min_size} --min_mapq ${params.cutesv_min_mapq}" base_args += " --max_cluster_bias_INS ${params.max_cluster_bias_INS} --diff_ratio_merging_INS ${params.diff_ratio_merging_INS}" @@ -62,8 +64,9 @@ process { } withName: 'SEVERUS_WITH_CONTROL' { - cpus = 4 - memory = 5.GB + cpus = 10 + memory = { 26.GB * task.attempt } + ext.prefix = { "${meta.sample ?: meta.id}_tn_severus" } ext.args = { def base_args = "--min-sv-size ${params.min_sv_size} --min-mapq ${params.severus_min_mapq} --min-support ${params.severus_min_support}" def user_args = params.severus_args ? params.severus_args : "" @@ -75,8 +78,9 @@ process { } withName: 'SEVERUS_NO_CONTROL' { - cpus = 4 - memory = 5.GB + cpus = 10 + memory = { 26.GB * task.attempt } + ext.prefix = { "${meta.sample ?: meta.id}_to_severus" } ext.args = { def base_args = "--min-sv-size ${params.min_sv_size} --PON \"${params.PON}\" --min-mapq ${params.severus_min_mapq} --min-support ${params.severus_min_support}" def user_args = params.severus_args ? params.severus_args : "" @@ -90,7 +94,7 @@ process { // Header-renamed VCFs (these are the actual processed caller outputs) withName: 'RENAME_VCF_HEADERS_SNIFFLES' { cpus = 1 - memory = 1.GB + memory = { 1.GB * task.attempt } ext.args = "" publishDir = [ path: { "${params.outdir}/case/01_raw_calls/sniffles" }, @@ -101,7 +105,7 @@ process { withName: 'RENAME_VCF_HEADERS_CUTESV' { cpus = 1 - memory = 1.GB + memory = { 1.GB * task.attempt } ext.args = "" publishDir = [ path: { "${params.outdir}/case/01_raw_calls/cutesv" }, @@ -112,7 +116,7 @@ process { withName: 'RENAME_VCF_HEADERS_SEVERUS' { cpus = 1 - memory = 1.GB + memory = { 1.GB * task.attempt } ext.args = "" publishDir = [ enabled: false // Don't publish severus outputs @@ -122,20 +126,20 @@ process { // RENAME_VCF - This is the final Severus output step withName: 'RENAME_VCF' { cpus = 1 - memory = 1.GB + memory = { 1.GB * task.attempt } ext.args = "" ext.when = { task.ext.meta = meta; true } // Pass meta to task.ext for use in publishDir publishDir = [ path: { "${params.outdir}/case/01_raw_calls/severus" }, mode: params.publish_dir_mode, pattern: '*.{vcf,vcf.gz}', - saveAs: { filename -> + saveAs: { filename -> // Get meta from task.ext def meta = task.ext.meta ?: [:] - + // Determine if this is tumor/normal or tumor-only def has_control = meta.has_control ?: false - + // Organize into subdirectories def subdir = has_control ? "tumor_normal" : "tumor_only" "${subdir}/${filename}" @@ -148,8 +152,8 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ withName: 'JASMINESV_SAMPLE' { - cpus = 3 - memory = 3.GB + cpus = 12 + memory = { 14.GB * task.attempt } ext.args = { def base_args = "--min_overlap ${params.min_overlap} --spec_len ${params.spec_len} --min_seq_id ${params.min_seq_id}" base_args += " --max_dist_linear ${params.max_dist_linear} --max_dist ${params.max_dist} --min_dist ${params.min_dist}" @@ -170,6 +174,8 @@ process { // Sample consensus filtering and processing withName: 'JASMINE_HEADER_FIX' { + cpus = 1 + memory = { 3.GB * task.attempt } publishDir = [ enabled: false // Intermediate step ] @@ -184,12 +190,16 @@ process { } withName: 'FILTER_CHR' { + cpus = 1 + memory = { 3.GB * task.attempt } publishDir = [ enabled: false // Intermediate step ] } - withName: 'BCFTOOLS_SORT_SAMPLE' { + withName: 'SORT_VCF' { + cpus = 1 + memory = { 3.GB * task.attempt } publishDir = [ enabled: false // Intermediate step ] @@ -200,8 +210,8 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ withName: 'CALLER_SUPPORT_FILTER' { - cpus = 2 - memory = 2.GB + cpus = 4 + memory = { 3.GB * task.attempt } ext.args = { // Build dynamic expression using min_caller_support parameter def support_expr = "INFO/SUPP>=${params.min_caller_support}" @@ -220,8 +230,8 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ withName: 'SVDB_QUERY_SAMPLE' { - cpus = 4 - memory = 3.GB + cpus = 12 + memory = { 16.GB * task.attempt } ext.args = { def base_args = "--bnd_distance ${params.bnd_distance} --overlap ${params.overlap}" def user_args = params.svdb_query_args ? params.svdb_query_args : "" @@ -234,10 +244,10 @@ process { } withName: 'ANNOTSV_PER_SAMPLE_RAW' { - cpus = 6 - memory = 8.GB + cpus = 3 + memory = { 8.GB * task.attempt } ext.args = { - def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.min_sv_size}" + def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.annotsv_min_sv_size}" def user_args = params.annotsv_args ? params.annotsv_args : "" return "${base_args} ${user_args}".trim() } @@ -246,10 +256,10 @@ process { path: { "${params.outdir}/case/02_caller_merged" }, mode: params.publish_dir_mode, pattern: '*.{tsv,vcf}', - saveAs: { filename -> + saveAs: { filename -> def meta = task.ext.meta ?: [:] - def sample = meta.sample ?: meta.id ?: "unknown" - filename.replaceAll(/^[^.]+/, "${sample}") + def group_id = meta.sample ?: meta.id ?: "unknown" + filename.replaceAll(/^[^.]+/, "${group_id}") } ] } @@ -259,8 +269,8 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ withName: 'AF_FILTER' { - cpus = 2 - memory = 2.GB + cpus = 4 + memory = { 3.GB * task.attempt } ext.args = { // Build dynamic AF filter expression using threshold parameters def af_expr = "(INFO/AFgnomAD=\".\" || INFO/AFgnomAD<${params.max_gnomad_af}) && (INFO/AFneedLR=\".\" || INFO/AFneedLR<${params.max_needlr_af})" @@ -275,10 +285,10 @@ process { } withName: 'ANNOTSV_PER_SAMPLE' { - cpus = 6 - memory = 8.GB + cpus = 3 + memory = { 8.GB * task.attempt } ext.args = { - def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.min_sv_size}" + def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.annotsv_min_sv_size}" def user_args = params.annotsv_args ? params.annotsv_args : "" return "${base_args} ${user_args}".trim() } @@ -287,10 +297,10 @@ process { path: { "${params.outdir}/case/03_caller_merged_filtered" }, mode: params.publish_dir_mode, pattern: '*.{tsv,vcf}', - saveAs: { filename -> + saveAs: { filename -> def meta = task.ext.meta ?: [:] - def sample = meta.sample ?: meta.id ?: "unknown" - filename.replaceAll(/^[^.]+/, "${sample}") + def group_id = meta.sample ?: meta.id ?: "unknown" + filename.replaceAll(/^[^.]+/, "${group_id}") } ] } @@ -300,8 +310,8 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ withName: 'JASMINESV_COHORT' { - cpus = 3 - memory = 3.GB + cpus = 12 + memory = { 14.GB * task.attempt } ext.args = { def base_args = "--min_overlap ${params.min_overlap} --spec_len ${params.spec_len} --min_seq_id ${params.min_seq_id}" base_args += " --max_dist_linear ${params.max_dist_linear} --max_dist ${params.max_dist} --min_dist ${params.min_dist}" @@ -319,10 +329,28 @@ process { enabled: false // Intermediate step ] } - + + withName: 'ANNOTSV_COHORT_RAW' { + cpus = 6 + memory = 8.GB + ext.args = { + def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.annotsv_min_sv_size}" + def user_args = params.annotsv_args ? params.annotsv_args : "" + return "${base_args} ${user_args}".trim() + } + publishDir = [ + path: { "${params.outdir}/cohort" }, + mode: params.publish_dir_mode, + pattern: '*.{tsv,vcf}', + saveAs: { filename -> + filename.replaceAll(/^[^.]+/, 'cohort_annotated') + } + ] + } + withName: 'SVDB_QUERY_COHORT' { - cpus = 4 - memory = 3.GB + cpus = 12 + memory = { 16.GB * task.attempt } ext.args = { def base_args = "--bnd_distance ${params.bnd_distance} --overlap ${params.overlap}" def user_args = params.svdb_query_args ? params.svdb_query_args : "" @@ -352,7 +380,7 @@ process { cpus = 6 memory = 8.GB ext.args = { - def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.min_sv_size}" + def base_args = "-genomeBuild ${params.genome_build} -vcf ${params.output_vcf} -SVminSize ${params.annotsv_min_sv_size}" def user_args = params.annotsv_args ? params.annotsv_args : "" return "${base_args} ${user_args}".trim() } @@ -360,8 +388,8 @@ process { path: { "${params.outdir}/cohort" }, mode: params.publish_dir_mode, pattern: '*.{tsv,vcf}', - saveAs: { filename -> - filename.replaceAll(/^[^.]+/, 'cohort') + saveAs: { filename -> + filename.replaceAll(/^[^.]+/, 'cohort_filtered') } ] } @@ -369,10 +397,10 @@ process { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // SUPPORTING FILES AND UTILITIES // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - + withName: 'ANNOTSV_INSTALLANNOTATIONS' { cpus = 1 - memory = 4.GB + memory = { 4.GB * task.attempt } time = '1h' publishDir = [ path: { "${params.outdir ?: './results'}" }, @@ -390,7 +418,7 @@ process { // Raw calls summary (in case/01-raw-calls/) withName: 'SUMMARIZE_CALLERS' { cpus = 1 - memory = 1.GB + memory = { 3.GB * task.attempt } publishDir = [ path: { "${params.outdir}/case/01_raw_calls" }, mode: params.publish_dir_mode, @@ -401,7 +429,7 @@ process { // Caller merged summary (in case/02-caller-merged/) withName: 'SUMMARIZE_CALLER_MERGED' { cpus = 1 - memory = 1.GB + memory = { 3.GB * task.attempt } publishDir = [ path: { "${params.outdir}/case/02_caller_merged" }, mode: params.publish_dir_mode, @@ -412,7 +440,7 @@ process { // Filtered merged summary (in case/04-caller-merged-filtered/) withName: 'SUMMARIZE_CALLER_MERGED_FILTERED' { cpus = 1 - memory = 1.GB + memory = { 3.GB * task.attempt } publishDir = [ path: { "${params.outdir}/case/03_caller_merged_filtered" }, mode: params.publish_dir_mode, @@ -421,9 +449,20 @@ process { } // Cohort summary (in cohort/) - withName: 'SUMMARIZE_COHORT' { + withName: 'SUMMARIZE_COHORT_ANNOTATED' { + cpus = 1 + memory = { 3.GB * task.attempt } + publishDir = [ + path: { "${params.outdir}/cohort" }, + mode: params.publish_dir_mode, + pattern: '*_summary.json' + ] + } + + // Cohort filtered summary (in cohort/) + withName: 'SUMMARIZE_COHORT_FILTERED' { cpus = 1 - memory = 1.GB + memory = { 3.GB * task.attempt } publishDir = [ path: { "${params.outdir}/cohort" }, mode: params.publish_dir_mode, @@ -434,7 +473,7 @@ process { // Utility modules that don't need to be published withName: 'UNTAR.*' { cpus = 1 - memory = 1.GB + memory = { 1.GB * task.attempt } ext.args = "" publishDir = [ enabled: false // Don't publish intermediate untar files diff --git a/conf/modules_params.config b/conf/modules_params.config deleted file mode 100644 index f4f9c32..0000000 --- a/conf/modules_params.config +++ /dev/null @@ -1,124 +0,0 @@ -/* -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - Module-specific parameter configurations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -*/ - -params { - - // SV callers - sniffles = [ - tandem_repeats: "${projectDir}/assets/vntr/hg38.p14.trf.bed", - sniffles_args: "--mapq 20 \ - --minsvlen 50 \ - --minsupport 5 \ - --cluster-merge-pos 100" - ] - cutesv = [ - cutesv_args: "--genotype \ - --min_support 5 \ - --min_size 50 \ - --min_mapq 20 \ - --max_cluster_bias_INS 100 \ - --diff_ratio_merging_INS 0.3 \ - --max_cluster_bias_DEL 100 \ - --diff_ratio_merging_DEL 0.3" - ] - severus_with_control = [ - vntr_bed: "${projectDir}/assets/vntr/hg38.p14.trf.bed", - severus_with_control_args: "--min-sv-size 50 \ - --min-mapq 20 \ - --min-support 5" - ] - severus_no_control = [ - vntr_bed: "${projectDir}/assets/vntr/hg38.p14.trf.bed", - severus_no_control_args: "--min-sv-size 50 \ - --PON \"${projectDir}/assets/pon/PoN_1000G_hg38_extended.tsv.gz\" \ - --min-mapq 20 \ - --min-support 5" - ] - - // Jasmine sample and cohort consensus - jasminesv_sample = [ - fasta: null, - fasta_fai: null, - chr_norm: null, - jasminesv_sample_args: """--min_overlap 0.7 \ - --spec_len 50 \ - --min_seq_id 0.7 \ - --max_dist_linear 0.5 \ - --max_dist 3000 \ - --min_dist 100 \ - --pre_normalize \ - --normalize_type \ - --non_mutual_distance \ - --spec_reads 5""" - ] - jasminesv_cohort = [ - fasta: null, - fasta_fai: null, - chr_norm: null, - jasminesv_cohort_args: """--min_overlap 0.7 \ - --spec_len 50 \ - --min_seq_id 0.7 \ - --max_dist_linear 0.5 \ - --max_dist 3000 \ - --min_dist 100 \ - --pre_normalize \ - --normalize_type \ - --non_mutual_distance \ - --spec_reads 5""" - ] - - // AF annotation with SVDB - svdb_query = [ - vcf_dbs: [ - "${projectDir}/assets/svdb_databases/gnomad.v4.1.sv.sites.rmMissing.vcf.gz", - "${projectDir}/assets/svdb_databases/UWONT_450_AF.vcf.gz" - ], - in_frq: ["AF", "PFneedLR"], - out_frq: [ "AFgnomAD", "AFneedLR" ], - in_occ: [ "AN", "AN" ], - out_occ: [ "ANgnomAD", "ANneedLR" ], - prefix: "cohort", - svdb_query_args: "--bnd_distance 2500 \ - --overlap 0.8" - ] - // svdb_query_sample = [ - // // uses params.svdb_query by default - // svdb_query_args: params.svdb_query.svdb_query_args - // ] - - // bcftools filtering - bcftools_view_sample = [ - regions: null, - targets: null, - samples: null, - bcftools_view_sample_args: """--include 'INFO/SUPP>=1'""", - ] - - bcftools_view_cohort = [ - regions: null, - targets: null, - samples: null, - bcftools_view_cohort_args: """--include '(INFO/AFgnomAD="." || INFO/AFgnomAD<0.1) && (INFO/AFneedLR="." || INFO/AFneedLR<0.006)'""", - ] - // bcftools_view_per_sample = [ - // regions: null, - // targets: null, - // samples: null, - // bcftools_view_per_sample_args: params.bcftools_view_cohort?.bcftools_view_cohort_args ?: """--include '(INFO/AFgnomAD="." || INFO/AFgnomAD<0.1) && (INFO/AFneedLR="." || INFO/AFneedLR<0.006)'""", - // ] - - // AnnotSV - annotsv = [ - genome_build: "GRCh38", - annotations: null, - candidate_genes: null, - false_positive_snv: null, - gene_transcripts: null, - annotsv_args: """-vcf 1 \ - -SVminSize 0""" - ] - -} \ No newline at end of file diff --git a/docs/output.md b/docs/output.md index ef3340b..048256c 100755 --- a/docs/output.md +++ b/docs/output.md @@ -12,12 +12,9 @@ The directories listed below will be created in the results directory after the The pipeline is built using [Nextflow](https://www.nextflow.io/) and processes data using the following steps: - - [MultiQC](#multiqc) - Aggregate report describing results and QC from the whole pipeline - [Pipeline information](#pipeline-information) - Report metrics generated during the workflow execution - - ### MultiQC
diff --git a/modules/local/filter_chr/environment.yml b/modules/local/filter_chr/environment.yml index d945fdc..0a84f41 100644 --- a/modules/local/filter_chr/environment.yml +++ b/modules/local/filter_chr/environment.yml @@ -3,5 +3,5 @@ channels: - bioconda - conda-forge dependencies: - - htslib # for bgzip and zcat - - awk \ No newline at end of file + - htslib # for bgzip and zcat + - awk diff --git a/modules/local/filter_chr/main.nf b/modules/local/filter_chr/main.nf index 829b24b..266da5b 100644 --- a/modules/local/filter_chr/main.nf +++ b/modules/local/filter_chr/main.nf @@ -25,4 +25,4 @@ process FILTER_CHR { awk 'BEGIN{OFS="\\t"} /^#/ {print} /^chr([1-9]|1[0-9]|2[0-2]|X|Y|M)\\t/ {print}' ${vcf} | bgzip > ${vcf.baseName}.filtered.vcf.gz fi """ -} \ No newline at end of file +} diff --git a/modules/local/filter_chr/meta.yml b/modules/local/filter_chr/meta.yml index bd6592b..0d54fb2 100644 --- a/modules/local/filter_chr/meta.yml +++ b/modules/local/filter_chr/meta.yml @@ -28,4 +28,4 @@ output: description: | The gzipped VCF file containing only chr1-22, chrX, chrY, and chrM entries. authors: - - github.com/manascripts \ No newline at end of file + - github.com/manascripts diff --git a/modules/local/jasmine_header_fix/environment.yml b/modules/local/jasmine_header_fix/environment.yml index 1661bc9..9ca8738 100644 --- a/modules/local/jasmine_header_fix/environment.yml +++ b/modules/local/jasmine_header_fix/environment.yml @@ -1,10 +1,10 @@ name: jasmine_header_fix channels: -- conda-forge -- bioconda + - conda-forge + - bioconda dependencies: -- bioconda::bcftools=1.22 -- bioconda::coreutils=8.25 -- bioconda::htslib=1.22.1 -- conda-forge::awk=20250116 -- conda-forge::bash=5.2.37 \ No newline at end of file + - bioconda::bcftools=1.22 + - bioconda::coreutils=8.25 + - bioconda::htslib=1.22.1 + - conda-forge::awk=20250116 + - conda-forge::bash=5.2.37 diff --git a/modules/local/jasmine_header_fix/main.nf b/modules/local/jasmine_header_fix/main.nf index d182591..32090e9 100644 --- a/modules/local/jasmine_header_fix/main.nf +++ b/modules/local/jasmine_header_fix/main.nf @@ -23,7 +23,7 @@ process JASMINE_HEADER_FIX { when: task.ext.when == null || task.ext.when - + script: """ set -euo pipefail @@ -33,7 +33,7 @@ process JASMINE_HEADER_FIX { export TMP=\$PWD export TEMP=\$PWD export BCFTOOLS_PLUGINS="" - + # Create a local temp directory mkdir -p \$PWD/tmp export TMPDIR=\$PWD/tmp diff --git a/modules/local/merge_vcf_headers/environment.yml b/modules/local/merge_vcf_headers/environment.yml deleted file mode 100644 index e69de29..0000000 diff --git a/modules/local/merge_vcf_headers/main.nf b/modules/local/merge_vcf_headers/main.nf deleted file mode 100644 index e69de29..0000000 diff --git a/modules/local/merge_vcf_headers/meta.yml b/modules/local/merge_vcf_headers/meta.yml deleted file mode 100644 index e69de29..0000000 diff --git a/modules/local/rename_vcf/environment.yml b/modules/local/rename_vcf/environment.yml index b0a213c..ae08d43 100755 --- a/modules/local/rename_vcf/environment.yml +++ b/modules/local/rename_vcf/environment.yml @@ -2,4 +2,4 @@ name: rename_vcf channels: - defaults dependencies: - - coreutils \ No newline at end of file + - coreutils diff --git a/modules/local/rename_vcf/main.nf b/modules/local/rename_vcf/main.nf index 5a23b5e..100b08a 100755 --- a/modules/local/rename_vcf/main.nf +++ b/modules/local/rename_vcf/main.nf @@ -1,13 +1,13 @@ process RENAME_VCF { - tag "${meta.id}_${suffix}" + tag "${meta.id}" label 'process_single' input: tuple val(meta), path(vcf), val(suffix) - + output: tuple val(meta), path("${meta.id}_${suffix}.vcf") - + when: task.ext.when == null || task.ext.when @@ -15,4 +15,4 @@ process RENAME_VCF { """ cp ${vcf} ${meta.id}_${suffix}.vcf """ -} \ No newline at end of file +} diff --git a/modules/local/rename_vcf/meta.yml b/modules/local/rename_vcf/meta.yml index ed1142e..e94bcc4 100755 --- a/modules/local/rename_vcf/meta.yml +++ b/modules/local/rename_vcf/meta.yml @@ -29,4 +29,4 @@ output: description: | The symlinked or renamed VCF file, named as _suffix.vcf. authors: - - github.com/manascripts \ No newline at end of file + - github.com/manascripts diff --git a/modules/local/rename_vcf_headers/README.md b/modules/local/rename_vcf_headers/README.md index 7c26f96..c39fe83 100644 --- a/modules/local/rename_vcf_headers/README.md +++ b/modules/local/rename_vcf_headers/README.md @@ -1,20 +1,26 @@ # rename_vcf_headers ## Description + This module renames the headers in VCF files to include the correct sample names. It ensures compatibility with downstream tools that require properly formatted VCF headers. ## Inputs + - **meta**: Metadata containing the sample name. - **vcf**: Input VCF file. ## Outputs + - **renamed_vcf**: VCF file with updated headers. ## Usage + Include this module in your Nextflow workflow and provide the required inputs. The module uses `awk` to modify the VCF headers. ## Authors + - Manas Sehgal ## Requirements + - `awk` diff --git a/modules/local/rename_vcf_headers/environment.yml b/modules/local/rename_vcf_headers/environment.yml index 40a2916..25122bf 100755 --- a/modules/local/rename_vcf_headers/environment.yml +++ b/modules/local/rename_vcf_headers/environment.yml @@ -3,4 +3,4 @@ channels: - defaults dependencies: - coreutils - - htslib \ No newline at end of file + - htslib diff --git a/modules/local/rename_vcf_headers/main.nf b/modules/local/rename_vcf_headers/main.nf index c6fef4c..2287331 100644 --- a/modules/local/rename_vcf_headers/main.nf +++ b/modules/local/rename_vcf_headers/main.nf @@ -11,7 +11,7 @@ process RENAME_VCF_HEADERS { input: tuple val(meta), path(vcf) - + output: tuple val(meta), path(vcf) @@ -27,4 +27,3 @@ process RENAME_VCF_HEADERS { fi """ } - diff --git a/modules/local/summarize_sv_counts/README.md b/modules/local/summarize_sv_counts/README.md index 7985b5b..7ded004 100755 --- a/modules/local/summarize_sv_counts/README.md +++ b/modules/local/summarize_sv_counts/README.md @@ -1,2 +1,3 @@ # summarize_sv_counts + This module parses structural variant VCFs and generates a per-sample TSV summarizing SV counts by type. diff --git a/modules/local/summarize_sv_counts/environment.yml b/modules/local/summarize_sv_counts/environment.yml index 6e52ba2..bd40380 100755 --- a/modules/local/summarize_sv_counts/environment.yml +++ b/modules/local/summarize_sv_counts/environment.yml @@ -2,4 +2,4 @@ name: summarize_sv_counts channels: - defaults dependencies: - - coreutils \ No newline at end of file + - coreutils diff --git a/modules/local/summarize_sv_counts/main.nf b/modules/local/summarize_sv_counts/main.nf index 3385718..0c8c836 100755 --- a/modules/local/summarize_sv_counts/main.nf +++ b/modules/local/summarize_sv_counts/main.nf @@ -3,7 +3,7 @@ process SUMMARIZE_SV_COUNTS { label 'process_low' conda "${moduleDir}/environment.yml" - container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? 'https://community-cr-prod.seqera.io/docker/registry/v2/blobs/sha256/19/198b15b0581f19cfa29c5ff506b138aeb1bedb226d2ca308c46705bab133c1f0/data' : 'community.wave.seqera.io/library/bcftools_coreutils_gawk_gzip_pruned:e1a91ca0c5f22302' }" @@ -16,334 +16,230 @@ process SUMMARIZE_SV_COUNTS { path "versions.yml", emit: versions script: - // Smart detection of input type - def is_list = vcf_input instanceof List || (vcf_input instanceof String && vcf_input.contains(' ')) - def vcf_files = is_list ? (vcf_input instanceof List ? vcf_input.join(' ') : vcf_input) : vcf_input.toString() + def is_list = vcf_input instanceof List + def vcf_files = is_list ? vcf_input.join(' ') : vcf_input.toString() """ python3 << 'EOF' import sys, json, statistics, subprocess, os from collections import defaultdict, OrderedDict def analyze_vcf(vcf_file): - if not os.path.exists(vcf_file) or os.path.getsize(vcf_file) == 0: - print(f"Warning: VCF file {vcf_file} is empty or does not exist") - return {"total_variants": 0, "sv_types": {}} - - # Check VCF header for available fields - header_cmd = f"bcftools view -h {vcf_file}" - try: - header_result = subprocess.run(header_cmd, shell=True, capture_output=True, text=True, timeout=30) - if header_result.returncode != 0: - print(f"Warning: Could not read header from {vcf_file}") - return {"total_variants": 0, "sv_types": {}} - except Exception as e: - print(f"Warning: Error reading {vcf_file}: {e}") - return {"total_variants": 0, "sv_types": {}} - - # Determine available INFO fields - has_chr2 = '##INFO= 0: - svlen_dict[svtype].append(svlen_val) - except (ValueError, TypeError): - pass - - # Build result structure - sv_types = {} - for svtype in sorted(sv_counts.keys()): # Sort for consistent ordering - entry = {"count": sv_counts[svtype]} - svlens = svlen_dict.get(svtype, []) - if svlens: - entry["svlen_min"] = min(svlens) - entry["svlen_max"] = max(svlens) - entry["svlen_mean"] = round(statistics.mean(svlens), 2) - entry["svlen_median"] = statistics.median(svlens) - entry["svlen_stdev"] = round(statistics.stdev(svlens) if len(svlens) > 1 else 0, 2) - else: - entry.update({ - "svlen_min": None, "svlen_max": None, "svlen_mean": None, - "svlen_median": None, "svlen_stdev": None - }) - sv_types[svtype] = entry - - total_variants = sum(sv_counts.values()) - print(f"Processed {os.path.basename(vcf_file)}: {total_lines} raw variants -> {total_variants} counted variants") - if deduplicated_count > 0: - print(f" - Deduplicated {deduplicated_count} breakend/translocation events") - - return { - "total_variants": total_variants, - "sv_types": sv_types - } + print(f"Error processing {vcf_file}: {e}") -# Smart processing based on input type + # Compute statistics + for svtype, data in result["sv_types"].items(): + if data["svlen_data"]: + lengths = data["svlen_data"] + data["svlen_min"] = min(lengths) + data["svlen_max"] = max(lengths) + data["svlen_mean"] = round(statistics.mean(lengths), 2) + data["svlen_median"] = round(statistics.median(lengths), 1) + data["svlen_stdev"] = round(statistics.stdev(lengths) if len(lengths) > 1 else 0, 2) + del data["svlen_data"] + + return result + +# Get inputs vcf_files_str = "${vcf_files}" stage = "${stage_name}" - -# Determine if this is multi-caller or single VCF analysis vcf_list = [f.strip() for f in vcf_files_str.split() if f.strip()] -is_multi_caller = len(vcf_list) > 1 -print(f"Processing stage: {stage}") -print(f"Analysis mode: {'multi_caller' if is_multi_caller else 'single_vcf'}") -print(f"VCF files to process: {len(vcf_list)}") +# Detect if we should group by sample based on filenames +sample_data = {} +for vcf_file in vcf_list: + # Extract sample ID from filename + basename = os.path.basename(vcf_file) + if '_' in basename: + sample_id = basename.split('_')[0] # e.g., SAMPLE_caller.vcf + else: + sample_id = basename.split('.')[0] # fallback to full basename without extension -if is_multi_caller: - # Multi-caller analysis (for raw calls) + if sample_id not in sample_data: + sample_data[sample_id] = [] + sample_data[sample_id].append(vcf_file) + +# Only use sample grouping if we have multiple samples or multiple VCFs per sample +use_sample_grouping = len(sample_data) > 1 or any(len(vcfs) > 1 for vcfs in sample_data.values()) + +if use_sample_grouping: + # Multi-sample nested analysis result = OrderedDict([ ("stage", stage), - ("analysis_type", "multi_caller"), - ("callers", {}), - ("combined_stats", { - "total_variants": 0, - "sv_types": defaultdict(lambda: {"count": 0, "svlen_data": []}) - }) + ("analysis_type", "multi_sample"), + ("samples", {}) ]) - - # Analyze each VCF file (caller) - for vcf_file in vcf_list: - if not vcf_file or not os.path.exists(vcf_file): - print(f"Warning: VCF file {vcf_file} not found") - continue - - # Extract caller name from filename - basename = os.path.basename(vcf_file) - if 'sniffles' in basename.lower(): - caller = 'sniffles' - elif 'cutesv' in basename.lower(): - caller = 'cutesv' - elif 'severus' in basename.lower(): - caller = 'severus' - elif 'jasmine' in basename.lower() or 'consensus' in basename.lower(): - caller = 'consensus' - else: - # Fallback: use filename without extension - caller = basename.split('.')[0].split('_')[-1] - - print(f"\\nProcessing {caller}: {basename}") - - # Analyze this VCF with enhanced deduplication - stats = analyze_vcf(vcf_file) - result["callers"][caller] = stats - - # Add to combined stats - result["combined_stats"]["total_variants"] += stats["total_variants"] - for svtype, data in stats["sv_types"].items(): - result["combined_stats"]["sv_types"][svtype]["count"] += data["count"] - if data.get("svlen_min") is not None: - # Collect all SVLEN statistics for later aggregation - svlen_values = [] - if data.get("svlen_min") is not None: - svlen_values.append(data["svlen_min"]) - if data.get("svlen_max") is not None: - svlen_values.append(data["svlen_max"]) - if data.get("svlen_mean") is not None: - svlen_values.append(data["svlen_mean"]) - if data.get("svlen_median") is not None: - svlen_values.append(data["svlen_median"]) - result["combined_stats"]["sv_types"][svtype]["svlen_data"].extend(svlen_values) - - # Finalize combined stats - final_combined = {"total_variants": result["combined_stats"]["total_variants"], "sv_types": {}} - for svtype in sorted(result["combined_stats"]["sv_types"].keys()): - data = result["combined_stats"]["sv_types"][svtype] - entry = {"count": data["count"]} - svlen_data = [x for x in data["svlen_data"] if x is not None and x > 0] - if svlen_data: - entry.update({ - "svlen_min": min(svlen_data), - "svlen_max": max(svlen_data), - "svlen_mean": round(statistics.mean(svlen_data), 2), - "svlen_median": statistics.median(svlen_data), - "svlen_stdev": round(statistics.stdev(svlen_data) if len(svlen_data) > 1 else 0, 2) - }) + + for sample_id, sample_vcfs in sample_data.items(): + print(f"Processing sample: {sample_id}") + sample_vcfs_list = sample_vcfs if isinstance(sample_vcfs, list) else [sample_vcfs] + + if len(sample_vcfs_list) > 1: + # Multi-caller for this sample + sample_result = OrderedDict([ + ("analysis_type", "multi_caller"), + ("callers", {}), + ("combined_stats", { + "total_variants": 0, + "sv_types": defaultdict(lambda: {"count": 0, "svlen_data": []}) + }) + ]) + + caller_map = {} + for vcf_file in sample_vcfs_list: + vcf_name = os.path.basename(str(vcf_file)).lower() + if 'sniffles' in vcf_name: + caller_map[vcf_file] = 'sniffles' + elif 'cutesv' in vcf_name: + caller_map[vcf_file] = 'cutesv' + elif 'severus' in vcf_name: + caller_map[vcf_file] = 'severus' + else: + caller_map[vcf_file] = 'unknown' + + for vcf_file, caller in caller_map.items(): + analyzed_data = analyze_vcf(str(vcf_file)) + sample_result["callers"][caller] = analyzed_data + + sample_result["combined_stats"]["total_variants"] += analyzed_data["total_variants"] + for svtype, data in analyzed_data["sv_types"].items(): + sample_result["combined_stats"]["sv_types"][svtype]["count"] += data["count"] + if "svlen_data" in data: + sample_result["combined_stats"]["sv_types"][svtype]["svlen_data"].extend(data["svlen_data"]) + + # Compute combined stats + for svtype, data in sample_result["combined_stats"]["sv_types"].items(): + if data["svlen_data"]: + lengths = data["svlen_data"] + data["svlen_min"] = min(lengths) + data["svlen_max"] = max(lengths) + data["svlen_mean"] = round(statistics.mean(lengths), 2) + data["svlen_median"] = round(statistics.median(lengths), 1) + data["svlen_stdev"] = round(statistics.stdev(lengths) if len(lengths) > 1 else 0, 2) + del data["svlen_data"] + else: - entry.update({ - "svlen_min": None, "svlen_max": None, "svlen_mean": None, - "svlen_median": None, "svlen_stdev": None - }) - final_combined["sv_types"][svtype] = entry - - result["combined_stats"] = final_combined + # Single VCF for this sample + analyzed_data = analyze_vcf(str(sample_vcfs_list[0])) + sample_result = OrderedDict([ + ("analysis_type", "single_vcf"), + ("total_variants", analyzed_data["total_variants"]), + ("sv_types", analyzed_data["sv_types"]) + ]) + + result["samples"][sample_id] = sample_result else: - # Single VCF analysis (for consensus stages) - CONSISTENT ORDERING - vcf_file = vcf_list[0] if vcf_list else "" - print(f"\\nProcessing single VCF: {os.path.basename(vcf_file) if vcf_file else 'None'}") - - analyzed_data = analyze_vcf(vcf_file) - - # Build result with consistent field ordering using OrderedDict - result = OrderedDict([ - ("stage", stage), - ("analysis_type", "single_vcf"), - ("total_variants", analyzed_data["total_variants"]), - ("sv_types", analyzed_data["sv_types"]) - ]) + # Regular analysis (cohort-level or legacy) + if len(vcf_list) > 1: + # Multi-caller analysis + result = OrderedDict([ + ("stage", stage), + ("analysis_type", "multi_caller"), + ("callers", {}), + ("combined_stats", { + "total_variants": 0, + "sv_types": defaultdict(lambda: {"count": 0, "svlen_data": []}) + }) + ]) + + caller_map = {} + for vcf_file in vcf_list: + vcf_name = os.path.basename(vcf_file).lower() + if 'sniffles' in vcf_name: + caller_map[vcf_file] = 'sniffles' + elif 'cutesv' in vcf_name: + caller_map[vcf_file] = 'cutesv' + elif 'severus' in vcf_name: + caller_map[vcf_file] = 'severus' + else: + caller_map[vcf_file] = 'unknown' + + for vcf_file, caller in caller_map.items(): + analyzed_data = analyze_vcf(vcf_file) + result["callers"][caller] = analyzed_data + + result["combined_stats"]["total_variants"] += analyzed_data["total_variants"] + for svtype, data in analyzed_data["sv_types"].items(): + result["combined_stats"]["sv_types"][svtype]["count"] += data["count"] + if "svlen_data" in data: + result["combined_stats"]["sv_types"][svtype]["svlen_data"].extend(data["svlen_data"]) -# Write output with consistent formatting + # Compute combined statistics + for svtype, data in result["combined_stats"]["sv_types"].items(): + if data["svlen_data"]: + lengths = data["svlen_data"] + data["svlen_min"] = min(lengths) + data["svlen_max"] = max(lengths) + data["svlen_mean"] = round(statistics.mean(lengths), 2) + data["svlen_median"] = round(statistics.median(lengths), 1) + data["svlen_stdev"] = round(statistics.stdev(lengths) if len(lengths) > 1 else 0, 2) + del data["svlen_data"] + + else: + # Single VCF analysis + vcf_file = vcf_list[0] if vcf_list else "" + analyzed_data = analyze_vcf(vcf_file) + + result = OrderedDict([ + ("stage", stage), + ("analysis_type", "single_vcf"), + ("total_variants", analyzed_data["total_variants"]), + ("sv_types", analyzed_data["sv_types"]) + ]) + +# Write output with open("${stage_name}_summary.json", "w") as f: json.dump(result, f, indent=2) - -print(f"\\n=== SUMMARY ===") -print(f"Generated summary for stage: {stage}") -print(f"Analysis type: {'multi_caller' if is_multi_caller else 'single_vcf'}") -print(f"Total variants: {result.get('total_variants', 0)}") -if is_multi_caller: - print("Per-caller breakdown:") - for caller, stats in result.get("callers", {}).items(): - print(f" {caller}: {stats.get('total_variants', 0)} variants") +print(f"Generated summary for stage: {stage}") +if 'samples' in result: + print(f"Samples processed: {len(result['samples'])}") + for sample_id, sample_data in result['samples'].items(): + if 'total_variants' in sample_data: + print(f" {sample_id}: {sample_data['total_variants']} variants") + elif 'combined_stats' in sample_data: + print(f" {sample_id}: {sample_data['combined_stats']['total_variants']} variants") else: - print("SV type breakdown:") - for svtype, data in result.get("sv_types", {}).items(): - print(f" {svtype}: {data.get('count', 0)} variants") - -print("===============") + print(f"Total variants: {result.get('total_variants', 0)}") EOF cat <<-END_VERSIONS > versions.yml @@ -352,4 +248,4 @@ EOF python: \$(python --version | sed 's/Python //') END_VERSIONS """ -} \ No newline at end of file +} diff --git a/modules/local/summarize_sv_counts/meta.yml b/modules/local/summarize_sv_counts/meta.yml index c24a203..44dda40 100755 --- a/modules/local/summarize_sv_counts/meta.yml +++ b/modules/local/summarize_sv_counts/meta.yml @@ -34,4 +34,4 @@ output: TSV file summarizing SV type counts for the sample, named as _sv_summary.tsv. authors: - - github.com/manascripts \ No newline at end of file + - github.com/manascripts diff --git a/modules/nf-core/severus/environment.yml b/modules/nf-core/severus/environment.yml index 74eb305..e824160 100755 --- a/modules/nf-core/severus/environment.yml +++ b/modules/nf-core/severus/environment.yml @@ -1,12 +1,12 @@ --- # yaml-language-server: $schema=https://raw.githubusercontent.com/nf-core/modules/master/modules/environment-schema.json channels: -- conda-forge -- bioconda + - conda-forge + - bioconda dependencies: -- bioconda::bcftools=1.22 -- bioconda::coreutils=8.25 -- conda-forge::gawk=5.3.1 -- conda-forge::gzip=1.14 -- conda-forge::jq=1.8.1 -- conda-forge::python=3.13.0 + - bioconda::bcftools=1.22 + - bioconda::coreutils=8.25 + - conda-forge::gawk=5.3.1 + - conda-forge::gzip=1.14 + - conda-forge::jq=1.8.1 + - conda-forge::python=3.13.0 diff --git a/nextflow.config b/nextflow.config index 4ca716c..7c2296e 100755 --- a/nextflow.config +++ b/nextflow.config @@ -20,16 +20,15 @@ params { // File paths tandem_repeats = "${projectDir}/assets/vntr/hg38.p14.trf.bed" vntr_bed = "${projectDir}/assets/vntr/hg38.p14.trf.bed" - pon_file = "${projectDir}/assets/pon/PoN_1000G_hg38_extended.tsv.gz" PON = "${projectDir}/assets/pon/PoN_1000G_hg38_extended.tsv.gz" - + // SVDB databases svdb_databases = [ "${projectDir}/assets/svdb_databases/gnomad.v4.1.sv.sites.rmMissing.vcf.gz", "${projectDir}/assets/svdb_databases/UWONT_450_AF.vcf.gz" ] svdb_in_frq = ['AF', 'PFneedLR'] - svdb_out_frq = ['AFgnomAD', 'AFneedLR'] + svdb_out_frq = ['AFgnomAD', 'AFneedLR'] svdb_in_occ = ['AN', 'AN'] svdb_out_occ = ['ANgnomAD', 'ANneedLR'] @@ -41,7 +40,7 @@ params { cluster_merge_pos = 100 sniffles_args = "" - // CuteSV individual parameters + // CuteSV individual parameters cutesv_min_support = 5 min_size = 50 cutesv_min_mapq = 20 @@ -80,7 +79,7 @@ params { svdb_query_args = "" // Filtering parameters - min_caller_support = 1 + min_caller_support = 2 max_gnomad_af = 0.1 max_needlr_af = 0.006 caller_support_filter_args = "" @@ -89,7 +88,7 @@ params { // AnnotSV individual parameters genome_build = "GRCh38" output_vcf = 1 - min_sv_size = 0 + annotsv_min_sv_size = 0 annotsv_args = "" // References @@ -183,21 +182,60 @@ profiles { shifter.enabled = false charliecloud.enabled = false apptainer.enabled = false - process { - executor = 'lsf' - // scratch = '/omics/odcf/analysis/OE0415_projects/nb_lrs/LRS/2025-02-01_NB_LRS/05-analysis_tmp/nf-core-ontvar-test/run_amt/nextflow.config' - scratch = '/omics/odcf/analysis/OE0415_projects/nb_lrs/LRS/2025-02-01_NB_LRS/05-analysis_tmp/nf-core-ontvar-test/tmp' - memory = { 25.GB * task.attempt } - queue = { task.attempt <= 2 ? 'medium-debian' : 'long-debian' } - cpus = { Math.min(15 * task.attempt, 48) } - errorStrategy = 'retry' - maxRetries = 2 + + process { + executor = 'lsf' + scratch = '/omics/odcf/analysis/OE0415_projects/nb_lrs/LRS/2025-02-01_NB_LRS/05-analysis_tmp/nf-core-ontvar-test/tmp' + + // Queue selection based on your cluster configuration + queue = { + def hours = (task.time ?: 6.h).toHours() + + if (hours <= 10.0/60.0) { + 'short' + } else if (hours <= 1.0) { // 1 hour + 'medium' + } else if (hours <= 10.0) { // 10 hours + 'long' + } else if (mem > 200) { // More than 200GB memory + 'highmem' + } else { // Unlimited time, up to 200GB memory + 'verylong' + } + } + + // Resource limits based on your cluster specs + memory = { + def mem = (task.memory ?: 8.GB).toGiga() + if (mem > 200) { + // highmem queue - up to 4TB available but no hard limit + return Math.min(mem, 4000) + '.GB' + } else { + // Other queues - 200GB limit + return Math.min(mem, 200) + '.GB' + } } - executor { - name = 'lsf' - perTaskReserve = false - perJobMemLimit = true + cpus = { Math.min(task.cpus ?: 2, 10) } // Max 10 cores per your cluster config + time = { + def hours = (task.ext.time ?: 10.h).toHours() // Use task.ext.time or a fixed default + if (hours <= 10.0/60.0) { + '10.m' // short queue + } else if (hours <= 1.0) { + '1.h' // medium queue + } else if (hours <= 10.0) { + '10.h' // long queue + } else { + '240.h' // verylong - set reasonable max + } } + } + + executor { + name = 'lsf' + perTaskReserve = false + perJobMemLimit = true + queueSize = 100 + } } podman { podman.enabled = true @@ -388,5 +426,4 @@ validation { } // Load modules.config for DSL2 module specific options -// includeConfig 'conf/modules_params.config' includeConfig 'conf/modules.config' diff --git a/nextflow_schema.json b/nextflow_schema.json index ff3b2fb..0005ccd 100755 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -110,18 +110,11 @@ "description": "Path to VNTR BED file for Severus.", "fa_icon": "fas fa-file-alt" }, - "pon_file": { - "type": "string", - "format": "file-path", - "default": "${projectDir}/assets/pon/PoN_1000G_hg38_extended.tsv.gz", - "description": "Panel of Normals file.", - "fa_icon": "fas fa-database" - }, "PON": { "type": "string", "format": "file-path", "default": "${projectDir}/assets/pon/PoN_1000G_hg38_extended.tsv.gz", - "description": "Panel of Normals file (alias for pon_file).", + "description": "Panel of Normals file for filtering.", "fa_icon": "fas fa-database" } } @@ -134,7 +127,7 @@ "properties": { "svdb_databases": { "type": "array", - "items": {"type": "string"}, + "items": { "type": "string" }, "default": [ "${projectDir}/assets/svdb_databases/gnomad.v4.1.sv.sites.rmMissing.vcf.gz", "${projectDir}/assets/svdb_databases/UWONT_450_AF.vcf.gz" @@ -144,28 +137,28 @@ }, "svdb_in_frq": { "type": "array", - "items": {"type": "string"}, + "items": { "type": "string" }, "default": ["AF", "PFneedLR"], "description": "The frequency count tags in the database INFO columns.", "fa_icon": "fas fa-tags" }, "svdb_out_frq": { "type": "array", - "items": {"type": "string"}, + "items": { "type": "string" }, "default": ["AFgnomAD", "AFneedLR"], "description": "The frequency count tags to be included in the output.", "fa_icon": "fas fa-tags" }, "svdb_in_occ": { "type": "array", - "items": {"type": "string"}, + "items": { "type": "string" }, "default": ["AN", "AN"], "description": "The allele count tags in the database INFO columns.", "fa_icon": "fas fa-tags" }, "svdb_out_occ": { "type": "array", - "items": {"type": "string"}, + "items": { "type": "string" }, "default": ["ANgnomAD", "ANneedLR"], "description": "The allele count tags to be included in the output.", "fa_icon": "fas fa-tags" @@ -530,7 +523,7 @@ }, "false_positive_snv": { "type": ["string", "null"], - "format": "file-path", + "format": "file-path", "default": null, "description": "Path to false positive SNV file for AnnotSV.", "fa_icon": "fas fa-file-alt" @@ -549,7 +542,7 @@ "description": "Output VCF format in addition to TSV (1=yes, 0=no).", "fa_icon": "fas fa-file-code" }, - "min_sv_size": { + "annotsv_min_sv_size": { "type": "integer", "default": 0, "minimum": 0, @@ -775,4 +768,4 @@ "$ref": "#/$defs/generic_options" } ] -} \ No newline at end of file +} diff --git a/subworkflows/local/utils_nfcore_ontvar_pipeline/main.nf b/subworkflows/local/utils_nfcore_ontvar_pipeline/main.nf index 54a5028..f2c8e57 100755 --- a/subworkflows/local/utils_nfcore_ontvar_pipeline/main.nf +++ b/subworkflows/local/utils_nfcore_ontvar_pipeline/main.nf @@ -73,24 +73,13 @@ workflow PIPELINE_INITIALISATION { // Channel - .fromList(samplesheetToList(params.input, "${projectDir}/assets/schema_input.json")) - .map { - meta, fastq_1, fastq_2 -> - if (!fastq_2) { - return [ meta.id, meta + [ single_end:true ], [ fastq_1 ] ] - } else { - return [ meta.id, meta + [ single_end:false ], [ fastq_1, fastq_2 ] ] - } - } - .groupTuple() - .map { samplesheet -> - validateInputSamplesheet(samplesheet) - } - .map { - meta, fastqs -> - return [ meta, fastqs.flatten() ] - } - .set { ch_samplesheet } + .fromList(samplesheetToList(params.input, "${projectDir}/assets/schema_input.json")) + .map { + meta, sample_id, sample_type, bam_path -> + // Return: [group_id, sample_id, sample_type, bam_path] + return [meta.id, sample_id, sample_type, bam_path] + } + .set { ch_samplesheet } emit: samplesheet = ch_samplesheet diff --git a/workflows/ontvar.nf b/workflows/ontvar.nf index 71095b7..fa2c475 100755 --- a/workflows/ontvar.nf +++ b/workflows/ontvar.nf @@ -20,21 +20,23 @@ include { JASMINESV as JASMINESV_SAMPLE } from '../modules/nf-core/jasminesv/mai include { JASMINE_HEADER_FIX } from '../modules/local/jasmine_header_fix/main' include { JASMINESV as JASMINESV_COHORT } from '../modules/nf-core/jasminesv/main' include { FILTER_CHR } from '../modules/local/filter_chr/main' +include { ANNOTSV_ANNOTSV as ANNOTSV_COHORT_RAW } from '../modules/nf-core/annotsv/annotsv/main' include { ANNOTSV_ANNOTSV as ANNOTSV_COHORT } from '../modules/nf-core/annotsv/annotsv/main' -include { ANNOTSV_ANNOTSV as ANNOTSV_PER_SAMPLE_RAW } from '../modules/nf-core/annotsv/annotsv/main' -include { ANNOTSV_ANNOTSV as ANNOTSV_PER_SAMPLE } from '../modules/nf-core/annotsv/annotsv/main' +include { ANNOTSV_ANNOTSV as ANNOTSV_PER_SAMPLE_RAW } from '../modules/nf-core/annotsv/annotsv/main' +include { ANNOTSV_ANNOTSV as ANNOTSV_PER_SAMPLE } from '../modules/nf-core/annotsv/annotsv/main' include { ANNOTSV_INSTALLANNOTATIONS } from '../modules/nf-core/annotsv/installannotations/main' include { UNTAR as UNTAR_ANNOTSV } from '../modules/nf-core/untar/main' include { SUMMARIZE_SV_COUNTS as SUMMARIZE_CALLERS } from '../modules/local/summarize_sv_counts/main' include { SUMMARIZE_SV_COUNTS as SUMMARIZE_CALLER_MERGED } from '../modules/local/summarize_sv_counts/main' include { SUMMARIZE_SV_COUNTS as SUMMARIZE_CALLER_MERGED_FILTERED } from '../modules/local/summarize_sv_counts/main' -include { SUMMARIZE_SV_COUNTS as SUMMARIZE_COHORT } from '../modules/local/summarize_sv_counts/main' +include { SUMMARIZE_SV_COUNTS as SUMMARIZE_COHORT_ANNOTATED } from '../modules/local/summarize_sv_counts/main' +include { SUMMARIZE_SV_COUNTS as SUMMARIZE_COHORT_FILTERED } from '../modules/local/summarize_sv_counts/main' include { SVDB_QUERY as SVDB_QUERY_SAMPLE } from '../modules/nf-core/svdb/query/main' include { SVDB_QUERY as SVDB_QUERY_COHORT } from '../modules/nf-core/svdb/query/main' include { BCFTOOLS_VIEW as CALLER_SUPPORT_FILTER } from '../modules/nf-core/bcftools/view/main' include { BCFTOOLS_VIEW as AF_FILTER } from '../modules/nf-core/bcftools/view/main' include { BCFTOOLS_VIEW as AF_FILTER_COHORT } from '../modules/nf-core/bcftools/view/main' -include { BCFTOOLS_SORT as BCFTOOLS_SORT_SAMPLE } from '../modules/nf-core/bcftools/sort/main' +include { BCFTOOLS_SORT as SORT_VCF } from '../modules/nf-core/bcftools/sort/main' /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -56,22 +58,24 @@ workflow ONTVAR { ch_multiqc_files = Channel.empty() ch_sample_info = ch_samplesheet + // ch_sample_info now contains: [group_id, sample_id, sample_type, bam_path] + // [ 0 , 1 , 2 , 3 ] cases = ch_sample_info - .filter { it[2] == 'case' } + .filter { it[2] == 'case' } // sample_type is index 2 controls = ch_sample_info - .filter { it[2] == 'control' } + .filter { it[2] == 'control' } // sample_type is index 2 controls_by_match = controls - .map { c -> tuple(c[3], c[1]) } + .map { c -> tuple(c[0], c[3]) } // [group_id, bam_path] sv_input = cases - .map { c -> tuple(c[3], c[0], c[1]) } // map to [match_id, sample_id, bam_path] - .join(controls_by_match, remainder: true) // join on match_id - .map { it -> - def sample_id = it[1] + .map { c -> tuple(c[0], c[0], c[3]) } // map to [group_id, group_id, bam_path] - using group_id as sample name + .join(controls_by_match, remainder: true) // join on group_id + .map { it -> + def group_id = it[0] // Use group_id as sample identifier def case_bam = it[2] def control_bam = it[3] // control_bam comes from join - tuple(sample_id, case_bam, control_bam ?: null) + tuple(group_id, case_bam, control_bam ?: null) } // ────────────────────────────────────────────────────────────────────── @@ -81,8 +85,8 @@ workflow ONTVAR { // SNIFFLES sniffles_input = sv_input .map { it -> - def sample_id = it[0] - tuple([id: "${sample_id}_sniffles", sample: sample_id], it[1], file("${it[1]}.bai")) + def group_id = it[0] + tuple([id: "${group_id}", sample: group_id], it[1], file("${it[1]}.bai")) } SNIFFLES( @@ -92,14 +96,14 @@ workflow ONTVAR { Channel.value(true), // Input 4: vcf_output Channel.value(false) // Input 5: snf_output ) - + // CUTESV cutesv_input = sv_input .map { it -> - def sample_id = it[0] - tuple([id: "${sample_id}_cutesv", sample: sample_id], it[1], file("${it[1]}.bai")) + def group_id = it[0] + tuple([id: "${group_id}", sample: group_id], it[1], file("${it[1]}.bai")) } - + CUTESV( cutesv_input, Channel.value(tuple([id: "reference"], file(reference))) @@ -108,24 +112,24 @@ workflow ONTVAR { // SEVERUS severus_with_control_input = sv_input.filter { it[2] } .map { it -> - def sample_id = it[0] + def group_id = it[0] def case_bam = it[1] def control_bam = it[2] - tuple([id: sample_id, sample: sample_id, has_control: true, control_bam: control_bam], case_bam, file("${case_bam}.bai"), control_bam, file("${control_bam}.bai"), []) + tuple([id: group_id, sample: group_id, has_control: true, control_bam: control_bam], case_bam, file("${case_bam}.bai"), control_bam, file("${control_bam}.bai"), []) } - + severus_no_control_input = sv_input.filter { !it[2] } .map { it -> - def sample_id = it[0] + def group_id = it[0] def case_bam = it[1] - tuple([id: sample_id, sample: sample_id, has_control: false], case_bam, file("${case_bam}.bai"), [], [], []) + tuple([id: group_id, sample: group_id, has_control: false], case_bam, file("${case_bam}.bai"), [], [], []) } - + SEVERUS_WITH_CONTROL( severus_with_control_input, // Input 1: [meta, target_bam, target_bai, control_bam, control_bai, vcf] Channel.value(tuple([id: "vntr"], file(params.vntr_bed))) // Input 2: [meta, vntr_bed] ) - + SEVERUS_NO_CONTROL( severus_no_control_input, // Input 1: [meta, target_bam, target_bai, control_bam, control_bai, vcf] Channel.value(tuple([id: "vntr"], file(params.vntr_bed))) // Input 2: [meta, vntr_bed] @@ -133,8 +137,8 @@ workflow ONTVAR { severus_vcfs = SEVERUS_WITH_CONTROL.out.somatic_vcf .mix(SEVERUS_NO_CONTROL.out.somatic_vcf) - .map { meta, vcf -> - tuple(meta, vcf, 'severus') + .map { meta, vcf -> + tuple(meta, vcf, 'severus') } | RENAME_VCF // ────────────────────────────────────────────────────────────────────── @@ -153,12 +157,15 @@ workflow ONTVAR { all_caller_vcfs = sniffles_renamed_vcfs.mix(cutesv_renamed_vcfs, severus_renamed_vcfs) .map { meta, vcf -> tuple(meta, vcf) } - // Raw caller summaries (multi-caller summary for 01-raw-calls directory) + // Raw caller summaries + all_caller_vcfs_for_summary = all_caller_vcfs + .map { meta, vcf -> vcf } + .collect() + .map { vcf_list -> tuple([id: "raw_caller_summary"], vcf_list) } + SUMMARIZE_CALLERS( - all_caller_vcfs - .collect { meta, vcf -> vcf } - .map { vcf_list -> tuple([id: "01_raw_calls"], vcf_list) }, - Channel.value("01_raw_calls") + all_caller_vcfs_for_summary, + Channel.value("raw_calls") ) // ────────────────────────────────────────────────────────────────────── @@ -166,21 +173,21 @@ workflow ONTVAR { // ────────────────────────────────────────────────────────────────────── sv_calls_by_sample = all_caller_vcfs - .map { meta, vcf -> - def sample_name = meta.containsKey('sample') ? meta.sample : meta.id.replaceAll('_(sniffles|cutesv|severus)$', '') - tuple(sample_name, vcf) + .map { meta, vcf -> + def group_id = meta.sample ?: meta.id // Use sample field first, fallback to id + tuple(group_id, vcf) } .groupTuple(by: 0) // ────────────────────────────────────────────────────────────────────── - // Run Jasmine to merge SVs from callers per sample + // Run Jasmine to merge SVs from callers per sample // ────────────────────────────────────────────────────────────────────── jasminesv_sample_input = sv_calls_by_sample .filter { meta, vcf_list -> vcf_list.size() > 0 } - .map { meta, vcf_list -> - def sample_id = meta - tuple([id: "${sample_id}_jasmine", sample: sample_id], vcf_list, [], []) + .map { meta, vcf_list -> + def group_id = meta + tuple([id: group_id, sample: group_id, step: "consensus"], vcf_list, [], []) } // Prepare Jasmine input channels (per-sample) @@ -213,23 +220,23 @@ workflow ONTVAR { sample_filtered = ch_jasmine_sample_vcfs | FILTER_CHR - sample_filtered | BCFTOOLS_SORT_SAMPLE - sample_sorted = BCFTOOLS_SORT_SAMPLE.out.vcf + sample_filtered | SORT_VCF + sample_sorted = SORT_VCF.out.vcf // ────────────────────────────────────────────────────────────────────── // Filter SVs supported by ≥2 callers // ────────────────────────────────────────────────────────────────────── bcftools_sample_input = sample_sorted - .map { meta, vcf -> - def v = vcf.toString() - def updated_meta = meta + [sample: meta.sample ?: meta.id?.replaceAll('_jasmine.*', '')] - def idx = file(v + '.csi') - if( !idx.exists() ) idx = file(v + '.tbi') - def idx_out = idx.exists() ? idx : [] - tuple(updated_meta, file(v), idx_out) - } - + .map { meta, vcf -> + def v = vcf.toString() + def updated_meta = [id: meta.sample ?: meta.id, sample: meta.sample ?: meta.id, step: "caller_support"] + def idx = file(v + '.csi') + if( !idx.exists() ) idx = file(v + '.tbi') + def idx_out = idx.exists() ? idx : [] + tuple(updated_meta, file(v), idx_out) + } + CALLER_SUPPORT_FILTER( bcftools_sample_input, Channel.value([]), // regions @@ -237,17 +244,21 @@ workflow ONTVAR { Channel.value([]) // samples ) + // Consensus summary - simple approach + consensus_summary_input = CALLER_SUPPORT_FILTER.out.vcf + .map { meta, vcf -> vcf } + .collect() + .map { vcf_list -> tuple([id: "consensus_summary"], vcf_list) } + SUMMARIZE_CALLER_MERGED( - CALLER_SUPPORT_FILTER.out.vcf - .first() - .map { meta, vcf -> tuple([id: "02_caller_merged"], vcf) }, - Channel.value("02_caller_merged") + consensus_summary_input, + Channel.value("consensus") ) // ────────────────────────────────────────────────────────────────────── // SAMPLE LEVEL AF ANNOTATION + FILTERING + ANNOTSV ANNOTATION // ────────────────────────────────────────────────────────────────────── - + ch_per_sample_input = CALLER_SUPPORT_FILTER.out.vcf .map { meta, vcf -> tuple(meta, vcf) } @@ -269,13 +280,14 @@ workflow ONTVAR { ) ch_per_sample_bcftools_input = SVDB_QUERY_SAMPLE.out.vcf - .map { meta, annotated_vcf -> - def idx = file(annotated_vcf.toString() + '.csi') - if( !idx.exists() ) idx = file(annotated_vcf.toString() + '.tbi') - def idx_out = idx.exists() ? idx : [] - tuple(meta, file(annotated_vcf), idx_out) - } - + .map { meta, annotated_vcf -> + def updated_meta = [id: meta.sample ?: meta.id, sample: meta.sample ?: meta.id, step: "af_filter"] + def idx = file(annotated_vcf.toString() + '.csi') + if( !idx.exists() ) idx = file(annotated_vcf.toString() + '.tbi') + def idx_out = idx.exists() ? idx : [] + tuple(updated_meta, file(annotated_vcf), idx_out) + } + ch_bcftools_regions = Channel.value([]) ch_bcftools_targets = Channel.value([]) ch_bcftools_samples = Channel.value([]) @@ -287,12 +299,15 @@ workflow ONTVAR { ch_bcftools_samples ) - // Filtered merged summary + // Filtered summary - simple approach + filtered_summary_input = AF_FILTER.out.vcf + .map { meta, vcf -> vcf } + .collect() + .map { vcf_list -> tuple([id: "filtered_summary"], vcf_list) } + SUMMARIZE_CALLER_MERGED_FILTERED( - AF_FILTER.out.vcf - .first() - .map { meta, vcf -> tuple([id: "03_caller_merged_filtered"], vcf) }, - Channel.value("03_caller_merged_filtered") + filtered_summary_input, + Channel.value("filtered") ) // ────────────────────────────────────────────────────────────────────── @@ -332,9 +347,10 @@ workflow ONTVAR { ANNOTSV_PER_SAMPLE( AF_FILTER.out.vcf - .map { meta, vcf -> - meta.id = meta.id + "_annotated" - tuple(meta, vcf, [], []) }, + .map { meta, vcf -> + def updated_meta = [id: meta.sample ?: meta.id, sample: meta.sample ?: meta.id, step: "final_annotation"] + tuple(updated_meta, vcf, [], []) + }, ch_annotsv_annotations, ch_candidate_genes, ch_false_positive_snv, @@ -350,7 +366,7 @@ workflow ONTVAR { .collect() jasminesv_cohort_input = sample_consensus_vcfs - .map { vcf_list -> + .map { vcf_list -> tuple([id: "cohort"], vcf_list, [], []) } @@ -391,6 +407,26 @@ workflow ONTVAR { ch_svdb_cohort_bedpe ) + ch_candidate_genes_cohort = Channel.value(tuple([id: "candidate_genes"], [])) + ch_false_positive_snv_cohort = Channel.value(tuple([id: "false_positive_snv"], [])) + ch_gene_transcripts_cohort = Channel.value(tuple([id: "gene_transcripts"], [])) + + ANNOTSV_COHORT_RAW( + SVDB_QUERY_COHORT.out.vcf + .map { meta, vcf -> tuple(meta, vcf, [], []) }, + ch_annotsv_annotations, + ch_candidate_genes_cohort, + ch_false_positive_snv_cohort, + ch_gene_transcripts_cohort + ) + + // Cohort summaries - one per cohort directory + SUMMARIZE_COHORT_ANNOTATED( + SVDB_QUERY_COHORT.out.vcf + .map { meta, vcf -> tuple([id: "cohort_annotated_summary"], vcf) }, + Channel.value("cohort_annotated") + ) + // ────────────────────────────────────────────────────────────────────── // Filter annotated SVs based on AF // ────────────────────────────────────────────────────────────────────── @@ -407,10 +443,10 @@ workflow ONTVAR { ) // Cohort summaries - one per cohort directory - SUMMARIZE_COHORT( + SUMMARIZE_COHORT_FILTERED( AF_FILTER_COHORT.out.vcf - .map { meta, vcf -> tuple(meta, vcf) }, - Channel.value("cohort") + .map { meta, vcf -> tuple([id: "cohort_filtered_summary"], vcf) }, + Channel.value("cohort_filtered") ) // ────────────────────────────────────────────────────────────────────── @@ -422,10 +458,6 @@ workflow ONTVAR { tuple(meta, filtered_vcf, [], []) } - ch_candidate_genes_cohort = Channel.value(tuple([id: "candidate_genes"], [])) - ch_false_positive_snv_cohort = Channel.value(tuple([id: "false_positive_snv"], [])) - ch_gene_transcripts_cohort = Channel.value(tuple([id: "gene_transcripts"], [])) - ANNOTSV_COHORT( annotsv_input, ch_annotsv_annotations, @@ -437,14 +469,14 @@ workflow ONTVAR { // ────────────────────────────────────────────────────────────────────── // Collate and save software versions // ────────────────────────────────────────────────────────────────────── - + ch_versions = ch_versions.mix(SNIFFLES.out.versions) ch_versions = ch_versions.mix(CUTESV.out.versions) ch_versions = ch_versions.mix(SEVERUS_WITH_CONTROL.out.versions) ch_versions = ch_versions.mix(SEVERUS_NO_CONTROL.out.versions) ch_versions = ch_versions.mix(JASMINESV_SAMPLE.out.versions) ch_versions = ch_versions.mix(JASMINESV_COHORT.out.versions) - ch_versions = ch_versions.mix(BCFTOOLS_SORT_SAMPLE.out.versions) + ch_versions = ch_versions.mix(SORT_VCF.out.versions) ch_versions = ch_versions.mix(SVDB_QUERY_SAMPLE.out.versions) ch_versions = ch_versions.mix(SVDB_QUERY_COHORT.out.versions) ch_versions = ch_versions.mix(CALLER_SUPPORT_FILTER.out.versions) @@ -513,4 +545,3 @@ workflow ONTVAR { THE END ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -