In [19]:
# |default_exp pipeline_runner
# You need this at the top of every notebook you want turned into a module, the name your provide will determine the module name

# Libraries


In [7]:
# |export
# That export there, it makes sure this code goes into the module.

# standard libs
import os
import re

# Common to template
# add into settings.ini, requirements, package name is python-dotenv, for conda build ensure `conda config --add channels conda-forge`
import dotenv  # for loading config from .env files, https://pypi.org/project/python-dotenv/
import envyaml  # Allows to loads env vars into a yaml file, https://github.com/thesimj/envyaml
import fastcore  # To add functionality related to nbdev development, https://github.com/fastai/fastcore/
from fastcore import (
    test,
)
from fastcore.script import (
    call_parse,
)  # for @call_parse, https://fastcore.fast.ai/script
import json  # for nicely printing json and yaml
from fastcore import test
from ssi_analysis_utility import (
    core,
    sample_manager,
    analysis_utility,
)
from fastcore.script import call_parse
from pathlib import Path #to be able write :Path in cli function
import snakemake 

# Project specific libraries
import pandas as pd
import numpy as np
import subprocess

In [21]:
# To change working directory when running notebook only. DO NOT EXPORT
os.chdir(core.PROJECT_DIR)

# Functions


#### Check for required arguments

The function will check if any of the required arguments are missing. If yes, the function will report which arguments are missing, and exit.

In [22]:
# |export

def arguments_present_check(input_config):
    #Check if an input-file is provided, either as argument or unit-test. If not, exit with error
    if input_config["assembly_file"] is None and input_config["Illumina_read_files"] is None and input_config["Nanopore_read_file"] is None and input_config["samplesheet"] is None and input_config["input_folder"] is None:
        print(f"Input data must be given in the form of sequence data (assembly_file, Illumina_read_files and/or Nanopore_read_files), an input_dir with sequencing data, or a metadatas tsv with paths to those files")
        return(False)
    elif input_config["load_from_folder"] and input_config["input_folder"] is None:
        print(f"When running --load_from_folder an input directory must be provided with --input_folder")
    elif input_config["load_from_samplesheet"] and (not input_config["samplesheet"] or input_config["samplesheet"] is None):
        print(f"When running --load_from_samplesheet a samplesheet must be provided with --samplesheet")
    else:
        return(True)


In [8]:
def run_pipeline(snakefile: Path, workdir: Path = Path("./output"), cores: int = 1):
    """
    Execute a Snakemake pipeline programmatically.

    Args:
        snakefile (Path): Path to the Snakefile defining the workflow.
        workdir (Path): Working directory for the pipeline execution. Defaults to the current directory.
        cores (int): Number of cores to use for the pipeline. Defaults to 1.
    """
    success = snakemake(
        snakefile=str(snakefile),
        cores=cores,
        workdir=str(workdir),
        use_conda=True,  # Enable Conda if needed
        force_incomplete=True,  # Rerun incomplete jobs
        printreason=True,  # Show reasons for executing rules
        dryrun=True,  # Set to True to preview the pipeline execution
    )
    if not success:
        raise RuntimeError("Snakemake pipeline failed!")

### Spyogenes_analysis

The species specific Spyogenes_analysis class inherits attributes and methods from sample_manager and analysis_utility.

This includes attributes sample_name, assembly_file, Illumina_read_file, Nanopore_read_file, analysis_folder (output folder to store analyses for that specific sample).

And numerous methods with generic analyses and functions to set up folders and logging in a standardized manner.


In [23]:
# |export

