In [1]:
import re
from dataclasses import dataclass, field
from typing import List, Tuple

import tango

# Regex patterns
INCOMING_COMMAND_CALL_REGEX_PATTERN = r"-> (\w+\.\w+)\(\)"
RETURN_COMMAND_CALL_REGEX_PATTERN = r"^(.*) <- (\w+\.\w+)\(\)"
LRC_RETURN_VAL_REGEX_PATTERN = r"\(\[(.*)\], \['(.*)'\]\)"
LRC_TUPLE_REGEX_PATTERN = r"'([0-9a-zA-Z._]*)', '([^']*)'"
LOG_REGEX_PATTERN = r"([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|(.*)"
EVENT_REGEX_PATTERN = r"([^\t]*)\t([^\t]*)\t([^\t]*)\t(.*)"

### PlantUML helper class

In [2]:
class PlantUMLSequenceDiagram:
    def __init__(self):
        self.diagram_code = ""

    def start_diagram(self, title, actor):
        self.diagram_code = "@startuml SequenceDiagram\n"
        self.diagram_code += f"title {title}\n"
        self.diagram_code += f"actor {actor}\n"

    def add_participant(self, participant):
        self.diagram_code += f"participant {self.clean_text(participant)}\n"

    def end_diagram(self):
        self.diagram_code += "@enduml"

    def clean_text(self, text):
        return text.replace("-", "_")

    def wrap_text(self, text, max_width=50, max_length=500):
        if max_width <= 0:
            raise ValueError("max_width must be a positive integer.")

        if len(text) > max_length:
            truncated_length = max_length - 3
            text = text[:truncated_length] + "..."

        lines = []
        for i in range(0, len(text), max_width):
            end_index = i + max_width
            segment = text[i:end_index]
            lines.append(segment)

        result = "\n".join(lines).encode("unicode_escape").decode("utf-8")
        return result

    def wrap_text_on_spaces(self, text, max_width=50):
        if max_width <= 0:
            raise ValueError("max_width must be a positive integer.")

        words = text.split(" ")
        line = ""
        lines = []

        for word in words:
            if len(line) + len(word) < max_width:
                line += " " + word
            else:
                lines.append(line)
                line = word
        lines.append(line)

        result = "\n".join(lines).encode("unicode_escape").decode("utf-8")
        return result

    def add_note_over(self, device, note, color="lightgreen"):
        device = self.clean_text(device)
        # note = self.wrap_text(note)

        self.diagram_code += f"rnote over {device} #{color}: {note}\n"

    def add_hexagon_note_over(self, device, note, color="lightgrey"):
        device = self.clean_text(device)
        # note = self.wrap_text(note)

        self.diagram_code += f"hnote over {device} #{color}: {note}\n"

    def add_command_call(self, from_device, to_device, note):
        from_device = self.clean_text(from_device)
        to_device = self.clean_text(to_device)
        note = self.wrap_text(note)

        self.diagram_code += f"{from_device} -> {to_device}: {note}\n"

    def add_command_response(self, from_device, to_device, note):
        from_device = self.clean_text(from_device)
        to_device = self.clean_text(to_device)
        note = self.wrap_text(note)

        self.diagram_code += f"{from_device} --> {to_device}: {note}\n"

### Log Parser

In [18]:
class LogParser:
    def __init__(self):
        self.log_pattern_callbacks: list[tuple[str, callable]] = []

    def _parse_log_line(self, log_line):
        for pattern, pattern_cb in self.log_pattern_callbacks:
            match = re.search(pattern, log_line)
            if match:
                group_values = match.groups()
                pattern_cb(*group_values)
                break

    def parse_file(self, file_path):
        with open(file_path, "r", encoding="utf-8") as file:
            logs = file.readlines()

        for log in logs:
            self._parse_log_line(log)

### Your custom log parser

