<a href="https://colab.research.google.com/github/robjshore/colab_notebook/blob/main/ML_Abstract_Base_Classes_with_Type_Comments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import abc
from typing import Any, Dict, List, Optional, Union, Sequence

class BaseMLOperation(abc.ABC):
    """
    A base class for all ML operations, providing a common configuration
    initialization and validation pattern.
    """
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initializes the operation with an optional configuration dictionary.

        Args:
            config (Optional[Dict[str, Any]]):
                Input Type: A dictionary where keys are configuration parameter names (strings)
                            and values can be of any type, depending on the specific needs
                            of the concrete operation. If None, an empty dictionary is used.
                Purpose: To provide settings and parameters that control the behavior
                         of the operation (e.g., model paths, API keys, thresholds).
        """
        self.config: Dict[str, Any] = config if config is not None else {}
        # Specific validation logic is deferred to subclasses and typically called
        # within their `configure` or `run` methods.

    @abc.abstractmethod
    def configure(self) -> None:
        """
        Configures the operation. This method is intended for setup tasks that
        might rely on the initial configuration, such as loading models,
        allocating resources, or setting up connections.
        Implementations should make this method idempotent if possible.

        Input Type: None (implicitly `self`, which carries the `config`).
        Output Type: None. This method typically modifies the internal state of
                     the object (e.g., loads models into `self.model`).
        """
        pass

    @abc.abstractmethod
    def validate_config(self) -> None:
        """
        Validates the provided configuration to ensure the operation can run.

        Input Type: None (implicitly `self`, which carries the `config`).
        Output Type: None. This method is expected to raise an exception
                     (e.g., ValueError) if the configuration is invalid.
        Raises:
            ValueError: If the configuration is invalid or missing required parameters.
        """
        pass

# --- Document Ingestion ---
class BaseDocumentIngestor(BaseMLOperation):
    """Abstract base class for document ingestion from various sources."""

    @abc.abstractmethod
    def ingest(self, source: Any, **kwargs) -> Any:
        """
        Ingests content from a given source.

        Args:
            source (Any):
                Input Type: The type is `Any` to allow maximum flexibility. This could be:
                            - A string representing a file path (e.g., "/path/to/doc.pdf").
                            - A string representing a URL (e.g., "http://example.com/page").
                            - A database connection object or query string.
                            - An API client or parameters for an API call.
                            - Raw text content itself.
                Purpose: Specifies where or what to ingest.
            **kwargs: Additional keyword arguments specific to the ingestion method.

        Returns:
            Any:
                Output Type: The type is `Any` to accommodate diverse output formats.
                             This could be:
                             - A single string containing the entire document content.
                             - A list of strings, where each string is a page or section.
                             - A list of structured objects (e.g., custom Document objects).
                             - A dictionary representing structured data (e.g., from JSON/CSV).
                Purpose: To provide the ingested content in a usable format for downstream tasks.
        """
        pass

# --- Text Chunking ---
class BaseTextChunker(BaseMLOperation):
    """Abstract base class for dividing text into smaller chunks."""

    @abc.abstractmethod
    def chunk(self, document: Any, **kwargs) -> List[Any]:
        """
        Divides a document into smaller, manageable chunks.

        Args:
            document (Any):
                Input Type: The type is `Any` as it depends on the output of an ingestor.
                            This could be:
                            - A single large string.
                            - A list of strings (e.g., pages of a document).
                            - A custom document object.
                Purpose: The content that needs to be divided into smaller pieces.
            **kwargs: Additional keyword arguments for chunking strategy customization
                      (e.g., chunk_size, overlap, delimiters).

        Returns:
            List[Any]:
                Output Type: A list where each element is a chunk. The type of each
                             chunk (`Any`) can vary:
                             - Typically, a list of strings.
                             - Could also be a list of custom Chunk objects containing
                               text and metadata.
                Purpose: To provide the text broken down into manageable units.
        """
        pass

# --- Tokenization ---
class BaseTokenizer(BaseMLOperation):
    """Abstract base class for breaking down text into tokens."""

    @abc.abstractmethod
    def tokenize(self, text_input: Union[str, Sequence[str]], **kwargs) -> Union[List[str], List[List[str]]]:
        """
        Converts text input into a sequence of tokens.

        Args:
            text_input (Union[str, Sequence[str]]):
                Input Type: Can be either:
                            - A single string (e.g., one document or chunk).
                            - A sequence (e.g., list, tuple) of strings (e.g., multiple chunks).
                Purpose: The text data to be broken down into individual tokens.
            **kwargs: Additional keyword arguments for tokenizer customization.

        Returns:
            Union[List[str], List[List[str]]]:
                Output Type:
                            - If input is a single string: A list of strings, where each
                              string is a token (e.g., `['hello', 'world']`).
                            - If input is a sequence of strings: A list of lists of strings,
                              where each inner list contains tokens for the corresponding
                              input string (e.g., `[['hello', 'world'], ['another', 'sentence']]`).
                Purpose: To provide a tokenized representation of the input text.
        """
        pass

# --- Creation of Embeddings ---
class BaseEmbedder(BaseMLOperation):
    """Abstract base class for converting text or tokens into numerical embeddings."""

    @abc.abstractmethod
    def embed(
        self,
        data_to_embed: Union[str, Sequence[str], Sequence[Sequence[str]]],
        **kwargs
    ) -> List[List[float]]:
        """
        Generates numerical vector embeddings for the given input.

        Args:
            data_to_embed (Union[str, Sequence[str], Sequence[Sequence[str]]]):
                Input Type: Can be:
                            - A single string (e.g., a sentence or document).
                            - A sequence of strings (e.g., multiple sentences, chunks).
                            - A sequence of sequences of strings (e.g., pre-tokenized text,
                              where each inner sequence is a list of tokens for one item).
                Purpose: The textual data to be converted into numerical vectors.
            **kwargs: Additional keyword arguments for embedding model customization.

        Returns:
            List[List[float]]:
                Output Type: A list of embedding vectors. Each inner list is a vector
                             of floats representing an input item.
                             (e.g., `[[0.1, 0.2, ...], [0.3, 0.4, ...]]`).
                Purpose: To provide numerical representations of the input data suitable
                         for similarity comparisons or as input to ML models.
        """
        pass

# --- Weight Assignment / Scoring ---
class BaseScorer(BaseMLOperation):
    """Abstract base class for assigning weights or scores to items."""

    @abc.abstractmethod
    def score(
        self,
        items: Sequence[Any],
        context: Optional[Any] = None,
        **kwargs
    ) -> List[Union[float, int]]:
        """
        Assigns a numerical score or weight to each item in a sequence.

        Args:
            items (Sequence[Any]):
                Input Type: A sequence (e.g., list) of items to be scored. The type of
                            each item (`Any`) can vary:
                            - Documents, chunks of text (strings).
                            - Tokens (strings).
                            - Feature vectors (lists of numbers).
                Purpose: The collection of items that need scoring.
            context (Optional[Any]):
                Input Type: `Any` type to allow flexibility. This could be:
                            - A query string for relevance scoring.
                            - A global document collection for TF-IDF.
                            - A set of rules or a model for heuristic scoring.
                Purpose: Provides additional information that might be needed to calculate scores.
            **kwargs: Additional keyword arguments for scoring algorithm customization.

        Returns:
            List[Union[float, int]]:
                Output Type: A list of numerical scores (floats or integers),
                             corresponding one-to-one with the input `items`.
                Purpose: To provide a quantitative measure of importance, relevance, or
                         some other characteristic for each item.
        """
        pass

# --- Clustering ---
class BaseClusterer(BaseMLOperation):
    """Abstract base class for grouping similar items together."""

    @abc.abstractmethod
    def cluster(self, data_points: Sequence[List[float]], **kwargs) -> List[int]:
        """
        Assigns items to clusters based on their features.

        Args:
            data_points (Sequence[List[float]]):
                Input Type: A sequence (e.g., list) of data points. Each data point is
                            a list of floats, typically representing a feature vector
                            or an embedding.
                            (e.g., `[[0.1, 0.2], [0.9, 0.8], [0.15, 0.22]]`).
                Purpose: The numerical data to be grouped into clusters.
            **kwargs: Additional keyword arguments for clustering algorithm customization
                      (e.g., number of clusters for K-Means, epsilon for DBSCAN).

        Returns:
            List[int]:
                Output Type: A list of integers, where each integer is a cluster label
                             assigned to the corresponding input data point.
                             (e.g., `[0, 1, 0]`, indicating the first and third points
                             are in cluster 0, and the second is in cluster 1).
                             A value like -1 might indicate noise for some algorithms.
                Purpose: To provide group assignments for the input data.
        """
        pass

# --- Anomaly Detection ---
class BaseAnomalyDetector(BaseMLOperation):
    """Abstract base class for identifying items that deviate from the norm."""

    @abc.abstractmethod
    def detect(self, data_points: Sequence[List[float]], **kwargs) -> List[Union[int, bool]]:
        """
        Identifies anomalies within a set of data points.

        Args:
            data_points (Sequence[List[float]]):
                Input Type: A sequence (e.g., list) of data points. Each data point is
                            a list of floats, typically a feature vector or embedding.
                Purpose: The numerical data to be analyzed for anomalies.
            **kwargs: Additional keyword arguments for anomaly detection algorithm customization.

        Returns:
            List[Union[int, bool]]:
                Output Type: A list of indicators, one for each input data point.
                             The indicator can be:
                             - An integer (e.g., 1 for normal, -1 for anomaly, or 0/1).
                             - A boolean (e.g., True for anomaly, False for normal).
                Purpose: To flag which data points are considered outliers.
        """
        pass

# --- Feature Extraction ---
class BaseFeatureExtractor(BaseMLOperation):
    """Abstract base class for deriving features from raw data."""

    @abc.abstractmethod
    def extract(self, raw_data: Any, **kwargs) -> Any:
        """
        Extracts meaningful features from raw input data.

        Args:
            raw_data (Any):
                Input Type: The type is `Any` due to the wide variety of possible inputs:
                            - Text (string or list of strings).
                            - Image data.
                            - Structured data (e.g., dictionaries, pandas DataFrames).
                            - Time series data.
                Purpose: The input from which features need to be derived.
            **kwargs: Additional keyword arguments for feature extraction customization.

        Returns:
            Any:
                Output Type: The type is `Any` as the structure of extracted features
                             can vary greatly:
                             - A list or array of numerical features.
                             - A dictionary of feature names to values.
                             - A sequence of complex objects (e.g., POS tags, named entities).
                             - A sparse matrix.
                Purpose: To provide a transformed representation of the data that is more
                         suitable for machine learning models or analysis.
        """
        pass

# --- Data Transformation / Preprocessing ---
class BaseDataPreprocessor(BaseMLOperation):
    """Abstract base class for cleaning, normalizing, or transforming data."""

    @abc.abstractmethod
    def preprocess(self, data: Any, **kwargs) -> Any:
        """
        Applies preprocessing steps to the input data.

        Args:
            data (Any):
                Input Type: The type is `Any` because preprocessing can apply to various
                            data types:
                            - Text (string, list of strings).
                            - Numerical data (lists, arrays).
                            - Structured data (dictionaries, DataFrames).
                Purpose: The data that needs cleaning, normalization, or transformation.
            **kwargs: Additional keyword arguments for preprocessing step customization
                      (e.g., stopwords list, stemming algorithm, normalization range).

        Returns:
            Any:
                Output Type: The type is `Any`, and it typically matches or is closely
                             related to the input data type, but in its processed form.
                             (e.g., input string -> output string after lowercasing).
                Purpose: To provide data that is cleaner, more consistent, or in a more
                         appropriate format for subsequent operations.
        """
        pass

In [8]:
import os
from typing import List, Dict, Any, Optional
from abc import ABC, abstractmethod

# Assuming BaseMLOperation and BaseDocumentIngestor are defined above this code block
# as in the user's provided context.
# from your_module import BaseMLOperation, BaseDocumentIngestor

## Custom Object to Hold Ingested Directory Data

class IngestedDirectory:
    """
    A custom object to hold information about files ingested from a directory.
    """
    def __init__(self, directory_path: str, file_paths: List[str]):
        """
        Initializes the IngestedDirectory object.

        Args:
            directory_path (str): The path to the directory that was ingested.
            file_paths (List[str]): A list of full paths to the files found in the directory.
        """
        self.directory_path = directory_path
        self.file_paths = file_paths

    def __str__(self) -> str:
        """String representation for easy printing."""
        return f"IngestedDirectory(path='{self.directory_path}', files={len(self.file_paths)})"

    def __repr__(self) -> str:
        """Representation for debugging."""
        return f"IngestedDirectory(directory_path='{self.directory_path}', file_paths={self.file_paths})"

    # You could add methods here to facilitate access for a chunker,
    # e.g., get_file_contents(self, file_path) -> str or get_all_contents(self) -> Dict[str, str]
    # but for this example, the chunker can access the self.file_paths list directly.

## Directory Ingestor Class

class DirectoryIngestor(BaseDocumentIngestor):
    """
    Ingests a directory by listing its contents.
    Returns an IngestedDirectory object containing file paths.
    """
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initializes the DirectoryIngestor with configuration.

        Config should contain:
        - 'directory_path' (str): The path to the directory to ingest.
        """
        super().__init__(config)
        self.directory_path: Optional[str] = None

    def validate_config(self) -> None:
        """
        Validates the configuration for the DirectoryIngestor.

        Checks if 'directory_path' is provided and is a valid directory.
        """
        if 'directory_path' not in self.config or not isinstance(self.config['directory_path'], str):
            raise ValueError("Configuration must contain a 'directory_path' key with a string value.")

        self.directory_path = self.config['directory_path']

        if not os.path.isdir(self.directory_path):
             raise ValueError(f"The provided directory_path '{self.directory_path}' is not a valid directory.")

    def configure(self) -> None:
        """
        Configures the DirectoryIngestor.
        Validation is done here based on the BaseMLOperation pattern.
        """
        # In this simple case, configure mainly involves validating the config
        # and storing the validated directory path.
        self.validate_config()


    def ingest(self, source: Any = None, **kwargs) -> IngestedDirectory:
        """
        Lists files in the configured directory.

        Args:
            source (Any): Ignored in this implementation as the directory is
                          specified in the config.
            **kwargs: Additional keyword arguments (ignored).

        Returns:
            IngestedDirectory: An object containing the directory path and a list
                               of file paths within that directory.
        """
        if self.directory_path is None:
             # Ensure configuration and validation has happened
             self.configure()

        file_list = []
        # Walk through the directory to get all file paths
        for root, _, files in os.walk(self.directory_path):
            for file in files:
                file_list.append(os.path.join(root, file))

        return IngestedDirectory(directory_path=self.directory_path, file_paths=file_list)

