In [1]:
from collections import Counter
import dspy
from dspy import OpenAI, settings
from dspy.teleprompt import MIPROv2, BootstrapFewShot
from dspy.evaluate.evaluate import Evaluate
from dspy import ColBERTv2
from dsp.utils import print_message, normalize_text
from dotenv import load_dotenv
import os
load_dotenv("/media/uberdev/ddrv/gitFolders/codeai_fusion/.env")

  from .autonotebook import tqdm as notebook_tqdm


True

In [2]:
def f1_score_01(prediction, ground_truth):
    """Calculates the F1 Score of multiple predictions 
    that has been carried out for a single data point"""
    
    prediction_tokens = [normalize_text(elem) for elem in prediction.split("|")]
    ground_truth_tokens = [elem] * len(prediction_tokens)
    
    common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
    num_same = sum(common.values())
    
    if len(prediction_tokens) == len(ground_truth_tokens) == 0:
        print_message("\n#> F1 Metric: Rare edge case of len(prediction_tokens) == len(ground_truth_tokens) == 0.\n")
    
    if num_same == 0:
        return 0
    
    precision = 1.0 * num_same / len(prediction_tokens)
    recall = 1.0 * num_same / len(ground_truth_tokens)
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    print(f"precision: {precision}, recall: {recall}, f1: {f1}") 
    return f1

In [2]:
from openinference.instrumentation.dspy import DSPyInstrumentor
# instruments the internal calls in DSPy library
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
# help to get the span of http requests to the APIs
from opentelemetry.sdk import trace as trace_sdk
#processes the data collected from the spans
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from openinference.semconv.resource import ResourceAttributes
import phoenix as px

INFO:phoenix.config:ðŸ“‹ Ensuring phoenix working directory: /home/uberdev/.phoenix
INFO:phoenix.inferences.inferences:Dataset: phoenix_inferences_672e330f-93cb-4250-b12b-6c8f52f18d48 initialized


In [3]:
endpoint = "http://127.0.0.1:6006/v1/traces"

In [4]:
resource = Resource(attributes={
    ResourceAttributes.PROJECT_NAME: 'bswfs-f1-score'
})

In [5]:
tracer_provider = trace_sdk.TracerProvider(resource=resource)
span_otlp_exporter = OTLPSpanExporter(endpoint=endpoint)

In [6]:
tracer_provider.add_span_processor(SimpleSpanProcessor(span_exporter=span_otlp_exporter))

trace_api.set_tracer_provider(tracer_provider=tracer_provider)

DSPyInstrumentor().instrument(skip_dep_check=True) # here where DSPy is instrumented

In [3]:
# Need to check if the dspy is writing logs
import os
llm = OpenAI(model='gpt-4o-mini',
             api_key=os.environ['OPENAI_API_KEY'],
             max_tokens=2000)
settings.configure(lm=llm)

In [9]:
llm("Explain f1-score")

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


['The F1 score is a statistical measure used to evaluate the performance of a binary classification model. It is particularly useful when the classes are imbalanced, meaning that one class is more frequent than the other. The F1 score combines two important metrics: precision and recall.\n\n### Definitions:\n- **Precision**: This is the ratio of true positive predictions to the total predicted positives. It answers the question: "Of all the instances that were predicted as positive, how many were actually positive?"\n  \n  \\[\n  \\text{Precision} = \\frac{\\text{True Positives (TP)}}{\\text{True Positives (TP)} + \\text{False Positives (FP)}}\n  \\]\n\n- **Recall**: This is the ratio of true positive predictions to the total actual positives. It answers the question: "Of all the actual positive instances, how many were correctly predicted as positive?"\n  \n  \\[\n  \\text{Recall} = \\frac{\\text{True Positives (TP)}}{\\text{True Positives (TP)} + \\text{False Negatives (FN)}}\n  \\]\

In [4]:
def F1(prediction, answers_list):
    # list of answers are entering for a single prediction
    assert isinstance(answers_list, list)
    return max(f1_score_01(prediction, ans) for ans in answers_list)

def answer_match(prediction, answers, frac=1.0):
    return F1(prediction, answers) >= frac

def answer_f1_match_01(example, pred, trace=[], frac=0.95):
    assert isinstance(example.answer, (str, list))
    print(f"Looking at the traces, {trace}")
    if isinstance(example.answer, str):
        return answer_match(pred.answer, [example.answer], frac=frac)
    else:
        return answer_match(pred.answer, example.answer, frac=frac)

In [5]:
class NewsCategorization(dspy.Signature):
    news_body = dspy.InputField(desc="The body of the news to be categorized")
    answer = dspy.OutputField(desc="It could be 'fake|real|real|fake|fake'",)

class CoTCombined(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(NewsCategorization)
        self.hops = 5 
        # this is arbitrary value to create 5 preps per news_body

    def forward(self, news_body):
        # forward method where the actual predition 
        # is done
        pred_list = []
        for idx in range(self.hops):
            # same news body is going to enter self.hops time
            pred_one = self.prog(news_body=news_body)
            pred_list.append(pred_one)
            answer_piped = "|".join(pred_list)
        return dspy.Prediction(answer=answer_piped)
# https://github.com/stanfordnlp/dspy/issues/577

In [7]:
class NewsCategorization(dspy.Signature):
    news_body = dspy.InputField(desc="The body of the news to be categorized")
    answer = dspy.OutputField(desc="Should be 'fake' or 'real'")

class CoTCombined(dspy.Module):
    def __init__(self):
        super().__init__()
        self.prog = dspy.ChainOfThought(NewsCategorization)
        self.history = []  # This will store the history of operations

    def forward(self, news_body):
        # planning to making multiple predictions later
        # pred_one = self.prog(news_body=news) # << The variable news was wrongly assigned
        pred_one = self.prog(news_body=news_body)
        return dspy.Prediction(answer=pred_one)

In [6]:
prgm_under_test = CoTCombined()

In [7]:
custom_trainset = []
custom_devset = []

with open('../train_fake_real_news.tsv', 'r') as tsv:
    lines = tsv.readlines()
    for line in lines[1:21]:
        news, truth = line.split("\t")
        if truth.strip() == '0':
            custom_trainset.append(dspy.Example(news_body=news, answer='fake').with_inputs("news_body"))
        else:
            custom_trainset.append(dspy.Example(news_body=news, answer='real').with_inputs("news_body"))
    for line in lines[22:43]:
        news, truth = line.split("\t")
        if truth.strip() == '0':
            custom_devset.append(dspy.Example(news_body=news, answer='fake').with_inputs("news_body"))
        else:
            custom_devset.append(dspy.Example(news_body=news, answer='real').with_inputs("news_body"))

In [8]:
custom_trainset[0]

Example({'news_body': ' Courts Decide Conspiracy Nut Alex Jones Is Too Crazy To Raise His Own Kids (DETAILS)', 'answer': 'fake'}) (input_keys={'news_body'})

In [9]:
prgm_under_test(custom_trainset[9])

AssertionError: Need format_handler for news_body of type <class 'dspy.primitives.example.Example'>