### 问题描述

1、背景：车端会采集整车数据到云端做分析，采集的动作是车端计算引擎实施，车端计算引擎的输入是JSON配置文件<u>（JSON配置文件由算子、信号等构成一系列触发规则）</u>，为确保车端计算引擎可支持正确按配置文件执行，通常会做模拟测试：

2、模拟测试说明：模拟JSON配置文件策略（策略需支持28+算子，信号1000+），模拟时需保策略的完备性、尽可能覆盖所有场景，充分保证车型可按照配置文件正确上报；

JSON策略例如：
- 1、A信号由非2跳变到2且A连续三帧=2同时B由非2跳变到2且B连续三帧=2
- 2、A=1持续6帧且该信号告警标志位置于FALSE，触发告警
- 3、A=5，且持续时长>15s，且当前告警标志位为FALSE

车端算子举例：Filter，Downsample，Dynamize，Splitter，Expand，Project，StreamAggregate，Hopping Window，Pattern Window

![车端算子举例](WechatIMG40.jpg)

3、要求：通过模型，自动生成配置文件的数据采集策略，以及一段时间的测试数据，期望在较低代价的情况下（非穷举方法），保证生成的采集策略尽可能完备；

### 一、开始建模分析

#### 1.1、算子类、信号类定义

In [None]:
# 车端计算引擎的输入是JSON配置文件（JSON配置文件由算子、信号等构成一系列触发规则），为确保车端计算引擎可支持正确按配置文件执行，通常会做模拟测试：

# 定义算子类
class Operator:
    def __init__(self, name, description):
        self.name = name
        self.description = description

# 定义信号类
class Signal:
    def __init__(self, name, value_range):
        self.name = name
        self.value_range = value_range

# 定义算子
operators = [
    Operator('Filter', '基于条件过滤信号'),
    Operator('Downsample', '消减信号的采样'),
    Operator('Dynamize', '算子将数据中含空值的行去除，可以指定检查列'),
    Operator('Splitter', '将信号拆分为多个流'),
    Operator('Expand', ''),
    Operator('Project', ''),
    Operator('StreamAggregate', ''),
    Operator('HoppingWindow', '跳跃滑动窗口'),
    Operator('PatternWindow', '模式匹配窗口')
]

# 定义信号, 后续信号可扩展到1000+，示例为3种不同类型的信号
signals = [
    Signal('A', (1, 10)),  # 整型信号
    Signal('B', (0.0, 10.0)),  # 浮点型信号
    Signal('C', (True, False)),  # 布尔型信号
]

#### 1.2、构造数据输入（车端计算引擎的输入是JSON配置文件）
生成的策略需要覆盖以下方面：
- **信号触发条件**：每个信号的触发条件（如某值持续时间、跳变等）。
- **算子组合**：对信号的算子处理（如Filter, Downsample等）。
- **触发规则**：多信号组合条件下的复杂触发规则。

In [None]:
import json
import random

# 生成随机整数
def generate_random_int(value_range):
    return random.randint(value_range[0], value_range[1])

# 生成随机浮点数
def generate_random_float(value_range):
    return random.uniform(value_range[0], value_range[1])

# 生成随机布尔值
def generate_random_bool():
    return random.choice([True, False])

# 生成随机测试数据
def generate_test_data(signal, length):
    data = []
    for _ in range(length):
        if type(signal.value_range[0]) == int:
            data.append(generate_random_int(signal.value_range))
        elif type(signal.value_range[0]) == float:
            data.append(generate_random_float(signal.value_range))
        elif type(signal.value_range[0]) == bool:
            data.append(generate_random_bool())
        else:
            pass
    return data

def generate_signal_sequence(signal, length, jump_probability=0.2):
    # 生成一个特定信号的时间序列，包含信号随机跳变
    sequence = []
    current_value = generate_test_data(signal, length)
    # print(generate_test_data(signal, length))
    for _ in range(length):
        if random.random() < jump_probability:    # 信号随机跳变
            current_value = generate_test_data(signal, length)
        sequence.append(current_value)
    return sequence

# 测试模拟数据
def simulate_test_data(num_samples=100, sequence_length=10):
    test_data = []
    signal_list = [random.choice(signals) for _ in range(num_samples)]

    for _ in range(num_samples):
        data_point = {}
        for signal in signal_list:
            sequence = generate_signal_sequence(signal, sequence_length)
            # print(sequence)
            data_point[signal.name] = sequence
        test_data.append(data_point)
    
    return test_data

# 单测：输出构造的信号
simulate_test_data(4, 3)

### 二、模型的构建及运行

#### 2.1、 模型构建（强化学习生成策略）
采用强化学习的方法，模型可以通过不断尝试生成不同的策略来优化其有效性和完备性。
- **状态空间**：当前配置的状态（如信号的状态，算子配置等）。
- **动作空间**：生成或修改一条新的策略规则。
- **奖励机制**：对每个生成的策略进行模拟测试，评价其完备性和有效性，给予对应的奖励。