In [9]:
import abc
from typing import Any, Dict, List, Optional, Union, Sequence
import os

# Assuming BaseMLOperation and BaseTextChunker are defined from the previous code block
# from your_module import BaseMLOperation, BaseTextChunker

## Custom Object to Hold Ingested Directory Data (from previous code)
class IngestedDirectory:
    """
    A custom object to hold information about files ingested from a directory.
    """
    def __init__(self, directory_path: str, file_paths: List[str]):
        self.directory_path = directory_path
        self.file_paths = file_paths

    def __str__(self) -> str:
        return f"IngestedDirectory(path='{self.directory_path}', files={len(self.file_paths)})"

    def __repr__(self) -> str:
        return f"IngestedDirectory(directory_path='{self.directory_path}', file_paths={self.file_paths})"

    # Methods to access contents would go here in a real implementation
    # For demonstration, a chunker will access self.file_paths directly.


## A Dummy Chunker Class for Demonstration
class DummyDirectoryChunker(BaseTextChunker):
    """
    A dummy chunker that accepts an IngestedDirectory object.
    This class is for demonstration purposes and does not perform actual chunking.
    """
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initializes the dummy chunker."""
        super().__init__(config)

    def validate_config(self) -> None:
        """
        Dummy config validation.
        In a real chunker, this would validate chunking parameters.
        """
        print("DummyDirectoryChunker: Validating config (no-op).")
        # Add real config validation here if needed for the dummy

    def configure(self) -> None:
        """
        Dummy configuration step.
        In a real chunker, this might load resources or set up parameters.
        """
        print("DummyDirectoryChunker: Configuring (no-op).")
        self.validate_config() # Call validation as per BaseMLOperation pattern

    def chunk(self, document: Any, **kwargs) -> List[Any]:
        """
        Dummy chunk method that expects an IngestedDirectory object.
        Does not perform actual chunking.

        Args:
            document (Any): Expected to be an instance of IngestedDirectory.
            **kwargs: Additional keyword arguments (ignored).

        Returns:
            List[Any]: An empty list as this is a dummy implementation.
        """
        print("DummyDirectoryChunker: Received document for chunking.")

        if not isinstance(document, IngestedDirectory):
            print(f"Warning: Expected IngestedDirectory object but received {type(document)}")
            return []

        print(f"DummyDirectoryChunker: Accessing ingested directory: {document.directory_path}")
        print(f"DummyDirectoryChunker: Found {len(document.file_paths)} files to process.")

        # In a real chunker, you would iterate through document.file_paths,
        # read the content of each file, and then apply chunking logic.
        # For example:
        # all_content = ""
        # for file_path in document.file_paths:
        #     try:
        #         with open(file_path, 'r', encoding='utf-8') as f:
        #             all_content += f.read() + "\n\n" # Concatenate content, maybe add separators
        #     except Exception as e:
        #         print(f"Error reading file {file_path}: {e}")
        #
        # # Now apply your chunking logic to all_content or process files individually
        # actual_chunks = ["dummy chunk 1", "dummy chunk 2"] # Replace with real chunking result

        print("DummyDirectoryChunker: (Skipping actual chunking in dummy class)")

        # Return a placeholder or empty list
        return [] # In a real scenario, return the list of chunks

# --- Demonstration Code ---

## Create a dummy IngestedDirectory object
# For this demonstration, let's create a temporary dummy directory and file
dummy_dir = "./dummy_ingest_dir"
dummy_file1 = os.path.join(dummy_dir, "file1.txt")
dummy_file2 = os.path.join(dummy_dir, "subdir", "file2.txt")

# Create the dummy directory and a subdirectory
os.makedirs(os.path.join(dummy_dir, "subdir"), exist_ok=True)

# Create dummy files
with open(dummy_file1, "w") as f:
    f.write("This is the content of dummy file 1.")
with open(dummy_file2, "w") as f:
    f.write("This is the content of dummy file 2 in a subdirectory.")

# Manually create the list of dummy file paths
dummy_file_paths = [dummy_file1, dummy_file2]

# Create an instance of the custom IngestedDirectory object
# In a real pipeline, this object would come from the DirectoryIngestor.ingest() call
print("Creating a dummy IngestedDirectory object...")
ingested_directory_object = IngestedDirectory(directory_path=dummy_dir, file_paths=dummy_file_paths)
print(f"Created object: {ingested_directory_object}")
print("-" * 20)


## Create an instance of the Dummy Chunker class
print("Creating an instance of DummyDirectoryChunker...")
chunker_instance = DummyDirectoryChunker()
print("Chunker instance created.")
print("-" * 20)

## Pass the IngestedDirectory object to the chunker's chunk method
print("Passing the IngestedDirectory object to the chunker's chunk method...")
try:
    # The chunk method will be called with the ingested object
    resulting_chunks = chunker_instance.chunk(ingested_directory_object)
    print("Chunking method called.")
    print(f"Resulting chunks (from dummy): {resulting_chunks}")
except Exception as e:
    print(f"An error occurred during dummy chunking: {e}")

print("-" * 20)
print("Demonstration complete.")

# Clean up the dummy directory and files
print("Cleaning up dummy directory...")
os.remove(dummy_file1)
os.remove(dummy_file2)
os.rmdir(os.path.join(dummy_dir, "subdir"))
os.rmdir(dummy_dir)
print("Cleanup complete.")

Creating a dummy IngestedDirectory object...
Created object: IngestedDirectory(path='./dummy_ingest_dir', files=2)
--------------------
Creating an instance of DummyDirectoryChunker...
Chunker instance created.
--------------------
Passing the IngestedDirectory object to the chunker's chunk method...
DummyDirectoryChunker: Received document for chunking.
DummyDirectoryChunker: Accessing ingested directory: ./dummy_ingest_dir
DummyDirectoryChunker: Found 2 files to process.
DummyDirectoryChunker: (Skipping actual chunking in dummy class)
Chunking method called.
Resulting chunks (from dummy): []
--------------------
Demonstration complete.
Cleaning up dummy directory...
Cleanup complete.


In [None]:
import abc
from typing import Any, Dict, List, Optional, Union, Sequence
import json
import os

# Assuming BaseMLOperation and BaseTextChunker are defined
# from your_module import BaseMLOperation, BaseTextChunker

# Import necessary libraries from transformers and torch
try:
    from transformers import AutoTokenizer, AutoModel
    import torch
except ImportError:
    print("Please install transformers and torch: pip install transformers torch")
    # Exit or handle the error appropriately
    AutoTokenizer = None
    AutoModel = None
    torch = None

# Import sentence tokenizer from NLTK
try:
    import nltk
    # Download the punkt tokenizer if you haven't already
    try:
        nltk.data.find('tokenizers/punkt')
    except nltk.downloader.DownloadError:
        nltk.download('punkt')
    from nltk.tokenize import sent_tokenize, PunktSentenceTokenizer
except ImportError:
    print("Please install nltk: pip install nltk")
    # Exit or handle the error appropriately
    sent_tokenize = None
    PunktSentenceTokenizer = None


## LegalBERT Semantic Chunker

class LegalBertSemanticChunker(BaseTextChunker):
    """
    A chunker that uses a LegalBERT model to find potential semantic boundaries
    in text and outputs offsets to a JSON file.

    This implementation uses sentence boundaries as a proxy for semantic boundaries
    and calculates their end offsets. A more advanced implementation might
    process text in windows and analyze model outputs (like embeddings or attention)
    to detect changes in topic or meaning.
    """
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initializes the LegalBertSemanticChunker.

        Config can optionally include:
        - 'model_name' (str): The Hugging Face model name (default: 'legalbert-base-uncased').
        - 'output_json_path' (str): The path to save the output JSON file. Required.
        - 'device' (str): 'cuda' or 'cpu' (default: 'cpu').
        """
        super().__init__(config)
        self.tokenizer = None
        self.model = None
        self.output_json_path: Optional[str] = None
        self.model_name: str = self.config.get('model_name', 'legalbert-base-uncased')
        self.device: str = self.config.get('device', 'cpu')

    def validate_config(self) -> None:
        """
        Validates the configuration.
        Checks for 'output_json_path'.
        """
        if 'output_json_path' not in self.config or not isinstance(self.config['output_json_path'], str):
            raise ValueError("Configuration must contain an 'output_json_path' key with a string value.")
        self.output_json_path = self.config['output_json_path']

        # Check if model name and device are valid (basic check)
        if not isinstance(self.model_name, str) or len(self.model_name) == 0:
             raise ValueError("Config 'model_name' must be a non-empty string.")
        if self.device not in ['cpu', 'cuda']:
             raise ValueError("Config 'device' must be 'cpu' or 'cuda'.")

        print("LegalBertSemanticChunker: Config validated successfully.")

    def configure(self) -> None:
        """
        Configures the chunker by loading the model and tokenizer.
        """
        self.validate_config()

        if AutoTokenizer is None or AutoModel is None or torch is None or sent_tokenize is None:
             raise RuntimeError("Required libraries (transformers, torch, nltk) not loaded. Please install them.")

        print(f"LegalBertSemanticChunker: Loading model and tokenizer: {self.model_name}")
        try:
            # Load tokenizer and model
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModel.from_pretrained(self.model_name)
            self.model.to(self.device) # Move model to the specified device
            self.model.eval() # Set model to evaluation mode
            print("LegalBertSemanticChunker: Model and tokenizer loaded successfully.")
        except Exception as e:
            raise RuntimeError(f"Failed to load model or tokenizer '{self.model_name}': {e}")

    def chunk(self, document: Any, **kwargs) -> List[Any]:
        """
        Finds semantic boundaries (sentence endings) in the input text
        and saves their offsets to a JSON file.

        Args:
            document (Any): The input text content, expected as a single string.
                            If the input is from IngestedDirectory, you need to
                            read file content first.
            **kwargs: Additional keyword arguments (ignored).

        Returns:
            List[Any]: In this implementation, returns an empty list as the primary
                       output is the JSON file. In a real chunker, this would
                       return the text chunks themselves.
        """
        if self.tokenizer is None or self.model is None:
            # Configure if not already done
            self.configure()

        if not isinstance(document, str):
             print(f"Warning: LegalBertSemanticChunker expects string input, but received {type(document)}. Attempting to convert.")
             try:
                 text = str(document)
             except Exception:
                 print("Error: Could not convert input document to string.")
                 return [] # Return empty list if input is not a string

        else:
            text = document

        if not text.strip():
            print("Warning: Input text is empty or only whitespace. Skipping chunking.")
            return []

        print(f"LegalBertSemanticChunker: Processing text (length: {len(text)})...")

        # Use NLTK's sentence tokenizer to find potential boundaries
        # This is a simplification; a more advanced method would use LegalBERT's
        # properties to find boundaries directly or analyze embeddings.
        sentences = sent_tokenize(text)

        # Calculate the end offsets of each sentence in the original text
        offsets: List[int] = []
        current_offset = 0
        for sentence in sentences:
            # Find the position of the sentence within the remaining text
            # We add 1 for the space/newline that might separate sentences,
            # though this can be tricky with inconsistent spacing.
            # A more robust approach might use string finding with original text and track indices.
            try:
                 # Find the *exact* end position of the sentence in the original text
                 # This requires careful index tracking
                 end_pos = text.find(sentence, current_offset) + len(sentence)
                 if text.find(sentence, current_offset) != -1: # Ensure sentence was found
                     offsets.append(end_pos)
                     current_offset = end_pos # Update offset for the next search
                 else:
                      print(f"Warning: Could not find sentence '{sentence[:50]}...' at expected offset {current_offset}. Offset calculation may be inaccurate.")
                      # Fallback: just add length, less accurate
                      current_offset += len(sentence)
                      offsets.append(current_offset)

            except Exception as e:
                 print(f"Error calculating offset for sentence '{sentence[:50]}...': {e}")
                 # Attempt to recover by just advancing the offset
                 current_offset += len(sentence)


        print(f"LegalBertSemanticChunker: Found {len(offsets)} potential semantic boundaries.")

        # Save the offsets to a JSON file
        try:
            with open(self.output_json_path, 'w', encoding='utf-8') as f:
                json.dump(offsets, f, indent=4)
            print(f"LegalBertSemanticChunker: Offsets saved to {self.output_json_path}")
        except Exception as e:
            print(f"Error saving offsets to JSON file {self.output_json_path}: {e}")

        # According to the BaseTextChunker definition, chunk() should return the chunks.
        # However, in this specific request, the primary output is the JSON file of offsets.
        # We will return an empty list as a placeholder or could potentially return
        # the text segments based on these offsets if needed for a downstream task.
        # For this specific request's output format (JSON offsets), an empty list is acceptable
        # as the main result is the side effect of writing the file.
        return [] # Returning an empty list of chunks


