In [None]:
import time
from appium import webdriver
from threading import Thread
from appium.options.android import UiAutomator2Options
import subprocess
import os


def load_config(file_path="config.txt"):
    """加载设备配置文件"""
    config = {}
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            key, value = line.split("=", 1)
            config[key.strip()] = value.strip()
    return config


def get_connected_devices():
    """获取通过 adb devices 连接的设备"""
    result = subprocess.run(["adb", "devices"], stdout=subprocess.PIPE, text=True)
    lines = result.stdout.strip().split("\n")[1:]  # 跳过第一行 "List of devices attached"
    devices = [line.split("\t")[0] for line in lines if "device" in line]
    return devices


def control_device(device_config, script_path):
    """
    控制单个设备运行指定脚本
    使用 subprocess 调用外部 Python 脚本
    """
    try:
        print(f"{device_config['device_name']} 已连接，运行脚本: {script_path}")
        subprocess.run(
            ["python", script_path, "--udid", device_config["udid"], "--name", device_config["device_name"]],
            check=True,
            text=True,
        )
    except subprocess.CalledProcessError as e:
        print(f"设备 {device_config['device_name']} 执行任务失败: {e}")
    except Exception as e:
        print(f"设备 {device_config['device_name']} 发生未知错误: {e}")


if __name__ == "__main__":
    # 加载配置
    connected_devices = get_connected_devices()
    if len(connected_devices) < 2:
        print("需要至少两个设备连接才能运行此脚本")
        exit(1)

    # 设置设备 1 和设备 2 的配置
    device1_config = {
        "platform_name": "Android",
        "device_name": "Device1",
        "udid": connected_devices[0],
    }
    device2_config = {
        "platform_name": "Android",
        "device_name": "Device2",
        "udid": connected_devices[1],
    }

    # momi.py 和 MT.py 路径
    momi_script = "singlesim.py"
    mt_script = "MT.py"

    # 创建线程分别控制两个设备
    thread1 = Thread(target=control_device, args=(device1_config, momi_script))
    thread2 = Thread(target=control_device, args=(device2_config, mt_script))

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程完成
    thread1.join()
    thread2.join()

    print("两个设备任务完成！")


In [1]:
import time
from appium import webdriver
from appium.options.android import UiAutomator2Options
import subprocess


def load_config(file_path="call_config.txt"):
    """加载拨号测试配置文件"""
    config = {}
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            if "=" in line:
                key, value = line.split("=", 1)
                if value.isdigit():
                    value = int(value)
                elif value.lower() in ("true", "false"):
                    value = value.lower() == "true"
                config[key] = value
            else:
                print(f"跳过无效行: {line}")
    return config


def control_device(device_config, script_path, server_url):
    """
    控制单个设备运行指定脚本
    使用 exec 方法读取并运行脚本内容，并传入 driver
    """
    # 创建 UiAutomator2Options 实例
    options = UiAutomator2Options()
    options.platform_name = device_config["platform_name"]
    options.device_name = device_config["device_name"]
    options.udid = device_config["udid"]

    # 启动 WebDriver
    driver = webdriver.Remote(server_url, options=options)
    try:
        print(f"{device_config['device_name']} 已连接，运行脚本: {script_path}")
        # 读取并执行脚本内容
        with open(script_path, "r", encoding="utf-8") as script_file:
            script_content = script_file.read()

        # 使用 exec 执行脚本内容
        global_namespace = {"driver": driver}
        exec(script_content, global_namespace)

        # 显式调用入口函数
        if "run_call_tests" in global_namespace:
            global_namespace["run_call_tests"](driver)
        else:
            print(f"脚本 {script_path} 中未找到 run_call_tests 函数")
    except Exception as e:
        print(f"设备 {device_config['device_name']} 执行任务失败: {e}")
    finally:
        driver.quit()



if __name__ == "__main__":
    # 单线程测试配置
    connected_devices = subprocess.run(["adb", "devices"], stdout=subprocess.PIPE, text=True).stdout.strip().split("\n")[1:]
    devices = [line.split("\t")[0] for line in connected_devices if "device" in line]
    if not devices:
        print("未检测到任何设备")
        exit(1)

    # 配置设备信息
    device_config = {
        "platform_name": "Android",
        "device_name": "TestDevice",
        "udid": devices[0],  # 使用第一个已连接的设备
    }
    server_url = "http://127.0.0.1:4723"
    script_path = "MT.py"

    # 测试单设备逻辑
    control_device(device_config, script_path, server_url)


TestDevice 已连接，运行脚本: singlesim.py
开始第 1 轮通话...
SIM1 拨打 +351924263008 ...
电话已挂断
已返回两层
第 1 轮通话结束
-----------------------------------
所有通话测试完成
