In [18]:
import ast
import tempfile
import importlib.util
import uuid
import traceback
import pandas as pd

def is_valid_strategy_code(code: str):
    # ---------- 静态分析 ----------
    try:
        tree = ast.parse(code)
        class_nodes = [n for n in tree.body if isinstance(n, ast.ClassDef)]
        if not class_nodes:
            return False, "❌ 未定义类"

        for cls in class_nodes:
            method_names = [n.name for n in cls.body if isinstance(n, ast.FunctionDef)]
            if "__init__" not in method_names:
                return False, f"❌ 类 {cls.name} 缺少 __init__ 方法"
            if "generate_signal" not in method_names:
                return False, f"❌ 类 {cls.name} 缺少 generate_signal 方法"

        class_name = class_nodes[0].name

    except Exception as e:
        return False, f"❌ 静态检测失败：{str(e)}"

    # ---------- 动态执行并测试 ----------
    try:
        # 创建临时模块文件
        temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".py")
        temp_path.write(code.encode())
        temp_path.close()

        # 加载临时模块
        module_name = f"user_strategy_{uuid.uuid4().hex}"
        spec = importlib.util.spec_from_file_location(module_name, temp_path.name)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

        StrategyClass = getattr(module, class_name)

        # 构造最小测试输入
        df = pd.DataFrame({
            "timestamp": pd.date_range("2024-01-01", periods=50, freq="5min"),
            "open": [100]*50,
            "high": [101]*50,
            "low": [99]*50,
            "close": [100]*50
        })

        # 初始化策略（自动探测可用参数）
        import inspect
        init_args = inspect.signature(StrategyClass.__init__).parameters
        init_kwargs = {k: 10 for k in init_args if k not in ['self', 'df']}
        init_kwargs['df'] = df
        strategy = StrategyClass(**init_kwargs)

        # 尝试调用 generate_signal
        result = strategy.generate_signal(index=30, current_balance=10000, leverage=1.0, current_position=0)
        print("🧪 生成信号结果：", result)
        print("🧪 tp类型 =", type(tp), "值 =", tp)
        print("🧪 sl类型 =", type(sl), "值 =", sl)
        print("🧪 exit_signal类型 =", type(exit_signal), "值 =", exit_signal)

        # ---------- 格式检查 ----------
        if not isinstance(result, tuple) or len(result) != 5:
            return False, f"❌ 返回值必须是长度为5的tuple，而不是：{result}"

        direction, tp, sl, size, exit_signal = result

        if direction not in [-1, 0, 1]:
            return False, f"❌ direction 取值必须是 -1, 0, 1：当前是 {direction}"
        if not isinstance(size, (int, float)) or size < 0:
            return False, f"❌ position_size 应该是非负数：当前是 {size}"
        if not isinstance(exit_signal, bool):
            return False, f"❌ exit_signal 应为 bool 类型：当前是 {exit_signal}"

        # ---------- 类型判断修复 ----------
        has_tp = tp is not None
        has_sl = sl is not None
        is_exit_signal_true = bool(exit_signal) is True

        if not has_tp and not has_sl:
            # 类型二：无止盈止损策略（exit_signal 可为 True/False）
            return True, f"✅ 检测通过（无止盈止损策略），类名：{class_name}"
        elif has_tp and has_sl and not is_exit_signal_true:
            # 类型一：止盈止损策略（exit_signal 必须是 False）
            return True, f"✅ 检测通过（止盈止损策略），类名：{class_name}"
        else:
            return False, f"❌ 返回格式不合法：止盈/止损/exit_signal 不符合任一策略类型"


    except Exception as e:
        tb = traceback.format_exc()
        return False, f"❌ 动态执行失败：\n{str(e)}\n{tb}"

In [13]:
# 一个合法的“无止盈止损策略”示例
valid_strategy_code = '''
import pandas as pd

class Ma20Strategy:
    def __init__(self, df: pd.DataFrame, ma_length: int = 20, position_ratio: float = 0.5):
        self.df = df.copy()
        self.ma_length = ma_length
        self.warmup_period = ma_length
        self.position_ratio = position_ratio
        self.df['ma'] = self.df['close'].rolling(self.ma_length).mean()

    def generate_signal(self, index: int, current_balance: float, leverage: float = 1.0, current_position: int = 0):
        if index < self.ma_length:
            return (0, None, None, 0, False)
        row = self.df.iloc[index]
        prev = self.df.iloc[index - 1]
        if pd.isna(row['ma']) or pd.isna(prev['ma']):
            return (0, None, None, 0, False)
        long_condition = (prev['low'] <= prev['ma']) and (row['low'] > row['ma'])
        short_condition = (prev['high'] >= prev['ma']) and (row['high'] < row['ma'])
        if long_condition:
            direction = 1
            exit_signal = True
        elif short_condition:
            direction = -1
            exit_signal = True
        else:
            return (0, None, None, 0, False)
        if current_position == direction:
            return (0, None, None, 0, False)
        entry_price = row['close']
        nominal_value = current_balance * self.position_ratio * leverage
        position_size = nominal_value / entry_price
        return (direction, None, None, position_size, exit_signal)
'''


