# core

> Base classes, functions and other objects used across the package.

In [1]:
#| default_exp core

In [2]:
#| hide
from eccore.ipython import nb_setup
from eccore.core import path_to_parent_dir
from fastcore.test import test_fail
from nbdev import nbdev_export, show_doc

In [3]:
#| hide
nb_setup()
NBS_ROOT = path_to_parent_dir('nbs')
assert NBS_ROOT.is_dir()

Set autoreload mode


In [4]:
#| export
import json
import os
import pandas as pd
import re
import sqlite3
import sys
import torch
import warnings
from configparser import ConfigParser
from eccore.core import validate_path, safe_path
from IPython.display import display, Markdown, HTML
from pathlib import Path
from pprint import pprint
from sqlite3 import Connection, Cursor
from typing import Any, Optional, Literal

try: from google.colab import drive
except: pass

In [5]:
#| export
# Retrieve the package root
from metagentorch import __file__
CODE_ROOT = Path(__file__).parents[0]
PACKAGE_ROOT = Path(__file__).parents[1]

This module includes all base classes, functions and other objects that are used across the package. It is imported by all other modules in the package.

`core` includes utility classes and functions to make it easier to work with the complex file systems adopted for the project, as well as base classes such as a file reader with additional functionality.

# Utility Classes and Functions

## Handling files and file structure

Utility classes to represent

In [6]:
#| export
class ProjectFileSystem:
    """Represent a project file system, return paths to key directories, provide methods to manage the file system.

    - Paths to key directories are based on whether the code is running locally or in the cloud.
    - First time it is used on a local computer, it must be registered as local and a project root path must be set.
    - A user configuration file is created in the user's home directory to store the project root path and whether the machine is local or not.

    > Technical note: `ProjectFileSystem` is a simpleton class
    """

    _instance = None
    _config_dir = '.metagentorch'
    _config_fname = 'metagentorch.cfg'
    _shared_project_dir = 'Metagenomics'
    
    def __new__(cls, *args, **kwargs):
        # Create instance if it does not exist yet
        if cls._instance is None:
            cls.home = Path.home().resolve()
            if kwargs.get('config_fname', None) is not None:
                cls._p2config = kwargs['config_fname']
            else:
                cls._p2config = cls.home / cls._config_dir / cls._config_fname
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(
        self, 
        mount_gdrive:bool=True,       # True to mount Google Drive if running on Colab
        project_file:Path|None=None,  # Path to the project file. If None, use the one saved in the config file
        config_fname:Path|None=None,  # Path to a configuration file. If None, use the default one in the user's home directory
        ) -> None:

        # Discover where the script is being run
        self.is_colab = 'google.colab' in sys.modules       
        if self.is_colab and mount_gdrive:
            drive.mount('/content/gdrive')
            self.gdrive = Path('/content/gdrive/MyDrive')
        self.is_kaggle = 'kaggle_web_client' in sys.modules
        if self.is_kaggle:
            raise NotImplementedError(f"ProjectFileSystem is not implemented for Kaggle yet")
        if not self.is_colab and not self.is_kaggle and not self.is_local:
            msg = """
                    Code does not seem to run on the cloud but computer is not registered as local
                    If you are running on a local computer, you must register it as local by running
                    `ProjectFileSystem().register_as_local()`
                    before you can use the ProjectFileSystem class.
                    """
            warnings.warn(msg, UserWarning)

        # Set key directory paths
        self._project_root = Path()
        data_dir = 'data'
        if self.is_local or os.getenv("GITHUB_ACTIONS") == "true":
            cfg = self.read_config()
            path_str = cfg.get('Infra', 'project_root', fallback=None)
            data_dir = cfg.get('Infra', 'data_dir', fallback='data')
            if path_str is None: 
                msg = """
                Project root is not yet set in config file.
                To set it, use `ProjectFileSystem().set_project_root()`.
                """
                warnings.warn(msg)
            else:
                self._project_root = Path(path_str)
        self._data = self.project_root / data_dir
        self._nbs = self.project_root / 'nbs'

    def __call__(self) -> bool: return self.is_local

    def info(self) -> None:
        """Print basic info on the file system and the device"""
        print(f"Running {self.os} on {self.running_on}")
        print(f"Device's home directory: {self.home}")
        print(f"Project file structure:")
        print(f" - Root ........ {self.project_root} \n - Data Dir .... {self.data} \n - Notebooks ... {self.nbs}")
    
    def read_config(self) -> ConfigParser:
        """Read config from the configuration file if it exists and return an empty config if does not"""
        cfg = ConfigParser()
        if self.p2config.is_file(): 
            cfg.read(self.p2config)
        else:
            cfg.add_section('Infra')
        return cfg

    def register_as_local(self) -> ConfigParser:
        """Update the configuration file to register the machine as local machine"""
        cfg = self.read_config()
        os.makedirs(self.home/self._config_dir, exist_ok=True)
        cfg['Infra']['registered_as_local'] = 'True'
        with open(self.p2config, 'w') as fp:
            cfg.write(fp)
        return cfg

    def set_project_root(
        self, 
        p2project: str|Path,      # string or Path to the project directory. Can be absolute or relative to home
        data_dir: str = 'data'    # Directory name for data under project root
        ) -> ConfigParser:
        """Update the configuration file to set the project root"""
        # Build and validate the path to the project root
        if isinstance(p2project, str): 
            p2project = Path(p2project)
            if not p2project.is_absolute():
                p2project = self.home / p2project
        if not p2project.is_dir(): raise FileNotFoundError(f"{p2project.absolute()} does not exist")
        
        # Update the configuration file        
        cfg = self.read_config()
        os.makedirs(self.home/self._config_dir, exist_ok=True)
        cfg['Infra']['project_root'] = str(p2project.absolute())
        cfg['Infra']['data_dir'] = str(data_dir)
        with open(self.p2config, 'w') as fp:
            cfg.write(fp)
        self._project_root = p2project
        self._data = self.project_root / data_dir
        print(f"Project Root set to:   {self._project_root.absolute()}")
        print(f"Data directory set to: {self._data.absolute()}")
        return cfg

    @property
    def os(self) -> str: return sys.platform

    @property
    def project_root(self) -> Path:
        if self.is_local:
            return self._project_root
        elif self.is_colab:
            return self.gdrive / self._shared_project_dir
        elif os.getenv("GITHUB_ACTIONS") == "true":
            return self._project_root
        elif self.is_kaggle:
            raise NotImplementedError(f"ProjectFileSystem is not implemented for Kaggle yet")
        else:
            msg = """
                Not running locally, on Colab or on Kaggle. If running locally,
                register the machine as local by running `ProjectFileSystem().register_as_local()`
                """
            warnings.warn(msg)
            # raise ValueError('Not running locally, on Colab or on Kaggle')
            return self._project_root

    @property
    def data(self) -> Path: return self._data

    @data.setter
    def data(self, folder_name) -> None: self._data = self.project_root / folder_name
    
    @property
    def nbs(self) -> Path: return self.project_root / 'nbs'        

    @nbs.setter
    def nbs(self,folder_name) -> Path: return self.project_root / folder_name

    @property
    def p2config(self) -> Path: 
        # return self.home / self._config_dir / self._config_fname
        return self._p2config
        
    @property
    def is_local(self) -> bool:
        """Return `True` if the current machine was registered as a local machine"""
        cfg = self.read_config()
        return cfg['Infra'].getboolean('registered_as_local', False)

    @property
    def running_on(self) -> str:
        """Return the device on which this is run: local, colab, kaggle, ..."""
        if self.is_local: device = 'local computer'
        elif self.is_colab: device = 'colab'
        elif self.is_kaggle: device = 'kaggle'
        else: device = 'unknown cloud server'
        return device
    
    def readme(
        self, 
        dir_path:Path|None=None, # Path to the directory to inquire. If None, display readme file from project_root.
        ) -> None:
        """Display `readme.md` file or any other `.md` file in `dir_path`. 

        This provides a convenient way to get information on each direcotry content
        """
        if dir_path is None: 
            path = self.data
        elif validate_path(dir_path, path_type='dir'):
            path = dir_path
        else:
            raise ValueError(f"'dir_path' is not a directory: {dir_path.absolute()}")

        
        if path.is_relative_to(self.project_root):
            path_to_display = path.relative_to(self.project_root)
        else:
            path_to_display = path.absolute()
        display(HTML('<hr>'))
        # display(Markdown(f"ReadMe file for directory `{path.relative_to(self.project_root)}`:"))
        display(Markdown(f"ReadMe file for directory `{path_to_display}`:"))
        mdfiles = {p.stem: p for p in path.glob('*.md')}
        if mdfiles:
            mdfile = mdfiles.get('readme', None)
            if mdfile is None:
                mdfile = mdfiles.get(list(mdfiles.keys())[0])
            display(HTML('<hr>'))
            display(Markdown(filename=mdfile))
            display(HTML('<hr>'))
        else:
            print('No markdown file in this folder')
        