# --- Demonstration Code ---

# Define a dummy text input (replace with reading a file if needed)
dummy_text_content = """
This is the first sentence about contracts. It discusses section 1.
A second sentence about legal proceedings follows. This sentence is longer and might contain more complex terms.
Finally, a third sentence concludes this short document. It talks about compliance requirements.
"""

# Define where to save the output JSON file
output_json_file = "./legalbert_offsets.json"

# Create an instance of the LegalBertSemanticChunker
# Provide the output JSON path and optionally model name and device
print("Creating an instance of LegalBertSemanticChunker...")
try:
    chunker_config = {
        'output_json_path': output_json_file,
        'model_name': 'legalbert-base-uncased', # Or another compatible model
        'device': 'cuda' if torch.cuda.is_available() else 'cpu' # Use GPU if available
    }
    legalbert_chunker = LegalBertSemanticChunker(config=chunker_config)
    print("Chunker instance created.")
    print("-" * 20)

    # Configure the chunker (loads the model and tokenizer)
    # This is called automatically by chunk if not called explicitly
    # print("Configuring the chunker (loading model)...")
    # legalbert_chunker.configure()
    # print("Chunker configured.")
    # print("-" * 20)

    # Pass the text content to the chunk method
    print("Passing the text content to the chunker's chunk method...")
    resulting_chunks = legalbert_chunker.chunk(dummy_text_content)
    print("Chunking method called.")
    print(f"Resulting chunks (this chunker outputs offsets to JSON): {resulting_chunks}")
    print("-" * 20)

    # Verify the output file was created
    if os.path.exists(output_json_file):
        print(f"Output JSON file created at: {output_json_file}")
        with open(output_json_file, 'r', encoding='utf-8') as f:
             loaded_offsets = json.load(f)
        print(f"Offsets written to JSON: {loaded_offsets}")

        # Clean up the dummy output file
        print("Cleaning up dummy output file...")
        os.remove(output_json_file)
        print("Cleanup complete.")
    else:
        print(f"Output JSON file was NOT created at: {output_json_file}")


