In [1]:
import asyncio
from openweights import OpenWeights # type: ignore
from dotenv import load_dotenv # type: ignore
import random
load_dotenv()
ow = OpenWeights()

import openweights.jobs.vllm

from dataclasses import dataclass, asdict
import time
import json
import plotly.express as px
import plotly.figure_factory as ff
import pandas as pd
from typing import List, Dict
import os

@dataclass 
class RequestResult:
    """Results for a single request"""
    completion_time: float
    total_tokens: int
    prompt_tokens: int
    completion_tokens: int

@dataclass
class LoadTestResult:
    """Class to save all API and client args together with load test results"""
    name: str
    model: str
    request_timeout: float 
    per_token_timeout: float
    max_num_seqs: int
    dataset_size: int
    n_gpus: int
    total_time: float
    total_tokens: int
    total_requests: int
    throughput_requests: float  # requests per second
    throughput_tokens: float    # tokens per second
    avg_latency: float         # seconds per request
    per_request_results: List[Dict]  # List of RequestResult as dicts
    
    def to_json(self, filename):
        os.makedirs(os.path.dirname(filename), exist_ok=True)
        with open(filename, 'w') as f:
            json.dump(asdict(self), f, indent=2)
    
    @classmethod
    def from_json(cls, filename):
        with open(filename) as f:
            data = json.load(f)
        return cls(**data)

def plot_results(results):
    """Create interactive plots of results"""
    # Latency vs Throughput scatter plot
    df = pd.DataFrame([asdict(r) for r in results])
    fig1 = px.scatter(
        df,
        x="throughput_requests",
        y="avg_latency",
        hover_data=["name", "model", "max_num_seqs", "n_gpus"],
        title="Load Test Results - Latency vs Throughput"
    )
    fig1.show()
    
    # Distribution of completion times
    for result in results:
        df_requests = pd.DataFrame(result.per_request_results)
        fig2 = ff.create_distplot(
            [df_requests['completion_time']], 
            [result.name],
            bin_size=0.1
        )
        fig2.update_layout(
            title=f"Distribution of Completion Times - {result.name}",
            xaxis_title="Completion Time (s)",
            yaxis_title="Density"
        )
        fig2.show()

def get_dataset(size):
    """Generate test dataset"""
    return [
        [{"role": "user", "content": "Please explain in great detail the history of China. Start with a general history, then add chaopter that explain in detail the history of every major city, and then add chapters that explain the history of every major dynasty. Be very detailed and resemble the style of wikipedia."}]
        for _ in range(size)
    ]

async def load_test(
    name: str,
    model = "unsloth/DeepSeek-R1-Distill-Qwen-1.5B",
    request_timeout: float = 5,
    per_token_timeout: float = 1,
    max_num_seqs: int = 100,
    dataset_size: int = 1000,
    n_gpus: int = 1,
    n_tokens=[600]
):
    """Deploy a model with the given vllm/client args and test the performance"""
    deploy_kwargs = dict(
        max_num_seqs=max_num_seqs,
        requires_vram_gb = n_gpus * 65,
    )
    dataset = get_dataset(dataset_size)
    sem = asyncio.Semaphore(max_num_seqs)
    
    start_time = time.time()
    async with ow.api.deploy(model, **deploy_kwargs):
        ow.chat.request_timeout = request_timeout
        ow.chat.per_token_timeout = per_token_timeout

        async def timed_request(messages):
            async with sem:
                req_start = time.time()
                n_tokens_choice = random.choice(n_tokens)
                response = await ow.async_chat.completions.create(model=model, messages=messages, max_tokens=n_tokens_choice)
                print(f"Completion time: {time.time() - req_start:.2f}s")
                return RequestResult(
                    completion_time=time.time() - req_start,
                    total_tokens=response.usage.total_tokens,
                    prompt_tokens=response.usage.prompt_tokens,
                    completion_tokens=response.usage.completion_tokens
                )

        responses = await asyncio.gather(
            *[timed_request(messages) for messages in dataset]
        )
    total_time = time.time() - start_time
    
    total_tokens = sum(r.total_tokens for r in responses)
    total_requests = len(responses)
    
    result = LoadTestResult(
        name=name,
        model=model,
        request_timeout=request_timeout,
        per_token_timeout=per_token_timeout,
        max_num_seqs=max_num_seqs,
        dataset_size=dataset_size,
        n_gpus=n_gpus,
        total_time=total_time,
        total_tokens=total_tokens,
        total_requests=total_requests,
        throughput_requests=total_requests/total_time,
        throughput_tokens=total_tokens/total_time,
        avg_latency=total_time/total_requests,
        per_request_results=[asdict(r) for r in responses]
    )
    
    result.to_json(f"results/{name}.json")
    return result


async def eval_max_num_seqs(**_load_test_kwargs):
    """Evaluate the impact of max_num_seqs on throughput"""
    results = []
    for max_num_seqs in [1, 10, 100]:
        name = f"max_num_seqs_{max_num_seqs}"
        results.append(await load_test(name, max_num_seqs=max_num_seqs, **_load_test_kwargs))
    plot_results(results)


Connected to org:  Plan-B


In [2]:
await eval_max_num_seqs(n_tokens=[600], request_timeout=600)

AttributeError: 'OpenWeights' object has no attribute 'deploy'

In [None]:
await eval_max_num_seqs(max_tokens=[1000])