In [None]:
import gevent.monkey
gevent.monkey.patch_all()

from locust import HttpUser, task
from locust.env import Environment
from locust.stats import stats_printer
from locust.runners import STATE_STOPPED, STATE_RUNNING
from locust.log import setup_logging

import gevent
import time
import random
import csv
import os
import json
import requests
from datetime import datetime
from collections import defaultdict
from gevent.lock import Semaphore
from gevent.event import Event

# ==== 全局配置 ====
ENTRY_NODES = [
    "http://yl-01.lab.uvalight.net:31113",
    "http://yl-02.lab.uvalight.net:31113",
    "http://yl-03.lab.uvalight.net:31113",
    "http://yl-06.lab.uvalight.net:31113",
]

ARCHITECTURES = ["centralized", "federated", "decentralized", "dynamic"]
FUNCTION_MAP = {
    "basic": "matrix-multiplication",
    "data_local": "image-resize"
}

TOTAL_TASKS = 2000  # 1000个请求
entry_logs = []
task_counter = 0
task_lock = Semaphore()
experiment_complete = Event()

# ==== 用户类 ====
class FaaSUser(HttpUser):
    wait_time_between_tasks = 0.1  # 任务间最小间隔
    
    def on_start(self):
        self.should_stop = False
    
    def on_stop(self):
        self.should_stop = True
    
    @task
    def call_function(self):
        global entry_logs, task_counter, experiment_complete
        
        # 检查是否应该停止
        if self.should_stop or experiment_complete.is_set():
            self.environment.runner.quit()
            return

        # 原子性地获取任务号和检查限制
        with task_lock:
            if task_counter >= TOTAL_TASKS:
                # 如果已经达到任务数量限制，就不再执行新任务
                if not experiment_complete.is_set():
                    print(f"✅ Task limit reached ({TOTAL_TASKS}), marking experiment complete")
                    experiment_complete.set()
                    self.environment.runner.quit()
                return
            
            task_counter += 1
            current_task = task_counter
            
            # 如果这是最后一个任务，标记实验即将完成
            if current_task == TOTAL_TASKS:
                print(f"🎯 Submitting final task ({TOTAL_TASKS}/{TOTAL_TASKS})")

        payload = {
            "fn_name": FUNCTION_MAP.get(os.getenv("FUNC_TYPE", "basic"), "matrix-multiplication"),
            "payload": "http://yl-01.lab.uvalight.net:9000/images/image.jpg"
        }
        # if os.getenv("FUNC_TYPE") == "data_local":
        #     payload["payload"] = "http://yl-01.lab.uvalight.net:9000/images/image.jpg"

        # entry_url = random.choice(ENTRY_NODES) + "/entry"
        entry_url = "http://yl-01.lab.uvalight.net:31113/entry"
        selected_node = entry_url.split("//")[1].split(":")[0]  # 提取节点名
        start = time.time()

        try:
            with self.client.post(entry_url, json=payload, timeout=30, catch_response=True) as res:
                duration = time.time() - start
                timestamp = datetime.now().isoformat()
                
                if res.status_code == 200:
                    # 记录详细信息（使用锁保护）
                    with task_lock:
                        entry_logs.append({
                            "architecture": os.getenv("ARCH", "dynamic"),
                            "fn_type": os.getenv("FUNC_TYPE", "basic"),
                            "concurrency": os.getenv("CONCURRENCY", "unknown"),
                            "task_id": current_task,
                            "response_time": round(duration, 3),
                            "status": "success",
                            "timestamp": timestamp,
                            "entry_node": selected_node,
                            "real_arch": res.json().get("architecture", os.getenv("ARCH", "dynamic"))
                        })
                    res.success()
                else:
                    # 记录失败的请求（使用锁保护）
                    with task_lock:
                        entry_logs.append({
                            "architecture": os.getenv("ARCH", "dynamic"),
                            "fn_type": os.getenv("FUNC_TYPE", "basic"),
                            "concurrency": os.getenv("CONCURRENCY", "unknown"),
                            "task_id": current_task,
                            "response_time": round(duration, 3),
                            "status": f"http_{res.status_code}",
                            "timestamp": timestamp,
                            "entry_node": selected_node,
                            "real_arch": res.json().get("architecture", os.getenv("ARCH", "dynamic"))
                        })
                    res.failure(f"HTTP {res.status_code}")
        except Exception as e:
            duration = time.time() - start
            timestamp = datetime.now().isoformat()
            # 记录异常的请求（使用锁保护）
            with task_lock:
                entry_logs.append({
                    "architecture": os.getenv("ARCH", "dynamic"),
                    "fn_type": os.getenv("FUNC_TYPE", "basic"),
                    "concurrency": os.getenv("CONCURRENCY", "unknown"),
                    "task_id": current_task,
                    "response_time": round(duration, 3),
                    "status": f"exception_{type(e).__name__}",
                    "timestamp": timestamp,
                    "entry_node": selected_node
                })
            print(f"[!] Task {current_task} failed after {duration:.3f}s: {e}")
            if 'res' in locals():
                res.failure(f"Exception: {e}")
        
        # 如果这是最后一个任务完成，标记实验完成
        if current_task == TOTAL_TASKS:
            print(f"🎉 Final task completed! Marking experiment as done.")
            experiment_complete.set()
            self.environment.runner.quit()


