In [1]:
import dotenv
import json
from twon_lss.simulations.wp3_simulation import agent_parameter_estimation
from twon_lss.simulations.wp3_simulation import (
    Simulation,
    SimulationArgs,
    WP3Agent,
    AgentInstructions,
    WP3LLM,
    agent_parameter_estimation,
    simulation_load_estimator,
)

from twon_lss.utility import LLM, Message, Chat, Noise
from twon_lss.schemas import Feed, User, Post

import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

In [2]:
ENV = dotenv.dotenv_values("../" * 3 + ".env")

In [3]:
AGENTS_INSTRUCTIONS_CFG = json.load(open("../data/agents.instructions.json"))
AGENTS_PERSONAS_CFG = json.load(open("../data/agents.personas_dummy.json"))

In [None]:
AGENT_LLM = WP3LLM(
    api_key=ENV["RUNPOD_TOKEN"],
    url="https://api.runpod.ai/v2/4rest17vcta7ga/runsync"
)

In [5]:
def stress_test_llm(num_iterations: int = 0, num_agents = 10):
    
    users = [WP3Agent(
            llm=AGENT_LLM,
            instructions=AgentInstructions(**AGENTS_INSTRUCTIONS_CFG["actions"]),
            cognition=persona["cognition"],
            bio=persona["bio"],
            memory=persona["history"],
            memory_length=10,
            activation_probability=agent_parameter_estimation(posts_per_day=persona["posts_per_day"], seed=i)["activation_probability"],
            posting_probability=agent_parameter_estimation(posts_per_day=persona["posts_per_day"], seed=i)["posting_probability"],
            read_amount=agent_parameter_estimation(posts_per_day=persona["posts_per_day"], seed=i)["read_amount"],
        ) for i, persona in enumerate(AGENTS_PERSONAS_CFG[:num_agents])]

    feed: Feed = Feed()

    for _ in range(num_iterations):
        print(f"Starting iteration {_ + 1} of {num_iterations}")
        max_workers = min(num_agents, multiprocessing.cpu_count() * 2)
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(run_agent, user, feed[:50]) for user in users]

            responses = []
            for future in as_completed(futures):
                responses.append(future.result())

            feed: Feed = Feed()
            feed.extend(responses)

def run_agent(user: WP3Agent, feed:Feed):
    user.consume_feed(posts=feed, user = User())  # Dummy user id for feed consumption
    post = user.post()
    return Post(content=post, user=User())

In [6]:
# test
start_time = time.time()
stress_test_llm(num_iterations=10, num_agents=100)
end_time = time.time()
print(f"Stress test completed in {end_time - start_time:.2f} seconds")

Starting iteration 1 of 10
Starting iteration 2 of 10
Starting iteration 3 of 10
Starting iteration 4 of 10
Starting iteration 5 of 10
Starting iteration 6 of 10
Starting iteration 7 of 10
Starting iteration 8 of 10
Starting iteration 9 of 10
Starting iteration 10 of 10
Stress test completed in 258.88 seconds
