In [1]:
from pathlib import Path
import rich
from rich.tree import Tree
from rich import print as rprint
from rich.console import Group
from rich.panel import Panel
from typing import Optional, Any
from collections import defaultdict
from datetime import datetime

from pydantic import BaseModel, Field

from llmtrace.trace import TraceNode


In [2]:
tree = Tree("Rich Tree")


tree.add("foo")
tree.add("bar").add(Group("Test", Panel("test")))

baz_tree = tree.add("baz")
baz_tree.add("[red]Red").add("[green]Green").add("[blue]Blue")
rprint(tree)

In [3]:
LOG_DIR = Path('./logs')


In [4]:
dirs = list(LOG_DIR.iterdir())
print(f"{len(dirs)} directories found")

log_files = list(dirs[0].glob('*.log.json'))
print(f"{len(log_files)} log files found")


logged_traces = []
for log_file in log_files:
    with log_file.open('r') as f:
        logged_trace = TraceNode.parse_raw(f.read())
        logged_traces.append(logged_trace)

print(f"{len(logged_traces)} logged traces found")
print(logged_traces)

1 directories found
14 log files found
14 logged traces found
[(name=log id=f5cc nb_children=0 parent=(name=do_a id=7bcc nb_children=2 parent=(name=do_b id=7567 nb_children=2 parent=(name=do_c id=29cd nb_children=2 parent=(name=root id=d16a nb_children=1))))), (name=log id=7a13 nb_children=0 parent=(name=do_b id=7567 nb_children=4 parent=(name=do_c id=29cd nb_children=2 parent=(name=root id=d16a nb_children=1)))), (name=log id=eacf nb_children=0 parent=(name=do_a id=7275 nb_children=1 parent=(name=do_b id=66ae nb_children=1 parent=(name=do_c id=29cd nb_children=1 parent=(name=root id=d16a nb_children=1))))), (name=log id=5880 nb_children=0 parent=(name=do_a id=b914 nb_children=2 parent=(name=do_c id=29cd nb_children=3 parent=(name=root id=d16a nb_children=1)))), (name=log id=eb63 nb_children=0 parent=(name=do_a id=00db nb_children=2 parent=(name=do_b id=7567 nb_children=1 parent=(name=do_c id=29cd nb_children=2 parent=(name=root id=d16a nb_children=1))))), (name=log id=91a0 nb_children

In [5]:
# List of traced logs
# [(name=log id=36e9 nb_children=0 parent=(name=do_a id=008e nb_children=0 parent=(name=do_b id=49d2 nb_children=1 parent=(name=do_c id=5ae8 nb_children=1 parent=(name=root id=098b nb_children=1))))), (name=log id=17e9 nb_children=0 parent=(name=do_a id=2676 nb_children=0 parent=(name=do_b id=afc9 nb_children=1 parent=(name=do_c id=5ae8 nb_children=2 parent=(name=root id=098b nb_children=1))))), (name=log id=d271 nb_children=0 parent=(name=do_a id=11f5 nb_children=0 parent=(name=do_c id=5ae8 nb_children=3 parent=(name=root id=098b nb_children=1)))), (name=log id=b8b2 nb_children=0 parent=(name=do_a id=b708 nb_children=0 parent=(name=do_b id=49d2 nb_children=2 parent=(name=do_c id=5ae8 nb_children=1 parent=(name=root id=098b nb_children=1))))), (name=log id=8ed6 nb_children=0 parent=(name=do_a id=26e3 nb_children=0 parent=(name=do_b id=afc9 nb_children=2 parent=(name=do_c id=5ae8 nb_children=2 parent=(name=root id=098b nb_children=1)))))]

# class TraceNode(BaseModel):
#     name: str
#     id: str = Field(default_factory=get_random_hash)
#     parent: Optional["TraceNode"] = None
#     nb_subnodes: int = 0
#     timestamp_init_utc: float = Field(default_factory=get_utc_timestamp)
#     metadata: Optional[dict] = None

class TreeNode(BaseModel):
    name: str
    id: str
    subtrace_idx: int
    children: dict[str, "TreeNode"] = Field(default_factory=dict)
    timestamp_utc: float
    metadata: Optional[Any] = None


def create_adjacency_list(traces: list[TraceNode]) -> dict[str, list[TreeNode]]:
    """
    Create an adjacency list from the traces.
    """
    adjacency_dct: dict[str, dict[str, TreeNode]] = defaultdict(dict)
    for trace in traces:
        while trace.parent is not None:
            child_id = trace.id
            parent_id = trace.parent.id
            adjacency_dct[parent_id][child_id] = TreeNode(
                name=trace.name,
                id=trace.id,
                subtrace_idx=trace.parent.nb_subnodes,
                timestamp_utc=trace.timestamp_init_utc,
                metadata=trace.metadata,
            )
            trace = trace.parent
        else:
            adjacency_dct[""][trace.id] = TreeNode(
                name=trace.name,
                id=trace.id,
                subtrace_idx=0,
                timestamp_utc=trace.timestamp_init_utc,
                metadata=trace.metadata,
            )
    return adjacency_dct


def create_tree_recursive(node: TreeNode, adjacency_dct: dict[str, dict[str, TreeNode]]):
    for child_id, child in adjacency_dct[node.id].items():
        node.children[child_id] = child
        create_tree_recursive(child, adjacency_dct)


def sort_tree_recursive(node: TreeNode):
    node.children = dict(sorted(node.children.items(), key=lambda x: x[1].subtrace_idx))
    for child in node.children.values():
        sort_tree_recursive(child)

def create_tree(logged_traces) -> TreeNode:
    adjacency_dct = create_adjacency_list(logged_traces)
    if len(adjacency_dct[""].values()) > 1:
        root = TreeNode(
            name="temp_root",
            id="",
            subtrace_idx=0,
            timestamp_utc=0,
            metadata="TreeWarning: More than one root node found.",
        )
    else:
        root = next(iter(adjacency_dct[""].values()))
    create_tree_recursive(node=root, adjacency_dct=adjacency_dct)
    sort_tree_recursive(root)
    return root


trace_tree = create_tree(logged_traces)

In [6]:
def create_rich_tree_recursive(node: TreeNode, tree: Tree):
    for _, child in node.children.items():
        time_formatted = datetime.fromtimestamp(child.timestamp_utc).strftime("%Y-%m-%d %H:%M:%S")
        tree_node_repr = f"{child.name} - {time_formatted} [{child.id[:8]}] {child.subtrace_idx}"
        if child.metadata is not None:
            tree_node_repr = Group(tree_node_repr, Panel(str(child.metadata)))
        tree_node = tree.add(tree_node_repr)
        create_rich_tree_recursive(child, tree_node)


def create_rich_tree(node: TreeNode) -> Tree:
    time_formatted = datetime.fromtimestamp(node.timestamp_utc).strftime("%Y-%m-%d %H:%M:%S")
    tree = Tree(f"{node.name} - {time_formatted} [{node.id[:8]}] {node.subtrace_idx}")
    create_rich_tree_recursive(node, tree)
    return tree




rich_tree = create_rich_tree(trace_tree)
rprint(rich_tree)

In [7]:
@trace
def do_a():
    print(TRACE_CONTEXT.get())
    log_trace("Hello world 123!")

@trace
def do_b():
    do_a()
    do_a()

@trace
def do_c():
    do_b()
    do_b()
    do_a()


do_c()
print(TRACE_CONTEXT.get())

NameError: name 'trace' is not defined

In [None]:

# def reverse_trace(trace: TraceNode, child: Optional[TreeNode] = None) -> dict:
#     """
#     Reverse the order of the traces.
#     """
#     children = {}
#     if child is not None:
#         children[child.id] = child
#     tree = TreeNode(
#         name=trace.name,
#         id=trace.id,
#         timestamp_utc=trace.timestamp_init_utc,
#         metadata=trace.metadata,
#         children=children,
#     )
#     if trace.parent is not None:
#         tree = reverse_trace(trace.parent, tree)
#     return tree


# print(logged_traces[0])
# print(reverse_trace(logged_traces[0]))


# def merge_tree(tree1: TreeNode, tree2: TreeNode) -> TreeNode:
#     """
#     Merge two trees into one tree.
#     """
#     if tree1.id == tree2.id:
#         tree1.children.update(tree2.children)
#         return tree1
#     else:
#         for child in tree1.children.values():
#             tree1.children[child.id] = merge_tree(child, tree2)
#         return tree1


# def merge_trees(trees: list[TreeNode]) -> TreeNode:
#     """
#     Merge the trees into one tree.
#     """
#     merged_tree = trees[0]
#     for tree in trees[1:]:
#         merged_tree = merge_tree(merged_tree, tree)
#     return merged_tree

# # def create_tree(traced_logs) -> dict:
# #     """
# #     Create a tree from the logged traces, taking into account the parent-child relationships and ids.
# #     The traced_logs need to be reversed.
# #     """
# #     tree = {}
# #     for traced_log in traced_logs:
# #         trace = traced_log.trace
# #         if trace.parent is None:
# #             tree[trace.id] = trace
# #         else:
# #             parent_id = trace.parent.id
# #             parent = tree[parent_id]
# #             parent.nb_children += 1
# #             tree[trace.id] = trace
# #     return tree


# # create_tree(logged_traces)