# ==== 远程重载架构 ====
def reload_architecture(arch):
    success_count = 0
    ALL_NODES = [
        "http://yl-01.lab.uvalight.net:31113",
        "http://yl-02.lab.uvalight.net:31113",
        "http://yl-03.lab.uvalight.net:31113",
        "http://yl-04.lab.uvalight.net:31113",
        "http://yl-06.lab.uvalight.net:31113",
    ]
    for base in ALL_NODES:
        try:
            res = requests.post(base + "/reload", json={"architecture": arch}, timeout=10)
            if res.status_code == 200:
                success_count += 1
                print(f"[✓] Reloaded {base} to {arch}")
            else:
                print(f"[!] Reload failed on {base}: HTTP {res.status_code}")
        except Exception as e:
            print(f"[x] Reload error on {base}: {e}")
    
    if success_count == 0:
        raise Exception(f"Failed to reload architecture on all nodes")
    
    # 等待架构切换完成
    time.sleep(5)


# ==== 鲁棒的实验运行函数 ====
def run_experiment(host, func_type, arch, concurrency, output_csv, detailed_csv):
    global entry_logs, task_counter, experiment_complete
    
    # 重置全局状态
    entry_logs = []
    task_counter = 0
    experiment_complete.clear()

    os.environ["FUNC_TYPE"] = func_type
    os.environ["ARCH"] = arch
    os.environ["CONCURRENCY"] = str(concurrency)  # 添加并发数到环境变量

    print(f"\n🚀 Starting experiment: {arch} | {func_type} | concurrency={concurrency}")
    
    try:
        # 重载架构
        reload_architecture(arch)
        
        # 设置日志
        setup_logging("INFO", None)
        
        # 创建环境
        env = Environment(user_classes=[FaaSUser])
        env.create_local_runner()
        env.host = host
        
        # 启动统计打印器
        stats_greenlet = gevent.spawn(stats_printer, env.stats)
        
        # 记录开始时间
        start_time = time.time()
        
        # 启动测试
        env.runner.start(user_count=concurrency, spawn_rate=concurrency)
        print(f"✅ Started {concurrency} concurrent users")
        
        # 等待完成，使用多重超时机制
        max_experiment_time = max(600, TOTAL_TASKS * 2)  # 最长10分钟或每个任务2秒
        check_interval = 5  # 每5秒检查一次
        elapsed = 0
        tasks_completed_logged = False  # 防止重复日志
        
        while (not experiment_complete.is_set() and 
               env.runner.state not in [STATE_STOPPED] and
               elapsed < max_experiment_time):
            
            time.sleep(check_interval)
            elapsed = time.time() - start_time
            
            # 打印进度
            with task_lock:
                current_count = task_counter
                progress = (current_count / TOTAL_TASKS) * 100
                print(f"📊 Progress: {current_count}/{TOTAL_TASKS} ({progress:.1f}%) | "
                      f"Time: {elapsed:.1f}s | State: {env.runner.state}")
                
                # 检查是否所有任务都提交了（只记录一次）
                if current_count >= TOTAL_TASKS and not tasks_completed_logged:
                    print("🎯 All tasks submitted, waiting for completion...")
                    tasks_completed_logged = True
                    # 不在这里设置 experiment_complete，让用户任务自己设置
        
        # 停止测试
        print("🛑 Stopping experiment...")
        if env.runner.state != STATE_STOPPED:
            env.runner.stop()
        
        # 等待干净停止，但有超时
        stop_timeout = 30
        stop_start = time.time()
        while (env.runner.state != STATE_STOPPED and 
               time.time() - stop_start < stop_timeout):
            print(f"⏳ Waiting for clean stop... ({time.time() - stop_start:.1f}s)")
            time.sleep(1)
        
        if not stats_greenlet.dead:
            stats_greenlet.kill()
        
        total_duration = time.time() - start_time
        
        successful_requests = len(entry_logs)
        success_rate = (successful_requests / TOTAL_TASKS) * 100 if TOTAL_TASKS > 0 else 0
        
        print(f"📈 Results: {successful_requests}/{TOTAL_TASKS} successful ({success_rate:.1f}%)")
        print(f"⏱️  Total time: {total_duration:.2f}s")
        
        # 计算统计信息
        if entry_logs:
            response_times = [log['response_time'] for log in entry_logs]
            avg_response_time = sum(response_times) / len(response_times)
            min_response_time = min(response_times)
            max_response_time = max(response_times)
            # 计算百分位数
            sorted_times = sorted(response_times)
            p50 = sorted_times[int(len(sorted_times) * 0.5)]
            p95 = sorted_times[int(len(sorted_times) * 0.95)]
            p99 = sorted_times[int(len(sorted_times) * 0.99)]
            
            print(f"📊 Response time stats: avg={avg_response_time:.2f}s, "
                  f"min={min_response_time:.2f}s, max={max_response_time:.2f}s")
            print(f"📊 Percentiles: P50={p50:.2f}s, P95={p95:.2f}s, P99={p99:.2f}s")
        else:
            avg_response_time = min_response_time = max_response_time = p50 = p95 = p99 = 0
            print("⚠️ No successful requests recorded!")
        
        # 保存详细的每个请求数据到 detailed_csv
        print(f"💾 Saving {len(entry_logs)} request records to {detailed_csv}")
        if entry_logs:
            with open(detailed_csv, "a", newline="", encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=[
                    "architecture", "fn_type", "concurrency", "task_id", "response_time", 
                    "status", "timestamp", "entry_node", "real_arch"
                ])
                if f.tell() == 0:
                    writer.writeheader()
                
                for log in entry_logs:
                    writer.writerow(log)
            print(f"✅ Successfully saved {len(entry_logs)} detailed records")
        else:
            print("⚠️ No request data to save!")
        
        # 保存汇总数据到 output_csv
        with open(output_csv, "a", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=[
                "architecture", "fn_type", "concurrency", "total_time", 
                "successful_requests", "success_rate", "avg_response_time",
                "min_response_time", "max_response_time", "p50", "p95", "p99"
            ])
            if f.tell() == 0:
                writer.writeheader()
            writer.writerow({
                "architecture": arch,
                "fn_type": func_type,
                "concurrency": concurrency,
                "total_time": round(total_duration, 2),
                "successful_requests": successful_requests,
                "success_rate": round(success_rate, 2),
                "avg_response_time": round(avg_response_time, 2),
                "min_response_time": round(min_response_time, 2),
                "max_response_time": round(max_response_time, 2),
                "p50": round(p50, 2),
                "p95": round(p95, 2),
                "p99": round(p99, 2)
            })
        
        print(f"✅ Experiment completed: {func_type} | {arch} | "
              f"concurrency={concurrency} | duration={total_duration:.2f}s")
        
        return True
        
    except Exception as e:
        print(f"❌ Experiment failed: {e}")
        return False
    
    finally:
        # 确保清理
        try:
            if 'env' in locals() and hasattr(env, 'runner'):
                if env.runner.state != STATE_STOPPED:
                    env.runner.stop()
        except:
            pass


