In [6]:
# Set OpenAI API Key to the environment variable. You can also pass the token to dspy.LM()
import getpass
import os

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Get OpenAI API key from environment variable
openai_api_key = os.getenv("OPENAI_API_KEY")
import dspy

# Define your model. We will use OpenAI for simplicity
model_name = "gpt-4o-mini"

# Leverage default authentication inside the Databricks context (notebooks, workflows, etc.)
# Note that an OPENAI_API_KEY environment must be present. You can also pass the token to dspy.LM()
lm = dspy.LM(
    model=f"openai/{model_name}",
    max_tokens=500,
    temperature=0.1,
)
dspy.settings.configure(lm=lm)


In [8]:
import mlflow

mlflow.set_experiment("DSPy Quickstart")


<Experiment: artifact_location='file:///Users/saeedanwar/Desktop/mlflow/mlruns/331215673284591059', creation_time=1734550550669, experiment_id='331215673284591059', last_update_time=1734550550669, lifecycle_stage='active', name='DSPy Quickstart', tags={}>

In [9]:
mlflow.dspy.autolog()


Set Up Data
Next, we will download the Reuters 21578 dataset from Huggingface. We also write a utility to ensure that our train/test split has the same labels.

In [42]:
import numpy as np
import pandas as pd
from dspy.datasets.dataset import Dataset


def read_data_and_subset_to_categories() -> tuple[pd.DataFrame]:
    """
    Read the reuters-21578 dataset. Docs can be found in the url below:
    https://huggingface.co/datasets/yangwang825/reuters-21578
    """

    # Read train/test split
    file_path = "hf://datasets/yangwang825/reuters-21578/{}.json"
    train = pd.read_json(file_path.format("train"))
    test = pd.read_json(file_path.format("test"))

    # Clean the labels
    label_map = {
        0: "acq",
        1: "crude",
        2: "earn",
        3: "grain",
        4: "interest",
        5: "money-fx",
        6: "ship",
        7: "trade",
    }

    train["label"] = train["label"].map(label_map)
    test["label"] = test["label"].map(label_map)

    return train, test