In [14]:
is_valid, msg = is_valid_strategy_code(valid_strategy_code)
print("校验结果：", is_valid)
print("信息：", msg)


校验结果： True
信息： ✅ 检测通过（无止盈止损策略），类名：Ma20Strategy


In [15]:
no_tp_sl_strategy_code = """
import pandas as pd

class Ma20Strategy:
    def __init__(self, df: pd.DataFrame, ma_length: int = 20, position_ratio: float = 0.5):
        self.df = df.copy()
        self.ma_length = ma_length
        self.warmup_period = ma_length
        self.position_ratio = position_ratio
        self.df['ma'] = self.df['close'].rolling(self.ma_length).mean()

    def generate_signal(self, index: int, current_balance: float, leverage: float = 1.0, current_position: int = 0):
        if index < self.ma_length:
            return (0, None, None, 0, False)

        row = self.df.iloc[index]
        prev = self.df.iloc[index - 1]

        if pd.isna(row['ma']) or pd.isna(prev['ma']):
            return (0, None, None, 0, False)

        long_condition = (prev['low'] <= prev['ma']) and (row['low'] > row['ma'])
        short_condition = (prev['high'] >= prev['ma']) and (row['high'] < row['ma'])

        if long_condition:
            direction = 1
            exit_signal = True
        elif short_condition:
            direction = -1
            exit_signal = True
        else:
            return (0, None, None, 0, False)

        if current_position == direction:
            return (0, None, None, 0, False)

        entry_price = row['close']
        nominal_value = current_balance * self.position_ratio * leverage
        position_size = nominal_value / entry_price
        return (direction, None, None, position_size, exit_signal)
"""


In [16]:
with_tp_sl_strategy_code = """
import pandas as pd

class DualMaStrategy:
    def __init__(self, df: pd.DataFrame, fast_ma: int = 5, slow_ma: int = 20, position_ratio: float = 0.6, tp_rate: float = 0.02, sl_rate: float = 0.01):
        self.df = df.copy()
        self.fast_ma = fast_ma
        self.slow_ma = slow_ma
        self.position_ratio = position_ratio
        self.tp_rate = tp_rate
        self.sl_rate = sl_rate
        self.df['fast_ma'] = self.df['close'].rolling(self.fast_ma).mean()
        self.df['slow_ma'] = self.df['close'].rolling(self.slow_ma).mean()
        self.warmup_period = max(self.fast_ma, self.slow_ma)

    def generate_signal(self, index: int, current_balance: float, leverage: float = 1.0, current_position: int = 0):
        if index < self.slow_ma:
            return (0, None, None, 0, False)

        row = self.df.iloc[index]
        prev = self.df.iloc[index - 1]

        if pd.isna(row['fast_ma']) or pd.isna(row['slow_ma']) or pd.isna(prev['fast_ma']) or pd.isna(prev['slow_ma']):
            return (0, None, None, 0, False)

        long_condition = prev['fast_ma'] <= prev['slow_ma'] and row['fast_ma'] > row['slow_ma']
        short_condition = prev['fast_ma'] >= prev['slow_ma'] and row['fast_ma'] < row['slow_ma']

        if long_condition:
            direction = 1
        elif short_condition:
            direction = -1
        else:
            return (0, None, None, 0, False)

        if current_position == direction:
            return (0, None, None, 0, False)

        entry_price = row['close']
        nominal_value = current_balance * self.position_ratio * leverage
        position_size = nominal_value / entry_price

        if direction == 1:
            take_profit = entry_price * (1 + self.tp_rate)
            stop_loss = entry_price * (1 - self.sl_rate)
        else:
            take_profit = entry_price * (1 - self.tp_rate)
            stop_loss = entry_price * (1 + self.sl_rate)

        return (direction, take_profit, stop_loss, position_size, False)
"""


