In [None]:
import json
import os
import pickle
from math import ceil, floor
from pprint import pprint
from typing import List

os.environ["DSP_CACHEBOOL"] = "TRUE"
os.environ["DSP_CACHEDIR"] = "./cache/library"
os.environ["DSP_NOTEBOOK_CACHEDIR"] = "./cache/notebook"
os.environ["LITELLM_MODE"] = "PRODUCTION"

import dsp
import dspy
import emoji
import Levenshtein
import numpy as np
import pandas as pd
import phoenix
import pydantic
from dspy.evaluate import Evaluate
from dspy.teleprompt import BootstrapFewShotWithRandomSearch, LabeledFewShot
from dspy.teleprompt.signature_opt_typed import optimize_signature
from openinference.instrumentation.dspy import DSPyInstrumentor
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
    OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

from library.types import *
from library.utils import *

phoenix.launch_app(host="localhost", port=6006)
tracer_provider = trace_sdk.TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:6006/v1/traces")))
trace_api.set_tracer_provider(tracer_provider)
DSPyInstrumentor().instrument()

evaluate = Evaluate(devset=None, metric=None, num_threads=os.cpu_count() // 2, display_progress=True, display_table=10)

In [2]:
# TODO: Check and play with STOP sequences
params = { "max_tokens": 1024, "temperature": 0.7 }

gpt35 = dspy.ChatBackend(model="openai/gpt-3.5-turbo-instruct", api_key=os.environ["OPENAI_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
gpt4o = dspy.ChatBackend(model="openai/gpt-4o", api_key=os.environ["OPENAI_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
gqmix = dspy.ChatBackend(model="groq/mixtral-8x7b-32768", api_key=os.environ["GROQ_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
gqll3 = dspy.ChatBackend(model="groq/llama-3.2-11b-text-preview", api_key=os.environ["GROQ_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
asmix = dspy.ChatBackend(model="anyscale/mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=os.environ["ANYSCALE_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
asll3 = dspy.ChatBackend(model="anyscale/meta-llama/Meta-Llama-3-8B-Instruct", api_key=os.environ["ANYSCALE_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)
fwl31 = dspy.ChatBackend(model="fireworks_ai/accounts/fireworks/models/llama-v3p1-8b-instruct", api_key=os.environ["FIREWORKS_API_KEY"], params=params, attempts=3, system_prompt=SYSTEM_PROMPT)

dspy.configure(backend=gqll3, trace=[], cache=True) # trace=[] needed to run assertions and suggestions!

In [None]:
# TODO: The current sample has a majority of english feedbacks,
# this is ok for now but enhance in future iterations
with open("artifacts/feedbacks/labeled.json", "r") as file:
    feedbacks = json.load(file)

feedbacks = pd.DataFrame(feedbacks)
display(feedbacks.head())
print(f"{ceil(feedbacks['content'].apply(len).mean())} average feedback length ~ {ceil(feedbacks['content'].apply(tokenizer).apply(len).mean())} tokens")

# Extract Issues

## Pipeline

In [4]:
class IssueGenerator(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        feedback: str

    class Output(pydantic.BaseModel):
        class Issue(pydantic.BaseModel):
            title: str
            description: str
            steps: List[str]

        issues: List[Issue]

    class GenerateIssues(dspy.Signature):
        """
List valid issues, that a customer has with a product (context is provided), from the customer's feedback.
- Issues that the customer did not explicitly state are invalid issues.
- If the customer is uncertain of an issue it is an invalid issue.
- Issues without steps to reproduce them are still valid issues.
- Suggestions, reviews, opinions or preferences are invalid issues.
- Lexicographic, syntactic, spelling, grammar or any other language mistakes of the feedback's text are invalid issues.
- Again, an issue cannot be supposed to be valid if the customer did not explicitly state it.
        """

        class Input(pydantic.BaseModel):
            context: str
            feedback: str

        class Output(pydantic.BaseModel):
            class Issue(pydantic.BaseModel):
                title: str = pydantic.Field(description="4 to 10 words, which cannot contain the words `issue` (or synonyms), `customer` (or synonyms) or the product's name.", max_length=100)
                description: str = pydantic.Field(description="Long, complete explanation, but without redundant information, using the feedback's original words. Must focus solely on the issue by depersonalizing the sentences.")
                steps: List[str] = pydantic.Field(description="Precise steps, but very concise, if any, to be able to reproduce the issue, else `[]`.", max_items=5)

            issues: List[Issue] = pydantic.Field(description="If any, else `[]`.")

        input: Input = dspy.InputField()
        output: Output = dspy.OutputField()

    def __init__(self) -> None:
        super().__init__()

        self.generate_issues = ChainOfThought(self.GenerateIssues, max_retries=3, explain_errors=False)

        self.activate_assertions(handler=dspy.backtrack_handler, max_backtracks=3)

    def forward(self, input: Input) -> dspy.Prediction:
        issues = self.generate_issues(input=self.GenerateIssues.Input(
            context=input.context,
            feedback=input.feedback,
        )).output.issues

        return dspy.Prediction(output=self.Output(
            issues=[self.Output.Issue(
                title=issue.title,
                description=issue.description,
                steps=issue.steps,
            ) for issue in issues],
        ))

In [5]:
class InfoInferrer(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        feedback: str
        issue: str
        categories: List[str]

    class Output(pydantic.BaseModel):
        severity: str
        category: str

    class InferInfo(dspy.Signature):
        """
Infer the following information from an issue that an LLM extracted from the feedback of a customer.
- Discern the severity, valid options (`severities`) are provided.
- Discern the category, valid options (`categories`) are provided.
        """

        class Input(pydantic.BaseModel):
            context: str
            feedback: str
            issue: str
            severities: List[str]
            categories: List[str]

        class Output(pydantic.BaseModel):
            severity: str = pydantic.Field(description="The valid option that best fits.")
            category: str = pydantic.Field(description=f"The valid option that best fits, if any, else `{UNKNOWN_OPTION}`.")

        input: Input = dspy.InputField()
        output: Output = dspy.OutputField()

    def __init__(self) -> None:
        super().__init__()

        self.infer_info = ChainOfThought(self.InferInfo, max_retries=3, explain_errors=False)

        self.activate_assertions(handler=dspy.backtrack_handler, max_backtracks=3)

    def forward(self, input: Input) -> dspy.Prediction:
        info = self.infer_info(input=self.InferInfo.Input(
            context=input.context,
            feedback=input.feedback,
            issue=input.issue,
            severities=[severity.replace("_", " ") for severity in Severity.list()],
            categories=list({category.replace("_", " ") for category in input.categories + [UNKNOWN_OPTION]}),
        )).output

        severity = info.severity.upper().replace(" ", "_")

        dspy.Assert(
            severity in Severity.list(),
            f'Severity must be {self.InferInfo.Output.model_fields["severity"].description}! `{severity}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in Severity.list()])
        )

        category = info.category.upper().replace(" ", "_")
        if not input.categories:
            category = UNKNOWN_OPTION

        dspy.Suggest(
            category in input.categories or category == UNKNOWN_OPTION,
            f'Category must be {self.InferInfo.Output.model_fields["category"].description}! `{category}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in input.categories])
        )

        if category not in input.categories:
            category = UNKNOWN_OPTION

        return dspy.Prediction(output=self.Output(
            severity=severity,
            category=category,
        ))

In [6]:
class IssueExtractor(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        categories: List[str]
        feedback: str

    class Output(pydantic.BaseModel):
        class Issue(pydantic.BaseModel):
            title: str
            description: str
            steps: List[str]
            severity: str
            category: str

        issues: List[Issue]

    def __init__(self) -> None:
        super().__init__()

        self.IssueGenerator = IssueGenerator
        self.generate_issues = self.IssueGenerator()
        self.generate_issues.load("artifacts/issue_extractor/issue_generator/labeled_few_shot.json")
        self.InfoInferrer = InfoInferrer
        self.infer_info = self.InfoInferrer()
        self.infer_info.load("artifacts/issue_extractor/info_inferrer/labeled_few_shot.json")

    def forward(self, input: Input) -> dspy.Prediction:
        generated_issues = self.generate_issues(input=self.IssueGenerator.Input(
            context=input.context,
            feedback=input.feedback,
        )).output.issues

        issues = []
        for issue in generated_issues:
            info = self.infer_info(input=self.InfoInferrer.Input(
                context=input.context,
                feedback=input.feedback,
                issue=issue.description,
                categories=input.categories,
            )).output

            issues.append(self.Output.Issue(
                title=issue.title,
                description=issue.description,
                steps=issue.steps,
                severity=info.severity,
                category=info.category,
            ))

        return dspy.Prediction(output=self.Output(
            issues=issues,
        ))

## Evaluation

TODO: Issue labels are needed to evaluate the pipeline

In [None]:
from random import random

m = int(random() * len(feedbacks))
fed = feedbacks.iloc[m]

print(fed["translation"])
print()
print(fed["categories"])
print("="*40, m, "="*40)

issues = IssueExtractor()(input=IssueExtractor.Input(
    context=fed["context"],
    categories=fed["categories"],
    feedback=fed["translation"],
)).output.issues
for issue in issues:
    print(f"[{issue.severity}]", issue.title)
    print("Category:", issue.category or "None")
    print()
    print(issue.description)
    if issue.steps:
        print("Steps to reproduce:")
        for i, step in enumerate(issue.steps):
            print(f"{i+1}. {step}")

    print("-"*80)

print("^"*40, len(issues),"^"*40)

# Extract Suggestions

## Pipeline

In [15]:
class SuggestionGenerator(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        feedback: str

    class Output(pydantic.BaseModel):
        class Suggestion(pydantic.BaseModel):
            title: str
            description: str
            reason: str

        suggestions: List[Suggestion]

    class GenerateSuggestions(dspy.Signature):
        """
List valid improvement proposals, feature requests and ideas, that a customer has about a product (context is provided), from the customer's feedback.
- Suggestions that the customer did not explicitly state are invalid suggestions.
- If the customer is uncertain of a suggestion it is an invalid suggestion.
- Suggestions without reasons behind the proposals are still valid suggestions.
- Issues, concerns, complaints, reviews, opinions or preferences are invalid suggestions.
- Suggestions that come from an issue are invalid suggestions.
- Lexicographic, syntactic, spelling, grammar or any other language mistakes of the feedback's text are invalid suggestions.
- Again, a suggestion cannot be supposed to be valid if the customer did not explicitly state it.
        """

        class Input(pydantic.BaseModel):
            context: str
            feedback: str

        class Output(pydantic.BaseModel):
            class Suggestion(pydantic.BaseModel):
                title: str = pydantic.Field(description="4 to 10 words, which cannot contain the words `suggestion` (or synonyms), `customer` (or synonyms) or the product's name.", max_length=100)
                description: str = pydantic.Field(description="Long, complete explanation, but without redundant information, using the feedback's original words. Must focus solely on the suggestion by depersonalizing the sentences.")
                reason: str = pydantic.Field(description=f'The customer\'s motivation behind the proposal of the suggestion, if any must always start with `This will`, else `{UNKNOWN_OPTION}`.')

            suggestions: List[Suggestion] = pydantic.Field(description="If any, else `[]`.")

        input: Input = dspy.InputField()
        output: Output = dspy.OutputField()

    def __init__(self) -> None:
        super().__init__()

        self.generate_suggestions = ChainOfThought(self.GenerateSuggestions, max_retries=3, explain_errors=False)

        self.activate_assertions(handler=dspy.backtrack_handler, max_backtracks=3)

    def forward(self, input: Input) -> dspy.Prediction:
        suggestions = self.generate_suggestions(input=self.GenerateSuggestions.Input(
            context=input.context,
            feedback=input.feedback,
        )).output.suggestions

        return dspy.Prediction(output=self.Output(
            suggestions=[self.Output.Suggestion(
                title=suggestion.title,
                description=suggestion.description,
                reason=suggestion.reason if suggestion.reason.upper() != UNKNOWN_OPTION else "",
            ) for suggestion in suggestions],
        ))

In [16]:
class InfoInferrer(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        feedback: str
        suggestion: str
        categories: List[str]

    class Output(pydantic.BaseModel):
        importance: str
        category: str

    class InferInfo(dspy.Signature):
        """
Infer the following information from a suggestion that an LLM extracted from the feedback of a customer.
- Discern the importance, valid options (`importances`) are provided.
- Discern the category, valid options (`categories`) are provided.
        """

        class Input(pydantic.BaseModel):
            context: str
            feedback: str
            suggestion: str
            importances: List[str]
            categories: List[str]

        class Output(pydantic.BaseModel):
            importance: str = pydantic.Field(description="The valid option that best fits.")
            category: str = pydantic.Field(description=f"The valid option that best fits, if any, else `{UNKNOWN_OPTION}`.")

        input: Input = dspy.InputField()
        output: Output = dspy.OutputField()

    def __init__(self) -> None:
        super().__init__()

        self.infer_info = ChainOfThought(self.InferInfo, max_retries=3, explain_errors=False)

        self.activate_assertions(handler=dspy.backtrack_handler, max_backtracks=3)

    def forward(self, input: Input) -> dspy.Prediction:
        info = self.infer_info(input=self.InferInfo.Input(
            context=input.context,
            feedback=input.feedback,
            suggestion=input.suggestion,
            importances=[importance.replace("_", " ") for importance in Importance.list()],
            categories=list({category.replace("_", " ") for category in input.categories + [UNKNOWN_OPTION]}),
        )).output

        importance = info.importance.upper().replace(" ", "_")

        dspy.Assert(
            importance in Importance.list(),
            f'Importance must be {self.InferInfo.Output.model_fields["importance"].description}! `{importance}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in Importance.list()])
        )

        category = info.category.upper().replace(" ", "_")
        if not input.categories:
            category = UNKNOWN_OPTION

        dspy.Suggest(
            category in input.categories or category == UNKNOWN_OPTION,
            f'Category must be {self.InferInfo.Output.model_fields["category"].description}! `{category}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in input.categories])
        )

        if category not in input.categories:
            category = UNKNOWN_OPTION

        return dspy.Prediction(output=self.Output(
            importance=importance,
            category=category,
        ))

In [17]:
class SuggestionExtractor(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        categories: List[str]
        feedback: str

    class Output(pydantic.BaseModel):
        class Suggestion(pydantic.BaseModel):
            title: str
            description: str
            reason: str
            importance: str
            category: str

        suggestions: List[Suggestion]

    def __init__(self) -> None:
        super().__init__()

        self.SuggestionGenerator = SuggestionGenerator
        self.generate_suggestions = self.SuggestionGenerator()
        self.generate_suggestions.load("artifacts/suggestion_extractor/suggestion_generator/labeled_few_shot.json")
        self.InfoInferrer = InfoInferrer
        self.infer_info = self.InfoInferrer()
        self.infer_info.load("artifacts/suggestion_extractor/info_inferrer/labeled_few_shot.json")

    def forward(self, input: Input) -> dspy.Prediction:
        generated_suggestions = self.generate_suggestions(input=self.SuggestionGenerator.Input(
            context=input.context,
            feedback=input.feedback,
        )).output.suggestions

        suggestions = []
        for suggestion in generated_suggestions:
            info = self.infer_info(input=self.InfoInferrer.Input(
                context=input.context,
                feedback=input.feedback,
                suggestion=suggestion.description,
                categories=input.categories,
            )).output

            suggestions.append(self.Output.Suggestion(
                title=suggestion.title,
                description=suggestion.description,
                reason=suggestion.reason,
                importance=info.importance,
                category=info.category,
            ))

        return dspy.Prediction(output=self.Output(
            suggestions=suggestions,
        ))

## Evaluation

TODO: Suggestion labels are needed to evaluate the pipeline

In [None]:
from random import random

m = int(random() * len(feedbacks))
fed = feedbacks.iloc[m]

print(fed["translation"])
print()
print(fed["categories"])
print("="*40, m, "="*40)

suggestions = SuggestionExtractor()(input=SuggestionExtractor.Input(
    context=fed["context"],
    categories=fed["categories"],
    feedback=fed["translation"],
)).output.suggestions
for suggestion in suggestions:
    print(f"[{suggestion.importance}]", suggestion.title)
    print("Category:", suggestion.category or "None")
    print()
    print(suggestion.description)
    if suggestion.reason:
        print("Reason:")
        print(suggestion.reason)

    print("-"*80)

print("^"*40, len(suggestions),"^"*40)

# Extract Review

## Pipeline

In [19]:
class ReviewExtractor(dspy.Module):
    class Input(pydantic.BaseModel):
        context: str
        categories: List[str]
        feedback: str

    class Output(pydantic.BaseModel):
        class Review(pydantic.BaseModel):
            content: str
            keywords: List[str]
            sentiment: str
            emotions: List[str]
            intention: str
            category: str

        review: Review

    class InferInfo(dspy.Signature):
        """
Infer the following information from the customer's feedback of a product (context is provided).
- List the most important keywords, following the following rules:
    - Limit each keyword to 3 words maximum.
    - Only include keywords the customer explicitly stated.
    - Do not include emojis in the keywords.
    - Do not include the name of the product in the keywords.
- Discern the sentiment, valid options (`sentiments`) are provided.
- Discern the emotions, valid options (`emotions`) are provided.
- Discern the intention, valid options (`intentions`) are provided, following the following rules:
    - To `retain` means to have the intention to buy again, renew and/or recommend the product.
    - To `churn` means to have the intention to return, refund, cancel and/or discourage the product. Critical issues also cause customer churn.
    - To `recommend` cannot be assumed if the customer did not explicitly state it (except if synonyms were used).
    - To `discourage` is very likely if the customer has the intention to churn.
- Discern the category, valid options (`categories`) are provided.
        """

        class Input(pydantic.BaseModel):
            context: str
            feedback: str
            sentiments: List[str]
            emotions: List[str]
            intentions: List[str]
            categories: List[str]

        class Output(pydantic.BaseModel):
            keywords: List[str] = pydantic.Field(description="If any, else `[]`.", max_items=10)
            sentiment: str = pydantic.Field(description="The valid option that best fits.")
            emotions: List[str] = pydantic.Field(description="The valid options that best fit, if any, else `[]`.", max_items=4)
            intention: str = pydantic.Field(description=f"The valid option that best fits, if any, else `{UNKNOWN_OPTION}`.")
            category: str = pydantic.Field(description=f"The valid option that best fits, if any, else `{UNKNOWN_OPTION}`.")

        input: Input = dspy.InputField()
        output: Output = dspy.OutputField()

    def __init__(self) -> None:
        super().__init__()

        self.infer_info = ChainOfThought(self.InferInfo, max_retries=3, explain_errors=False)

        self.activate_assertions(handler=dspy.backtrack_handler, max_backtracks=3)
        self.load("artifacts/review_extractor/labeled_few_shot.json")

    def forward(self, input: Input) -> dspy.Prediction:
        info = self.infer_info(input=self.InferInfo.Input(
            context=input.context,
            feedback=input.feedback,
            sentiments=[sentiment.replace("_", " ") for sentiment in Sentiment.list()],
            emotions=[emotion.replace("_", " ") for emotion in Emotion.list()],
            intentions=[intention.replace("_", " ") for intention in Intention.list() + [UNKNOWN_OPTION]],
            categories=list({category.replace("_", " ") for category in input.categories + [UNKNOWN_OPTION]}),
        )).output

        keywords = [keyword.lower() for keyword in info.keywords if keyword]

        inexistent_keywords = list(filter(lambda keyword: keyword not in input.feedback.lower(), keywords))
        dspy.Suggest(
            not inexistent_keywords,
            "All keywords must be included in the customer's feedback! Keywords not included:\n" + "".join([f"- {keyword}\n" for keyword in inexistent_keywords]),
        )

        keywords = list(set(keywords) - set(inexistent_keywords))

        long_keywords = list(filter(lambda keyword: len(keyword.split()) > 3, keywords))
        dspy.Suggest(
            not long_keywords,
            "Each keyword must be 3 words maximum! Keywords too long:\n" + "".join([f"- {keyword}\n" for keyword in long_keywords]),
        )

        keywords = list(set(keywords) - set(long_keywords))

        emoji_keywords = list(filter(lambda keyword: emoji.emoji_count(keyword) > 0, keywords))
        dspy.Suggest(
            not emoji_keywords,
            "Keywords cannot include emojis! Keywords with emojis:\n" + "".join([f"- {keyword}\n" for keyword in emoji_keywords]),
        )

        keywords = list(set(keywords) - set(emoji_keywords))

        sentiment = info.sentiment.upper().replace(" ", "_")

        dspy.Assert(
            sentiment in Sentiment.list(),
            f'Sentiment must be {self.InferInfo.Output.model_fields["sentiment"].description}! `{sentiment}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in Sentiment.list()])
        )

        emotions = [emotion.upper().replace(" ", "_") for emotion in info.emotions]

        invalid_emotions = list(filter(lambda emotion: emotion not in Emotion.list(), emotions))
        dspy.Assert(
            not invalid_emotions,
            f'Emotions must be {self.InferInfo.Output.model_fields["emotions"].description}! Invalid options:\n' + "".join([f"- {emotion}\n" for emotion in invalid_emotions]) + 'Valid options are:\n' + "".join([f"- {option}\n" for option in Emotion.list()]),
        )

        emotions = list(set(emotions))

        intention = info.intention.upper().replace(" ", "_")

        dspy.Suggest(
            intention in Intention.list() or intention == UNKNOWN_OPTION,
            f'Intention must be {self.InferInfo.Output.model_fields["intention"].description}! `{intention}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in Intention.list()])
        )

        if intention not in Intention.list():
            intention = UNKNOWN_OPTION

        category = info.category.upper().replace(" ", "_")
        if not input.categories:
            category = UNKNOWN_OPTION

        dspy.Suggest(
            category in input.categories or category == UNKNOWN_OPTION,
            f'Category must be {self.InferInfo.Output.model_fields["category"].description}! `{category}` is NOT a valid option. Valid options are:\n' + "".join([f"- {option}\n" for option in input.categories])
        )

        if category not in input.categories:
            category = UNKNOWN_OPTION

        return dspy.Prediction(output=self.Output(
            review=self.Output.Review(
                content=input.feedback,
                keywords=keywords,
                sentiment=sentiment,
                emotions=emotions,
                intention=intention,
                category=category,
            ),
        ))

## Evaluation

TODO: Review labels are needed to evaluate the pipeline

In [None]:
from random import random

m = int(random() * len(feedbacks))
fed = feedbacks.iloc[m]

print(fed["translation"])
print()
print(fed["categories"])
print("="*40, m, "="*40)

review = ReviewExtractor()(input=ReviewExtractor.Input(
    context=fed["context"],
    categories=fed["categories"],
    feedback=fed["translation"],
)).output.review
print("Keywords:")
for keyword in review.keywords:
    print(f"  - {keyword}")
print(f"Sentiment: {review.sentiment}")
print(f"Emotions: {', '.join(review.emotions)}")
print(f"Intention: {review.intention}")
print(f"Category: {review.category}")