class CSVDataset(Dataset):
    def __init__(
        self, n_train_per_label: int = 20, n_test_per_label: int = 10, *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.n_train_per_label = n_train_per_label
        self.n_test_per_label = n_test_per_label

        self._create_train_test_split_and_ensure_labels()

    def _create_train_test_split_and_ensure_labels(self) -> None:
        """Perform a train/test split that ensure labels in `dev` are also in `train`."""
        # Read the data
        train_df, test_df = read_data_and_subset_to_categories()

        # Sample for each label
        train_samples_df = pd.concat(
            [group.sample(n=self.n_train_per_label) for _, group in train_df.groupby("label")]
        )
        test_samples_df = pd.concat(
            [group.sample(n=self.n_test_per_label) for _, group in test_df.groupby("label")]
        )


        # Set DSPy class variables
        self._train = train_samples_df.to_dict(orient="records")
        self._dev = test_samples_df.to_dict(orient="records")


# Limit to a small dataset to showcase the value of bootstrapping
dataset = CSVDataset(n_train_per_label=3, n_test_per_label=1)



In [43]:
# Create train and test sets containing DSPy
# Note that we must specify the expected input value name
train_dataset = [example.with_inputs("text") for example in dataset.train]
test_dataset = [example.with_inputs("text") for example in dataset.dev]
unique_train_labels = {example.label for example in dataset.train}

print(len(train_dataset), len(test_dataset))
print(f"Train labels: {unique_train_labels}")
print(train_dataset[0])

24 8
Train labels: {'ship', 'interest', 'grain', 'crude', 'money-fx', 'trade', 'acq', 'earn'}
Example({'label': 'interest', 'text': 'volcker says fed policy not linked to rate rise federal reserve board chairman paul volcker said that he did not believe there was a connection between the fed s policies and the recent rise in the prime interest rate by most major u s banks asked by reporters following testimony before the senate banking committee whether the fed had anything to do with the rise to pct in the prime he replied not that i was able to detect reuter'}) (input_keys={'text'})


In [44]:
# Create directories if they don't exist
import os
os.makedirs("data", exist_ok=True)

# Store train and test datasets locally as JSON files
train_df, test_df = read_data_and_subset_to_categories()
train_df.to_json("data/train.json", orient="records")
test_df.to_json("data/test.json", orient="records")
print("Files saved successfully to data/train.json and data/test.json")


Files saved successfully to data/train.json and data/test.json


note that we simply provide the expected labels to output field in the TextClassificationSignature class. From this initial state, we’ll look to use DSPy to learn to improve our classifier accuracy.

In [45]:
class TextClassificationSignature(dspy.Signature):
    text = dspy.InputField()
    label = dspy.OutputField(
        desc=f"Label of predicted class. Possible labels are {unique_train_labels}"
    )
    
    def __str__(self):
        return f"Input text: {self.text}\nPredicted label: {self.label}"
class TextClassifier(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate_classification = dspy.Predict(TextClassificationSignature)

    def forward(self, text: str):
        return self.generate_classification(text=text)


In [46]:
# In Python, a module is a file containing Python definitions and statements
# dspy.predict is a module that provides functionality for making predictions
# Let's check its type
type(dspy.predict)

module

In [47]:
from copy import copy

# Initilize our impact_improvement class
text_classifier = copy(TextClassifier())

message = "I am interested in space"
print(text_classifier(text=message))

message = "I hate ice skating"
print(text_classifier(text=message))


Prediction(
    label='interest'
)
Prediction(
    label='interest'
)


Training
To train, we will leverage BootstrapFewShotWithRandomSearch, an optimizer that will take bootstrap samples from our training set and leverage a random search strategy to optimize our predictive accuracy.

Note that in the below example, we leverage a simple metric definition of exact match, as defined in validate_classification, but dspy.Metrics can contain complex and LM-based logic to properly evaluate our accuracy

In [48]:
from dspy.teleprompt import BootstrapFewShotWithRandomSearch


def validate_classification(example, prediction, trace=None) -> bool:
    return example.label == prediction.label


optimizer = BootstrapFewShotWithRandomSearch(
    metric=validate_classification,
    num_candidate_programs=5,
    max_bootstrapped_demos=2,
    num_threads=1,
)

compiled_pe = optimizer.compile(copy(TextClassifier()), trainset=train_dataset)


Going to sample between 1 and 2 traces per predictor.
Will attempt to bootstrap 5 candidate sets.
Average Metric: 21.00 / 24 (87.5%): 100%|██████████| 24/24 [00:22<00:00,  1.07it/s]

2024/12/19 00:24:10 INFO dspy.evaluate.evaluate: Average Metric: 21 / 24 (87.5%)



New best score: 87.5 for seed -3
Scores so far: [87.5]
Best score so far: 87.5
Average Metric: 23.00 / 24 (95.8%): 100%|██████████| 24/24 [00:27<00:00,  1.15s/it]

2024/12/19 00:24:37 INFO dspy.evaluate.evaluate: Average Metric: 23 / 24 (95.8%)



New best score: 95.83 for seed -2
Scores so far: [87.5, 95.83]
Best score so far: 95.83


  8%|▊         | 2/24 [00:01<00:13,  1.67it/s]


Bootstrapped 2 full traces after 2 examples for up to 1 rounds, amounting to 2 attempts.
Average Metric: 6.00 / 24 (25.0%):  25%|██▌       | 6/24 [00:07<00:21,  1.20s/it]

KeyboardInterrupt: 

In [37]:
def check_accuracy(classifier, test_data: pd.DataFrame = test_dataset) -> float:
    residuals = []
    predictions = []
    for example in test_data:
        prediction = classifier(text=example["text"])
        residuals.append(int(validate_classification(example, prediction)))
        predictions.append(prediction)
    return residuals, predictions
# 

uncompiled_residuals, uncompiled_predictions = check_accuracy(copy(TextClassifier()))
print(f"Uncompiled accuracy: {np.mean(uncompiled_residuals)}")

compiled_residuals, compiled_predictions = check_accuracy(compiled_pe)
print(f"Compiled accuracy: {np.mean(compiled_residuals)}")


Uncompiled accuracy: 0.625


NameError: name 'compiled_pe' is not defined