In [7]:
show_doc(ProjectFileSystem)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L36){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem

>      ProjectFileSystem (*args, **kwargs)

*Represent a project file system, return paths to key directories, provide methods to manage the file system.

- Paths to key directories are based on whether the code is running locally or in the cloud.
- First time it is used on a local computer, it must be registered as local and a project root path must be set.
- A user configuration file is created in the user's home directory to store the project root path and whether the machine is local or not.

> Technical note: `ProjectFileSystem` is a simpleton class*

**Reference Project File System**:

This project adopts a unified file structure to make coding and colaboration easier. In addition, we can run the code locally (from a `project-root` directory) or in the cloud (colab, kaggle, others).

The unified file structure when running localy is:
```text
    project-root   
        |--- data
        |      |--- CNN_Virus_data  (all data from CNN Virus original paper)
        |      |--- saved           (trained and finetuned models, saved preprocessed datasets)
        |      |--- ....            (raw or pre-processed data from various sources, results, ... )  
        |      
        |--- nbs  (all reference and work notebooks)
        |      |--- cnn_virus
        |      |        |--- notebooks.ipynb
```

When running on *google colab*, it is assumed that a google drive is mounted on the colab server instance, and that this google drive root includes a shortcut named `Metagenomics` and pointing to the project shared directory. The project shared directory is accessible [here](/https://drive.google.com/drive/folders/134uei5fmt08TpzhmjG4sW0FQ06kn2ZfZ) if you are an authorized project member.

**`ProjectFileSystem` at work**:

If you use this class for the first time on a local computer, read the two **Important Notes** below.

In [8]:
#|eval: false
pfs = ProjectFileSystem()

Once created, the instance of `ProjectFileSystem` gives access to key directories' paths:

- `project root`: `Path` to the project root directory
- `data`: `Path` to the data directory
- `nbs`: `Path` to the notebooks directory

It also provides additional information regarding the computer on which the code is running:

- `os`: a string providing the name of the operating system the code is running on
- `is_colab`: True if the code is running on google colab
- `is_kaggle`: True if the code is running on kaggle server (NOT IMPLEMENTED YET)
- `is_local`: True if the code is running on a computer registered as local

In [9]:
#|eval: false
for p in [pfs.project_root, pfs.data, pfs.nbs]:
    print(p)

/home/vtec/projects/bio/metagentorch
/home/vtec/projects/bio/metagentorch/data
/home/vtec/projects/bio/metagentorch/nbs


In [10]:
#|eval: false
print(f"Operating System: {pfs.os}")
print(f"Local Computer: {pfs.is_local}, Colab: {pfs.is_colab}, Kaggle: {pfs.is_kaggle}")

Operating System: linux
Local Computer: True, Colab: False, Kaggle: False


In [11]:
show_doc(ProjectFileSystem.info)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L106){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem.info

>      ProjectFileSystem.info ()

*Print basic info on the file system and the device*

In [12]:
#|eval: false
pfs.info()

Running linux on local computer
Device's home directory: /home/vtec
Project file structure:
 - Root ........ /home/vtec/projects/bio/metagentorch 
 - Data Dir .... /home/vtec/projects/bio/metagentorch/data 
 - Notebooks ... /home/vtec/projects/bio/metagentorch/nbs


In [13]:
show_doc(ProjectFileSystem.readme)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L211){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem.readme

>      ProjectFileSystem.readme (dir_path:pathlib.Path|None=None)

*Display `readme.md` file or any other `.md` file in `dir_path`. 

This provides a convenient way to get information on each direcotry content*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| dir_path | pathlib.Path \| None | None | Path to the directory to inquire. If None, display readme file from project_root. |
| **Returns** | **None** |  |  |

In [14]:
#|eval: false
pfs.readme(Path('data_dev'))

ReadMe file for directory `/home/vtec/projects/bio/metagentorch/nbs-dev/data_dev`:

### Data directory for this package development 
This directory includes all  data required to validate and test this package code.

```text
data_dev
 |--- CNN_Virus_data
 |     |--- 50mer_ds_100_seq
 |     |--- 150mer_ds_100_seq
 |     |--- train_short
 |     |--- val_short
 |     |--- weight_of_classes
 |--- ncbi
 |     |--- infer_results
 |     |     |--- cnn_virus
 |     |     |--- csv
 |     |     |--- xlsx
 |     |     |--- testdb.db
 |     |--- refsequences
 |     |     |--- cov
 |     |     |     |--cov_virus_sequence_one_metadata.json
 |     |     |     |--sequences_two_no_matching_rule.fa
 |     |     |     |--another_sequence.fa
 |     |     |     |--cov_virus_sequences_two.fa
 |     |     |     |--cov_virus_sequences_two_metadata.json
 |     |     |     |--cov_virus_sequence_one.fa
 |     |     |     |--single_1seq_150bp
 |     |     |     |    |--single_1seq_150bp.fq
 |     |     |     |    |--single_1seq_150bp.aln
 |     |     |     |--paired_1seq_150bp
 |     |     |     |    |--paired_1seq_150bp2.aln
 |     |     |     |    |--paired_1seq_150bp2.fq
 |     |     |     |    |--paired_1seq_150bp1.fq 
 |     |     |     |    |--paired_1seq_150bp1.aln 
 |     |--- simreads
 |     |     |--- cov
 |     |     |     |--- paired_1seq_50bp
 |     |     |     |      |--- paired_1seq_50bp_1.aln
 |     |     |     |      |--- paired_1seq_50bp_1.fq
 |     |     |     |--- single_1seq_50bp
 |     |     |     |      |--- single_1seq_50bp_1.aln
 |     |     |     |      |--- single_1seq_50bp_1.fq
 |     |     |--- cov
 |     |     |     |--single_1seq_50bp
 |     |     |     |    |--single_1seq_50bp.aln
 |     |     |     |    |--single_1seq_50bp.fq
 |     |     |     |--single_1seq_150bp
 |     |     |     |    |--single_1seq_150bp.fq
 |     |     |     |    |--single_1seq_150bp.aln
 |     |     |     |--paired_1seq_150bp
 |     |     |     |    |--paired_1seq_150bp2.aln
 |     |     |     |    |--paired_1seq_150bp2.fq
 |     |     |     |    |--paired_1seq_150bp1.fq
 |     |     |     |    |--paired_1seq_150bp1.aln
 |--- saved           
 |--- readme.md               
```

>**Important Note 1**:
>
>When using the package on a local computer for the **first time**, you must register the computer as a local computer. Otherwise, `ProjectFileSystem` will raise an error. Once registered, the configuration file will be updated and `ProjectFileSystem` will detect that and run without error.

In [15]:
show_doc(ProjectFileSystem.register_as_local)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L122){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem.register_as_local

>      ProjectFileSystem.register_as_local ()

*Update the configuration file to register the machine as local machine*

In [16]:
#|eval: false
cfg = pfs.register_as_local()

>**Important Note 2**:
>
>When using the package on a local computer for the **first time**, it is also required to *set the project root directory*. This is necessary to allow users to locate their local project folder anywhere they want. Once set, the path to the project root will be saved in the configuratin file.

In [17]:
show_doc(ProjectFileSystem.set_project_root)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L131){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem.set_project_root

>      ProjectFileSystem.set_project_root (p2project:str|pathlib.Path,
>                                          data_dir:str='data')

*Update the configuration file to set the project root*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| p2project | str \| pathlib.Path |  | string or Path to the project directory. Can be absolute or relative to home |
| data_dir | str | data | Directory name for data under project root |
| **Returns** | **ConfigParser** |  |  |

In [18]:
#|eval: false
pfs.set_project_root('/home/vtec/projects/bio/metagentorch/');

Project Root set to:   /home/vtec/projects/bio/metagentorch
Data directory set to: /home/vtec/projects/bio/metagentorch/data


In [19]:
show_doc(ProjectFileSystem.read_config)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L113){target="_blank" style="float:right; font-size:smaller"}

### ProjectFileSystem.read_config

>      ProjectFileSystem.read_config ()

*Read config from the configuration file if it exists and return an empty config if does not*

In [20]:
#|eval: false
cfg = pfs.read_config()
cfg['Infra']['registered_as_local']

'True'

In [21]:
#|eval: false
cfg['Infra']['project_root']

'/home/vtec/projects/bio/metagentorch'

In [22]:
#|eval: false
cfg['Infra']['data_dir']

'data'

**Technical Note for Developpers**

The current notebook and all other development notebooks use a minimum set of data that comes with the repository under `nbs-dev/data_dev` instead of the standard `data` directory which is much too large for testing and developing.

Therefore, when creating the instance of `ProjectFileSystem`, use the parameter `config_file` to pass a specific development configuration, also coming with the repository.

In [23]:
#| hide
from metagentorch.cnn_virus.utils import update_dev_cfg_file
ProjectFileSystem._instance = None # Required because ProjectFileSystem is a singleton
update_dev_cfg_file()

In [24]:
p2dev_cfg = PACKAGE_ROOT / 'nbs-dev/metagentorch-dev.cfg'
pfs = ProjectFileSystem(config_fname=p2dev_cfg)
pfs.info()

Running linux on local computer
Device's home directory: /home/vtec
Project file structure:
 - Root ........ /home/vtec/projects/bio/metagentorch 
 - Data Dir .... /home/vtec/projects/bio/metagentorch/nbs-dev/data_dev 
 - Notebooks ... /home/vtec/projects/bio/metagentorch/nbs


## SQlite Database Helper Class

In [25]:
# | export
class SqliteDatabase:
    """Manage a SQLite db file, execute SQL queries, return results, provide context manager functionality.

    Example usage as a context manager
    
    ```python
    db_path = Path('your_database.db')
    db = SqliteDb(db_path)

    with db as database:
        result = database.get_result("SELECT * FROM your_table")
        print(result)
    ```
    """

    def __init__(self, p2db:Path) -> None:
        self.p2db = p2db
        self.conn = None

    def connect(self) -> Connection:
        """Connect to the SQLite database"""
        self.conn = sqlite3.connect(self.p2db)
        return self.conn

    def execute(self, sql:str) -> Cursor:
        """Execute an SQL query and return the cursor after execution"""
        if self.conn is None: self.conn = self.connect()
        cursor = self.conn.cursor()
        cursor.execute(sql)
        return cursor

    def get_result(self, sql:str) -> list[Any]:
        """Execute an SQL query and return the result"""
        cursor = self.execute(sql)
        result = cursor.fetchall()
        return result

    def get_dataframe(self, sql:str) -> pd.DataFrame:
        """Wraps pandas.read_sql_query"""
        if self.conn is None: self.connect()
        df = pd.read_sql_query(sql, self.conn)
        return df
    
    def dataframe_to_table(
        self, 
        df:pd.DataFrame,         # DataFrame to write to the database
        table_name:str,          # Name of the database table to write to
        if_exists:Literal['fail','replace','append']='append',  # One of 'fail', 'replace', 'append'
        index:bool=False         # If True, write the DataFrame index as a column
        )-> None:
        """Wraps pandas.DataFrame.to_sql"""
        if self.conn is None: self.connect()
        df.to_sql(table_name, self.conn, if_exists=if_exists, index=index)
            
    def close(self) -> None:
        """Close the connection to the SQLite database"""
        if self.conn is not None:
            self.conn.close()
            self.conn = None

    def __enter__(self):
        """Enter the runtime context related to this object."""
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit the runtime context related to this object."""
        self.close()

    def list_columns(self,
                     name: str # name of a table or a view
                    ) -> list[str]:
        """Returns the list of columns in the table or view `name`"""
        query = f"PRAGMA table_info({name})"
        cursor = self.execute(query)
        cols = [row[1] for row in cursor.fetchall()]
        return cols

    def list_indexes(self) -> None:
        """List indexes in the database and the indexed columns"""
        print(f"List of indexes in database '{self.p2db.name}' and indexed columns:")
        if self.conn is None: self.connect()

        query = f"SELECT name, tbl_name FROM sqlite_master WHERE type='index' ;"
        indexes = self.get_result(sql=query)

        for index in indexes:
            print(f"- {index[0]} for table '{index[1]}':")
            cursor = self.execute(f"PRAGMA index_info({index[0]}) ;")
            for col_info in cursor.fetchall():
                print(f"    - {col_info[2]}")

    def print_schema(self) -> None:
        # 'type', 'name', 'tbl_name', 'rootpage', 'sql'
        # type can be 'table', 'view', 'index' or 'trigger'
        query = """
        SELECT type, name
        FROM sqlite_master 
        WHERE type IN ('table', 'view', 'trigger') 
        """
        cursor = self.execute(query)
        for t, name in cursor.fetchall():
            print(f"{name} ({t})")
            if t == 'table':
                print(' columns:',', '.join(self.list_columns(name)))
                indexes = self.get_result(f"PRAGMA index_list({name})")
                for index in indexes:
                    index_name = index[1]
                    print(f" index: {index_name}")
                    print('   indexed columns:',', '.join([row[2] for row in self.get_result(f"PRAGMA index_info({index_name})")]))
            if t =='view':
                print(' columns:', ','.join(self.list_columns(name)))
            print()


In [26]:
p2db = pfs.data / 'ncbi/infer_results/cov-ncbi/testdb.db'
db = SqliteDatabase(p2db)

In [27]:
#| hide
# Creating the table to test the class
top_n = 5

db.connect()
# Create table for predictions and its index
pred_cols_str = 'readid refseqid refsource refseq_strand taxonomyid'.split(' ')
pred_cols_int = 'lbl_true lbl_pred pos_true pos_pred'.split(' ')
top_pred_cols = [f"top_{top_n}_lbl_pred_{i}" for i in range(top_n)]
query = """
CREATE TABLE IF NOT EXISTS predictions (
    id INTEGER PRIMARY KEY,
"""
for col in pred_cols_str:
    query += f"{col} TEXT, "
for col in pred_cols_int:
    query += f"{col} INTEGER, "
for col in top_pred_cols:
    query += f"{col} INTEGER, "
query = query[:-2]+')'
print(query)
db.execute(query)

query = "CREATE INDEX IF NOT EXISTS idx_preds ON predictions (readid, refseqid, pos_true);"
print(query)
db.execute(query)
print('Table `predictions` created with index.')

# Create table for probabilities (one per 50-mer in order to keep small nb or columns in table)
query = f"""
CREATE TABLE IF NOT EXISTS label_probabilities (
    id INTEGER PRIMARY KEY,
    read_kmer_id TEXT,
    read_50mer_nb INTEGER,
"""
query += ' '.join([f"prob_{i:03d} REAL, " for i in range(187)])
query += "FOREIGN KEY (read_kmer_id) REFERENCES predictions(readid)"
query += ')'
print(query)
db.execute(query)

query = "CREATE INDEX IF NOT EXISTS idx_probs ON label_probabilities (read_kmer_id, read_50mer_nb);"
print(query)
db.execute(query)
print(f'Table `label_probabilities` created with index.')

# Create view joining predictions and label_probabilities
view_name = 'preds_probs'

# top prediction columns from table predictions:
top_lbl_pred_n = ','.join([f"p.top_5_lbl_pred_{i}" for i in range(5)])

# probabilities columns from table label_probabilities 
probs_n = ','.join([f"lp.prob_{i:03d}" for i in range(187)])

query = f"""
CREATE VIEW IF NOT EXISTS {view_name} AS
SELECT 
    lp.id,
    p.refseqid,
    p.lbl_true, p.lbl_pred,
    p.pos_true, p.pos_pred,
    {top_lbl_pred_n},
    lp.read_kmer_id, lp.read_50mer_nb,
    {probs_n}
FROM 
    label_probabilities lp
INNER JOIN 
    predictions p
ON 
    lp.read_kmer_id = p.readid
"""
print(query)
db.execute(query)
print(f'View `preds_probs` created.')


CREATE TABLE IF NOT EXISTS predictions (
    id INTEGER PRIMARY KEY,
readid TEXT, refseqid TEXT, refsource TEXT, refseq_strand TEXT, taxonomyid TEXT, lbl_true INTEGER, lbl_pred INTEGER, pos_true INTEGER, pos_pred INTEGER, top_5_lbl_pred_0 INTEGER, top_5_lbl_pred_1 INTEGER, top_5_lbl_pred_2 INTEGER, top_5_lbl_pred_3 INTEGER, top_5_lbl_pred_4 INTEGER)
CREATE INDEX IF NOT EXISTS idx_preds ON predictions (readid, refseqid, pos_true);
Table `predictions` created with index.

CREATE TABLE IF NOT EXISTS label_probabilities (
    id INTEGER PRIMARY KEY,
    read_kmer_id TEXT,
    read_50mer_nb INTEGER,
prob_000 REAL,  prob_001 REAL,  prob_002 REAL,  prob_003 REAL,  prob_004 REAL,  prob_005 REAL,  prob_006 REAL,  prob_007 REAL,  prob_008 REAL,  prob_009 REAL,  prob_010 REAL,  prob_011 REAL,  prob_012 REAL,  prob_013 REAL,  prob_014 REAL,  prob_015 REAL,  prob_016 REAL,  prob_017 REAL,  prob_018 REAL,  prob_019 REAL,  prob_020 REAL,  prob_021 REAL,  prob_022 REAL,  prob_023 REAL,  prob_024 REAL

In [28]:
db.print_schema()

predictions (table)
 columns: id, readid, refseqid, refsource, refseq_strand, taxonomyid, lbl_true, lbl_pred, pos_true, pos_pred, top_5_lbl_pred_0, top_5_lbl_pred_1, top_5_lbl_pred_2, top_5_lbl_pred_3, top_5_lbl_pred_4
 index: idx_preds
   indexed columns: readid, refseqid, pos_true

label_probabilities (table)
 columns: id, read_kmer_id, read_50mer_nb, prob_000, prob_001, prob_002, prob_003, prob_004, prob_005, prob_006, prob_007, prob_008, prob_009, prob_010, prob_011, prob_012, prob_013, prob_014, prob_015, prob_016, prob_017, prob_018, prob_019, prob_020, prob_021, prob_022, prob_023, prob_024, prob_025, prob_026, prob_027, prob_028, prob_029, prob_030, prob_031, prob_032, prob_033, prob_034, prob_035, prob_036, prob_037, prob_038, prob_039, prob_040, prob_041, prob_042, prob_043, prob_044, prob_045, prob_046, prob_047, prob_048, prob_049, prob_050, prob_051, prob_052, prob_053, prob_054, prob_055, prob_056, prob_057, prob_058, prob_059, prob_060, prob_061, prob_062, prob_063, prob

## Other utility classes

In [29]:
# | export
class JsonDict(dict):
    """Dictionary whose current value is mirrored in a json file and can be initated from a json file
    
    `JsonDict` requires a path to json file at creation. An optional dict can be passed as argument.

    Behavior at creation:
    
    - `JsonDict(p2json, dict)` will create a `JsonDict` with key-values from `dict`, and mirrored in `p2json`
    - `JsonDict(p2json)` will create a `JsonDict` with empty dictionary and load json content if file exists

    Once created, `JsonDict` instances behave exactly as a dictionary
    """
    def __init__(
        self, 
        p2json: str|Path,               # path to the json file to mirror with the dictionary 
        dictionary: dict|None = None    # optional dictionary to initialize the JsonDict
        ):
        """Create dict from a passed dict or from json. Create the json file if required"""
        self.p2json = Path(p2json) if isinstance(p2json, str) else p2json
        if dictionary is None:
            if self.p2json.is_file():
                dictionary = self.load()
                self.initial_dict_from_json = True
            else:
                dictionary = dict()
                self.initial_dict_from_json = False
        super().__init__(dictionary)
        self.save()
    
    def __setitem__(self, __k:Any, v:Any) -> None:
        super().__setitem__(__k, v)
        self.save()

    def __delitem__(self, k:Any):
        super().__delitem__(k)
        self.save()

    def __repr__(self):
        txt1 = f"dict mirrored in {self.p2json.absolute()}\n"
        txt2 = super().__repr__()
        return txt1 + txt2

    def load(self)->dict:
        with open(self.p2json, 'r') as fp:
            return json.load(fp)

    def save(self):
        with open(self.p2json, 'w') as fp:
            json.dump(self, fp, indent=4)



Create a new dictionary mirrored to a JSON file:

In [30]:
d = {'a': 1, 'b': 2, 'c': 3}
p2json = pfs.data / 'jsondict-test.json'
jsondict = JsonDict(p2json, d)
jsondict

dict mirrored in /home/vtec/projects/bio/metagentorch/nbs-dev/data_dev/jsondict-test.json
{'a': 1, 'b': 2, 'c': 3}

Once created, the `JsonFile` instance behaves exactly like a dictionary, with the added benefit that any change to the dictionary is automatically saved to the JSON file.

In [31]:
jsondict['a'], jsondict['b'], jsondict['c']

(1, 2, 3)

In [32]:
for k, v in jsondict.items():
    print(f"key: {k}; value: {v}")

key: a; value: 1
key: b; value: 2
key: c; value: 3


Adding or removing a value from the dictionary works in the same way as for a normal dictionary. But the json file is automatically updated.

In [33]:
jsondict['d'] = 4
jsondict

dict mirrored in /home/vtec/projects/bio/metagentorch/nbs-dev/data_dev/jsondict-test.json
{'a': 1, 'b': 2, 'c': 3, 'd': 4}

In [34]:
with open(p2json, 'r') as fp:
    print(fp.read())

{
    "a": 1,
    "b": 2,
    "c": 3,
    "d": 4
}


In [35]:
del jsondict['a']
jsondict

dict mirrored in /home/vtec/projects/bio/metagentorch/nbs-dev/data_dev/jsondict-test.json
{'b': 2, 'c': 3, 'd': 4}

In [36]:
with open(p2json, 'r') as fp:
    print(fp.read())

{
    "b": 2,
    "c": 3,
    "d": 4
}


In [37]:
#| export
class JsonFileReader:
    """Mirror a JSON file and a dictionary"""
    def __init__(self, 
                 path:str|Path # path to the json file
                ):
        self.path = safe_path(path)
        with open(path, 'r') as fp:
            self.d = json.load(fp)
    
    def add_item(self, 
                 key:str,  # key for the new item
                 item:dict # new item to add to the json as a dict
                ):
        self.d[key] = item
        return self.d

    def save_to_file(self, path=None):
        if path is None: 
            path = self.path
        else:
            path = safe_path(path)

        with open(path, 'w') as fp:
            json.dump(self.d, fp, indent=4)

In [38]:
show_doc(JsonFileReader)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L416){target="_blank" style="float:right; font-size:smaller"}

### JsonFileReader

>      JsonFileReader (path:str|pathlib.Path)

*Mirror a JSON file and a dictionary*

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| path | str \| pathlib.Path | path to the json file |

In [39]:
jd = JsonFileReader(pfs.data / 'test.json')
pprint(jd.d)

{'item 1': {'keys': 'key key key key', 'pattern': 'pattern 1'},
 'item 2': {'keys': 'key key key key', 'pattern': 'pattern 2'},
 'item 3': {'keys': 'key key key key', 'pattern': 'pattern 3'}}


Now we can add an item to the dictionary/json 

In [40]:
new_item = {'keys': 'key key key key', 'pattern': 'another pattern'}
jd.add_item(key='another item', item=new_item)

{'item 1': {'keys': 'key key key key', 'pattern': 'pattern 1'},
 'item 2': {'keys': 'key key key key', 'pattern': 'pattern 2'},
 'item 3': {'keys': 'key key key key', 'pattern': 'pattern 3'},
 'another item': {'keys': 'key key key key', 'pattern': 'another pattern'}}

After saving the updated JSON file, we can load it again and see the changes.

In [41]:
jd.save_to_file()

In [42]:
jd = JsonFileReader(pfs.data / 'test.json')
pprint(jd.d)

{'another item': {'keys': 'key key key key', 'pattern': 'another pattern'},
 'item 1': {'keys': 'key key key key', 'pattern': 'pattern 1'},
 'item 2': {'keys': 'key key key key', 'pattern': 'pattern 2'},
 'item 3': {'keys': 'key key key key', 'pattern': 'pattern 3'}}


In [43]:
# | hide
initial_json = {
    'item 1': {'keys': 'key key key key', 'pattern': 'pattern 1'},
    'item 2': {'keys': 'key key key key', 'pattern': 'pattern 2'},
    'item 3': {'keys': 'key key key key', 'pattern': 'pattern 3'}
    }
with open(pfs.data / 'test.json', 'w') as fp:
    json.dump(initial_json, fp, indent=4)

## Other utility functions

In [44]:
#| export
def list_available_devices():
    # Check if CUDA is available
    cuda_available = torch.cuda.is_available()
    print(f"CUDA available: {cuda_available}")

    # List all available CUDA devices
    if cuda_available:
        num_cuda_devices = torch.cuda.device_count()
        print(f"Number of CUDA devices: {num_cuda_devices}")
        for i in range(num_cuda_devices):
            print(f"CUDA Device {i}: {torch.cuda.get_device_name(i)}")

    # Check for CPU availability
    cpu_available = torch.device('cpu')
    print(f"CPU available: {cpu_available}")

# Base Classes

## File Readers

Base classes to be extended in order to create readers for specific file formats.

In [92]:
#| export
class TextFileBaseReader:
    """Iterator going through a text file by chunks of `nlines` lines. Iterator can be reset to file start.
    
    The class is mainly intented to be extended, as it is for handling sequence files of various formats such as `FastaFileReader`.
    """

    def __init__(
        self,
        path: str|Path,  # path to the file
        nlines: int=1,   # number of lines on one chunk
    ):
        self.path = safe_path(path)
        self.nlines = nlines
        self.fp = None
        self.reset_iterator()
        
        # Attributes related to metadata parsing
        # Currently assumes the iterator generates a dictionary with key/values
        # TODO: extend to iterator output as simple string.
        self.text_to_parse_key = None
        self.parsing_rules_json = ProjectFileSystem().project_root / 'default_parsing_rules.json'
        self.re_rule_name = None
        self.re_pattern = None       # regex pattern to use to parse text
        self.re_keys = None          # keys (re group names) to parse text

    def reset_iterator(self) -> None:
        """Reset the iterator to point to the first line in the file."""
        if self.fp is not None:
            self.fp.close()
        self.fp = open(self.path, 'r')
        self._chunk_nb = 0
        
    def __iter__(self):
        return self

    def _safe_readline(self) -> str:
        """Read a new line and handle end of file tasks."""
        if self.fp is None: raise RuntimeError(f"File {self.path} is not opened")
        line = self.fp.readline()
        if line == '':
            self.fp.close()
            raise StopIteration()
        return line

    def __next__(self) -> str:
        """Return one chunk of `nlines` text lines at the time"""
        lines = []
        for i in range(self.nlines):
            lines.append(self._safe_readline())
        self._chunk_nb = self._chunk_nb + 1
        return ''.join(lines)
    
    def print_first_chunks(
        self, 
        nchunks:int=3,  # number of chunks to print
    ) -> None:
        """Print the first `nchunk` chunks of text from the file.

        After printing, the iterator is reset again to its first line.
        """
        self.reset_iterator()
        for i, chunk in enumerate(self.__iter__()):
            if i > nchunks-1: break
            print(f"{self.nlines}-line chunk {i+1}")
            print(chunk)
        self.reset_iterator()
            
    def _parse_text_fn(
        self,
        txt:str,         # text to parse
        pattern:str,     # regex pattern to apply to parse the text, must include groups
    )-> dict:            # parsed metadata in key/value format
        """Parsing metadata from string, using regex pattern. Return a metadata dictionary."""
        p = re.compile(pattern)
        keys = list(p.groupindex.keys())
        if len(keys)< 1: 
            raise ValueError(f"Pattern must include at least one group")

        match = p.search(txt)
        if match is not None:
            metadata = match.groupdict()
        else:
            metadata = {k:'' for k in keys}
        return metadata

    def parse_text(
        self,
        txt:str,                    # text to parse
        pattern:str|None=None,      # If None, uses standard regex pattern to extract metadata, otherwise, uses passed regex
        # keys:list[str]|None=None,   # If None, uses standard regex list of keys, otherwise, uses passed list of keys (str)
    )-> dict:                       # parsed metadata in key/value format
        """Parse text using regex pattern with groups. Return a metadata dictionary."""
        if pattern is None:
            if self.re_pattern is not None:
                return self._parse_text_fn(txt, self.re_pattern)
            else:
                raise ValueError('attribute re_pattern and re_keys are still None')
        else:
            return self._parse_text_fn(txt, pattern)        
        
    def set_parsing_rules(
        self,
        pattern: str|None=None,     # regex pattern to apply to parse the text, search in parsing rules json if None
        verbose: bool=False         # when True, provides information on each rule
    )-> None:
        """Set the standard regex parsing rule for the file.
        
        Rules can be set:
        
        1. manually by passing specific custom values for `pattern` and `keys`
        2. automatically, by testing all parsing rules saved in `parsing_rule.json` 
        
        Automatic selection of parsing rules works by testing each rule saved in `parsing_rule.json` on the first 
        definition line of the fasta file, and selecting the one rule that generates the most metadata matches.
        
        Rules consists of two parameters:
        
        - The regex pattern including one `group` for each metadata item, e.g `(?P<group_name>regex_code)`
        - The list of keys, i.e. the list with the name of each regex groups, used as key in the metadata dictionary
        
        This method updates the three following class attributes: `re_rule_name`, `re_pattern`, `re_keys`
      
        """
        # get the first definition line in the file to test the pattern
        # in base class, text_to_parse_key is not defined and automatic rule selection cannot be used
        # this must be handled in children classes
        if self.text_to_parse_key is None:
            msg = """
            `text_to_parse_key` is not defined in this class. 
            It is not possible to set a parsing rule. Must be define, e.g. 'definition line'
            """
            warnings.warn(msg, category=UserWarning)
            return

        self.reset_iterator()
        first_output = next(self)
        text_to_parse = first_output[self.text_to_parse_key]
        divider_line = f"{'-'*80}"

        if pattern is not None:
            re_keys = list(re.compile(pattern).groupindex.keys())
            if len(re_keys) < 1: raise ValueError(f"Pattern must include at least one group")
            try:
                metadata_dict = self.parse_text(text_to_parse, pattern)
                self.re_rule_name = 'Custom Rule'
                self.re_pattern = pattern
                self.re_keys = re_keys
                if verbose:
                    print(divider_line)
                    print(f"Custom rule was set for this instance.")
                    print(f"{self.re_rule_name}: {self.re_pattern}")
            except Exception as err: 
                raise ValueError(f"The pattern generates the following error:\n{err}")      
        else:
            # Load all existing rules from json file
            with open(self.parsing_rules_json, 'r') as fp:
                parsing_rules = json.load(fp)
                
            # test all existing rules and keep the one with highest number of matches
            max_nbr_matches = 0
            for k, v in parsing_rules.items():
                re_pattern:str = v['pattern']
                re_keys = list(re.compile(re_pattern).groupindex.keys())
                try:
                    metadata_dict = self.parse_text(text_to_parse, re_pattern)
                    nbr_matches = len([k for k,v in metadata_dict.items() if v is not None and v !=''])
                    if verbose:
                        print(divider_line)
                        # print(f"Rule <{k}> generated {nbr_matches:,d} matches")
                        print(f"{nbr_matches:,d} matches generated with rule <{k}> ")
                        print(divider_line)
                        print(re_pattern)
                        print(re_keys)
                        print(metadata_dict)

                    if nbr_matches > max_nbr_matches:
                        self.re_pattern = re_pattern
                        self.re_keys = re_keys
                        self.re_rule_name = k   
                except Exception as err:
                    if verbose:
                        print(divider_line)
                        print(f"Rule <{k}> generated an error")
                        print(err)
                    else:
                        pass
            if self.re_rule_name is None:
                msg = """
        None of the saved parsing rules were able to extract metadata from the first line in this file.
        You must set a custom rule (pattern + keys) before parsing text, by using:
            `self.set_parsing_rules(custom_pattern)`
                """
                warnings.warn(msg, category=UserWarning)
            
            if verbose:
                print(divider_line)
                print(f"Selected rule with most matches: {self.re_rule_name}")

            # We used the iterator, now we need to reset it to make all lines available
            self.reset_iterator()


Once initialized, the iterator runs over each chunk of line(s) in the text file, sequentially.

In [93]:
pfs.data

Path('/home/vtec/projects/bio/metagentorch/nbs-dev/data_dev')

In [94]:
p2textfile = pfs.data / 'CNN_Virus_data/train_short'
it = TextFileBaseReader(path=p2textfile, nlines=3)

one_iteration = next(it)

print(one_iteration)

TCAAAATAATCAGAAATGTTGAACCTAGGGTTGGACACATAATGACCAGC	76	0
ATTGTTTAACAATTTGTGCTCGTCCCGGTCACCCGCATCCAATCTTGATG	4	9
AATCTTGTCCTATCCTACCCGCAGGGGAATTGATGATAGANGTGCTTTTA	181	0



Let's create a new instance of the file reader, and get several iterations.

In [None]:
it = TextFileBaseReader(path=p2textfile, nlines=3)

one_iteration = next(it)
print(one_iteration)

TCAAAATAATCAGAAATGTTGAACCTAGGGTTGGACACATAATGACCAGC	76	0
ATTGTTTAACAATTTGTGCTCGTCCCGGTCACCCGCATCCAATCTTGATG	4	9
AATCTTGTCCTATCCTACCCGCAGGGGAATTGATGATAGANGTGCTTTTA	181	0



In [None]:
another_iteration = next(it)
print(another_iteration)
one_more_iteration = next(it)
print(one_more_iteration)

GGAGCGGAGCCAACCCCTATGCTCACTTGCAACCCAAGGGGCGTTCCAGT	74	3
TGGATCCTGCGCGGGACGTCCTTTGTCTACGTCCCGTCGGCGCATCCCGC	60	3
GAGAGACTTACTAAAAAGCTGGCACTTACCATCAGTGTTTCACCTACATG	44	0

ACACACGACACTAGAGATAATGTGTCAGTGGATTATAAACAAACCAAGTT	43	7
TTGTAGCATAAGAACTGGTCTTCGCTGAAATTCTTGTCTTGATCTCATCT	35	2
TGGCCCTGCGGTCTGGGGCCCAGAAGCATATGTCAAGTCCTTTGAGAAGT	73	4



If we want to access the start of the file again, we need to re-initialize the file handle.

In [None]:
show_doc(TextFileBaseReader.reset_iterator)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L468){target="_blank" style="float:right; font-size:smaller"}

### TextFileBaseReader.reset_iterator

>      TextFileBaseReader.reset_iterator ()

*Reset the iterator to point to the first line in the file.*

In [None]:
it.reset_iterator()
one_iteration = next(it)
print(one_iteration)
another_iteration = next(it)
print(another_iteration)

TCAAAATAATCAGAAATGTTGAACCTAGGGTTGGACACATAATGACCAGC	76	0
ATTGTTTAACAATTTGTGCTCGTCCCGGTCACCCGCATCCAATCTTGATG	4	9
AATCTTGTCCTATCCTACCCGCAGGGGAATTGATGATAGANGTGCTTTTA	181	0

GGAGCGGAGCCAACCCCTATGCTCACTTGCAACCCAAGGGGCGTTCCAGT	74	3
TGGATCCTGCGCGGGACGTCCTTTGTCTACGTCCCGTCGGCGCATCCCGC	60	3
GAGAGACTTACTAAAAAGCTGGCACTTACCATCAGTGTTTCACCTACATG	44	0



In [None]:
show_doc(TextFileBaseReader.print_first_chunks)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L495){target="_blank" style="float:right; font-size:smaller"}

### TextFileBaseReader.print_first_chunks

>      TextFileBaseReader.print_first_chunks (nchunks:int=3)

*Print the first `nchunk` chunks of text from the file.

After printing, the iterator is reset again to its first line.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| nchunks | int | 3 | number of chunks to print |
| **Returns** | **None** |  |  |

In [None]:
it = TextFileBaseReader(path=p2textfile, nlines=3)

it.print_first_chunks(nchunks=3)

3-line chunk 1
TCAAAATAATCAGAAATGTTGAACCTAGGGTTGGACACATAATGACCAGC	76	0
ATTGTTTAACAATTTGTGCTCGTCCCGGTCACCCGCATCCAATCTTGATG	4	9
AATCTTGTCCTATCCTACCCGCAGGGGAATTGATGATAGANGTGCTTTTA	181	0

3-line chunk 2
GGAGCGGAGCCAACCCCTATGCTCACTTGCAACCCAAGGGGCGTTCCAGT	74	3
TGGATCCTGCGCGGGACGTCCTTTGTCTACGTCCCGTCGGCGCATCCCGC	60	3
GAGAGACTTACTAAAAAGCTGGCACTTACCATCAGTGTTTCACCTACATG	44	0

3-line chunk 3
ACACACGACACTAGAGATAATGTGTCAGTGGATTATAAACAAACCAAGTT	43	7
TTGTAGCATAAGAACTGGTCTTCGCTGAAATTCTTGTCTTGATCTCATCT	35	2
TGGCCCTGCGGTCTGGGGCCCAGAAGCATATGTCAAGTCCTTTGAGAAGT	73	4



In [None]:
show_doc(TextFileBaseReader.parse_text)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L542){target="_blank" style="float:right; font-size:smaller"}

### TextFileBaseReader.parse_text

>      TextFileBaseReader.parse_text (txt:str, pattern:str|None=None,
>                                     keys:list[str]|None=None)

*Parse text using regex pattern and keys. Return a metadata dictionary.

The passed text is parsed using the regex pattern. The method return a dictionary in the format:

    {
        'key_1': 'metadata 1',
        'key_2': 'metadata 2',
        ...
    }*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| txt | str |  | text to parse |
| pattern | str \| None | None | If None, uses standard regex pattern to extract metadata, otherwise, uses passed regex |
| keys | list[str] \| None | None | If None, uses standard regex list of keys, otherwise, uses passed list of keys (str) |
| **Returns** | **dict** |  | **parsed metadata in key/value format** |

In [None]:
text = '>2591237:ncbi:1'
pattern = r"^>(?P<id>\d+):(?P<source>ncbi):(?P<nb>\d*)"
p = re.compile(pattern)
keys = p.groupindex.keys()

it.parse_text(text, pattern, keys)

{'id': '2591237', 'nb': '1', 'source': 'ncbi'}

## Extending the base class

`TextFileBaseReader` is a base class, intended to be extended into specific file format readers.

The following methods will typically be extended to match data file and other structured text files formats:

- `__next__` method in order to customize how the iterator parses files into "elements". For instance, in a FASTA file, one element consists of two lines: a *"definition line"* and the *sequence* itself. Extending `TextFileBaseReader` allows to read pairs of lines sequentially and return an element as a dictionary. For instance, `FastaFileReader` iterates over each pairs of lines in a Fasta file and return each pair as a dictionary as follows:

```text
    {
    'definition line': '>2591237:ncbi:1 [MK211378]\t2591237\tncbi\t1 [MK211378] '
                       '2591237\tCoronavirus BtRs-BetaCoV/YN2018D\t\tscientific '
                       'name\n',
    'sequence':        'TATTAGGTTTTCTACCTACCCAGGA'
    }
```
- Methods for parsing metadata from the file. For instance, `parse_file` method will handle how the reader will iterate over the full file and return a dictionary for the entire file. 
- Extended classes will also define a specific attributes (`text_to_parse_key`, `re_pattern`, `re_keys`, ...)

In [None]:
show_doc(TextFileBaseReader.set_parsing_rules)

---

[source](https://github.com/vtecftwy/metagentorch/blob/main/metagentorch/core.py#L571){target="_blank" style="float:right; font-size:smaller"}

### TextFileBaseReader.set_parsing_rules

>      TextFileBaseReader.set_parsing_rules (pattern:str|None=None,
>                                            keys:list[str]|None=None,
>                                            verbose:bool=False)

*Set the standard regex parsing rule for the file.

Rules can be set:

1. manually by passing specific custom values for `pattern` and `keys`
2. automatically, by testing all parsing rules saved in `parsing_rule.json` 

Automatic selection of parsing rules works by testing each rule saved in `parsing_rule.json` on the first 
definition line of the fasta file, and selecting the one rule that generates the most metadata matches.

Rules consists of two parameters:

- The regex pattern including one `group` for each metadata item, e.g `(?P<group_name>regex_code)`
- The list of keys, i.e. the list with the name of each regex groups, used as key in the metadata dictionary

This method updates the three following class attributes: `re_rule_name`, `re_pattern`, `re_keys`*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| pattern | str \| None | None | regex pattern to apply to parse the text, search in parsing rules json if None |
| keys | list[str] \| None | None | list of keys/group for regex, search in parsing rules json if None |
| verbose | bool | False | when True, provides information on each rule |
| **Returns** | **None** |  |  |

> **Important Note to Developpers**
>
> Method `set_parsing_rules` is there to allow `TextFileBaseReader`'s descendant classes to automatically select parsing rule by applying rules saved in a json file to a string extracted from the first element in the file.
>
> It assumes that the iterator returns its elements as dictionaries `{section_name:section, ...}` and not as a pure string. The key `self.text_to_parse_key` will then be used to extract the text to parse for testing the rules.
> The base class iterator returns a simple string and `self.text_to_parse_key` is set to `None`.
>
> To make setting up a default parsing rule for the reader instance, the iterator must return a dictionary and `self.text_to_parse_key` must be set to the key in the dictionary corresponding the the text to parse. 
>
> See implementation in `FastaFileReader`.
>
> Calling `set_parsing_rules` on a class that does not satisfy with these characteristics will do nothing and return a warning.

In [None]:
it.set_parsing_rules()

            `text_to_parse_key` is not defined in this class. 
            It is not possible to set a parsing rule.
            


# Deprecated Items

When any of the following classes and functions is called, it will raise an exception with an error message indicating how to handle the required code refactoring.

Example:

```python
DeprecationWarning                        Traceback (most recent call last)
Input In [140], in <cell line: 1>()
----> 1 TextFileBaseIterator(p2textfile)

Input In [139], in TextFileBaseIterator.__init__(self, *args, **kwargs)
      4 def __init__(self, *args, **kwargs):
      5     msg = \"\"\"
      6     `TextFileBaseIterator` is deprecated. 
      7     Use `TextFileBaseReader` instead, with same capabilities and more.\"\"\"
----> 8     raise DeprecationWarning(msg)

DeprecationWarning: 
        `TextFileBaseIterator` is deprecated. 
        Use `TextFileBaseReader` instead, with same capabilities and more." 
```

In [None]:
#| export
class TextFileBaseIterator:
    """`TextFileBaseIterator` is a deprecated class, to be replaced by `TextFileBaseReader`"""
    def __init__(self, *args, **kwargs):
        msg = """
        `TextFileBaseIterator` is deprecated. 
        Use `TextFileBaseReader` instead, with same capabilities and more."""
        raise DeprecationWarning(msg)

In [None]:
#| hide
p2textfile = Path('data_dev/train_short')
test_fail(TextFileBaseIterator, msg="Should generate DeprecationWarning", contains="`TextFileBaseIterator` is deprecated.")

In [None]:
#| hide
nbdev_export()