In [7]:
!pip install litellm
!pip install mermaid-magic


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.1.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [21]:
%load_ext mermaid_magic

The mermaid_magic extension is already loaded. To reload it, use:
  %reload_ext mermaid_magic


In [41]:
%%mermaid
graph TD
    A[Start Jupyter Notebook] --> B{Load Configuration};
    B --> C{Load Benchmark Data};
    C --> D{Select LLM Model via Ollama Endpoint};
    D --> E{Iterate Through Log Snippets Scenarios};
    E --> F{For Each Log Snippet};
    F --> G{Format Prompt for LLM};
    G --> H{Send Request to Ollama Endpoint};
    H --> I{Receive LLM Response};
    I --> J{Evaluate LLM Response};
    J --> K{Store Evaluation Metrics};
    K --> E;
    E -- All Snippets Processed --> L{Aggregate and Analyze Results};
    L --> M{Visualize Results};
    M --> N[End Report Findings];

    subgraph "Model Abstraction Layer Pluggable"
        O[Ollama Model 1 Interface]
        P[Ollama Model 2 Interface]
        Q[More Models...]
    end

    D -.-> O;
    D -.-> P;
    D -.-> Q;

    subgraph "Benchmark Data Store"
        R[Log Snippet 1 example Error Log]
        S[Expected Analysis 1 Ground Truth]
        T[Log Snippet 2 example Security Event]
        U[Expected Analysis 2 Ground Truth]
        V[More Snippets...]
    end

    C --> R;
    C --> S;
    C --> T;
    C --> U;
    C --> V;

    subgraph "Evaluation Logic"
        W[Define Evaluation Criteria example Accuracy Relevance Completeness]
        X[Scoring Mechanism example Keyword Match Semantic Similarity LLM as a judge]
    end

    J --> W;
    J --> X;

In [44]:
import os

class LogProcessor:
    """
    A class to load, process, and chunk log data for LLM analysis.
    """

    def __init__(self, config):
        """
        Initializes the LogProcessor.

        Args:
            config (dict): Configuration dictionary. Expected keys:
                - "window_size_lines" (int): Number of log lines per chunk.
                - "max_line_length" (int): Maximum characters allowed per line.
                                           Lines exceeding this will be truncated.
                - "slide_step" (int): Number of lines the window slides forward
                                      for the next chunk.
        """
        if not isinstance(config, dict):
            raise TypeError("Config must be a dictionary.")

        self.config = config
        self._validate_config()

        self.raw_log_lines = []
        self.processed_log_lines = []

    def _validate_config(self):
        """Validates the provided configuration."""
        required_keys = ["window_size_lines", "max_line_length", "slide_step"]
        for key in required_keys:
            if key not in self.config:
                raise ValueError(f"Missing required config key: {key}")

        if not isinstance(self.config["window_size_lines"], int) or self.config["window_size_lines"] <= 0:
            raise ValueError("Config 'window_size_lines' must be a positive integer.")
        if not isinstance(self.config["max_line_length"], int) or self.config["max_line_length"] <= 0:
            raise ValueError("Config 'max_line_length' must be a positive integer.")
        if not isinstance(self.config["slide_step"], int) or self.config["slide_step"] <= 0:
            raise ValueError("Config 'slide_step' must be a positive integer.")

    def _truncate_line(self, line: str) -> str:
        """
        Truncates a single line to the configured max_line_length.
        """
        max_len = self.config["max_line_length"]
        if len(line) > max_len:
            return line[:max_len]
        return line

    def load_logs(self, log_source):
        """
        Loads log lines from a specified source (file path or list of strings).
        Each loaded line is immediately processed (truncated).

        Args:
            log_source (str or list): The source of the log data.
                                      If str, it's treated as a file path.
                                      If list, it's treated as a list of log line strings.

        Raises:
            FileNotFoundError: If log_source is a path and the file doesn't exist.
            IOError: If there's an error reading the file.
            TypeError: If log_source is not a str or list, or if list elements are not strings.
        """
        self.raw_log_lines = []
        self.processed_log_lines = [] # Clear previously processed lines

        current_raw_lines = []
        if isinstance(log_source, str):
            if not os.path.exists(log_source):
                raise FileNotFoundError(f"Log file not found: {log_source}")
            try:
                with open(log_source, 'r', encoding='utf-8') as f:
                    # Use rstrip to remove various trailing newline characters
                    current_raw_lines = [line.rstrip('\r\n') for line in f]
            except Exception as e:
                raise IOError(f"Error reading log file {log_source}: {e}")
        elif isinstance(log_source, list):
            if not all(isinstance(line, str) for line in log_source):
                raise TypeError("If log_source is a list, all its elements must be strings.")
            # Also strip various trailing newlines from list input for consistency
            current_raw_lines = [line.rstrip('\r\n') for line in log_source]
        else:
            raise TypeError("log_source must be a file path (str) or a list of strings.")

        self.raw_log_lines = current_raw_lines
        # Process (truncate) lines immediately after loading
        self.processed_log_lines = [self._truncate_line(line) for line in self.raw_log_lines]


    def get_chunks(self):
        """
        Generates log chunks based on the configured window size and slide step.
        Each chunk is a list of processed (truncated) log lines.
        This method is a generator.

        Yields:
            list: A chunk of log lines (list of strings).
                  Returns an empty iterator if no logs are loaded or logs are empty.
        """
        if not self.processed_log_lines:
            return iter([]) # Return an empty iterator if no processed lines

        window_size = self.config["window_size_lines"]
        step = self.config["slide_step"]
        num_processed_lines = len(self.processed_log_lines)

        current_pos = 0
        while current_pos < num_processed_lines:
            window_end = current_pos + window_size
            chunk = self.processed_log_lines[current_pos:window_end]
            
            if chunk: # Only yield if the chunk is not empty
                yield chunk
            
            # Optimization: if the last chunk was smaller than window_size,
            # and step is 1, we might generate empty or redundant small chunks.
            # However, the current logic is simpler and correct.
            # The main check `current_pos < num_processed_lines` handles termination.
            # If the last chunk was full or partial, and current_pos + step >= num_processed_lines,
            # the next iteration won't yield if the slice is empty or current_pos is too high.

            # If the last chunk was already smaller than the window size,
            # and we've taken all lines, further steps won't yield new full windows.
            # The condition `if chunk:` already handles not yielding empty lists if
            # current_pos somehow goes beyond where meaningful slices can be made.
            if window_end >= num_processed_lines and len(chunk) < window_size:
                 # If the current chunk is the last possible partial chunk
                 if current_pos + step >= num_processed_lines and len(chunk) < step : # and we won't get a new line by sliding
                    pass # allow loop to terminate naturally by current_pos update

            current_pos += step