class Spyogenes_analysis(analysis_utility.analysis_utility):
    """
    Class to perform analysis for Streptococcus pyogenes.
    """
    
    species = "Streptococcus pyogenes"
    
    def __init__(self, 
                 attributes, 
                 input_folder, 
                 output_folder, 
                 analysis_config):
        """
        Initialize the Spyogenes_analysis class with sample attributes, input folder, output folder, 
        and analysis configuration.

        Parameters:
        -------------
        attributes : dict
            Dictionary containing sample information such as sample name, input files, etc.
        input_folder : str
            Path to the input folder containing required files for the analysis.
        output_folder : str
            Path to the output folder where analysis results will be saved.
        analysis_config : dict
            Dictionary containing configuration settings for the analyses to be performed.
        """
        attributes = attributes.copy()
        analysis_config = analysis_config.copy()
        super().__init__(attributes, input_folder, output_folder, analysis_config)
        self.run_analyses(analysis_config)
    
    def run_analyses(self, 
                     analysis_config):
        """
        Runs the specified analyses on the sample based on the provided configuration.

        Parameters:
        -------------
        analysis_config : dict
            Configuration dictionary specifying which analyses to run and their settings.
        """
        self.sample_setup()  # Setup sample before starting analyses
        analyses_to_run = analysis_config["analyses_to_run"]
        
        # Run lineage determination if specified
        if "assembly_lineage_determination" in analyses_to_run:
            lineage_determination_config = analysis_config["assembly_lineage_determination"]
            self._assembly_lineage_determination_(lineage_determination_config)
        
        # Run emm typing if specified
        if "emm_typing" in analyses_to_run:
            emm_typing_config = analysis_config["emm_typing"]
            self._emm_typing_(emm_typing_config)
        
        # Run resistance gene detection if specified
        if "resistance_gene_detection" in analyses_to_run:
            AMR_blast_config = analysis_config["resistance_gene_detection"]
            self._blast_presence_absence_(AMR_blast_config)
        
        # Run virulence gene detection if specified
        if "virulence_gene_detection" in analyses_to_run:
            VFDB_blast_config = analysis_config["virulence_gene_detection"]
            VFDB_blast_config["results_format"] = "string"
            self._blast_presence_absence_(VFDB_blast_config)
        
        #self.sample_cleanup()  # Cleanup after analyses are done
        self.write_to_tsv()  # Write results to a TSV file


class Spyogenes_manager(analysis_utility.analysis_manager):
    """
    A class to manage the analysis of multiple Streptococcus pyogenes samples.
    """
    
    def __init__(self, 
                 input_config, 
                 analysis_settings_config):
        """
        Initializes the Spyogenes_manager with input configuration and analysis settings.

        Parameters:
        -------------
        input_config : dict
            Configuration for input files and sample handling.
        analysis_settings_config : dict
            Configuration for the specific analyses to be run on the samples.
        """
        super().__init__(input_config, 
                         analysis_settings_config)

    def init_sample(self, 
                    attributes):
        """
        Initializes a sample for analysis by creating an instance of `Spyogenes_analysis`.

        Parameters:
        -------------
        attributes : dict
            Dictionary containing sample-specific information, such as file paths and sample name.

        Returns:
        -------------
        Spyogenes_analysis
            An instance of `Spyogenes_analysis` initialized with the given sample attributes.
        """
        try:
            output_folder = os.path.abspath(attributes["output_folder"])
        except KeyError:
            output_folder = os.path.join(self.base_output_folder, attributes["sample_name"])
        
        sample = Spyogenes_analysis(attributes, self.base_input_folder, output_folder, self.analysis_settings_config)
        return sample


def print_fasta_from_dict(fasta_dict, output_file):
    """
    Prints a dictionary of sequences in FASTA format to an output file.

    Parameters:
    -------------
    fasta_dict : dict
        Dictionary where keys are sequence headers and values are sequence strings.
    output_file : str
        Path to the output file where the FASTA data will be written.
    """
    printlines = ""
    for header, sequence in fasta_dict.items():
        printlines += ">" + header + "\n" + sequence + "\n"
    
    with open(output_file, 'w') as o:
        o.write(printlines)


### Testing

The log parser unit test parses an rclone log found in tests/rclone_log_parser_test_log.txt. It verifies basic information like number of succesful and unsuccesful transfers in the log, and prints a tsv-formatted summary table of each transfer instance.

Should there be some try/except statements here, so the user get some kind of direct feedback like "all tests passed" (or not, if thats the case)? Right now, I dont think its very obvious whether the test was successful or not, from the user-side.

In [24]:
# |export

