The following details out the steps taken to update the `dict.txt` file per case [Update dict.txt - remove xylazine terms](https://github.com/suzytamang/clever-rockies/issues/19).

This will treat each modification as a separate and distinct series of actions (deletes), which will be used in turn to inform case [Support Feature: Renumbering and Validation Script](https://github.com/suzytamang/clever-rockies/issues/22).

In [None]:
# dataclass to hold terminology entry

from abc import ABC
from dataclasses import dataclass
from typing import Callable, List
import pandas as pd
from math import floor
from tabulate import tabulate
from typing import Self
import static_frame as sf
from typing import Literal
from typing import cast


pd.set_option("display.max_columns", None)
pd.set_option("display.max_columns", 20)
pd.set_option("display.width", 200)


class FrameHelper:

    @staticmethod
    def show(df: pd.DataFrame):
        print(tabulate(df, headers="keys", tablefmt="psql", showindex=False))

In [None]:
# Constants


class ColumnLabels(ABC):

    @classmethod
    def all_values(cls):
        return [
            val for key, val in vars(cls).items() if not key.startswith("_")
        ]


class TermLabels(ColumnLabels):
    ID_LABEL = "id"
    TERM_LABEL = "term"
    SUBCLASS_LABEL = "subclass_name"
    CLASS_LABEL = "class_name"


class NumberingDetails(ColumnLabels):
    ID_CHUNK_LABEL = "id_chunk"


class ValidationFlags(ColumnLabels):
    IS_VALID_TERM_LABEL = "is_valid_term"
    HAS_DUPLICATE_ID_LABEL = "has_duplicate_id"
    DUPLICATE_TERM_CLASS_SUBCLASS_LABEL = "duplicate_term_class_subclass"

In [None]:
class IResetChanged(ABC):

    def reset(self):
        pass


class ISetChanged(ABC):

    def set_changed(self):
        pass


class ChangedFlag(ISetChanged, IResetChanged):
    """Class to manage changed flag; controls access to only set as changed"""

    def __init__(self) -> None:
        self._is_changed = False

    def set_changed(self):
        self._is_changed = True

    def reset(self):
        self._is_changed = False

    @property
    def is_changed(self) -> bool:
        return self._is_changed


In [None]:


@dataclass
class Term:
    id: int
    term: str
    subclass_name: str
    class_name: str

    @staticmethod
    def deserialize(term_string: str) -> "Term":
        term_pieces = term_string.split("|")
        return Term(
            id=int(term_pieces[0]),
            term=term_pieces[1],
            subclass_name=term_pieces[2],
            class_name=term_pieces[3],
        )

    @staticmethod
    def deserialize_terms(term_strings: List[str]) -> List["Term"]:
        return [Term.deserialize(term_string) for term_string in term_strings]

In [None]:
# Terminology Validator
class TermValidator:

    def __init__(self, *rules: Callable[[Term], bool]) -> None:
        rules = list(rules)
        # if no rules provided, then default to asserting term is valid
        if len(rules) == 0:
            rules.append(TermValidator.noop)
        self._rules = rules

    def __call__(self, term: Term) -> bool:
        is_valid = all(is_valid(term) for is_valid in self._rules)
        return is_valid

    @staticmethod
    def noop(x) -> bool:
        return True

In [None]:
# Terminology Validator

class TerminologyValidator:

    def __init__(self, term_validator: TermValidator) -> None:
        self._term_validator: TermValidator = term_validator

    def __call__(self, termd_df: pd.DataFrame) -> bool:
        termd_df[ValidationFlags.IS_VALID_TERM_LABEL] = termd_df.apply(
            lambda term: self._term_validator(term), axis=1
        )

        duplicate_ids_found = self.check_for_duplicate_ids()
        duplicate_term_class_subclass_found = (
            self.check_for_duplicate_term_class_subclass()
        )

        if duplicate_ids_found:
            print("Duplicate IDs found")

        if duplicate_term_class_subclass_found:
            print("Duplicate IDs found")

        return (
            all(termd_df[ValidationFlags.IS_VALID_TERM_LABEL])
            and not duplicate_ids_found
            and not duplicate_term_class_subclass_found
        )

    def duplicate_check(
        self, labels_to_check: str | List[str], flag_label
    ) -> bool:
        """Check to see if duplicates exist for a vector of columns

        Args:
            labels_to_check (str | List[str]): vector of columns to check
            flag_label (_type_): label to add to dataframe for this flag

        Returns:
            bool: _description_
        """
        if isinstance(labels_to_check, str):
            labels_to_check = [labels_to_check]
        duplicate_mask: pd.Series = self._terms_df[
            labels_to_check
        ].duplicated()
        self._terms_df[flag_label] = duplicate_mask
        return any(duplicate_mask)

    def check_for_duplicate_ids(self) -> bool:
        """Check to see if ids are duplicated

        Returns:
            bool: True if no duplicate ids exist
        """
        return self.duplicate_check(
            TermLabels.ID_LABEL, ValidationFlags.HAS_DUPLICATE_ID_LABEL
        )

    def check_for_duplicate_term_class_subclass(self) -> bool:
        """Check to see if the combination of term, class, and subclass exists

        Returns:
            bool: True if no duplicates exist
        """
        return self.duplicate_check(
            [
                TermLabels.TERM_LABEL,
                TermLabels.CLASS_LABEL,
                TermLabels.SUBCLASS_LABEL,
            ],
            ValidationFlags.DUPLICATE_TERM_CLASS_SUBCLASS_LABEL,
        )

    def does_term_exist(self, term: Term) -> bool:
        """Check to see if the term string exists

        Args:
            term (Term): _description_

        Returns:
            bool: _description_
        """
        term_count = len(
            self._terms_df[self._terms_df[TermLabels.TERM_LABEL] == term.term]
        )
        if term_count > 1:
            print("More than one instance of term found: %s", term)

        return term_count == 1

In [None]:
class TerminologyRenumberStrategy:
    """This strategy assumes the following:

    - Terms ids are in 1k chunks
    - Deleting a term drops the id and renumbers
        all following terms up to the next 1k boundary

    """

    ID_BLOCK_SIZE = 1000

    def __init__(self) -> None:
        self._monitor: IResetChanged | None = None

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        """Iterates over each 1k block and compacts ids
        to create a monotonically increasing id starting at
        each 1k boundary.

        Example:

        | Original ID 	| Updated ID 	|
        |-------------	|------------	|
        | 1           	| 1          	|
        | 2           	| 2          	|
        | 10          	| 3          	|
        | 100         	| 4          	|
        | 1001        	| 1000       	|
        | 1002        	| 1001       	|
        | 1010        	| 1002       	|

        Args:
            df (pd.DataFrame): current copy of the terminology

        Returns:
            pd.DataFrame: terminology with updated ids
        """

        # Find all 1k chunks

        df[NumberingDetails.ID_CHUNK_LABEL] = (
            df[TermLabels.ID_LABEL] / TerminologyRenumberStrategy.ID_BLOCK_SIZE
        ).apply(floor)

        working_df: pd.DataFrame = (
            df.groupby(by=NumberingDetails.ID_CHUNK_LABEL, as_index=False)
            .apply(lambda x: x.reset_index(drop=True))
            .reset_index()
            .drop(labels=["level_0", TermLabels.ID_LABEL], axis=1)
            .rename({"level_1": TermLabels.ID_LABEL}, axis=1)
        )

        first_chunk_df: pd.DataFrame = working_df[
            working_df[NumberingDetails.ID_CHUNK_LABEL] == 0
        ]
        first_chunk_df[TermLabels.ID_LABEL] = (
            first_chunk_df[TermLabels.ID_LABEL] + 1
        )

        remaining_chunks_df: pd.DataFrame = working_df[
            working_df[NumberingDetails.ID_CHUNK_LABEL] > 0
        ]
        remaining_chunks_df[TermLabels.ID_LABEL] = (
            remaining_chunks_df[NumberingDetails.ID_CHUNK_LABEL]
            * TerminologyRenumberStrategy.ID_BLOCK_SIZE
            + remaining_chunks_df[TermLabels.ID_LABEL]
        )

        df = pd.concat([first_chunk_df, remaining_chunks_df])
        df = df.drop(
            labels=[NumberingDetails.ID_CHUNK_LABEL]
            + ValidationFlags.all_values(),
            axis=1,
        )

        self._monitor.reset()

        return df

    def set_monitor(self, monitor: IResetChanged):
        self._monitor = monitor

In [None]:
# Primary management class for terminology actions


class TerminologyManager:
    """Manage additions, deletions, and updates to terminology"""

    @staticmethod
    def as_readonly(df: pd.DataFrame):
        return sf.Frame.from_pandas(df)

    def __init__(
        self,
        terms: pd.DataFrame | str,
        terminology_validator: TerminologyValidator,
        renumber_strategy: TerminologyRenumberStrategy,
    ) -> None:
        change_monitor: ChangedFlag = ChangedFlag()
        self._change_monitor: ISetChanged = change_monitor
        if isinstance(terms, str):
            terms: pd.DataFrame = pd.read_csv(
                terms,
                delimiter="|",
                names=[
                    TermLabels.ID_LABEL,
                    TermLabels.TERM_LABEL,
                    TermLabels.SUBCLASS_LABEL,
                    TermLabels.CLASS_LABEL,
                ],
            )

        self._updated_terminology_df: pd.DataFrame = terms.copy()
        self._original_terminology_df: pd.DataFrame = terms.copy()
        self._terminology_validator: TerminologyValidator = (
            terminology_validator
        )
        self._renumber_strategy: TerminologyRenumberStrategy = (
            renumber_strategy
        )
        self._renumber_strategy.set_monitor(
            cast(IResetChanged, self._change_monitor)
        )

        if self._terminology_validator() is False:
            raise Exception("Term validation failed")

    def __enter__(self) -> Self:
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._updated_terminology_df = self._renumber_strategy(
            self._updated_terminology_df
        )

    def find(
        self, ids: List[int], source: Literal["original"] | Literal["updated"]
    ):
        """Search for entries in the original or updated terminology

        Args:
            ids (List[int]): _description_
            source (Literal[&quot;original&quot;] | Literal[&quot;updated&quot;]): _description_

        Returns:
            _type_: _description_
        """
        df: pd.DataFrame
        if source == "original":
            df = self.get_original_terminology()
        else:
            df = self.get_updated_terminology()

        return TerminologyManager.as_readonly(
            df[TermLabels.ID_LABEL].isin(ids)
        )

    def get_original_terminology(self):
        """Return a copy of the original terminology

        Returns:
            pd.DataFrame: _description_
        """
        return TerminologyManager.as_readonly(self._original_terminology_df)

    def get_updated_terminology(self) -> pd.DataFrame:
        """Return the updated terminology, renumbering if a change was made

        Returns:
            pd.DataFrame: _description_
        """
        if self._change_monitor.is_changed is False:
            return TerminologyManager.as_readonly(self._updated_terminology_df)
        return TerminologyManager.as_readonly(
            self._renumber_strategy(self._updated_terminology_df)
        )

    def delete_term(self, term: Term) -> Self | None:
        """Remove a term from the terminology

        Args:
            term (Term): Delete a term from the terminology
        """
        if self._terminology_validator.does_term_exist(term) is False:
            print("Term %s does not exist")
            return
        df: pd.DataFrame = self._updated_terminology_df
        term_mask: pd.Series = (
            (df[TermLabels.ID_LABEL] == term.id)
            and (df[TermLabels.TERM_LABEL] == term.term)
            and (df[TermLabels.CLASS_LABEL] == term.class_name)
            and (df[TermLabels.SUBCLASS_LABEL] == term.subclass_name)
        )

        if not any(term_mask):
            raise Exception("Could not locate term in dict: %s", str(term))

        if df[term_mask].values.sum() > 1:
            raise Exception(
                "Found more than one instance of term: %s", str(term)
            )

        df = df[~term_mask]  # everything but the selected term
        self._change_monitor.set_changed()
        return self

    def add_term(self, term: Term) -> Self | None:
        raise NotImplementedError("add_term")
        self._change_monitor.set_changed()
        return self

    def update_term(self, term: Term) -> Self | None:
        raise NotImplementedError("update_term")
        self._change_monitor.set_changed()
        return self

Load terminology

In [None]:
terminology_manager = TerminologyManager(
    r"D:\DaxWorkspace\PERC\clever-rockies\res\dicts\dict.txt",
    terminology_validator=TerminologyValidator(term_validator=TermValidator()),
    renumber_strategy=TerminologyRenumberStrategy(),
)

FrameHelper.show(terminology_manager.find(ids=list(range(3000,3005)), source="original"))

Remove entries

In [None]:
terms_to_delete: List[str] = [
    "3001|tranq|XYLA|drug",
    "3002|tranq dope|XYLA|drug",
    "3003|zylazine|XYLA|drug",
]

terms: List[Term] = Term.deserialize_terms(terms_to_delete)

for term in terms:
    terminology_manager.delete_term(term)
    
FrameHelper.show(terminology_manager.find(ids=list(range(3000,3005)), source="original"))

FrameHelper.show(terminology_manager.find(ids=list(range(3000,3005)), source="updated"))

Get renumbered terminology

In [None]:
updated_terminology_df: pd.DataFrame = terminology_manager.get_updated_terminology()

In [None]:
FrameHelper.show(updated_terminology_df.head(20))