except RuntimeError as e:
     print(f"Setup Error: {e}")
except ValueError as e:
    print(f"Configuration Error: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

In [10]:
# Install required libraries
print("Installing required libraries...")
!pip install transformers torch nltk
print("Installation complete.")

# Download NLTK data (punkt tokenizer)
import nltk
try:
    nltk.data.find('tokenizers/punkt')
except (nltk.downloader.DownloadError, LookupError):
    print("Downloading NLTK punkt tokenizer...")
    nltk.download('punkt')
    print("Download complete.")

Installing required libraries...
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Down

AttributeError: module 'nltk.downloader' has no attribute 'DownloadError'

In [15]:
# file ipython-input-0-6ab65fdc8fcf
import abc
from typing import Any, Dict, List, Optional, Union, Sequence
import json
import os

# Assuming BaseMLOperation and BaseTextChunker are defined
# from your_module import BaseMLOperation, BaseTextChunker

# Import necessary libraries from transformers and torch
try:
    from transformers import AutoTokenizer, AutoModel
    import torch
except ImportError:
    print("Please install transformers and torch: pip install transformers torch")
    # Exit or handle the error appropriately
    AutoTokenizer = None
    AutoModel = None
    torch = None

# Import sentence tokenizer from NLTK
try:
    import nltk
    # Download the punkt tokenizer if you haven't already
    try:
        nltk.data.find('tokenizers/punkt')
    except LookupError: # Catch only LookupError here
        print("NLTK 'punkt' tokenizer not found. Downloading...")
        nltk.download('punkt')
        print("NLTK 'punkt' tokenizer download complete.")
    from nltk.tokenize import sent_tokenize, PunktSentenceTokenizer
except ImportError:
    print("Please install nltk: pip install nltk")
    # Exit or handle the error appropriately
    sent_tokenize = None
    PunktSentenceTokenizer = None


## LegalBERT Semantic Chunker

class LegalBertSemanticChunker(BaseTextChunker):
    """
    A chunker that uses a LegalBERT model to find potential semantic boundaries
    in text and outputs offsets to a JSON file.

    This implementation uses sentence boundaries as a proxy for semantic boundaries
    and calculates their end offsets. A more advanced implementation might
    process text in windows and analyze model outputs (like embeddings or attention)
    to detect changes in topic or meaning.
    """
    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initializes the LegalBertSemanticChunker.

        Config can optionally include:
        - 'model_name' (str): The Hugging Face model name (default: 'legalbert-base-uncased').
        - 'output_json_path' (str): The path to save the output JSON file. Required.
        - 'device' (str): 'cuda' or 'cpu' (default: 'cpu').
        """
        super().__init__(config)
        self.tokenizer = None
        self.model = None
        self.output_json_path: Optional[str] = None
        # CORRECTED: Use the correct model name from Hugging Face Hub
        self.model_name: str = self.config.get('model_name', 'nlpaueb/legal-bert-base-uncased')
        self.device: str = self.config.get('device', 'cpu')

    def validate_config(self) -> None:
        """
        Validates the configuration.
        Checks for 'output_json_path'.
        """
        if 'output_json_path' not in self.config or not isinstance(self.config['output_json_path'], str):
            raise ValueError("Configuration must contain an 'output_json_path' key with a string value.")
        self.output_json_path = self.config['output_json_path']

        # Check if model name and device are valid (basic check)
        if not isinstance(self.model_name, str) or len(self.model_name) == 0:
             raise ValueError("Config 'model_name' must be a non-empty string.")
        if self.device not in ['cpu', 'cuda']:
             raise ValueError("Config 'device' must be 'cpu' or 'cuda'.")

        print("LegalBertSemanticChunker: Config validated successfully.")

    def configure(self) -> None:
        """
        Configures the chunker by loading the model and tokenizer.
        """
        self.validate_config()

        if AutoTokenizer is None or AutoModel is None or torch is None or sent_tokenize is None:
             raise RuntimeError("Required libraries (transformers, torch, nltk) not loaded. Please install them.")

        print(f"LegalBertSemanticChunker: Loading model and tokenizer: {self.model_name}")
        try:
            # Load tokenizer and model
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModel.from_pretrained(self.model_name)
            self.model.to(self.device) # Move model to the specified device
            self.model.eval() # Set model to evaluation mode
            print("LegalBertSemanticChunker: Model and tokenizer loaded successfully.")
        except Exception as e:
            raise RuntimeError(f"Failed to load model or tokenizer '{self.model_name}': {e}")

    def chunk(self, document: Any, **kwargs) -> List[Any]:
        """
        Finds semantic boundaries (sentence endings) in the input text
        and saves their offsets to a JSON file.

        Args:
            document (Any): The input text content, expected as a single string.
                            If the input is from IngestedDirectory, you need to
                            read file content first.
            **kwargs: Additional keyword arguments (ignored).

        Returns:
            List[Any]: In this implementation, returns an empty list as the primary
                       output is the JSON file. In a real chunker, this would
                       return the text chunks themselves.
        """
        if self.tokenizer is None or self.model is None:
            # Configure if not already done
            self.configure()

        if not isinstance(document, str):
             print(f"Warning: LegalBertSemanticChunker expects string input, but received {type(document)}. Attempting to convert.")
             try:
                 text = str(document)
             except Exception:
                 print("Error: Could not convert input document to string.")
                 return [] # Return empty list if input is not a string

        else:
            text = document

        if not text.strip():
            print("Warning: Input text is empty or only whitespace. Skipping chunking.")
            return []

        print(f"LegalBertSemanticChunker: Processing text (length: {len(text)})...")

        # Use NLTK's sentence tokenizer to find potential boundaries
        # This is a simplification; a more advanced method would use LegalBERT's
        # properties to find boundaries directly or analyze embeddings.
        sentences = sent_tokenize(text)

        # Calculate the end offsets of each sentence in the original text
        offsets: List[int] = []
        current_offset = 0
        for sentence in sentences:
            # Find the position of the sentence within the remaining text
            # We add 1 for the space/newline that might separate sentences,
            # though this can be tricky with inconsistent spacing.
            # A more robust approach might use string finding with original text and track indices.
            try:
                 # Find the *exact* end position of the sentence in the original text
                 # This requires careful index tracking
                 end_pos = text.find(sentence, current_offset) + len(sentence)
                 if text.find(sentence, current_offset) != -1: # Ensure sentence was found
                     offsets.append(end_pos)
                     current_offset = end_pos # Update offset for the next search
                 else:
                      print(f"Warning: Could not find sentence '{sentence[:50]}...' at expected offset {current_offset}. Offset calculation may be inaccurate.")
                      # Fallback: just add length, less accurate
                      current_offset += len(sentence)
                      offsets.append(current_offset)

            except Exception as e:
                 print(f"Error calculating offset for sentence '{sentence[:50]}...': {e}")
                 # Attempt to recover by just advancing the offset
                 current_offset += len(sentence)


        print(f"LegalBertSemanticChunker: Found {len(offsets)} potential semantic boundaries.")

        # Save the offsets to a JSON file
        try:
            with open(self.output_json_path, 'w', encoding='utf-8') as f:
                json.dump(offsets, f, indent=4)
            print(f"LegalBertSemanticChunker: Offsets saved to {self.output_json_path}")
        except Exception as e:
            print(f"Error saving offsets to JSON file {self.output_json_path}: {e}")

        # According to the BaseTextChunker definition, chunk() should return the chunks.
        # However, in this specific request, the primary output is the JSON file of offsets.
        # We will return an empty list as a placeholder or could potentially return
        # the text segments based on these offsets if needed for a downstream task.
        # For this specific request's output format (JSON offsets), an empty list is acceptable
        # as the main result is the side effect of writing the file.
        return [] # Returning an empty list of chunks


# --- Demonstration Code ---

# Define a dummy text input (replace with reading a file if needed)
dummy_text_content = """
This is the first sentence about contracts. It discusses section 1.
A second sentence about legal proceedings follows. This sentence is longer and might contain more complex terms.
Finally, a third sentence concludes this short document. It talks about compliance requirements.
"""

# Define where to save the output JSON file
output_json_file = "./legalbert_offsets.json"

# Create an instance of the LegalBertSemanticChunker
# Provide the output JSON path and optionally model name and device
print("Creating an instance of LegalBertSemanticChunker...")
try:
    chunker_config = {
        'output_json_path': output_json_file,
        # CORRECTED model name
        'model_name': 'nlpaueb/legal-bert-base-uncased', # Or another compatible model
        'device': 'cuda' if torch.cuda.is_available() else 'cpu' # Use GPU if available
    }
    legalbert_chunker = LegalBertSemanticChunker(config=chunker_config)
    print("Chunker instance created.")
    print("-" * 20)

    # Configure the chunker (loads the model and tokenizer)
    # This is called automatically by chunk if not called explicitly
    # print("Configuring the chunker (loading model)...")
    # legalbert_chunker.configure()
    # print("Chunker configured.")
    # print("-" * 20)

    # Pass the text content to the chunk method
    print("Passing the text content to the chunker's chunk method...")
    resulting_chunks = legalbert_chunker.chunk(dummy_text_content)
    print("Chunking method called.")
    print(f"Resulting chunks (this chunker outputs offsets to JSON): {resulting_chunks}")
    print("-" * 20)

    # Verify the output file was created
    if os.path.exists(output_json_file):
        print(f"Output JSON file created at: {output_json_file}")
        with open(output_json_file, 'r', encoding='utf-8') as f:
             loaded_offsets = json.load(f)
        print(f"Offsets written to JSON: {loaded_offsets}")

        # Clean up the dummy output file
        print("Cleaning up dummy output file...")
        os.remove(output_json_file)
        print("Cleanup complete.")
    else:
        print(f"Output JSON file was NOT created at: {output_json_file}")


except RuntimeError as e:
     print(f"Setup Error: {e}")
except ValueError as e:
    print(f"Configuration Error: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

Creating an instance of LegalBertSemanticChunker...
Chunker instance created.
--------------------
Passing the text content to the chunker's chunk method...
LegalBertSemanticChunker: Config validated successfully.
LegalBertSemanticChunker: Loading model and tokenizer: nlpaueb/legal-bert-base-uncased
LegalBertSemanticChunker: Model and tokenizer loaded successfully.
LegalBertSemanticChunker: Processing text (length: 279)...
An unexpected error occurred: 
**********************************************************************
  Resource [93mpunkt_tab[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('punkt_tab')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mtokenizers/punkt_tab/english/[0m

  Searched in:
    - '/root/nltk_data'
    - '/usr/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '

In [16]:
import nltk

try:
    # Check if the 'punkt_tab' resource is already available
    nltk.data.find('tokenizers/punkt_tab')
    print("'punkt_tab' resource is already downloaded.")
except LookupError:
    # If 'punkt_tab' is not found, download it
    print("Downloading NLTK 'punkt_tab' resource...")
    try:
        nltk.download('punkt_tab')
        print("'punkt_tab' resource download complete.")
    except Exception as e:
        print(f"Error downloading 'punkt_tab' resource: {e}")
except Exception as e:
    print(f"An unexpected error occurred while checking for 'punkt_tab': {e}")

# You might also want to ensure the standard 'punkt' is downloaded
try:
    nltk.data.find('tokenizers/punkt')
    print("'punkt' resource is already downloaded.")
except LookupError:
    print("Downloading NLTK 'punkt' resource...")
    try:
        nltk.download('punkt')
        print("'punkt' resource download complete.")
    except Exception as e:
        print(f"Error downloading 'punkt' resource: {e}")
except Exception as e:
    print(f"An unexpected error occurred while checking for 'punkt': {e}")

Downloading NLTK 'punkt_tab' resource...


[nltk_data] Downloading package punkt_tab to /root/nltk_data...


'punkt_tab' resource download complete.
'punkt' resource is already downloaded.


[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
