# After sales text clustering using Transformers
## Sentence Transformers - BERT embeddings

In [1]:
import pandas as pd
from sentence_transformers import SentenceTransformer

import os

pd.options.mode.chained_assignment = None

data_version = "2024-05-14"
data_base_path = "../DATA/processed"

In [2]:
# Preprocess text
def preprocess_text(text, words_to_remove=None):
    if words_to_remove is None:
        words_to_remove = []

    text = text.lower()
    words = text.split()
    text = [word for word in words if word not in words_to_remove]
    return " ".join(text)


# Load data
data_path = os.path.join(data_base_path, f"{data_version}", "text_to_analyse_clean.csv")
text_to_analyse = pd.read_csv(data_path, sep="¬", engine="python")

words_to_remove = [
    "averia",
    "averías",
    "avería",
    "defecto",
    "defectos",
    "error",
    "errores",
    "fallo",
    "fallos",
    "falla",
    "motivo",
    "motivos",
    "proble",
    "problema",
    "problemas",
]

text_to_analyse["processed_text"] = text_to_analyse["text_to_analyse"].apply(
    preprocess_text
)

In [3]:
text_to_analyse.head()

In [4]:
# Load model
model = SentenceTransformer("all-mpnet-base-v2")

# Compute embeddings
embeddings = model.encode(text_to_analyse["processed_text"].tolist())
text_to_analyse["embeddings"] = embeddings.tolist()

In [5]:
# Load Errors
errors = pd.read_csv("../DATA/TablaTipoErrorPostventa.csv", sep=";", header=0)[
    ["Código", "CODCAR3", "CODCAR2", "DESCFAM", "Motivo General", "DESCRIPCION"]
]
errors.columns = [
    "ID_ERROR",
    "CODCAR3",
    "CODCAR2",
    "DESCFAM",
    "MOTIVO",
    "DESCRIPCION",
]  # Rename columns
errors["DESCRIPCION_DETAILED"] = errors[
    "MOTIVO"
]  # + ' ' + errors['DESCRIPCION'].fillna('') # Concatenate MOTIVO and DESCRIPCION
errors["CODCAR2"] = errors["CODCAR2"].str.replace("-", "0").astype(int)  # Clean CODCAR2

In [6]:
# Calculate embeddings for errors
errors["description_processed"] = errors["DESCRIPCION_DETAILED"].apply(preprocess_text)
errors_embeddings = model.encode(errors["description_processed"].tolist())
errors["embeddings"] = errors_embeddings.tolist()

In [7]:
# Calculate similarity
from sklearn.metrics.pairwise import cosine_similarity


def calculate_similarity(embeddings, error_embeddings):
    return cosine_similarity([embeddings], [error_embeddings])[0][0]

In [8]:
# Calculate the cosine similarity between the text_to_analyse and the errors
for index, row in errors.iterrows():
    # Create a condition for filtering
    condition = text_to_analyse["CAR3"] == row["CODCAR3"]
    if row["CODCAR2"]:
        condition &= text_to_analyse["CAR2"] == row["CODCAR2"]

    if not text_to_analyse.loc[condition, "embeddings"].empty:
        text_to_analyse.loc[condition, f'cosine_similarity_{row["ID_ERROR"]}'] = (
            text_to_analyse.loc[condition, "embeddings"].apply(
                lambda x: calculate_similarity(x, row["embeddings"])
            )
        )

    print(f"Error {row['ID_ERROR']} calculated")

In [9]:
text_to_analyse.sample(10)

In [10]:
cosine_columns = [col for col in text_to_analyse.columns if "cosine_similarity_" in col]
text_to_analyse[cosine_columns] = text_to_analyse[cosine_columns].fillna(
    0
)  # Fill NA with 0
text_to_analyse.loc[:, "highest_score"] = text_to_analyse[cosine_columns].max(axis=1)
text_to_analyse.loc[:, "highest_score_error"] = (
    text_to_analyse[cosine_columns].idxmax(axis=1).apply(lambda x: x.split("_")[-1])
)

In [11]:
errors["ID_ERROR"] = errors["ID_ERROR"].astype(str)
text_to_analyse = text_to_analyse.merge(
    errors[["ID_ERROR", "MOTIVO"]],
    left_on="highest_score_error",
    right_on="ID_ERROR",
    how="left",
)

In [12]:
text_to_analyse.sample(10)