# ==== 主函数 ====
def main():
    output_file = "experiment_summary.csv"  # 汇总数据
    detailed_file = "experiment_details.csv"  # 每个请求的详细数据
    host = "http://dummy-host"
    
    # 备份之前的结果文件
    for file_path in [output_file, detailed_file]:
        if os.path.exists(file_path):
            backup_name = f"{file_path}.backup.{int(time.time())}"
            os.rename(file_path, backup_name)
            print(f"📁 Previous results backed up: {file_path} -> {backup_name}")
    
    failed_experiments = []
    
    # func_types = ["basic", "data_local"]
    func_types = ["data_local"]
    # ARCHITECTURES = ["centralized", "federated", "decentralized", "dynamic"]
    ARCHITECTURES = ["dynamic"]
    for func_type in func_types:
        print(f"\n🔥 Starting experiments for function type: {func_type}")
        print(f"Function: {FUNCTION_MAP.get(func_type, 'unknown')}")
        
        for arch in ARCHITECTURES:
            # for concurrency in [2, 4, 6, 8, 10, 12, 15, 20, 30, 40]:
            for concurrency in [40]:
                print(f"\n{'='*60}")
                print(f"🔬 Running: {func_type} | {arch} | concurrency={concurrency}")
                print(f"{'='*60}")
                
                success = run_experiment(host, func_type, arch, concurrency, output_file, detailed_file)
                
                if not success:
                    failed_experiments.append((func_type, arch, concurrency))
                    print(f"❌ Failed: {func_type} | {arch} | concurrency={concurrency}")
                
                cooldown = 30
                print(f"😴 Cooling down for {cooldown}s...")
                time.sleep(cooldown)

    print(f"\n{'='*60}")
    print("🎊 ALL EXPERIMENTS COMPLETED!")
    print(f"{'='*60}")
    
    if failed_experiments:
        print("❌ Failed experiments:")
        for func_type, arch, conc in failed_experiments:
            print(f"   - {func_type} | {arch} | concurrency={conc}")
    else:
        print("✅ All experiments completed successfully!")
    
    print(f"📊 Summary results saved to: {output_file}")
    print(f"📊 Detailed results saved to: {detailed_file}")
    
    if os.path.exists(detailed_file):
        try:
            import pandas as pd
            df = pd.read_csv(detailed_file)
            total_requests = len(df)
            successful_requests = len(df[df['status'] == 'success'])
            overall_success_rate = (successful_requests / total_requests) * 100 if total_requests > 0 else 0
            
            print(f"\n📈 Overall Statistics:")
            print(f"   Total requests: {total_requests}")
            print(f"   Successful requests: {successful_requests}")
            print(f"   Overall success rate: {overall_success_rate:.1f}%")
            
            if successful_requests > 0:
                success_df = df[df['status'] == 'success']
                avg_time = success_df['response_time'].mean()
                print(f"   Average response time: {avg_time:.3f}s")
                
                print(f"\n📊 Statistics by function type:")
                for func_type in func_types:
                    func_df = success_df[success_df['fn_type'] == func_type]
                    if len(func_df) > 0:
                        func_avg = func_df['response_time'].mean()
                        func_count = len(func_df)
                        print(f"   {func_type}: {func_count} requests, avg={func_avg:.3f}s")
                
        except ImportError:
            print("\n💡 Install pandas to see overall statistics: pip install pandas")
        except Exception as e:
            print(f"\n⚠️ Could not calculate statistics: {e}")