In [27]:
class EventsFileParser(LogParser):
    def __init__(self, show_events=False, device_hierarchy=[]):
        super().__init__()

        self.show_events = show_events

        self.sequence_diagram = PlantUMLSequenceDiagram()
        self.device_hierarchy = device_hierarchy
        self.running_lrc_status_updates = {}

        self.log_pattern_callbacks = [
            [EVENT_REGEX_PATTERN, self.event_callback],
        ]

    def get_likely_caller_from_hierarchy(self, device) -> str:
        if device not in self.device_hierarchy or device == self.device_hierarchy[0]:
            return "unknown"
        device_index = self.device_hierarchy.index(device)
        return self.device_hierarchy[device_index - 1]

    def get_method_from_lrc_id(self, lrc_id) -> str:
        return "_".join(lrc_id.split("_")[2:])

    def parse(self, file_path, output_file_path, actor="pytest"):
        log_file_name = file_path.split("/")[-1]

        cleaned_log_file_name = self.sequence_diagram.clean_text(log_file_name)
        title = f"Sequence diagram generated from\n{cleaned_log_file_name}".encode(
            "unicode_escape"
        ).decode("utf-8")

        self.test_started = False
        self.running_lrc_status_updates = {}
        self.sequence_diagram.start_diagram(title, actor)

        # Add participants to ensure order of swimlanes
        for device in self.device_hierarchy[1:]:
            self.sequence_diagram.add_participant(device)

        self.parse_file(file_path)

        self.sequence_diagram.end_diagram()

        # Save the PlantUML diagram code to a file
        with open(output_file_path, "w", encoding="utf-8") as f:
            f.write(self.sequence_diagram.diagram_code)

    def event_callback(self, prefix, device, event_attr, val):
        # 1724660914.761 - Event - 2024-08-26 08:28:34.761448	DishManager(mid-dish/dish-manager
        # /ska001)	longrunningcommandstatus	('1724660914.663982_241979260268973_SetStowMode',
        # 'COMPLETED')
        device = device.split("/")[1]  # dish-manager
        caller = self.get_likely_caller_from_hierarchy(device)

        if "longrunningcommand" in event_attr:
            self.handle_lrc_event_log(device, caller, event_attr, val)
        elif self.show_events:
            self.sequence_diagram.add_note_over(
                device,
                f'Event\n""{event_attr} = {val.strip()}""'.encode("unicode_escape").decode(
                    "utf-8"
                ),
            )

    def handle_lrc_event_log(self, device, caller, event_attr, val):
        if "longrunningcommandstatus" in event_attr:
            lrc_statuses = re.findall(LRC_TUPLE_REGEX_PATTERN, val)
            for index, (lrc_id, status) in enumerate(lrc_statuses):
                # If there are any newer updates for this lrc in the LRC statuses then skip this
                newer_status_found = False
                if index + 1 < len(lrc_statuses):
                    for i in range(index + 1, len(lrc_statuses)):
                        if lrc_statuses[i][0] == lrc_id:
                            newer_status_found = True
                            break

                if newer_status_found:
                    break

                method_name = self.get_method_from_lrc_id(lrc_id)

                if status == "STAGING":
                    # Only track methods which are called in the scope of the file
                    # This avoids some noise left over in LRC attributes from previous test / setup
                    self.running_lrc_status_updates[lrc_id] = []

                # Only update if its a method called in the scope of this file and its a new status
                if (
                    lrc_id in self.running_lrc_status_updates
                    and status not in self.running_lrc_status_updates[lrc_id]
                ):
                    self.running_lrc_status_updates[lrc_id].append(status)
                    self.sequence_diagram.add_command_response(
                        device, caller, f'""{method_name}"" -> {status}'
                    )
        elif "longrunningcommandprogress" in event_attr:
            lrc_progresses = re.findall(LRC_TUPLE_REGEX_PATTERN, val)
            for lrc_id, progress in lrc_progresses:
                # Only show progress updates for methods which have been staged
                if lrc_id in self.running_lrc_status_updates:
                    method_name = self.get_method_from_lrc_id(lrc_id)
                    self.sequence_diagram.add_command_call(
                        device, device, f'""{method_name}"" -> {progress}'
                    )
        elif event_attr == "longrunningcommandresult":
            pass

### Setup your tracked devices

In [5]:
tracked_device_trls = ["tango://localhost:45678/mid-dish/ds-manager/ska001#dbase=no"]

In [11]:
@dataclass
class TrackedDevice:
    """Class to group tracked device information"""

    device_proxy: tango.DeviceProxy
    attribute_names: Tuple[str]
    subscription_ids: List[int] = field(default_factory=list)


tracked_devices = [
    TrackedDevice(
        tango.DeviceProxy(device_trl),
        (
            "longrunningcommandstatus",
            "longrunningcommandresult",
            "longrunningcommandprogress",
        ),
    )
    for device_trl in tracked_device_trls
]

### Setup the event printer

In [7]:
class EventPrinter:
    """Class that writes attribute changes to a file"""

    def __init__(self, filename: str, tracked_devices: Tuple[TrackedDevice] = ()) -> None:
        self.tracked_devices = tracked_devices
        self.filename = filename
        self.events = []

    def __enter__(self):
        for tracked_device in self.tracked_devices:
            dp = tracked_device.device_proxy
            for attr_name in tracked_device.attribute_names:
                sub_id = dp.subscribe_event(attr_name, tango.EventType.CHANGE_EVENT, self)
                tracked_device.subscription_ids.append(sub_id)

    def __exit__(self, exc_type, exc_value, exc_tb):
        for tracked_device in self.tracked_devices:
            try:
                dp = tracked_device.device_proxy
                for sub_id in tracked_device.subscription_ids:
                    dp.unsubscribe_event(sub_id)
            except tango.DevError:
                pass

    def add_event(self, timestamp, message):
        self.events.append((timestamp, message))
        with open(self.filename, "a") as open_file:
            open_file.write("\n" + message)

    def push_event(self, ev: tango.EventData):
        event_string = ""
        if ev.err:
            err = ev.errors[0]
            event_string = f"\nEvent Error {err.desc} {err.origin} {err.reason}"
        else:
            attr_name = ev.attr_name.split("/")[-1]
            attr_value = ev.attr_value.value
            if ev.attr_value.type == tango.CmdArgType.DevEnum:
                attr_value = ev.device.get_attribute_config(attr_name).enum_labels[attr_value]

            event_string = f"Event - {ev.reception_date}\t{ev.device}\t{attr_name}\t{attr_value}"

        self.add_event(ev.reception_date.totime(), event_string)

In [12]:
events_file_name = "output_file.txt"
event_printer = EventPrinter(events_file_name, tracked_devices)

### Start the event printer monitoring before running commands on your monitored device

In [13]:
event_printer.__enter__()

### Exit the printer to unsubscribe from attributes

In [15]:
event_printer.__exit__(None, None, None)

### Parse the events file

In [29]:
sequence_diagram_file_name = "sequence-diagram.puml"

file_parser = EventsFileParser(show_events=False, device_hierarchy=["pytest", "ds-manager"])

file_parser.parse(events_file_name, sequence_diagram_file_name, actor="pytest")