In [13]:
top10_per_error = (
    text_to_analyse[
        ["codigo", "text_to_analyse", "highest_score", "highest_score_error"]
    ]
    .groupby("highest_score_error", group_keys=False)
    .apply(lambda x: x.nlargest(10, "highest_score"))
    .reset_index(drop=True)
)

top10_per_error.head(500)

In [14]:
text_to_analyse[text_to_analyse["codigo"] == "MMHSNG1V2C"][
    ["codigo", "text_to_analyse", "highest_score", "highest_score_error"]
]

In [15]:
"""import csv
top50_per_error = text_to_analyse[['codigo','text_to_analyse', 'highest_score', 'highest_score_error']] \
    .groupby('highest_score_error', group_keys=False) \
    .apply(lambda x: x.nlargest(50, 'highest_score')) \
    .reset_index(drop=True)

top50_per_error.to_csv("../DATA/processed/2024-05-14/top50_per_error.csv", index=False, encoding="utf-8", quoting=csv.QUOTE_ALL)"""

# Save results in Qdrant database

In [16]:
from qdrant_client import QdrantClient
from qdrant_client.http import models as qmodels
from qdrant_client.http.models import Filter, FieldCondition, MatchValue

qdrant_client = QdrantClient(":memory:")

qdrant_client.create_collection(
    collection_name="MyZone-DefectClassification",
    vectors_config=qmodels.VectorParams(size=768, distance=qmodels.Distance.COSINE),
)

In [17]:
# Insert vectors into Qdrant
points = [
    qmodels.PointStruct(
        id=id,
        vector=row["embeddings"],
        payload={
            "error": row["highest_score_error"],
            "codigo": row["codigo"],
            "error_description": row["MOTIVO"],
            "text": row["text_to_analyse"],
            "family": row["CAR3"],
            "sistema": row["CAR2"],
        },
    )
    for id, row in text_to_analyse.iterrows()
]

qdrant_client.upsert(collection_name="MyZone-DefectClassification", points=points)

In [18]:
# Define a function to search for errors
def define_error(text, family):
    query_embedding = model.encode(text).tolist()

    # Define the filter
    filter_condition = Filter(
        must=[FieldCondition(key="family", match=MatchValue(value=int(family)))]
    )

    search_result = qdrant_client.search(
        collection_name="MyZone-DefectClassification",
        query_vector=query_embedding,
        limit=20,
        query_filter=filter_condition,
    )
    df = pd.DataFrame(
        [
            (result.payload["error_description"], result.score)
            for result in search_result
        ],
        columns=["Error", "Score"],
    )

    group = df.groupby("Error").max().sort_values(by="Score", ascending=False)

    # total = group[1].sum()
    # group['percentage'] = group[group[1] > 0.2][1].apply(lambda x: x/total)

    return group["Score"].to_dict()

In [19]:
define_error("Fallo de la pantalla", "91")

In [20]:
import gradio as gr

# Create the Gradio interface
iface = gr.Interface(
    fn=define_error,
    inputs=[
        gr.Textbox(
            lines=1,
            placeholder="Enter error description here",
            label="Error Description",
            value="Fallo de la pantalla",
        ),
        gr.Textbox(
            lines=1, placeholder="Product Family", label="Product Family", value="91"
        ),
    ],
    outputs=gr.Label(num_top_classes=5),
    title="Error probability calculator",
    description="Enter error description and the product family to get the most probable error.",
)

# Launch the interface
iface.launch()

## Evaluate the model

In [21]:
# Separate the data into train and test
test_dataset_ids = pd.read_csv("../DATA/processed/test_dataset_ids.csv")
test_dataset = text_to_analyse[
    text_to_analyse["codigo"].isin(test_dataset_ids["codigo"])
]

In [22]:
test_dataset = test_dataset.merge(test_dataset_ids, on="codigo", how="inner")
test_dataset["ERROR_POSTVENTA"] = test_dataset["ERROR_POSTVENTA"].astype(str)
test_dataset["highest_score_error"] = test_dataset["highest_score_error"].astype(str)

In [26]:
test_dataset[["highest_score_error", "ERROR_POSTVENTA"]]

In [24]:
from sklearn.metrics import classification_report

print(
    classification_report(
        test_dataset["ERROR_POSTVENTA"], test_dataset["highest_score_error"]
    )
)

In [25]:
test_dataset.count()