if __name__ == "__main__":
    main()


🔥 Starting experiments for function type: data_local
Function: image-resize

🔬 Running: data_local | dynamic | concurrency=40

🚀 Starting experiment: dynamic | data_local | concurrency=40
[✓] Reloaded http://yl-01.lab.uvalight.net:31113 to dynamic
[✓] Reloaded http://yl-02.lab.uvalight.net:31113 to dynamic
[✓] Reloaded http://yl-03.lab.uvalight.net:31113 to dynamic
[✓] Reloaded http://yl-04.lab.uvalight.net:31113 to dynamic
[✓] Reloaded http://yl-06.lab.uvalight.net:31113 to dynamic


[2025-06-25 06:38:52,471] Lyt2023/INFO/locust.runners: Ramping to 40 users at a rate of 20.00 per second


✅ Started 40 concurrent users


[2025-06-25 06:38:53,491] Lyt2023/INFO/locust.runners: All users spawned: {"FaaSUser": 40} (40 total users)


📊 Progress: 40/2000 (2.0%) | Time: 5.0s | State: running
[!] Task 39 failed after 8.945s: Expecting value: line 1 column 1 (char 0)
[!] Task 36 failed after 8.948s: Expecting value: line 1 column 1 (char 0)
[!] Task 37 failed after 8.948s: Expecting value: line 1 column 1 (char 0)
[!] Task 34 failed after 8.948s: Expecting value: line 1 column 1 (char 0)
[!] Task 40 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 38 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 33 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 28 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 35 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 21 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 29 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 30 failed after 8.949s: Expecting value: line 1 column 1 (char 0)
[!] Task 26 failed after 8.950s: Expecting 