In [None]:
# |default_exp plasmidfinder

In [None]:
# |hide
# See above? this hides these blocks, meaning these blocks aren't in the module and aren't in the documentation
import nbdev
from nbdev.showdoc import *  # ignore this Pylance warning in favor of following nbdev docs

In [None]:
# |export
# That export there, it makes sure this code goes into the module.

# standard libs
import os
import re

# Common to template
# add into settings.ini, requirements, package name is python-dotenv, for conda build ensure `conda config --add channels conda-forge`
import dotenv  # for loading config from .env files, https://pypi.org/project/python-dotenv/
import envyaml  # Allows to loads env vars into a yaml file, https://github.com/thesimj/envyaml
import fastcore  # To add functionality related to nbdev development, https://github.com/fastai/fastcore/
from fastcore import (
    test,
)
from fastcore.script import (
    call_parse,
)  # for @call_parse, https://fastcore.fast.ai/script
import json  # for nicely printing json and yaml
from fastcore import test
from bifrost_bridge import core


Because the notebooks now are located in the `nbs` folder, we need to change the python `wd` for the notebook to the project folder. Keep this included in all notebooks but don't export it to the package. 

In [None]:
# This block should never be exported. It is to have python running in the project (and not the nbs) dir, and to initiate the package using pip.
os.chdir(core.PROJECT_DIR)

##################################################CODE_SEGMENT###########################################

In [None]:
# |export

def process_plasmidfinder_data(
    input_path:str,
    output_path:str = './output.tsv',
    replace_header:str = None,
    filter_columns:str = None,
    add_header:str = None,
    convert_coverage:bool = False,
    filter_contig:bool = False):

    """
    Command-line interface for processing plasmidfinder data.

    This function sets up an argument parser to handle command-line arguments for processing plasmidfinder data files.
    It supports specifying input and output file paths, replacing headers, filtering columns, and handling the presence or absence of headers in the input file.

    Arguments:
        input_path (str): Path to the input file.
        output_path (str): Path to the output file (default: './output.tsv').
        replace_header (str): Header to replace the existing header (default: None).
        filter_columns (str): Columns to filter from the header (default: None).
        header_exists (int): Indicates if the header exists in the input file (default: 1).
        add_header (str): Header to add if the header does not exist in the input file (default: None).
        convert_coverage (bool): If True, converts coverage values in the 'Query / Template length' column to percentages (default: False).
        filter_contig (bool): If True, filters out 'Contig' column to just contig number (default: False).
    """


    df = core.DataFrame()
    
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"The input file {input_path} does not exist.")
    df.import_data(input_path, file_type='tsv', add_header=add_header)

    def concatenate_vector(x, sep=','):
        return ','.join([str(i) for i in x])
    
    df_agg = df.df.apply(concatenate_vector, axis=0)
    df.df = df_agg.to_frame().T

    #PFinder_Coverage contains value like "152 / 152,682 / 682", thats two values separated by commas, I would like to divide first number by second number and replaces the value with the result
    def process_coverage(val):
        # Split by comma, process each "a / b" part, return comma-separated results
        parts = [v.strip() for v in str(val).split(',')]
        results = []
        for part in parts:
            if '/' in part:
                num, denom = part.split('/')
                try:
                    ratio = float(num.strip()) / float(denom.strip()) * 100
                    results.append(str(ratio))
                except Exception:
                    results.append(part)
            else:
                results.append(part)
        return ','.join(results)
    if convert_coverage:
        df.df['Query / Template length'] = df.df['Query / Template length'].apply(process_coverage)

    def extract_contig(val):
        # Extract the first part before any space or comma
        parts = [v.strip() for v in str(val).split(',')]
        results = []
        for part in parts:
            match = re.search(r'\b(\w+)\d+\b', part)
            if match:
                results.append(match.group(0))
            else:
                results.append(part)
        return ','.join(results)
    # If filter_contig is True, we look at the 'Contig' column, it contains "textXXXX more text,text2XXXXX more text", we want to extract textXXXX,text2XXXXX, etc
    if filter_contig:
        df.df['Contig'] = df.df['Contig'].apply(extract_contig)

    if filter_columns:
        df.filter_columns(filter_columns)

    if replace_header:
        df.rename_header(replace_header)


    
    #df.show()

    df.export_data(output_path, file_type='tsv')

@call_parse
def process_plasmidfinder_data_from_cli(
    input_path:str,
    output_path:str = './output.tsv',
    replace_header:str = None,
    filter_columns:str = None,
    add_header:str = None,
    convert_coverage:bool = False,
    filter_contig:bool = False):
    process_plasmidfinder_data(input_path, output_path, replace_header, filter_columns, add_header, convert_coverage, filter_contig)

In [None]:
#|hide
#Example usage of the function
#process_plasmidfinder_data(
#   input_path='test_data/plasmidfinder.tsv', 
#   output_path='test_data/plasmidfinder_testout.tsv',
#   convert_coverage=True,
#   filter_contig=True
   #filter_columns="Query / Template length"
#)

##################################################CODE_SEGMENT###########################################

In [None]:
#| hide
# This is included at the end to ensure when you run through your notebook the code is also transferred to the associated python package

nbdev.nbdev_export()