# Directive for creating a script for your notebook

The block here below is required at the top of each notebook that you want to create a script for. You will also need to edit the "settings.ini" file, to create a script (see [Coding in NBdev](https://dksund.sharepoint.com/:fl:/g/contentstorage/CSP_7c761ee7-b577-4e08-8517-bc82392bf65e/ETlSfUyArSNJhX8veMI_JQ8By1aXGHzDJkhotpfpXx4mmw?e=037EwH&nav=cz0lMkZjb250ZW50c3RvcmFnZSUyRkNTUF83Yzc2MWVlNy1iNTc3LTRlMDgtODUxNy1iYzgyMzkyYmY2NWUmZD1iJTIxNXg1MmZIZTFDRTZGRjd5Q09TdjJYblkwVlNiWXFYcE1yaHVrVmZqTVJUVEE4X1VwZjhTd1JxcjRNdmFrSmh2RCZmPTAxVlVLVzVWSlpLSjZVWkFGTkVORVlLN1pQUERCRDZKSVAmYz0lMkYmYT1Mb29wQXBwJnA9JTQwZmx1aWR4JTJGbG9vcC1wYWdlLWNvbnRhaW5lciZ4PSU3QiUyMnclMjIlM0ElMjJUMFJUVUh4a2EzTjFibVF1YzJoaGNtVndiMmx1ZEM1amIyMThZaUUxZURVeVpraGxNVU5GTmtaR04zbERUMU4yTWxodVdUQldVMkpaY1Zod1RYSm9kV3RXWm1wTlVsUlVRVGhmVlhCbU9GTjNVbkZ5TkUxMllXdEthSFpFZkRBeFZsVkxWelZXU1RJMVJsaFBNalkyUlZkQ1FqTTFRVmhKVTBkRFVVcFdXa1klM0QlMjIlMkMlMjJpJTIyJTNBJTIyNzRmNzM1ZmUtYzg4Ny00MjhhLWFkZmYtNTEyZTg2YmNmZmQzJTIyJTdE) 
(**Writing your own notebooks**) on loop for more details). Replace **some_string** with a name that makes sense for your notebook. 

In [None]:
# |default_exp Hinfluenzae_parser


# Libraries
Include all the libraries which should be used in this module. You can also import modules from other notebooks; here, we have imported the functions in the core notebook.

In [None]:
# |export

# standard libs
import os
import re

# Common to template
# add into settings.ini, requirements, package name is python-dotenv, for conda build ensure `conda config --add channels conda-forge`
import dotenv  # for loading config from .env files, https://pypi.org/project/python-dotenv/
import envyaml  # Allows to loads env vars into a yaml file, https://github.com/thesimj/envyaml
import fastcore  # To add functionality related to nbdev development, https://github.com/fastai/fastcore/
from fastcore import (
    test,
)
from fastcore.script import (
    call_parse,
)  # for @call_parse, https://fastcore.fast.ai/script
import json  # for nicely printing json and yaml

#import functions from core module (optional, but most likely needed). 
from ssi_analysis_result_parsers import(
    core,
    blast_parser,
)
#from ssi_analysis_result_parsers.blast_parser import extract_presence_absence

# Project specific libraries
from pathlib import Path
import pandas
import numpy
import sys

In [None]:
# This block should never be exported. It is to have python running in the project (and not the nbs) dir, and to initiate the package using pip.
os.chdir(core.PROJECT_DIR)

# Functions

Add your code here below. If your notebook will be used as a console-script, you need to add a "cli"-function, at the end (see [Coding in NBdev](https://dksund.sharepoint.com/:fl:/g/contentstorage/CSP_7c761ee7-b577-4e08-8517-bc82392bf65e/ETlSfUyArSNJhX8veMI_JQ8By1aXGHzDJkhotpfpXx4mmw?e=037EwH&nav=cz0lMkZjb250ZW50c3RvcmFnZSUyRkNTUF83Yzc2MWVlNy1iNTc3LTRlMDgtODUxNy1iYzgyMzkyYmY2NWUmZD1iJTIxNXg1MmZIZTFDRTZGRjd5Q09TdjJYblkwVlNiWXFYcE1yaHVrVmZqTVJUVEE4X1VwZjhTd1JxcjRNdmFrSmh2RCZmPTAxVlVLVzVWSlpLSjZVWkFGTkVORVlLN1pQUERCRDZKSVAmYz0lMkYmYT1Mb29wQXBwJnA9JTQwZmx1aWR4JTJGbG9vcC1wYWdlLWNvbnRhaW5lciZ4PSU3QiUyMnclMjIlM0ElMjJUMFJUVUh4a2EzTjFibVF1YzJoaGNtVndiMmx1ZEM1amIyMThZaUUxZURVeVpraGxNVU5GTmtaR04zbERUMU4yTWxodVdUQldVMkpaY1Zod1RYSm9kV3RXWm1wTlVsUlVRVGhmVlhCbU9GTjNVbkZ5TkUxMllXdEthSFpFZkRBeFZsVkxWelZXU1RJMVJsaFBNalkyUlZkQ1FqTTFRVmhKVTBkRFVVcFdXa1klM0QlMjIlMkMlMjJpJTIyJTNBJTIyNzRmNzM1ZmUtYzg4Ny00MjhhLWFkZmYtNTEyZTg2YmNmZmQzJTIyJTdE) 
(**Code execution** and **Input, output and options**) on loop for more details)

In [None]:
# | export

def get_biotype_from_gene_presence(biotype_gene_presence_dict: Path) -> dict:
    """
    Convert biotype gene presence dict to biotype
    """

    if biotype_gene_presence_dict["indole"] == "1":
        if biotype_gene_presence_dict["urease"] == "1":
            if biotype_gene_presence_dict["ODC"] == "1":
                biotype = "I"
            else:
                biotype = "II"
        else:
            if biotype_gene_presence_dict["ODC"] == "1":
                biotype = "V"
            else:
                biotype = "VII"
    else:
        if biotype_gene_presence_dict["urease"] == "1":
            if biotype_gene_presence_dict["ODC"] == "1":
                biotype = "IV"
            else:
                biotype = "III"
        else:
            if biotype_gene_presence_dict["ODC"] == "1":
                biotype = "VI"
            else:
                biotype = "VIII"
    return {"biotype": biotype}


def extract_hicap_results(hicap_tsv: Path):
    if hicap_tsv.exists():
        try:
            df = pandas.read_csv(hicap_tsv, sep='\t')
            if df.shape[0]>0:
                serotype = df.iloc[0]["predicted_serotype"]
                serotype_attributes = df.iloc[0]["attributes"]
                serotype_genes = df.iloc[0]["genes_identified"]
            else:
                print(f"Hicap output file empty at {hicap_tsv}", file=sys.stderr)
                return None
        except pandas.errors.EmptyDataError:
            print(f"Hicap output file empty at {hicap_tsv}", file=sys.stderr)
            return None
    else:
        serotype = "-"
        serotype_attributes = "no_capsule_genes_found"
        serotype_genes = "-"

    return {"serotype": serotype, "serotype_attributes": serotype_attributes, "serotype_genes": serotype_genes}


def extract_ariba_ftsI_snps(ariba_output_tsv: Path, ftsI_types_tsv: Path):
    if not ftsI_types_tsv.exists():
        print(f"Failed to load ftsI types table at {ftsI_types_tsv}")
        return None
    elif ariba_output_tsv.exists():
        ftsI_types = {}
        ftsI_table_snps = []
        with open(ftsI_types_tsv) as f:
            for line in f:
                line = line.rstrip('\n').split('\t')
                print(line)
                if line[0] == 'pos':
                    positions = line[1:]
                elif line[0] == 'Ref':
                    refs = line[1:]
                elif line[0] == 'Diverse':
                    snps = line[1:]
                    for n in range(len(snps)):
                        snp_split = snps[n].split('/')
                        for snp in snp_split:
                            ftsI_table_snps.append(refs[n]+positions[n]+snp)
                else:
                    type = line[0]
                    vars = line[1:]
                    type_vars = []
                    for n in range(len(vars)):
                        var = vars[n]
                        if var != '' and var != ' ':
                            type_vars.append(refs[n]+positions[n]+var)
                    ftsI_types[type] = type_vars

        change_list = []
        ftsI_gene_snps = []
        with open(ariba_output_tsv) as f:
            for line in f:
                line = line.rstrip('\n').split('\t')
                if line[0] != '#ariba_ref_name':
                    change = line[18]
                    ftsI_gene_snps.append(change)
                    if change in ftsI_table_snps:
                        change_list.append(change)

        match_var_counts = {}
        for type in ftsI_types:
            var_list = ftsI_types[type]
            match_var_count = 0
            for var in var_list:
                if var in change_list:
                    match_var_count += 1
            match_var_counts[type] = [match_var_count,len(var_list)]
        best_match = 0
        best_type = '-'
        for type in match_var_counts:
            check_list = match_var_counts[type]
            if check_list[0] == check_list[1] and check_list[0] > best_match:
                best_match = check_list[0]
                best_type = type
    else:
        print(f"No ariba report found at {ariba_output_tsv}")
        return None
    return {"ftsI_type": best_type, "key_ftsI_snps": change_list, "all_ftsI_snps": ftsI_gene_snps}



class HinfluenzaeResults(core.PipelineResults):

    @classmethod
    def from_tool_paths(cls, legionella_sbt_results_tsv: Path, lag1_blast_tsv: Path, sample_name = None):
        """
        Alternative constructor for initializing results for single sample,
        Initializes HinfluenzaeResults instance provided paths to outputs from tools (legionella sbt and lag1 presence blast)
        """
        hinfluenze_results = cls.summary(legionella_sbt_results_tsv=legionella_sbt_results_tsv,
                                         lag1_blast_tsv=lag1_blast_tsv)
        return cls( {sample_name: hinfluenze_results})
    
    @classmethod
    def from_tool_paths_dict(cls, file_paths: dict):
        """
        Alternative constructor for initializing results for multiple samples,
        Initializes HinfluenzaeResults instance by providing a dictionary of paths to outputs from tools (legionella sbt and lag1 presence blast)
        """
        results_dict = {}
        for sample_name, path_dict in file_paths.items():
            hinfluenze_results = cls.summary(legionella_sbt_results_tsv=Path(path_dict["sbt_results"]),
                                             lag1_blast_tsv=Path(path_dict["lag1_blast_results"]))
            results_dict[sample_name] = hinfluenze_results
        return cls(results_dict)
    
    @classmethod
    def from_tool_paths_dataframe(cls, file_paths_df: pandas.DataFrame):
        """
        Alternative constructor for initializing results for multiple samples,
        Initializes HinfluenzaeResults instance by providing a DataFrame of paths to outputs from tools (legionella sbt and lag1 presence blast)
        """
        file_paths_df.replace(numpy.nan, None, inplace=True)
        file_paths = file_paths_df.to_dict(orient="index")
        results_dict = {}
        for sample_name, path_dict in file_paths.items():
            hinfluenze_results = cls.summary(legionella_sbt_results_tsv=Path(path_dict["sbt_results"]),
                                             lag1_blast_tsv=Path(path_dict["lag1_blast_results"]))
            print(hinfluenze_results)
            results_dict[sample_name] = hinfluenze_results
        return cls(results_dict)

    @classmethod
    def from_tool_paths_tsv(cls, tool_paths_tsv: Path):
        """
        Alternative constructor for initializing results for multiple samples,
        Initializes HinfluenzaeResults instance by providing a tsv-file with paths to outputs from tools (legionella sbt and lag1 presence blast)
        """
        file_paths_df = pandas.read_csv(tool_paths_tsv, sep='\t')
        file_paths_df.set_index("sample_name", inplace=True, drop=True)
        return cls.from_tool_paths_dataframe(file_paths_df)
    
    @staticmethod
    def summary(ftsI_ariba_tsv: Path, biotype_blast_tsv: Path, ) -> dict:
        biotype_gene_dict = blast_parser.extract_presence_absence(blast_output_tsv = biotype_blast_tsv,
                                                                    hits_as_string = False,
                                                                    include_match_stats = False,
                                                                    gene_names = ["indole","urease","ODC"])
        biotype_dict = get_biotype_from_gene_presence(biotype_gene_presence_dict=biotype_gene_dict)
        results_dict = core.update_results_dict(biotype_gene_dict, biotype_dict, old_duplicate_key_prefix="_")
        if results_dict is None:
            return {}
        return results_dict


    def __repr__(self):
        return(f"< Hinfluenzae analysis results object. {len(self.results_df)} samples with {len(self.results_df.columns)} result variables > ")




## TESTING


In [None]:

biotype_blast_tsv = "test_input/Hinfluenzae/biotype/sample1.biotype.blast.tsv"

biotype_gene_dict = blast_parser.extract_presence_absence(blast_output_tsv = biotype_blast_tsv,
                                                                    hits_as_string = False,
                                                                    include_match_stats = False,
                                                                    gene_names = ["indole","urease","ODC"])
biotype_dict = get_biotype_from_gene_presence(biotype_gene_presence_dict=biotype_gene_dict)

print(biotype_gene_dict)
print(biotype_dict)

hicap_results = extract_hicap_results(Path("test_input/Hinfluenzae/hicap/sample1.hicap.tsv"))
print(hicap_results)


ftsI_results = extract_ariba_ftsI_snps(ariba_output_tsv=Path("test_input/Hinfluenzae/ariba_ftsI/sample1.ftsI.ariba.tsv"),ftsI_types_tsv=Path("test_input/Hinfluenzae/ariba_ftsI/ftsI_types_table.txt"))
print(ftsI_results)

ftsI_results = extract_ariba_ftsI_snps(ariba_output_tsv=Path("test_input/Hinfluenzae/ariba_ftsI/sample2.ftsI.ariba.tsv"),ftsI_types_tsv=Path("test_input/Hinfluenzae/ariba_ftsI/ftsI_types_table.txt"))
print(ftsI_results)
"""
f = HinfluenzaeResults.from_results_tsv("./test_output/test_batch_output.tsv")
assert(len(f) == 2)
assert(f.results_dict["sample_1"]["ST"] == 23)


f = HinfluenzaeResults.from_tool_paths_dict(file_paths=  {"sample_1": {"sbt_results": "test_input/Legionella/test.sbt.tsv", "lag1_blast_results": "test_input/Legionella/lag-1_blast.tsv"},
                                                            "sample_2": {"sbt_results": "test_input/Legionella/test2.sbt.tsv", "lag1_blast_results": "test_input/Legionella/lag-1_blast_2.tsv"}})



f = HinfluenzaeResults.from_tool_paths(legionella_sbt_results_tsv="test_input/Legionella/test.sbt.tsv",
                                      lag1_blast_tsv="test_input/Legionella/lag-1_blast.tsv")



f = HinfluenzaeResults.from_tool_paths_tsv(tool_paths_tsv="test_input/Legionella/batch_parser_file_paths.tsv")

assert(len(f) == 4)
assert(len(f.results_df) == 4)
assert(len(f.results_df.columns) == 10)
"""

{'indole': '0', 'urease': '0', 'ODC': '1'}
{'biotype': 'VI'}
{'serotype': 'type_e', 'serotype_attributes': 'full_gene_complement,fragmented_locus', 'serotype_genes': 'bexA,bexB,bexC,bexD,ecs1,ecs2,ecs3;ecs4,ecs5,ecs6,ecs7,ecs8,hcsA,hcsB'}
['pos', '311', '337', '350', '352', '357', '368', '377', '385', '389', '437', '443', '449', '490', '501', '502', '511', '517', '526', '528', '530', '532', '547', '562', '569', '586']
['Ref', 'S', 'A', 'D', 'S', 'S', 'A', 'M', 'S', 'L', 'A', 'T', 'I', 'G', 'R', 'A', 'V', 'R', 'N', 'Y', 'A', 'T', 'V', 'V', 'N', 'A']
['I', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', 'H', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ']
['IIa', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', 'K', ' ', ' ', ' ', ' ', ' ', ' ', ' ']
['IIb', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', 'V', ' ', ' ', 'K', ' ', ' ', ' ', ' ', ' ', ' ', ' ']
['IIc', ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ', 

'\nf = HinfluenzaeResults.from_results_tsv("./test_output/test_batch_output.tsv")\nassert(len(f) == 2)\nassert(f.results_dict["sample_1"]["ST"] == 23)\n\n\nf = HinfluenzaeResults.from_tool_paths_dict(file_paths=  {"sample_1": {"sbt_results": "test_input/Legionella/test.sbt.tsv", "lag1_blast_results": "test_input/Legionella/lag-1_blast.tsv"},\n                                                            "sample_2": {"sbt_results": "test_input/Legionella/test2.sbt.tsv", "lag1_blast_results": "test_input/Legionella/lag-1_blast_2.tsv"}})\n\n\n\nf = HinfluenzaeResults.from_tool_paths(legionella_sbt_results_tsv="test_input/Legionella/test.sbt.tsv",\n                                      lag1_blast_tsv="test_input/Legionella/lag-1_blast.tsv")\n\n\n\nf = HinfluenzaeResults.from_tool_paths_tsv(tool_paths_tsv="test_input/Legionella/batch_parser_file_paths.tsv")\n\nassert(len(f) == 4)\nassert(len(f.results_df) == 4)\nassert(len(f.results_df.columns) == 10)\n'

In [None]:
# |export



@call_parse
def legionella_parser(
    legionella_sbt_file: Path = None,  # Path "*.sbt.tsv from legionella_sbt program"
    lag_1_blast_output: Path = None, #  Path to output from lag1_blast. Generated with blastn -query lag-1.fasta -subject assembly.fasta -outfmt "6 qseqid sseqid pident length qlen qstart qend sstart send sseq evalue bitscore"
    output_file: Path = None,  # Path to output tsv
    sample_name: str = None,
) -> None:
    """
    
    """
    hinfluenze_results = HinfluenzaeResults.from_tool_paths(legionella_sbt_results_tsv=legionella_sbt_file,
                                                           lag1_blast_tsv=lag_1_blast_output,
                                                           sample_name=sample_name)
    hinfluenze_results.write_tsv(output_file=output_file)

@call_parse
def legionella_batch_parser(
    file_path_tsv: Path = None,  # Path to tsv containing file paths to the outputs from tools to be parsed. Must contain headers "sample_name", "sbt_results", and "lag1_blast_results"
    output_file: Path = None,  # Path to output tsv
) -> None:
    """
    
    """
    hinfluenze_results = HinfluenzaeResults.from_tool_paths_tsv(tool_paths_tsv=file_path_tsv)
    hinfluenze_results.write_tsv(output_file)



# Directive for ensuring that the code in your notebook get executed as a script

The code-block here below is required to ensure that the code in the notebook is also transferred to the module (script), otherwise it will just be a notebook. See [Coding in NBdev](https://dksund.sharepoint.com/:fl:/g/contentstorage/CSP_7c761ee7-b577-4e08-8517-bc82392bf65e/ETlSfUyArSNJhX8veMI_JQ8By1aXGHzDJkhotpfpXx4mmw?e=037EwH&nav=cz0lMkZjb250ZW50c3RvcmFnZSUyRkNTUF83Yzc2MWVlNy1iNTc3LTRlMDgtODUxNy1iYzgyMzkyYmY2NWUmZD1iJTIxNXg1MmZIZTFDRTZGRjd5Q09TdjJYblkwVlNiWXFYcE1yaHVrVmZqTVJUVEE4X1VwZjhTd1JxcjRNdmFrSmh2RCZmPTAxVlVLVzVWSlpLSjZVWkFGTkVORVlLN1pQUERCRDZKSVAmYz0lMkYmYT1Mb29wQXBwJnA9JTQwZmx1aWR4JTJGbG9vcC1wYWdlLWNvbnRhaW5lciZ4PSU3QiUyMnclMjIlM0ElMjJUMFJUVUh4a2EzTjFibVF1YzJoaGNtVndiMmx1ZEM1amIyMThZaUUxZURVeVpraGxNVU5GTmtaR04zbERUMU4yTWxodVdUQldVMkpaY1Zod1RYSm9kV3RXWm1wTlVsUlVRVGhmVlhCbU9GTjNVbkZ5TkUxMllXdEthSFpFZkRBeFZsVkxWelZXU1RJMVJsaFBNalkyUlZkQ1FqTTFRVmhKVTBkRFVVcFdXa1klM0QlMjIlMkMlMjJpJTIyJTNBJTIyNzRmNzM1ZmUtYzg4Ny00MjhhLWFkZmYtNTEyZTg2YmNmZmQzJTIyJTdE) 
(**Writing your own notebooks**) on loop for more details.

In [None]:
# | hide
# This is included at the end to ensure when you run through your notebook the code is also transferred to the module and isn't just a notebook
import nbdev

nbdev.nbdev_export()