In [46]:
import unittest
import os
import tempfile

# Assuming the LogProcessor class definition is above or imported

class TestLogProcessor(unittest.TestCase):

    def setUp(self):
        self.temp_file = tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8")
        self.temp_file_path = self.temp_file.name

    def tearDown(self):
        self.temp_file.close()
        os.remove(self.temp_file_path)

    def _write_to_temp_file(self, lines):
        self.temp_file.seek(0)
        self.temp_file.truncate()
        for line in lines:
            self.temp_file.write(line + "\n")
        self.temp_file.flush()

    # 1. Initialization Tests
    def test_valid_initialization(self):
        config = {"window_size_lines": 3, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        self.assertEqual(processor.config, config)

    def test_init_missing_config_key(self):
        with self.assertRaisesRegex(ValueError, "Missing required config key: max_line_length"):
            LogProcessor({"window_size_lines": 3, "slide_step": 1})

    def test_init_invalid_config_value_type(self):
        with self.assertRaisesRegex(ValueError, "must be a positive integer"):
            LogProcessor({"window_size_lines": "3", "max_line_length": 10, "slide_step": 1})

    def test_init_invalid_config_value_non_positive(self):
        with self.assertRaisesRegex(ValueError, "must be a positive integer"):
            LogProcessor({"window_size_lines": 0, "max_line_length": 10, "slide_step": 1})

    def test_init_non_dict_config(self):
        with self.assertRaisesRegex(TypeError, "Config must be a dictionary."):
            LogProcessor("not_a_dict")

    # 2. Log Loading and Line Processing Tests
    def test_load_from_list_and_truncate(self):
        config = {"window_size_lines": 1, "max_line_length": 5, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["AAAAA", "BBBBBBBBB", "CCC\n", "DD"]
        processor.load_logs(logs)
        expected_processed = ["AAAAA", "BBBBB", "CCC", "DD"]
        self.assertEqual(processor.processed_log_lines, expected_processed)

    def test_load_from_file_and_truncate(self):
        config = {"window_size_lines": 1, "max_line_length": 3, "slide_step": 1}
        processor = LogProcessor(config)
        file_lines = ["apple", "banana", "pi"]
        self._write_to_temp_file(file_lines)
        processor.load_logs(self.temp_file_path)
        expected_processed = ["app", "ban", "pi"]
        self.assertEqual(processor.processed_log_lines, expected_processed)

    def test_load_empty_list(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        processor.load_logs([])
        self.assertEqual(processor.processed_log_lines, [])

    def test_load_empty_file(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        self._write_to_temp_file([])
        processor.load_logs(self.temp_file_path)
        self.assertEqual(processor.processed_log_lines, [])

    def test_load_non_existent_file(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        with self.assertRaises(FileNotFoundError):
            processor.load_logs("non_existent_file.log")

    def test_load_invalid_source_type(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        with self.assertRaisesRegex(TypeError, "log_source must be a file path .* or a list"):
            processor.load_logs(123)

    def test_load_list_with_non_string_elements(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        with self.assertRaisesRegex(TypeError, "all its elements must be strings"):
            processor.load_logs(["line1", 123, "line3"])
            
    def test_load_strips_trailing_newlines_from_list_input(self): # This test should now pass
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["line1\n", "line2\r\n", "line3\r", "line4"]
        processor.load_logs(logs)
        expected_processed = ["line1", "line2", "line3", "line4"]
        self.assertEqual(processor.processed_log_lines, expected_processed)

    # 3. Chunk Generation Tests
    def test_get_chunks_basic_sliding_window(self):
        config = {"window_size_lines": 2, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["L1", "L2", "L3", "L4"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        expected_chunks = [
            ["L1", "L2"],
            ["L2", "L3"],
            ["L3", "L4"],
            ["L4"]
        ]
        self.assertEqual(chunks, expected_chunks)

    def test_get_chunks_non_overlapping(self):
        config = {"window_size_lines": 2, "max_line_length": 10, "slide_step": 2}
        processor = LogProcessor(config)
        logs = ["L1", "L2", "L3", "L4", "L5"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        expected_chunks = [
            ["L1", "L2"],
            ["L3", "L4"],
            ["L5"]
        ]
        self.assertEqual(chunks, expected_chunks)

    def test_get_chunks_step_larger_than_window(self):
        config = {"window_size_lines": 2, "max_line_length": 10, "slide_step": 3}
        processor = LogProcessor(config)
        logs = ["L1", "L2", "L3", "L4", "L5", "L6"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        expected_chunks = [
            ["L1", "L2"], 
            ["L4", "L5"] 
        ]
        self.assertEqual(chunks, expected_chunks)

    def test_get_chunks_total_lines_less_than_window(self): # Corrected expectation
        config = {"window_size_lines": 5, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["L1", "L2", "L3"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        # If slide_step is 1, it will slide until only the last element is left
        expected_chunks = [
            ["L1", "L2", "L3"],
            ["L2", "L3"],
            ["L3"]
        ]
        self.assertEqual(chunks, expected_chunks)

    def test_get_chunks_total_lines_equals_window(self):
        config = {"window_size_lines": 3, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["L1", "L2", "L3"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        expected_chunks_slide1 = [["L1", "L2", "L3"], ["L2", "L3"], ["L3"]]
        self.assertEqual(chunks, expected_chunks_slide1)

    def test_get_chunks_empty_logs(self):
        config = {"window_size_lines": 3, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        processor.load_logs([]) 
        chunks = list(processor.get_chunks())
        self.assertEqual(chunks, [])

    def test_get_chunks_with_line_truncation(self):
        config = {"window_size_lines": 1, "max_line_length": 3, "slide_step": 1}
        processor = LogProcessor(config)
        logs = ["AAAAA", "BB", "CCCCC"]
        processor.load_logs(logs)
        chunks = list(processor.get_chunks())
        expected_chunks = [
            ["AAA"],
            ["BB"],
            ["CCC"]
        ]
        self.assertEqual(chunks, expected_chunks)

    def test_reloading_logs_clears_previous_and_chunks_new(self):
        config = {"window_size_lines": 1, "max_line_length": 10, "slide_step": 1}
        processor = LogProcessor(config)
        
        logs1 = ["A", "B"]
        processor.load_logs(logs1)
        chunks1 = list(processor.get_chunks())
        self.assertEqual(chunks1, [["A"], ["B"]])
        self.assertEqual(processor.processed_log_lines, ["A", "B"])

        logs2 = ["X", "Y", "Z"]
        processor.load_logs(logs2) 
        chunks2 = list(processor.get_chunks())
        self.assertEqual(chunks2, [["X"], ["Y"], ["Z"]])
        self.assertEqual(processor.processed_log_lines, ["X", "Y", "Z"])


if __name__ == '__main__':
    # If running in a script:
    # unittest.main() 
    # If running in Jupyter or similar:
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestLogProcessor))
    runner = unittest.TextTestRunner(verbosity=2) # Increased verbosity
    runner.run(suite)

test_get_chunks_basic_sliding_window (__main__.TestLogProcessor) ... ok
test_get_chunks_empty_logs (__main__.TestLogProcessor) ... ok
test_get_chunks_non_overlapping (__main__.TestLogProcessor) ... ok
test_get_chunks_step_larger_than_window (__main__.TestLogProcessor) ... ok
test_get_chunks_total_lines_equals_window (__main__.TestLogProcessor) ... ok
test_get_chunks_total_lines_less_than_window (__main__.TestLogProcessor) ... ok
test_get_chunks_with_line_truncation (__main__.TestLogProcessor) ... ok
test_init_invalid_config_value_non_positive (__main__.TestLogProcessor) ... ok
test_init_invalid_config_value_type (__main__.TestLogProcessor) ... ok
test_init_missing_config_key (__main__.TestLogProcessor) ... ok
test_init_non_dict_config (__main__.TestLogProcessor) ... ok
test_load_empty_file (__main__.TestLogProcessor) ... ok
test_load_empty_list (__main__.TestLogProcessor) ... ok
test_load_from_file_and_truncate (__main__.TestLogProcessor) ... ok
test_load_from_list_and_truncate (__main