def single_test():
    config = core.get_config()
    example_sample = Spyogenes_analysis({#"sample_name":"GAS-2022-1029",
                                       "assembly_file":"examples/GAS-2022-1029.fasta",
                                       "Illumina_read_files":["examples/GAS-2022-1029_S42_L555_R1_001.fastq.gz","examples/GAS-2022-1029_S42_L555_R2_001.fastq.gz"]},
                                       input_folder = False,
                                       output_folder = "output/GAS-2022-1029",
                                       analysis_config = config["analysis_settings"]["Spyogenes"])
    #assert(example_sample.sample_name == "GAS-2022-1029")
    assert(len(example_sample.Illumina_read_files) == 2)
    assert(not example_sample.Nanopore_read_file)
    assert(example_sample.analysis_results["emm_typing"]["EMM_type"] == "1.0")
    assert(example_sample.analysis_results["assembly_lineage_determination"]["Lineage"] == "M1DK")

def single_test_input_folder():
    config = core.get_config()
    example_sample = Spyogenes_analysis({"sample_name":"GAS-2024-0773",
                                       "assembly_file":"GAS-2024-0773.fasta",
                                       "Illumina_read_files":"GAS-2024-0773_S35_L555_R1_001.fastq.gz,GAS-2024-0773_S35_L555_R2_001.fastq.gz"},
                                       input_folder = "examples",
                                       output_folder = "output/GAS-2024-0773",
                                       analysis_config = config["analysis_settings"]["Spyogenes"])
    assert(example_sample.sample_name == "GAS-2024-0773")
    assert(len(example_sample.Illumina_read_files) == 2)
    assert(not example_sample.Nanopore_read_file)
    assert(example_sample.analysis_results["emm_typing"]["EMM_type"] == "4.0")
    assert(example_sample.analysis_results["emm_typing"]["ENN_type"] == "203.3*")
    assert(example_sample.analysis_results["emm_typing"]["MRP_type"] == "156.0")
    assert(example_sample.analysis_results["assembly_lineage_determination"]["Lineage"] == "-")


def folder_test():
    config = core.get_config()
    input_config =  config["input_manager"]
    input_config["load_from_folder"] = True
    input_config["input_folder"] = "examples/"
    #input_config["samplesheet"] = "samplesheet.tsv"
    input_config["output_folder"] = "output_from_folder/"
    input_config["analysis_config"] = config["analysis_settings"]["Spyogenes"]
    test = Spyogenes_manager(input_config,config["analysis_settings"]["Spyogenes"])


def folder_test_with_metadata():
    config = core.get_config()
    input_config =  config["input_manager"]
    input_config["load_from_folder"] = True
    input_config["input_folder"] = "examples/"
    input_config["samplesheet"] = "examples/samplesheet.tsv"
    input_config["output_folder"] = "output_from_folder/"
    input_config["analysis_config"] = config["analysis_settings"]["Spyogenes"]
    test = Spyogenes_manager(input_config,config["analysis_settings"]["Spyogenes"])

def samplesheet_test():
    config = core.get_config()
    input_config =  config["input_manager"]
    input_config["load_from_samplesheet"] = True
    input_config["samplesheet"] = "examples/samplesheet.tsv"
    input_config["output_folder"] = "output_from_samplesheet/"
    input_config["analysis_config"] = config["analysis_settings"]["Spyogenes"]
    test = Spyogenes_manager(input_config,config["analysis_settings"]["Spyogenes"])
    print(test.__dict__)
    for x in test:
        print(x.__dict__)




In [25]:
single_test()

In [26]:
single_test_input_folder()

In [27]:
folder_test()

In [28]:
folder_test_with_metadata()

In [29]:
samplesheet_test()

