In [None]:
import pip
def import_or_install(package):
    try:
        __import__(package)
    except ImportError:
        pip.main(['install', package])  

import_or_install('datasets')

In [None]:
import adalflow as adal
import re
import os
import json
# Development of the Adal Component
from typing import Any
from adalflow.eval import AnswerMatchAcc
from adalflow.datasets import Example

# Dataset Loading

In [None]:
from datasets import load_dataset
ds = load_dataset("openai/gsm8k", "main")
print(ds)

In [None]:
# Split the dataset into train into validation, Test is already split
train_test_split = ds['train'].train_test_split(test_size=0.4, seed=42, shuffle=True,)
train_data = train_test_split['train']
test_data = train_test_split['test']

# Making test data into validation data
val_data = ds['test']

print(len(train_data))
print(len(val_data))
print(len(test_data))

In [None]:
# Prepare the dataset for the adalflow format
from adalflow.utils import Dataset
from adalflow.datasets import Example
from adalflow.utils.data import subset_dataset
import uuid
class GSM8K_Train(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        id=str(uuid.uuid4())
        return Example(id=index, question=self.data[index]['question'], answer=self.data[index]['answer'])

class GSM8K_Test(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        id=str(uuid.uuid4())
        return Example(id=index, question=self.data[index]['question'], answer=self.data[index]['answer'])
    
class GSM8K_Val(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        id=str(uuid.uuid4())
        return Example(id=index, question=self.data[index]['question'], answer=self.data[index]['answer'])
    
train_dataset = GSM8K_Train(ds['train'])
test_dataset = GSM8K_Test(ds['test'])
val_dataset = GSM8K_Val(ds['test'])

NUM_TRAIN_SAMPLES = 1000
NUM_TEST_SAMPLES = 1000
NUM_VAL_SAMPLES = 1000

train_dataset = subset_dataset(train_dataset, NUM_TRAIN_SAMPLES)
test_dataset = subset_dataset(test_dataset, NUM_TEST_SAMPLES)
val_dataset = subset_dataset(val_dataset, NUM_VAL_SAMPLES)

print(len(train_dataset))
print(len(test_dataset))
print(len(val_dataset))

#### Dataset Preparation Completed

In [5]:
@adal.func_to_data_component
def parse_integer_answer(answer: str):
    """A function that parses the last integer from a string using regular expressions."""
    try:
        # Use regular expression to find all sequences of digits
        numbers = re.findall(r"\d+", answer)
        if numbers:
            # Get the last number found
            answer = int(numbers[-1])
        else:
            answer = -1
    except ValueError:
        answer = -1

    return answer

In [6]:
# Initialize the few shot template
FEW_SHOT_TEMPLATE = r"""<START_OF_SYSTEM_PROMPT>
{{system_prompt}}
{# Few shot demos #}
{% if few_shot_demos is not none %}
Here are some examples:
{{few_shot_demos}}
{% endif %}
<END_OF_SYSTEM_PROMPT>
<START_OF_USER>
{{input_str}}
<END_OF_USER>
"""

In [7]:
from adalflow import GoogleGenAIClient
main_model_client =GoogleGenAIClient(api_key="<your_api_key>")
main_model_kwargs = {
        "model":"gemini-1.5-flash-002",
        "temperature":0.6,
        "top_p":0.95,
    }

In [None]:
# Making the task Pipeline
from adalflow import Parameter, ParameterType

class GSM8K_Task_Pipeline(adal.Component):
    def __init__(self, model_client: adal.ModelClient, model_kwargs: dict):
        super().__init__()
        system_prompt = adal.Parameter(
            data="You will answer a reasoning question. Think step by step. The last line of your response should be of the following format: 'Answer: $VALUE' where VALUE is a numerical value.",
            role_desc="To give task instruction to the language model in the system prompt",
            requires_opt=False,
            param_type=ParameterType.PROMPT,
        )

        few_shot_demos = adal.Parameter(
            data=None,
            role_desc="To give few shot examples to the language model in the system prompt",
            requires_opt=False,
            param_type=ParameterType.DEMOS,
        )

        self.llm_counter = adal.Generator(
            model_client=model_client,
            model_kwargs=model_kwargs,
            template=FEW_SHOT_TEMPLATE,
            prompt_kwargs={"system_prompt":system_prompt, "few_shot_demos":few_shot_demos},
            output_processors=parse_integer_answer,
            cache_path=None,
            use_cache=False,
            name="llm_counter",
        )

    def call(self, question: str,id: str):
        output= self.llm_counter(prompt_kwargs={"input_str": question},id=id)
        return output
    

# Testing of the task
task = GSM8K_Task_Pipeline(model_client=main_model_client, model_kwargs=main_model_kwargs)

try:    
    for i in val_dataset:
        print(task(question=i.question,id=i.id))
        break
    for i in train_dataset:
        print(task(question=i.question,id=i.id))
        break
    for i in test_dataset:
        print(task(question=i.question,id=i.id))
        break

    print("Successfully tested the task")
except Exception as e:
    print("Failed to test the task : Error : ",e)



In [9]:

class GSM8K_Counter_Component(adal.AdalComponent):
    def __init__(self, model_client: adal.ModelClient, model_kwargs: dict):
        task = GSM8K_Task_Pipeline(model_client=model_client, model_kwargs=model_kwargs)
        eval_func = AnswerMatchAcc(type="exact_match").compute_single_item
        super().__init__(task=task, eval_fn=eval_func)

    def prepare_task(self, sample:Example,):
        return self.task.call, {"question": sample.question, "id": sample.id}
    
    def prepare_eval(self, sample: Example, y_pred: adal.GeneratorOutput) -> float:
        y_label = -1
        if (y_pred is not None and y_pred.data is not None):  # if y_pred and y_pred.data: might introduce bug when the data is 0
            y_label = y_pred.data
        y_gt = parse_integer_answer(sample.answer)
        return self.eval_fn, {"y": y_label, "y_gt": y_gt}

In [None]:
from typing import Dict
from adalflow.utils.data import Subset

def diagnose(
    model_client: adal.ModelClient,
    model_kwargs: Dict,
    dataset: Subset,
    split: str,
) -> Dict:

    adal_component = GSM8K_Counter_Component(model_client=model_client, model_kwargs=model_kwargs)
    trainer = adal.Trainer(adaltask=adal_component,debug=False,)
    trainer.diagnose(dataset=dataset, split=split)

diagnose(model_client=main_model_client, model_kwargs=main_model_kwargs, dataset=train_dataset, split="TRAINING")
diagnose(model_client=main_model_client, model_kwargs=main_model_kwargs, dataset=val_dataset, split="VALIDATION")
diagnose(model_client=main_model_client, model_kwargs=main_model_kwargs, dataset=test_dataset, split="TESTING")

# Backward Training


In [17]:
# Initlize the backward training
from adalflow.core import BackwardEngine
backward_engine=BackwardEngine(
            model_client=GoogleGenAIClient(api_key="<your_api_key>"),
            model_kwargs={
                "model": "gemini-1.5-pro",
                "temperature": 0.6,
                "top_p": 0.95,
            }
        )

teacher_model_config = {
    "model_client": GoogleGenAIClient(api_key="<your_api_key>"),
    "model_kwargs": {
        "model": "gemini-1.5-pro",
        "temperature": 0.0,
        "top_p": 0.99,
    },
}

text_optimizer_model_config = {
    "model_client": GoogleGenAIClient(api_key="<your_api_key>"),
    "model_kwargs": {
        "model": "gemini-1.5-flash-002",
        "temperature": 0.6,
        "top_p": 0.95,
    },
}

In [20]:
class GSM8K_Task_Pipeline(adal.Component):
    def __init__(self, model_client: adal.ModelClient, model_kwargs: dict):
        super().__init__()
        system_prompt = adal.Parameter(
            data="You will answer a reasoning question. Think step by step. The last line of your response should be of the following format: 'Answer: $VALUE' where VALUE is a numerical value.",
            role_desc="To give task instruction to the language model in the system prompt",
            requires_opt=True,
            param_type=ParameterType.PROMPT,
        )

        few_shot_demos = adal.Parameter(
            data=None,
            role_desc="To give few shot examples to the language model in the system prompt",
            requires_opt=False,
            param_type=ParameterType.DEMOS,
        )

        self.llm_counter = adal.Generator(
            model_client=model_client,
            model_kwargs=model_kwargs,
            template=FEW_SHOT_TEMPLATE,
            prompt_kwargs={"system_prompt":system_prompt, "few_shot_demos":few_shot_demos},
            output_processors=parse_integer_answer,
            cache_path=None,
            use_cache=False,
            name="llm_counter",
        )

    def call(self, question: str,id: str):
        output= self.llm_counter(prompt_kwargs={"input_str": question},id=id)
        return output

In [21]:
from typing import Callable, Dict, Tuple,Any

class GSM8K_AdalComponent(adal.AdalComponent):
    def __init__(
        self,
        model_client: adal.ModelClient,
        model_kwargs: Dict,
    ):
        task = GSM8K_Task_Pipeline(model_client=model_client, model_kwargs=model_kwargs)
        eval_fn = AnswerMatchAcc(type="exact_match").compute_single_item
        loss_fn = adal.EvalFnToTextLoss(
            eval_fn=eval_fn,
            eval_fn_desc="exact_match: 1 if str(y) == str(y_gt) else 0",
            backward_engine=backward_engine
        )
        super().__init__(task=task, eval_fn=eval_fn, loss_fn=loss_fn)
        self.text_optimizer_model_config = text_optimizer_model_config
        self.teacher_model_config = teacher_model_config
        self.backward_engine = backward_engine

    def prepare_task(self, sample: Example):
        return self.task.call, {"question": sample.question, "id": sample.id}

    def prepare_eval(self, sample: Example, y_pred: adal.GeneratorOutput) -> float:
        y_label = -1
        if (y_pred is not None and y_pred.data is not None):  # if y_pred and y_pred.data: might introduce bug when the data is 0
            y_label = y_pred.data
        y_gt = parse_integer_answer(sample.answer)
        return self.eval_fn, {"y": y_label, "y_gt": y_gt}

    def prepare_loss(
        self, sample: Example, pred: adal.Parameter
    ) -> Tuple[Callable, Dict[str, Any]]:
        result=parse_integer_answer(sample.answer)
        y_gt = adal.Parameter(
            name="y_gt",
            data=result,
            eval_input=result,
            requires_opt=False,
        )
        pred.eval_input = pred.data
        return self.loss_fn, {"kwargs": {"y": pred, "y_gt": y_gt}}
    


In [22]:
adal_component = GSM8K_AdalComponent(
        model_client=main_model_client,
        model_kwargs=main_model_kwargs,
    )


In [None]:
from adalflow import Trainer
train_batch_size=4
raw_shots=1
bootstrap_shots=1
max_steps=12
num_workers=4
strategy="constrained"
debug=False

trainer = Trainer(
        train_batch_size=train_batch_size,
        strategy=strategy,
        # max_steps=max_steps,
        num_workers=num_workers,
        adaltask=adal_component,
        raw_shots=raw_shots,
        bootstrap_shots=bootstrap_shots,
        debug=debug,
        weighted_sampling=True,
    )
trainer.fit(train_dataset=train_dataset, val_dataset=val_dataset, test_dataset=test_dataset, debug=debug)