# Using Weights & Biases `Weave` with AWS `Bedrock`

In this notebook, you will learn to use our newly released tool for LLM practitioners.

You can use [Weave](https://wandb.github.io/weave/) to:

- Log and debug language model inputs, outputs, and traces
- Build rigorous, apples-to-apples evaluations for language model use cases
- Organize all the information generated across the LLM workflow, from experimentation and evaluations to production

## Setup

In [None]:
# aws sso login

In [1]:
# !pip install -U weave boto3

In [11]:
import json
import boto3
from pprint import pprint
from botocore.exceptions import ClientError

from utils import mprint

## Create a Weights & Biases `Weave` project to store your traces

In [3]:
import weave
weave.init('aws-genai')

Logged in as Weights & Biases user: capecape.
View Weave data at https://wandb.ai/capecape/aws-genai/weave


<weave.weave_client.WeaveClient at 0x11e4c3150>

Decorate your function call, that's it!

In [4]:
bedrock_client = boto3.client(service_name='bedrock-runtime')

@weave.op() # <- just add this 😎
def call_model(
    model_id: str, 
    messages: str, 
    max_tokens: int=400,
    ) -> dict:

    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "messages": messages,
        "max_tokens": max_tokens})
        
    response = bedrock_client.invoke_model(body=body,modelId=model_id)

    response_body = json.loads(response.get('body').read())
    return response_body

Let's first try using the amazing `Claude Sonnet 3.5`

In [6]:
model_id = 'anthropic.claude-3-5-sonnet-20240620-v1:0'

messages = [{"role": "user", 
             "content": [
                 {"type": "text", 
                  "text": (
                        "In Bash, how do I list all text files in the current directory "
                        "(excluding subdirectories) that have been modified in the last month?")
                  }
                 ]
            }
            ]

outputs = call_model(model_id, messages)

🍩 https://wandb.ai/capecape/aws-genai/r/call/01913663-5a1d-74b3-a8af-bd6e5a7520bf


In [7]:
pprint(outputs)