In [54]:
int(True) == int(False)

False

In [57]:
def check_condition(data_point, condition):
    # print(data_point, condition)
    """检查单个数据点是否满足条件，例如 'A=2'"""
    signal, expected_value = condition.split("=")
    try:
        res = 0.0
        total = 0.0
        for point_list in data_point.get(signal):
            total += len(point_list)
            
            for signal_value in point_list:
                if (int(signal_value) == int(expected_value)):
                    res += 1.0
    except:
        print(condition, signal_value, expected_value)
    return res / total

def calculate_coverage(strategy, test_data):
    """计算策略的覆盖率，即策略在测试数据中触发的比例"""
    covered_points = sum(
        all(check_condition(data_point, rule["condition"]) for rule in strategy)
        for data_point in test_data
    )
    return covered_points / len(test_data)

# 定义奖励函数（以策略覆盖率作为奖励）
def reward_function(strategy, test_data):
    coverage = calculate_coverage(strategy, test_data)
    return coverage


### 2.2. 模型运行测试和优化
- **模型测试覆盖率评估**：计算生成策略在测试数据中的触发情况和覆盖率。

- **策略优化**：根据测试结果，通过调整生成策略的模型参数，逐步优化生成的策略，提高其完备性和有效性。

In [58]:
from tqdm import tqdm

# 强化学习策略生成
def generate_strategy(strategy_nums=10):
    strategy = []
    for _ in range(random.randint(1, 10)): # 随机生成1到10条策略规则
        signal = random.choice([random.choice(signals) for _ in range(strategy_nums)])
        operator = random.choice([random.choice(operators) for _ in range(strategy_nums)])
        # print(operators)
        condition = f"{signal.name}={random.randint(1, 5)}" 
        # 假设条件为A~C信号值的简单比较
        if type(signal.value_range[0]) == int:
            condition = f"{signal.name}={generate_random_int(signal.value_range)}" 
        elif type(signal.value_range[0]) == float:
            condition = f"{signal.name}={generate_random_float(signal.value_range)}"
        elif type(signal.value_range[0]) == bool:
            condition = f"{signal.name}={generate_random_bool()}"
        else:
            pass
        strategy.append({
            "operator": operator.name,
            "condition": condition
        })
    return strategy

# 评估策略
def evaluate_strategy(strategy):
    test_data = simulate_test_data(10, 10)
    # print(test_data)
    return reward_function(strategy, test_data)

# main 模型函数入口
best_strategy = {}
best_reward = -float('inf')
for _ in tqdm(range(2)): # 迭代2024次
    strategy = generate_strategy(3)
    reward = evaluate_strategy(strategy)
    # print(f"策略：{strategy} ---> 覆盖率分数（奖励函数输出）{reward}")
    if reward > best_reward:
        best_strategy = strategy
        best_reward = reward

# 输出最佳策略
print("Best Strategy:", json.dumps(best_strategy, indent=4))
print("Best Reward:", best_reward)

100%|██████████| 2/2 [00:00<00:00, 413.29it/s]

B=4.858653588784955 2.506385908016442 4.858653588784955
B=4.858653588784955 9.939266348258881 4.858653588784955
B=4.858653588784955 1.6383849454037813 4.858653588784955
B=4.858653588784955 6.18283963781244 4.858653588784955
B=4.858653588784955 3.758815795454684 4.858653588784955
B=4.858653588784955 3.931437148570006 4.858653588784955
B=4.858653588784955 6.072572818239649 4.858653588784955
B=4.858653588784955 5.542116175016777 4.858653588784955
B=4.858653588784955 6.849884402837309 4.858653588784955
B=4.858653588784955 2.382751023789883 4.858653588784955
C=False False False
C=False False False
C=False True False
C=False True False
C=False False False
C=False False False
C=False True False
C=False True False
C=False False False
C=False False False
Best Strategy: [
    {
        "operator": "PatternWindow",
        "condition": "B=4.858653588784955"
    },
    {
        "operator": "HoppingWindow",
        "condition": "C=True"
    }
]
Best Reward: 0.0





### 三、总结

通过上述强化学习的方式，可以生成并动态优化数据采集策略，确保在各种测试数据场景下的策略有效性和完备性，避免穷举法带来的时间复杂度。</br>
后续优化方向有如下三点：
- 优化数据采集的策略，可以引入遗传算法或更复杂的强化学习模型（如深度Q学习：DQN）自动学习最优的数据采集行为；
- 优化策略生成函数generate_strategy，更符合模拟信号策略的真实数据，确保生成的策略在实际应用中的有效性；
- 优化奖励函数reward_function，更精确地衡量策略的完备性。