{'analysis_settings_config': {'analyses_to_run': {'emm_typing': 'Typing of M protein gene using blast against CDC curated alleles', 'assembly_lineage_determination': 'Lineage determination based on presence of specified SNPs when mapping genome assembly against reference genome', 'resistance_gene_detection': 'Presence of resistance gens tetM, ermA and ermB', 'virulence_gene_detection': 'Presence of Streptococcal virulence genes from the Virulence Factor Database'}, 'assembly_lineage_determination': {'alias': 'assembly_lineage_determination', 'percent_snp_threshold': 50, 'files_to_clean': ['delta', 'filtered_delta', 'frankenfasta'], 'reference_fasta_file': './resources/lineage_determination/MGAS5005.fasta', 'lineage_variant_file': './resources/lineage_determination/Spyogenes_LOCs.tsv'}, 'emm_typing': {'alias': 'emm_typing', 'emm_allele_file': './resources/emm_typing/emm_alleles.fasta', 'emm_cluster_file': './resources/emm_typing/emm_enn_mrp_subgroups.txt', 'blast_header': 'qseqid sseqid

#### Cli function

This function is responsible for the execution of the current notebook, including parsing arguments.

The current version of the code require specification of an input log-file. It can be specified using the --input argument, or by providing a config-file as argument (with the log-file specified). If neither argument is specified, the script will exit with an error.

It is also possible to run a "unit-test", by specyfying the "unit_test" flag. In that case, the script will run on the test log-file provided with the repo, and exit without doing anything else.

In [30]:
# |export

@call_parse
def cli(
    #Definition of command-line arguments
    assembly_file:str = None, # Path to genome assembly file in fasta format
    Illumina_read_files:str = None, # Path to paired end read files, comma separated.
    Nanopore_read_file:str = None, # Path to single end Nanopore read file
    samplesheet:str = None, # Path to samplesheet containing metadata in tsv-format
    input_folder:str = None, # Path to input folder containing assembly files, Illumina read files or Nanopore read files. Overwrites any provided assembly_file and read_file input
    output_folder:str = None, # Path to output folder
    analyses_to_run: str = "all", # Comma separated list of analyses to perform. F.ex. "emm_typing,virulence_gene_detection"
    keep_files: str = None, # Intermediate outputs to keep. See readme for details on string format
    # load_from_folder: bool = False, # Flag to indicate
    load_from_samplesheet: bool = False, # Run analyses on samples listed in provided samplesheet
    config_file: Path = None, #Config-file containing all required arguments (required, unless using input arg)
    show_analyses: bool = False, # Print available analyses and exit
) -> None:
    #Set env vars and get config variables
    config = core.get_config(config_file)
    input_config = config["input_manager"]
    analysis_config = config["analysis_settings"]["Spyogenes"]
    if show_analyses:
        analysis_utility.print_analysis_options(analysis_config)
    else:
        input_config,analysis_config = analysis_utility.update_cli_input_config(input_config,analysis_config,samplesheet,input_folder,assembly_file,Illumina_read_files,Nanopore_read_file,output_folder,load_from_samplesheet,analyses_to_run,keep_files)
        #Check if any required arguments are missing
        if arguments_present_check(input_config):
            print("Running analyses")
            if input_folder is not None or load_from_samplesheet:
                Spyogenes_manager(input_config,analysis_config)
            else:
                sample_attributes = sample_manager.get_single_sample_attributes(input_config)
                Spyogenes_analysis(sample_attributes,input_folder=False,output_folder=output_folder,analysis_config=analysis_config)
        #Possible next: print some summary information to stdout about transfers found? Not sure if that is necessary in production.
        #Next: write function that adds data into transfer-db

In [31]:
# | hide
# This is included at the end to ensure when you run through your notebook the code is also transferred to the module and isn't just a notebook
import nbdev

nbdev.nbdev_export()

In [32]:
config = core.get_config()
input_config =  config["input_manager"]
input_config["load_from_samplesheet"] = True
input_config["samplesheet"] = "examples/samplesheet.tsv"
input_config["output_folder"] = "output_from_samplesheet/"
input_config["analysis_config"] = config["analysis_settings"]["Spyogenes"]

sample_manager.get_single_sample_attributes({"sample_name":"GAS-2024-0773",
                                       "assembly_file":"GAS-2024-0773.fasta",
                                       "Illumina_read_files":"GAS-2024-0773_S35_L555_R1_001.fastq.gz,GAS-2024-0773_S35_L555_R2_001.fastq.gz",
                                       "samplesheet": None})

{'sample_name': 'GAS-2024-0773',
 'assembly_file': 'GAS-2024-0773.fasta',
 'Illumina_read_files': 'GAS-2024-0773_S35_L555_R1_001.fastq.gz,GAS-2024-0773_S35_L555_R2_001.fastq.gz'}