{'content': [{'text': 'To list all text files in the current directory '
                      '(excluding subdirectories) that have been modified in '
                      'the last month using Bash, you can use the `find` '
                      "command combined with the `file` command. Here's how "
                      'you can do it:\n'
                      '\n'
                      '```bash\n'
                      'find . -maxdepth 1 -type f -mtime -30 -exec file {} \\; '
                      '| grep text | cut -d: -f1\n'
                      '```\n'
                      '\n'
                      "Let's break down this command:\n"
                      '\n'
                      '1. `find .`: Start searching in the current directory.\n'
                      '\n'
                      '2. `-maxdepth 1`: Limit the search to the current '
                      "directory only (don't search subdirectories).\n"
                      '\n'
                      '3. `-type f`: 

realising this, we can refactor the code to be more concise

In [12]:
@weave.op
def format_prompt(prompt: str) -> list[dict]:
    messages = [{"role": "user", 
                "content": [
                    {"type": "text", 
                    "text": prompt}]}]
    return messages


@weave.op
def claude(model_id: str, prompt: str, max_tokens: int=400) -> str:
    messages = format_prompt(prompt)
    response_body = call_model(model_id, messages, max_tokens)
    return response_body["content"][0]["text"]

In [13]:
prompt = ("Give me a super simple starting code in PyTorch for training of a diffusion model. " 
          "Use a minimal dataset like CIFAR10")
response = claude(model_id, prompt, max_tokens=2000)

🍩 https://wandb.ai/capecape/aws-genai/r/call/01913664-f8e7-7582-897c-c385c34bea0d


In [14]:
mprint(response)

Here's a simple starting code for training a diffusion model using PyTorch on the CIFAR10 dataset. This example uses a basic U-Net architecture for the model and implements a simplified diffusion process:

```python
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

# Hyperparameters
n_epochs = 100
batch_size = 64
image_size = 32
channels = 3
time_steps = 1000
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load CIFAR10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Simple U-Net model
class SimpleUNet(nn.Module):
    def __init__(self):
        super(SimpleUNet, self).__init__()
        self.down1 = nn.Conv2d(channels, 64, 3, padding=1)
        self.down2 = nn.Conv2d(64, 128, 3, padding=1)
        self.up1 = nn.ConvTranspose2d(128, 64, 3, padding=1)
        self.up2 = nn.ConvTranspose2d(64, channels, 3, padding=1)
        self.time_mlp = nn.Linear(1, 64)

    def forward(self, x, t):
        t = self.time_mlp(t.unsqueeze(-1)).view(-1, 64, 1, 1)
        x1 = self.down1(x)
        x2 = self.down2(nn.functional.relu(x1))
        x3 = self.up1(nn.functional.relu(x2))
        x3 = x3 + t
        return self.up2(nn.functional.relu(x3 + x1))

# Diffusion model
class DiffusionModel:
    def __init__(self, model, beta_start=1e-4, beta_end=0.02):
        self.model = model
        self.betas = torch.linspace(beta_start, beta_end, time_steps)
        self.alphas = 1 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)

    def noise_images(self, x, t):
        sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod[t])
        sqrt_one_minus_alphas_cumprod = torch.sqrt(1 - self.alphas_cumprod[t])
        epsilon = torch.randn_like(x)
        return sqrt_alphas_cumprod.view(-1, 1, 1, 1) * x + sqrt_one_minus_alphas_cumprod.view(-1, 1, 1, 1) * epsilon, epsilon

    def train_step(self, x):
        t = torch.randint(0, time_steps, (x.shape[0],)).to(device)
        noisy_x, noise = self.noise_images(x, t)
        predicted_noise = self.model(noisy_x, t.float() / time_steps)
        return nn.functional.mse_loss(predicted_noise, noise)

# Initialize model and optimizer
model = SimpleUNet().to(device)
diffusion = DiffusionModel(model)
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Training loop
for epoch in range(n_epochs):
    for batch in dataloader:
        optimizer.zero_grad()
        x = batch[0].to(device)
        loss = diffusion.train_step(x)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {loss.item():.4f}")

# Save the model
torch.save(model.state_dict(), "diffusion_model.pth")
```

This code provides a basic implementation of a diffusion model using PyTorch. It includes:

1. Loading the CIFAR10 dataset
2. A simple U-Net architecture
3. A basic diffusion process
4. A training loop

Note that this is a minimal example and may not produce high-quality results. Real-world diffusion models typically use more complex architectures, longer training times, and additional techniques to improve performance.

To run this code, you'll need to have PyTorch and torchvision installed. You can improve the model by:

1. Using a more sophisticated U-Net architecture
2. Implementing techniques like attention mechanisms
3. Using a learning rate scheduler
4. Implementing sampling for image generation
5. Adding more layers and increasing the model's capacity

Remember that training diffusion models can be computationally expensive and may require a GPU for reasonable training times.

## Using the `anthropic` Python SDK with Bedrock

there is a better way of interacting with Claude

In [None]:
# !pip install -U "anthropic[bedrock]" instructor

In [15]:
from anthropic import AnthropicBedrock

client = AnthropicBedrock(
    aws_region="us-east-1",
)

output_message = client.messages.create(
    model="anthropic.claude-3-5-sonnet-20240620-v1:0",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hello, world"}]
)

🍩 https://wandb.ai/capecape/aws-genai/r/call/01913665-d022-7bb3-8694-0ce7b014821b


In [19]:
mprint(output_message.content[0].text)

Hello! How can I assist you today? I'm here to help with any questions you might have or tasks you need help with. Feel free to ask me about a wide range of topics or let me know if you need any specific information or guidance.

We still should probably refactor this code to a higher level function to call claude

In [20]:
@weave.op
def call_claude_bis(prompt: str, model_id: str, max_tokens: int=400) -> str:
    "Call Bedrock Claude using the anthropic Python SDK"
    messages = format_prompt(prompt)
    response_body = client.messages.create(
        model="anthropic.claude-3-5-sonnet-20240620-v1:0",
        max_tokens=256,
        messages=messages)
    return response_body.content[0].text

In [21]:
response = call_claude_bis("How do I say: This is really handy, in french?", model_id)
mprint(response)

🍩 https://wandb.ai/capecape/aws-genai/r/call/01913668-51ac-7f13-99ce-7a90ae606371


In French, you can say:

"C'est vraiment pratique."

Pronunciation guide:
"Say vray-mahn pra-teek"

This phrase translates directly to "This is really practical" or "This is really handy" in English. It's a common expression used to describe something that is very useful or convenient.

Alternatively, you could also say:

"C'est très utile." (This is very useful)
Pronunciation: "Say tray oo-teel"

Both expressions convey the same meaning of something being really handy or useful.

## Evaluation driven development

When working with LLMs, it is important to evaluate the quality of the model's responses.

We can use Weave [`Evaluation`](https://wandb.github.io/weave/tutorial-eval) to build rigorous, apples-to-apples evaluations for language model use cases.

Let's evaluate the models on the [Factual Inconsistency Benchmark](https://arxiv.org/abs/2211.08412v1) challenging dataset to improve check model performance to detect hallucination by identifying inconsistencies between a piece of text and a "summary"

In [22]:
import json
import random
import instructor
from pydantic import BaseModel
from pathlib import Path

DATA_PATH = Path("./data")
NUM_SAMPLES = 20

def read_jsonl(path):
    "returns a list of dictionaries"
    with open(path, 'r') as file:
        return [json.loads(line) for line in file]

fib_ds = random.sample(read_jsonl(DATA_PATH / "fib-val.jsonl"), NUM_SAMPLES)

In [23]:
fib_ds[3]

{'premise': 'Police were called to an address in Holland Street at about 16:15 on Wednesday where they found the body of James Chadwick, who was from the city.\nHe had a number of unexplained injuries.\nFollowing the results of a post-mortem examination, his death is now being treated as murder.\nOfficers have been carrying out door-to-door inquiries and reviewing CCTV images from the area.\nDet Ch Insp Iain Smith, of Police Scotland, said: "This investigation is at an early stage and it is important we establish what has happened which led to James Chadwick losing his life.\n"Our inquiries so far have established that Mr Chadwick was last seen on Monday 31 August and we\'re appealing to anyone who has any knowledge of his movements since 31 August to contact police as a matter of urgency."',
 'hypothesis': 'A murder investigation has been launched following the death of a man in Holland Street.',
 'target': 1}

In [24]:
fib_prompt = """You are an expert to detect factual inconsistencies and hallucinations. 
You will be given a document and a summary.
- Carefully read the full document and the provided summary.
- Identify Factual Inconsistencies: any statements in the summary that are not supported by or contradict the information in the document.
Factually Inconsistent: If any statement in the summary is not supported by or contradicts the document, label it as 0
Factually Consistent: If all statements in the summary are supported by the document, label it as 1

Highlight or list the specific statements in the summary that are inconsistent.
Provide a brief explanation of why each highlighted statement is inconsistent with the document.

Return in JSON format with `consistency` and a `reason` for the given choice. Encode special chars properly.

Document: 
{premise}
Summary: 
{hypothesis}
"""

we will use [`instructor`](https://github.com/jxnl/instructor) to get concsisten structured output from Claude.

In [25]:
class ModelOutput(BaseModel):
    consistency: int
    reason: str

# we need to patch the client with instructor to get structured output
inst_client = instructor.from_anthropic(client)

class ClaudeJudge(weave.Model):
    model_id: str
    max_tokens: int=1000
    system_message: str = "You are a helpful assistant and expert on extracting information in JSON format. Encode special chars properly."
    prompt_template: str = fib_prompt

    @weave.op
    def apply_prompt_template(self, premise:str, hypothesis:str) -> str:
        return self.prompt_template.format(premise=premise, hypothesis=hypothesis)

    @weave.op
    def predict(self, premise:str, hypothesis:str, **kwargs) -> int:
        prompt = self.apply_prompt_template(premise, hypothesis)
        messages = format_prompt(prompt)
        structured_output = inst_client.messages.create(
            model=self.model_id,
            max_tokens=self.max_tokens,
            system=self.system_message,
            messages=messages,
            response_model=ModelOutput)
        return structured_output.dict()

Let's try the model on a single sample

In [26]:
haiku = ClaudeJudge(model_id="anthropic.claude-3-haiku-20240307-v1:0")
response = haiku.predict(**fib_ds[3])
pprint(response)

🍩 https://wandb.ai/capecape/aws-genai/r/call/01913668-da21-7d42-bd3f-8d08ffa3890b
{'consistency': 1,
 'reason': 'The summary is factually consistent with the information provided '
           'in the document. The summary accurately states that a murder '
           'investigation has been launched following the death of a man in '
           'Holland Street, which is supported by the details in the document.'}


### Running the Evaluation

To do an evaluation, we will need:
- A dataset
- A model to evalute
- A scorer function

In [27]:
def accuracy(model_output, target):
    class_model_output = model_output.get('consistency') if model_output else None
    return {"accuracy": class_model_output == target}

In [28]:
evaluation = weave.Evaluation(dataset=fib_ds, scorers=[accuracy])

In [29]:
await evaluation.evaluate(haiku)

{'model_output': {'consistency': {'mean': 0.6}},
 'accuracy': {'accuracy': {'true_count': 12, 'true_fraction': 0.6}},
 'model_latency': {'mean': 3.588719141483307}}

Let's try the bigger brother `sonnet-3.5`

In [30]:
sonnet = ClaudeJudge(model_id="anthropic.claude-3-5-sonnet-20240620-v1:0")
await evaluation.evaluate(sonnet)

{'model_output': {'consistency': {'mean': 0.6}},
 'accuracy': {'accuracy': {'true_count': 14, 'true_fraction': 0.7}},
 'model_latency': {'mean': 7.258974945545196}}