In [1]:
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-8.15.0-py3-none-any.whl.metadata (8.7 kB)
Collecting elastic-transport<9,>=8.13 (from elasticsearch)
  Downloading elastic_transport-8.15.0-py3-none-any.whl.metadata (3.6 kB)
Downloading elasticsearch-8.15.0-py3-none-any.whl (523 kB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m523.3/523.3 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
[?25hDownloading elastic_transport-8.15.0-py3-none-any.whl (64 kB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.4/64.4 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: elastic-transport, elasticsearch
Successfully installed elastic-transport-8.15.0 elasticsearch-8.15.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m

In [2]:
project_id = "KAXYxPR8MUgTcP8CF193y"
# project_id = "EEisP6epvj-no_veGiHTQ"
# project_id = "project_iCTt0LSMXYbv5jZNSdtEr"
# project_id = "project_FPGv6-1y-Sr4nqP1EoEQW"

In [5]:
from dotenv import load_dotenv

load_dotenv()

import os
from elasticsearch import Elasticsearch
import pandas as pd

es = Elasticsearch(
    os.environ["ELASTICSEARCH_NODE_URL"], api_key=os.environ["ELASTICSEARCH_API_KEY"]
)

response = es.search(
    index="search-traces-alias",
    query={
        "bool": {
            "must": [
                {"term": {"project_id": project_id}},
                {"exists": {"field": "input.embeddings.embeddings"}},
                {
                    "range": {
                        "timestamps.inserted_at": {
                            "gte": "now-1M",
                            "lt": "now",
                        }
                    }
                },
            ],
        }
    },
    source=["trace_id", "input"],  # type: ignore
    sort=[{"timestamps.inserted_at": "asc"}, {"trace_id": "asc"}],
    size=2000,
)

df = pd.DataFrame(
    [
        {
            "trace_id": hit["_source"]["trace_id"],
            "input": hit["_source"]["input"]["value"],
            "embeddings": hit["_source"]["input"]["embeddings"]["embeddings"],
        }
        for hit in response["hits"]["hits"]
    ]
)
df.to_csv(f"./data/traces_for_topics_{project_id}.csv", index=False)
df.head()

Unnamed: 0,trace_id,input,embeddings
0,trace_kbsOG-U46Br8jv9TaE6Zq,hi,"[-0.0037493026, -0.019173345, 0.0121757155, 0...."
1,trace_aWlIO_1Pq8eW3R17BoiUK,hii,"[0.038733196, -0.037454136, 0.025052842, 0.017..."
2,trace_--palpjWmJs_23f1y4D1u,aaaa,"[0.02986709, -0.016567731, -0.008210792, 0.013..."
3,trace_TaHab1z-GJXOvzVWXWu57,hi,"[-0.0037493026, -0.019173345, 0.0121757155, 0...."
4,trace_3y_a8PMk-5xrSh0a8wz0u,hi,"[-0.0037493026, -0.019173345, 0.0121757155, 0...."


In [None]:
from typing import TypedDict
import pandas as pd


df = pd.read_csv(f"notebooks/data/traces_for_topics_{project_id}.csv")
# convert embeddings column from string to list of floats
df["embeddings"] = df["embeddings"].apply(
    lambda x: list(map(float, x[1:-1].split(", ")))
)


class Trace(TypedDict):
    trace_id: str
    input: str
    embeddings: list[float]


traces: list[Trace] = []
for i, row in df.iterrows():
    traces.append(
        Trace(
            trace_id=row["trace_id"],
            input=row["input"],
            embeddings=row["embeddings"],
        )
    )
traces

In [None]:
from scipy.cluster.hierarchy import dendrogram, linkage
from matplotlib import pyplot as plt
import numpy as np

embeddings = np.array([np.array(x) for x in df["embeddings"]])
Z = linkage(embeddings, "ward")
fig = plt.figure(figsize=(25, 10))
dn = dendrogram(Z)
plt.show()

In [None]:
from scipy.cluster.hierarchy import fcluster

cophenetic_distances_for_topics = 4
cophenetic_distances_for_subtopics = 2
topic_ids = fcluster(Z, cophenetic_distances_for_topics, criterion="distance")

set(topic_ids)

In [None]:
from pprint import pprint
from scipy.spatial.distance import cdist


def calculate_centroid_and_distance(samples) -> tuple[list[float], float]:
    centroid = np.mean([np.array(item["embeddings"]) for item in samples], axis=0)
    distances = cdist(
        [np.array(item["embeddings"]) for item in samples],
        np.array([centroid]),
        "cosine",
    ).flatten()
    p95_distance = np.percentile(distances, 95).astype(float)

    return centroid, p95_distance


minimum_subtopics_per_topic = 1
minimum_traces_per_topic = 5


def build_hierarchy(
    traces: list[Trace],
    cophenetic_distance: int,
    with_embeddings=True,
    maximum_p95_distance: float = 1,
    with_subtopics=True,
) -> dict[str, dict[str, list[Trace]]]:
    embeddings = [t["embeddings"] for t in traces]
    Z = linkage(embeddings, "ward")
    topic_ids = fcluster(Z, cophenetic_distance, criterion="distance")

    # Dictionary to hold our two-level hierarchy
    hierarchy = {}

    # Iterate over each unique topic to create subtopics
    for topic_id in set(topic_ids):
        # Isolate samples that belong to the current topic
        indices_in_topic = [i for i, t in enumerate(topic_ids) if t == topic_id]

        traces_in_topic = [traces[i] for i in indices_in_topic]

        # If there's less than the minimum number of traces, skip this topic
        unique_inputs = set([t["input"] for t in traces_in_topic])
        if len(unique_inputs) < minimum_traces_per_topic:
            continue

        _, p95_distance = calculate_centroid_and_distance(traces_in_topic)
        # Skip this topic if the p95 distance is too large
        if p95_distance > maximum_p95_distance:
            continue

        if with_subtopics:
            subtopics = build_hierarchy(
                traces_in_topic,
                cophenetic_distances_for_subtopics,
                with_embeddings,
                maximum_p95_distance,
                False,
            )

            if len(subtopics.keys()) < minimum_subtopics_per_topic:
                continue

            hierarchy[f"Topic {topic_id}"] = subtopics
        else:
            hierarchy[f"Subtopic {topic_id}"] = [
                Trace(
                    trace_id=traces[i]["trace_id"],
                    input=traces[i]["input"],
                    embeddings=traces[i]["embeddings"] if with_embeddings else "[embeddings]",  # type: ignore
                )
                for i in indices_in_topic
            ]

    return hierarchy


hierarchy = build_hierarchy(traces, cophenetic_distances_for_topics)
pprint(build_hierarchy(traces, cophenetic_distances_for_topics, False))

In [None]:
import numpy as np

topics_count = len(hierarchy.keys())
subtopics_counts = [len(hierarchy[key].keys()) for key in hierarchy.keys()]
samples_counts = [
    len(samples) for subtopics in hierarchy.values() for samples in subtopics.values()
]

print(f"Number of topics: {topics_count}")
print(f"Number of subtopics: {subtopics_counts}")
print(
    f"Stats of samples:",
    "Mean",
    np.mean(samples_counts),
    "Median",
    np.median(samples_counts),
    "Max",
    np.max(samples_counts),
    "Min",
    np.min(samples_counts),
)

In [None]:
from typing import Optional
from openai import OpenAI
import os
import json

openai = OpenAI(api_key=os.environ["OPENAI_API_KEY"])


def generate_topic_names(
    topic_examples: list[list[str]], existing: Optional[list[str]] = None
) -> list[str]:
    example_count = sum([len(examples) for examples in topic_examples])
    print(
        f"Generating names for {len(topic_examples)} topics with {example_count} examples total."
    )
    topic_examples_str = "\n\n\n".join(
        [
            f"# Topic {index} Samples\n\n" + "\n".join(samples)
            for index, samples in enumerate(topic_examples)
        ]
    )

    existing_str = "\n".join(existing) if existing else ""
    existing_message = (
        f"\n\nThose are the topics that already exist, avoid using any names that may overlap \
        in meaning with them, think how the current sample data is unique and different from those instead:\n\n\
            {existing_str}"
        if existing_str
        else ""
    )

    response = openai.chat.completions.create(
        model="gpt-4-1106-preview",
        temperature=0.1,
        messages=[
            {
                "role": "system",
                "content": f"You are a highly knowledgeable assistant tasked with taxonomy for naming topics \
                    based on a list of examples. Provide a single, descriptive name for each topic. \
                    Topic names should not be similar to each other, as the data is already organized, \
                    the disambiguation between two similar topics should be clear from the name alone.\
                        {existing_message}",
            },
            {"role": "user", "content": f"{topic_examples_str}"},
        ],
        tools=[
            {
                "type": "function",
                "function": {
                    "name": "topicNames",
                    "parameters": {
                        "type": "object",
                        "properties": dict(
                            [
                                (f"topic_{index}", {"type": "string"})
                                for index in range(len(topic_examples_str))
                            ]
                        ),
                    },
                    "description": "use this function to name all the topics based on the examples provided",
                },
            }
        ],
        tool_choice={"type": "function", "function": {"name": "topicNames"}},
    )

    return list(json.loads(response.choices[0].message.tool_calls[0].function.arguments).values())  # type: ignore


topic_names = generate_topic_names(
    [
        ["example1", "example2"],
        ["foo", "bar"],
    ]
)

print(topic_names)

In [None]:
from random import random
from typing import Iterable, TypeVar, Optional

T = TypeVar("T")


def shuffled(x: Iterable[T]) -> list[T]:
    return sorted(x, key=lambda _: random())


def get_subtopic_samples(samples: list[Trace], n=5):
    unique_values = list(set([item["input"] for item in samples]))
    return [item[0:140] for item in shuffled(unique_values)[0:n]]


def generate_topic_and_subtopic_names(
    hierarchy: dict[str, dict[str, list[Trace]]],
    existing: Optional[list[str]] = None,
    skip_topic_names: bool = False,
):
    topic_examples = [
        shuffled(
            [
                item
                for samples in subtopics.values()
                for item in get_subtopic_samples(samples, n=5)
            ]
        )[0:30]
        for subtopics in hierarchy.values()
    ]

    topic_names = (
        list(hierarchy.keys())
        if skip_topic_names
        else generate_topic_names(topic_examples, existing=existing)
    )

    subtopic_names = [
        generate_topic_names(
            [get_subtopic_samples(samples, n=10) for samples in subtopics.values()],
            existing=existing,
        )
        for subtopics in hierarchy.values()
    ]

    return topic_names, subtopic_names


topic_names, subtopic_names = generate_topic_and_subtopic_names(hierarchy)

topic_names, subtopic_names

In [None]:
from typing import Optional, TypedDict
from scipy.spatial.distance import cdist
import nanoid


class Topic(TypedDict):
    id: str
    name: str
    centroid: list[float]
    p95_distance: float


class Subtopic(Topic):
    parent_id: str


class TraceTopicMap(TypedDict):
    trace_id: str
    topic_id: Optional[str]
    subtopic_id: Optional[str]


def build_response(
    hierarchy: dict[str, dict[str, list[Trace]]],
    topic_names: list[str],
    subtopic_names: list[list[str]],
) -> tuple[list[Topic], list[Subtopic], list[TraceTopicMap]]:
    topics: list[Topic] = []
    subtopics: list[Subtopic] = []
    traces: list[TraceTopicMap] = []

    for topic_idx, topic in enumerate(hierarchy.values()):
        topic_id = None

        topic_samples = [
            item for subtopic_samples in topic.values() for item in subtopic_samples
        ]
        unique_values = list(set([item["input"] for item in topic_samples]))
        if (
            len(topic.values()) >= minimum_subtopics_per_topic
            and len(unique_values) >= minimum_traces_per_topic
        ):
            topic_id = f"topic_{nanoid.generate()}"
            topic_name = topic_names[topic_idx]
            topic_centroid, topic_p95_distance = calculate_centroid_and_distance(
                topic_samples
            )

            topics.append(
                Topic(
                    id=topic_id,
                    name=topic_name,
                    centroid=topic_centroid,
                    p95_distance=topic_p95_distance,
                )
            )

        for subtopic_idx, subtopic in enumerate(topic.values()):
            subtopic_id = None

            unique_values = list(set([item["input"] for item in subtopic]))
            if topic_id and len(unique_values) >= minimum_traces_per_topic:
                subtopic_id = f"subtopic_{nanoid.generate()}"
                subtopic_name = subtopic_names[topic_idx][subtopic_idx]
                subtopic_centroid, subtopic_p95_distance = (
                    calculate_centroid_and_distance(subtopic)
                )

                subtopics.append(
                    Subtopic(
                        id=subtopic_id,
                        name=subtopic_name,
                        centroid=subtopic_centroid,
                        p95_distance=subtopic_p95_distance,
                        parent_id=topic_id,
                    )
                )

            for trace in subtopic:
                traces.append(
                    TraceTopicMap(
                        trace_id=trace["trace_id"],
                        topic_id=topic_id,
                        subtopic_id=subtopic_id,
                    )
                )

    return topics, subtopics, traces


topics, subtopics, traces_ = build_response(hierarchy, topic_names, subtopic_names)

(topics, subtopics, traces_)

# Incremental Clustering

In [None]:
U = TypeVar("U", Topic, Subtopic)


def get_matching_topic(trace: Trace, topics: list[U]) -> Optional[U]:
    trace_embeddings = np.array(trace["embeddings"])
    centroid_distances = cdist(
        [trace_embeddings],
        np.array([t["centroid"] for t in topics]),
        "cosine",
    ).flatten()

    # Find the closest topic to assign the trace
    sorted_topics = sorted(zip(topics, centroid_distances), key=lambda x: x[1])

    if sorted_topics[0][1] <= sorted_topics[0][0]["p95_distance"] * 1.1:
        return sorted_topics[0][0]
    else:
        return None


def assign_trace_to_topic(
    trace: Trace, topics: list[Topic], subtopics: list[Subtopic]
) -> TraceTopicMap:
    topic_id = None
    subtopic_id = None

    matching_topic = get_matching_topic(trace, topics)
    if matching_topic:
        topic_id = matching_topic["id"]
        subtopics_ = [s for s in subtopics if s["parent_id"] == topic_id]

        matching_subtopic = (
            get_matching_topic(trace, subtopics_) if len(subtopics_) > 0 else None
        )

        if matching_subtopic:
            subtopic_id = matching_subtopic["id"]

    return TraceTopicMap(
        trace_id=trace["trace_id"],
        topic_id=topic_id,
        subtopic_id=subtopic_id,
    )


topic_matches = 0
topic_mismatches = 0
topic_no_matches = 0
subtopic_matches = 0
subtopic_mismatches = 0
subtopic_no_matches = 0

for new_trace_to_add in traces:
    assigned_traces = [
        t for t in traces_ if t["trace_id"] == new_trace_to_add["trace_id"]
    ]
    if not len(assigned_traces):
        continue

    assigned_trace = assigned_traces[0]

    current_topic = [t for t in topics if t["id"] == assigned_trace["topic_id"]][0]
    current_subtopic = [
        t for t in subtopics if t["id"] == assigned_trace["subtopic_id"]
    ][0]

    trace_topic_map = assign_trace_to_topic(new_trace_to_add, topics, subtopics)

    if trace_topic_map["topic_id"] == assigned_trace["topic_id"]:
        topic_matches += 1
    elif trace_topic_map["topic_id"] is None:
        topic_no_matches += 1
    else:
        topic_mismatches += 1

    if trace_topic_map["subtopic_id"] == assigned_trace["subtopic_id"]:
        subtopic_matches += 1
    elif trace_topic_map["subtopic_id"] is None:
        subtopic_no_matches += 1
    else:
        subtopic_mismatches += 1

topics_precision = topic_matches / (topic_matches + topic_mismatches)
topics_recall = topic_matches / (topic_matches + topic_mismatches + topic_no_matches)
topics_f1 = 2 * (topics_precision * topics_recall) / (topics_precision + topics_recall)
print(
    f"Topics: Precision: {topics_precision:.3f}, Recall: {topics_recall:.3f}, F1: {topics_f1:.3f}"
)

subtopics_precision = subtopic_matches / (subtopic_matches + subtopic_mismatches)
subtopics_recall = subtopic_matches / (
    subtopic_matches + subtopic_mismatches + subtopic_no_matches
)
subtopics_f1 = (
    2
    * (subtopics_precision * subtopics_recall)
    / (subtopics_precision + subtopics_recall)
)
print(
    f"Subtopics: Precision: {subtopics_precision:.3f}, Recall: {subtopics_recall:.3f}, F1: {subtopics_f1:.3f}"
)

In [None]:
import numpy as np
from scipy.cluster.hierarchy import dendrogram, linkage
from matplotlib import pyplot as plt
import numpy as np

new_traces_to_assign = [
    trace
    for trace in traces
    if not assign_trace_to_topic(trace, topics, subtopics)["topic_id"]
]


def maybe_create_new_topics(
    traces: list[Trace], topics: list[U], cophenetic_distances: int, with_subtopics=True
) -> tuple[list[Topic], list[Subtopic], list[TraceTopicMap]]:
    average_p95_distance = np.mean([t["p95_distance"] for t in topics]).astype(float)
    print("average_p95_distance", average_p95_distance)
    print("number of new traces to assign", len(traces))
    print([x["input"] for x in traces])
    centroid = np.mean([np.array(x["embeddings"]) for x in traces], axis=0)
    distances = cdist(
        [np.array(x["embeddings"]) for x in traces],
        np.array([centroid]),
        "cosine",
    ).flatten()
    print("new traces p95 distance", np.percentile(distances, 95).astype(float))

    embeddings = np.array([np.array(x["embeddings"]) for x in traces])
    Z = linkage(embeddings, "ward")
    fig = plt.figure(figsize=(25, 10))
    dn = dendrogram(Z)
    plt.show()

    new_hierarchy = build_hierarchy(
        traces,
        cophenetic_distances,
        True,
        maximum_p95_distance=average_p95_distance,
        with_subtopics=with_subtopics,
    )

    if len(new_hierarchy.keys()) == 0:
        return [], [], []

    existing = [t["name"] for t in topics]
    if with_subtopics:
        topic_names, subtopic_names = generate_topic_and_subtopic_names(new_hierarchy, existing=existing)
    else:
        new_hierarchy: dict[str, dict[str, list[Trace]]] = {"New Sub Topics": new_hierarchy}  # type: ignore
        topic_names, subtopic_names = generate_topic_and_subtopic_names(new_hierarchy, existing=existing, skip_topic_names=True)
    return build_response(new_hierarchy, topic_names, subtopic_names)


new_topics, new_subtopics, new_traces_to_assign = maybe_create_new_topics(
    new_traces_to_assign, topics, cophenetic_distances_for_topics
)

(new_topics, new_subtopics, new_traces_to_assign)

In [None]:
new_traces_to_assign_to_subtopics_map: dict[str, list[Trace]] = {}
for trace in traces:
    trace_topic_map = assign_trace_to_topic(trace, topics, subtopics)
    topic_id = trace_topic_map["topic_id"]
    if topic_id and not trace_topic_map["subtopic_id"]:
        if topic_id not in new_traces_to_assign_to_subtopics_map:
            new_traces_to_assign_to_subtopics_map[topic_id] = []
        new_traces_to_assign_to_subtopics_map[topic_id].append(trace)

for topic_id, traces_ in new_traces_to_assign_to_subtopics_map.items():
    subtopics_ = [t for t in subtopics if t["parent_id"] == topic_id]
    _, new_subtopics_, new_traces_to_assign_ = maybe_create_new_topics(
        traces_,
        subtopics_,
        cophenetic_distances_for_subtopics,
        with_subtopics=False,
    )
    new_subtopics__ = [
        Subtopic(
            id=s["id"],
            name=s["name"],
            centroid=s["centroid"],
            p95_distance=s["p95_distance"],
            parent_id=topic_id,
        )
        for s in new_subtopics_
    ]

    new_subtopics += new_subtopics__
    new_traces_to_assign += new_traces_to_assign_

(new_topics, new_subtopics, new_traces_to_assign)