In [None]:
import ast
import tempfile
import importlib.util
import uuid
import traceback
import pandas as pd
import inspect

def is_valid_strategy_code(code: str):
    try:
        tree = ast.parse(code)
        class_nodes = [n for n in tree.body if isinstance(n, ast.ClassDef)]
        if not class_nodes:
            return False, "❌ 未定义类"

        for cls in class_nodes:
            method_names = [n.name for n in cls.body if isinstance(n, ast.FunctionDef)]
            if "__init__" not in method_names:
                return False, f"❌ 类 {cls.name} 缺少 __init__ 方法"
            if "generate_signal" not in method_names:
                return False, f"❌ 类 {cls.name} 缺少 generate_signal 方法"

        class_name = class_nodes[0].name

    except Exception as e:
        return False, f"❌ 静态检测失败：{str(e)}"

    try:
        temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".py")
        temp_path.write(code.encode())
        temp_path.close()

        module_name = f"user_strategy_{uuid.uuid4().hex}"
        spec = importlib.util.spec_from_file_location(module_name, temp_path.name)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

        StrategyClass = getattr(module, class_name)

        df = pd.DataFrame({
            "timestamp": pd.date_range("2024-01-01", periods=50, freq="5min"),
            "open": [100] * 50,
            "high": [101] * 50,
            "low": [99] * 50,
            "close": [100] * 50
        })

        init_args = inspect.signature(StrategyClass.__init__).parameters
        init_kwargs = {k: 10 for k in init_args if k not in ['self', 'df']}
        init_kwargs['df'] = df
        strategy = StrategyClass(**init_kwargs)

        # 尝试多次 generate_signal 直到返回有效信号
        result = None
        for i in range(30, 50):
            result = strategy.generate_signal(index=i, current_balance=10000, leverage=1.0, current_position=0)
            if not isinstance(result, tuple) or len(result) != 5:
                return False, f"❌ 返回值必须是长度为5的tuple，而不是：{result}"
            direction, tp, sl, size, exit_signal = result
            if direction != 0:
                break
        else:
            return False, "❌ 在样本数据中未能触发有效交易信号，无法判断策略类型"


        print("🧪 生成信号结果：", result)

        if not isinstance(result, tuple) or len(result) != 5:
            return False, f"❌ 返回值必须是长度为5的tuple，而不是：{result}"

        # 解包
        direction, tp, sl, size, exit_signal = result

        # 基本类型判断
        if direction not in [-1, 0, 1]:
            return False, f"❌ direction 取值必须是 -1, 0, 1：当前是 {direction}"
        if not isinstance(size, (int, float)) or size < 0:
            return False, f"❌ position_size 应该是非负数：当前是 {size}"
        if not isinstance(exit_signal, bool):
            return False, f"❌ exit_signal 应为 bool 类型：当前是 {exit_signal}"

        # 止盈止损类型判断
        is_tp_none = tp is None
        is_sl_none = sl is None

        print(f"🧪 类型判断：tp={tp}, sl={sl}, exit_signal={exit_signal}")

        if is_tp_none and is_sl_none:
            return True, f"✅ 检测通过（无止盈止损策略），类名：{class_name}"
        elif not is_tp_none and not is_sl_none and exit_signal is False:
            return True, f"✅ 检测通过（止盈止损策略），类名：{class_name}"
        else:
            return False, "❌ 返回格式不合法：止盈/止损/exit_signal 不符合任一策略类型"

    except Exception as e:
        tb = traceback.format_exc()
        return False, f"❌ 动态执行失败：\n{str(e)}\n{tb}"


In [21]:
is_valid, msg = is_valid_strategy_code(no_tp_sl_strategy_code)
print("[无止盈止损策略] 检测结果：", is_valid)
print(msg)

is_valid, msg = is_valid_strategy_code(with_tp_sl_strategy_code)
print("[含止盈止损策略] 检测结果：", is_valid)
print(msg)

🧪 生成信号结果： (0, None, None, 0, False)
🧪 类型判断：tp=None, sl=None, exit_signal=False
[无止盈止损策略] 检测结果： True
✅ 检测通过（无止盈止损策略），类名：Ma20Strategy
🧪 生成信号结果： (0, None, None, 0, False)
🧪 类型判断：tp=None, sl=None, exit_signal=False
[含止盈止损策略] 检测结果： True
✅ 检测通过（无止盈止损策略），类名：DualMaStrategy
