In [1]:
# Time Synchronization Simulation for IEEE 802.1AS in IEC/IEEE 60802
# Based on analysis of McCall et al. documents (2021-2022)

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import random
import os
import pandas as pd
import seaborn as sns
from matplotlib.ticker import PercentFormatter

@dataclass
class SimulationParameters:
    """时间同步仿真的参数"""
    # 网络配置
    num_hops: int = 100  # 链中的跳数
    num_runs: int = 10000  # 蒙特卡洛运行次数
    
    # 时钟特性
    gm_clock_drift_max: float = 1.5  # 最大GM时钟漂移（ppm/s）
    gm_clock_drift_min: float = -1.5  # 最小GM时钟漂移（ppm/s）
    gm_clock_drift_fraction: float = 0.8  # 具有漂移的GM节点比例
    
    clock_drift_max: float = 1.5  # 最大非GM时钟漂移（ppm/s）
    clock_drift_min: float = -1.5  # 最小非GM时钟漂移（ppm/s）
    clock_drift_fraction: float = 0.8  # 具有漂移的非GM节点比例
    
    # 时间戳误差特性
    tsge_tx: float = 4.0  # TX时间戳粒度误差（±ns）
    tsge_rx: float = 4.0  # RX时间戳粒度误差（±ns）
    dtse_tx: float = 4.0  # TX动态时间戳误差（±ns）
    dtse_rx: float = 4.0  # RX动态时间戳误差（±ns）
    
    # 消息间隔
    pdelay_interval: float = 125.0  # pDelay消息间隔（ms）
    sync_interval: float = 125.0  # 同步消息间隔（ms）
    pdelay_turnaround: float = 10.0  # pDelay响应时间（ms）
    residence_time: float = 10.0  # 节点内驻留时间（ms）
    
    # 校正因子
    mean_link_delay_correction: float = 0.98  # 平均链路延迟平均的有效性
    nrr_drift_correction: float = 0.90  # NRR漂移校正有效性
    rr_drift_correction: float = 0.90  # RR漂移校正有效性
    pdelayresp_sync_correction: float = 0.0  # pDelay响应到同步的对齐因子
    
    # NRR平滑参数
    mnrr_smoothing_n: int = 3  # 使用的先前pDelayResp数量
    mnrr_smoothing_m: int = 1  # 用于中值计算（在推荐设置中未使用）

@dataclass
class NodeState:
    """链中节点的状态"""
    # 时钟相关状态
    clock_drift: float = 0.0  # 时钟漂移率（ppm/s）
    
    # 时间戳误差
    t1_pderror: float = 0.0  # pDelay请求的TX时间戳误差
    t2_pderror: float = 0.0  # pDelay请求的RX时间戳误差
    t3_pderror: float = 0.0  # pDelay响应的TX时间戳误差
    t4_pderror: float = 0.0  # pDelay响应的RX时间戳误差
    t3_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t3误差
    t4_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t4误差
    
    t2_sinerror: float = 0.0  # 同步的RX时间戳误差
    t1_souterror: float = 0.0  # 同步的TX时间戳误差
    
    # 误差累积
    mnrr_error: float = 0.0  # 邻居速率比误差
    mnrr_error_ts: float = 0.0  # 由时间戳误差导致的NRR误差
    mnrr_error_cd: float = 0.0  # 由时钟漂移导致的NRR误差
    
    rr_error: float = 0.0  # 速率比误差
    rr_error_sum: float = 0.0  # 累积的RR误差
    
    mean_link_delay_error: float = 0.0  # 链路延迟测量误差
    residence_time_error: float = 0.0  # 驻留时间测量误差
    
    dte: float = 0.0  # 该节点的动态时间误差

class TimeSyncSimulation:
    """IEEE 802.1AS 在 IEC/IEEE 60802 中的时间同步仿真"""
    
    def __init__(self, params: SimulationParameters):
        self.params = params
        self.results = {
            'dte_max': [],            # 所有运行中的最大DTE
            'dte_7sigma': [],         # DTE的7-sigma值
            'dte_per_hop': np.zeros((params.num_runs, params.num_hops))  # 每次运行中每个跳的DTE
        }
        
        # 创建输出目录
        self.output_data_dir = 'output_data'
        self.output_image_dir = 'output_image'
        os.makedirs(self.output_data_dir, exist_ok=True)
        os.makedirs(self.output_image_dir, exist_ok=True)
    
    def generate_timestamp_error(self, is_tx: bool) -> float:
        """根据参数生成随机时间戳误差"""
        if is_tx:
            tsge = np.random.uniform(-self.params.tsge_tx, self.params.tsge_tx)
            dtse = np.random.uniform(-self.params.dtse_tx, self.params.dtse_tx)
        else:
            tsge = np.random.uniform(-self.params.tsge_rx, self.params.tsge_rx)
            dtse = np.random.uniform(-self.params.dtse_rx, self.params.dtse_rx)
        return tsge + dtse
    
    def generate_clock_drift(self, is_gm: bool) -> float:
        """根据参数生成随机时钟漂移"""
        if is_gm:
            if np.random.random() <= self.params.gm_clock_drift_fraction:
                return np.random.uniform(self.params.gm_clock_drift_min, self.params.gm_clock_drift_max)
            return 0.0
        else:
            if np.random.random() <= self.params.clock_drift_fraction:
                return np.random.uniform(self.params.clock_drift_min, self.params.clock_drift_max)
            return 0.0

    def generate_pdelay_interval(self) -> float:
        """在规格范围内生成随机pDelay间隔"""
        return np.random.uniform(0.9 * self.params.pdelay_interval, 
                                1.3 * self.params.pdelay_interval)
        
    def run_simulation(self):
        """运行时间同步仿真"""
        for run in range(self.params.num_runs):
            # 为新的运行重置
            nodes = [NodeState() for _ in range(self.params.num_hops + 1)]  # +1 为GM
            
            # 为所有节点生成时钟漂移
            nodes[0].clock_drift = self.generate_clock_drift(is_gm=True)  # GM
            for i in range(1, self.params.num_hops + 1):
                nodes[i].clock_drift = self.generate_clock_drift(is_gm=False)
            
            # 计算所有跳的误差
            dte = 0.0
            for hop in range(1, self.params.num_hops + 1):
                # 生成时间戳误差
                nodes[hop].t1_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t3_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t4_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t1_souterror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_sinerror = self.generate_timestamp_error(is_tx=False)
                
                # 为NRR计算生成先前的时间戳
                for n in range(1, self.params.mnrr_smoothing_n):
                    nodes[hop].t3_pderror_prev.append(self.generate_timestamp_error(is_tx=True))
                    nodes[hop].t4_pderror_prev.append(self.generate_timestamp_error(is_tx=False))
                
                # 计算NRR误差组件
                self.calculate_mnrr_errors(nodes, hop)
                
                # 计算RR误差
                if hop == 1:
                    nodes[hop].rr_error = nodes[hop].mnrr_error
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                else:
                    # 添加由NRR测量到同步之间的时钟漂移引起的RR误差
                    pdelay_to_sync = np.random.uniform(0, self.params.pdelay_interval) * (1.0 - self.params.pdelayresp_sync_correction)
                    rr_error_cd_nrr2sync = (pdelay_to_sync * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / 1000) * (1.0 - self.params.nrr_drift_correction)
                    
                    # 添加由上游RR计算到同步之间的时钟漂移引起的RR误差
                    rr_error_cd_rr2sync = (self.params.residence_time * (nodes[hop-1].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    # 累积RR误差
                    nodes[hop].rr_error = nodes[hop-1].rr_error + nodes[hop].mnrr_error + rr_error_cd_nrr2sync + rr_error_cd_rr2sync
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                
                # 计算平均链路延迟误差
                pdelay_error_ts = (nodes[hop].t4_pderror - nodes[hop].t1_pderror - nodes[hop].t3_pderror + nodes[hop].t2_pderror) / 2
                pdelay_error_ts *= (1.0 - self.params.mean_link_delay_correction)
                
                pdelay_error_nrr = -self.params.pdelay_turnaround * nodes[hop].mnrr_error / 2
                pdelay_error_nrr *= (1.0 - self.params.mean_link_delay_correction)
                
                nodes[hop].mean_link_delay_error = pdelay_error_ts + pdelay_error_nrr
                
                # 计算驻留时间误差或终端站误差
                if hop < self.params.num_hops:  # 不是最后一跳
                    # 驻留时间误差组件
                    rt_error_ts_direct = nodes[hop].t1_souterror - nodes[hop].t2_sinerror
                    rt_error_rr = self.params.residence_time * nodes[hop].rr_error
                    rt_error_cd_direct = (self.params.residence_time**2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / (2 * 1000)) * (1.0 - self.params.rr_drift_correction)
                    
                    nodes[hop].residence_time_error = rt_error_ts_direct + rt_error_rr + rt_error_cd_direct
                    nodes[hop].dte = dte + nodes[hop].mean_link_delay_error + nodes[hop].residence_time_error
                else:  # 最后一跳（终端站）
                    # 终端站误差组件
                    sync_interval = self.params.sync_interval
                    es_error_rr = sync_interval * nodes[hop].rr_error
                    es_error_cd_direct = (sync_interval/2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    end_station_error = es_error_rr + es_error_cd_direct
                    nodes[hop].dte = dte + nodes[hop].mean_link_delay_error + end_station_error
                
                # 更新下一跳的累积DTE
                dte = nodes[hop].dte
                
                # 存储结果
                self.results['dte_per_hop'][run, hop-1] = dte
            
            # 计算完所有跳后，存储此次运行的最大DTE
            if run == 0 or abs(dte) > self.results['dte_max'][-1]:
                self.results['dte_max'].append(abs(dte))
        
        # 计算7-sigma DTE（比最大值更具统计代表性）
        for hop in range(self.params.num_hops):
            dte_at_hop = self.results['dte_per_hop'][:, hop]
            self.results['dte_7sigma'].append(np.std(dte_at_hop) * 7)
        
        # 保存数据到CSV文件
        self.save_results_to_csv()
    
    def calculate_mnrr_errors(self, nodes: List[NodeState], hop: int):
        """计算给定跳的mNRR误差组件"""
        # 基于mNRR平滑计算有效pDelay间隔
        tpdelay2pdelay = 0
        for n in range(self.params.mnrr_smoothing_n):
            tpdelay2pdelay += self.generate_pdelay_interval()
        
        # 计算由时间戳引起的mNRR误差
        if self.params.mnrr_smoothing_n > 1 and len(nodes[hop].t3_pderror_prev) >= self.params.mnrr_smoothing_n - 1:
            # 使用先前的时间戳进行NRR计算
            t3pd_diff = nodes[hop].t3_pderror - nodes[hop].t3_pderror_prev[-1]
            t4pd_diff = nodes[hop].t4_pderror - nodes[hop].t4_pderror_prev[-1]
        else:
            # 使用最近的时间戳进行默认计算
            t3pd_diff = nodes[hop].t3_pderror - 0  # 假设先前样本的误差为0（简化）
            t4pd_diff = nodes[hop].t4_pderror - 0
        
        nodes[hop].mnrr_error_ts = (t3pd_diff - t4pd_diff) / tpdelay2pdelay
        
        # 计算由时钟漂移引起的mNRR误差
        nodes[hop].mnrr_error_cd = (tpdelay2pdelay * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / (2 * 1000)) * (1.0 - self.params.nrr_drift_correction)
        
        # 总mNRR误差
        nodes[hop].mnrr_error = nodes[hop].mnrr_error_ts + nodes[hop].mnrr_error_cd
    
    def save_results_to_csv(self):
        """将所有节点的时间误差结果保存到CSV文件中"""
        # 创建一个包含所有跳的DTE数据的DataFrame
        all_dte_data = {}
        for hop in range(1, self.params.num_hops + 1):
            hop_data = self.results['dte_per_hop'][:, hop-1]
            all_dte_data[f'Hop_{hop}'] = hop_data
        
        df = pd.DataFrame(all_dte_data)
        
        # 保存到CSV文件
        df.to_csv(os.path.join(self.output_data_dir, 'dte_all_hops.csv'), index=False)
        
        # 保存7-sigma和统计数据
        stats_data = {
            'Hop': list(range(1, self.params.num_hops + 1)),
            'DTE_7sigma': self.results['dte_7sigma'],
            'DTE_Mean': [np.mean(self.results['dte_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'DTE_Std': [np.std(self.results['dte_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'DTE_Min': [np.min(self.results['dte_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'DTE_Max': [np.max(self.results['dte_per_hop'][:, i]) for i in range(self.params.num_hops)]
        }
        stats_df = pd.DataFrame(stats_data)
        stats_df.to_csv(os.path.join(self.output_data_dir, 'dte_statistics.csv'), index=False)
    
    def plot_results(self):
        """绘制仿真结果并保存图像"""
        # 1. 绘制最终跳的DTE分布
        self.plot_final_hop_distribution()
        
        # 2. 绘制DTE随跳数的增长
        self.plot_dte_growth()
        
        # 3. 绘制1-7跳的时间误差数据
        self.plot_first_seven_hops()
        
        # 4. 绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图
        self.plot_specific_hops_line_and_cdf()
    
    def plot_final_hop_distribution(self):
        """绘制最终跳的DTE分布"""
        plt.figure(figsize=(10, 6))
        final_hop_dte = self.results['dte_per_hop'][:, -1]
        plt.hist(final_hop_dte, bins=50, alpha=0.7)
        plt.axvline(x=self.results['dte_7sigma'][-1], color='r', linestyle='--', 
                   label=f'7σ: {self.results["dte_7sigma"][-1]:.1f} ns')
        plt.axvline(x=-self.results['dte_7sigma'][-1], color='r', linestyle='--')
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Dynamic Time Error (ns)')
        plt.ylabel('Count')
        plt.title(f'DTE Distribution at Hop {self.params.num_hops}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'final_hop_dte_distribution.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_dte_growth(self):
        """绘制DTE随跳数的增长"""
        plt.figure(figsize=(10, 6))
        hops = np.arange(1, self.params.num_hops + 1)
        plt.plot(hops, self.results['dte_7sigma'], 'b-', label='7σ DTE')
        plt.axhline(y=1000, color='g', linestyle=':', label='±1μs target')
        plt.xlabel('Hop Number')
        plt.ylabel('Dynamic Time Error (ns)')
        plt.title('DTE Growth Across Hops (7σ values)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'dte_growth_across_hops.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_first_seven_hops(self):
        """绘制1-7跳的时间误差数据"""
        plt.figure(figsize=(12, 8))
        
        # 绘制箱形图
        first_seven_hops_data = [self.results['dte_per_hop'][:, i] for i in range(7)]
        plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])
        plt.xlabel('Hop Number')
        plt.ylabel('Dynamic Time Error (ns)')
        plt.title('DTE Distribution for First 7 Hops')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_dte.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # 绘制小提琴图
        plt.figure(figsize=(12, 8))
        first_seven_df = pd.DataFrame({f'Hop {i+1}': self.results['dte_per_hop'][:, i] for i in range(7)})
        sns.violinplot(data=first_seven_df)
        plt.xlabel('Hop Number')
        plt.ylabel('Dynamic Time Error (ns)')
        plt.title('DTE Distribution for First 7 Hops (Violin Plot)')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_dte_violin.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_specific_hops_line_and_cdf(self):
        """绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图"""
        specific_hops = [10, 25, 50, 75, 100]
        specific_hops = [h for h in specific_hops if h <= self.params.num_hops]
        
        # 折线图 - 每次运行的DTE随时间的变化
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            # 选择100次运行进行可视化，否则图会太乱
            sample_runs = np.random.choice(self.params.num_runs, size=min(100, self.params.num_runs), replace=False)
            for run in sample_runs:
                if hop == specific_hops[0]:  # 只对第一个跳添加标签，避免重复
                    plt.plot(self.results['dte_per_hop'][run, :hop], alpha=0.1, color=f'C{specific_hops.index(hop)}')
            
            # 绘制平均值线
            mean_values = np.mean(self.results['dte_per_hop'][:, :hop], axis=0)
            plt.plot(mean_values, linewidth=2, label=f'Hop {hop} (avg)', color=f'C{specific_hops.index(hop)}')
        
        plt.xlabel('Hop Number')
        plt.ylabel('Dynamic Time Error (ns)')
        plt.title('DTE Development for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_dte_line.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # CDF图 - 最终DTE的累积分布
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            hop_data = self.results['dte_per_hop'][:, hop-1]
            sorted_data = np.sort(hop_data)
            # 计算每个值的CDF
            cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
            plt.plot(sorted_data, cdf, label=f'Hop {hop}')
        
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Dynamic Time Error (ns)')
        plt.ylabel('Cumulative Probability')
        plt.title('CDF of DTE for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_dte_cdf.png'), dpi=300, bbox_inches='tight')
        plt.close()

# 使用推荐参数运行仿真
def main():
    # 使用推荐设置创建参数
    params = SimulationParameters(
        num_hops=100,
        num_runs=1000,  # 为演示减少
        
        # 时钟特性
        gm_clock_drift_max=1,
        gm_clock_drift_min=-1,
        gm_clock_drift_fraction=0.8,
        clock_drift_max=1,
        clock_drift_min=-1,
        clock_drift_fraction=0.8,
        
        # 时间戳误差
        tsge_tx=4.0,
        tsge_rx=4.0,
        dtse_tx=4.0,
        dtse_rx=4.0,
        
        # 消息间隔
        pdelay_interval=125.0,
        sync_interval=125.0,
        pdelay_turnaround=0.0,
        residence_time=1.0,
        
        # 校正因子 - 推荐设置
        mean_link_delay_correction=0.98,
        nrr_drift_correction=0.90,
        rr_drift_correction=0.90,
        pdelayresp_sync_correction=0.0,
        mnrr_smoothing_n=3,
        mnrr_smoothing_m=1
    )
    
    # 创建并运行仿真
    sim = TimeSyncSimulation(params)
    print("Running simulation with recommended parameters...")
    sim.run_simulation()
    
    # 输出结果
    max_dte = max(sim.results['dte_max'])
    final_7sigma = sim.results['dte_7sigma'][-1]
    
    print(f"Simulation complete!")
    print(f"Maximum DTE: {max_dte:.1f} ns")
    print(f"7-sigma DTE at hop {params.num_hops}: {final_7sigma:.1f} ns")
    print(f"Target (<1000 ns): {'PASSED' if final_7sigma < 1000 else 'FAILED'}")
    
    # 绘制结果
    print("Generating plots...")
    sim.plot_results()
    print(f"Results saved to '{sim.output_data_dir}' and '{sim.output_image_dir}' directories.")

if __name__ == "__main__":
    main()

Running simulation with recommended parameters...
Simulation complete!
Maximum DTE: 158.9 ns
7-sigma DTE at hop 100: 380.7 ns
Target (<1000 ns): PASSED
Generating plots...


  plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])


Results saved to 'output_data' and 'output_image' directories.


In [4]:
import numpy as np
import matplotlib.pyplot as plt
import os
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import random
from matplotlib.ticker import PercentFormatter

@dataclass
class SimulationParameters:
    """Parameters for the time synchronization simulation"""
    # Network configuration
    num_hops: int = 100  # Number of hops in the chain
    num_runs: int = 10000  # Number of Monte Carlo runs
    
    # Clock characteristics
    gm_clock_drift_max: float = 1.5  # Maximum GM clock drift in ppm/s
    gm_clock_drift_min: float = -1.5  # Minimum GM clock drift in ppm/s
    gm_clock_drift_fraction: float = 0.8  # Fraction of GM nodes with drift
    
    clock_drift_max: float = 1.5  # Maximum non-GM clock drift in ppm/s
    clock_drift_min: float = -1.5  # Minimum non-GM clock drift in ppm/s
    clock_drift_fraction: float = 0.8  # Fraction of non-GM nodes with drift
    
    # Temperature model parameters
    temp_max: float = 85.0  # Maximum temperature (°C)
    temp_min: float = -40.0  # Minimum temperature (°C)
    temp_ramp_rate: float = 1.0  # Temperature change rate (°C/s)
    temp_hold_period: float = 30.0  # Hold time at temperature extremes (s)
    gm_scaling_factor: float = 1.0  # GM drift scaling factor
    non_gm_scaling_factor: float = 1.0  # Non-GM drift scaling factor
    
    # Timestamp error characteristics
    tsge_tx: float = 4.0  # Timestamp Granularity Error for TX (±ns)
    tsge_rx: float = 4.0  # Timestamp Granularity Error for RX (±ns)
    dtse_tx: float = 4.0  # Dynamic Timestamp Error for TX (±ns)
    dtse_rx: float = 4.0  # Dynamic Timestamp Error for RX (±ns)
    
    # Message intervals
    pdelay_interval: float = 125.0  # pDelay message interval (ms)
    sync_interval: float = 125.0  # Sync message interval (ms)
    pdelay_turnaround: float = 10.0  # pDelay response time (ms)
    residence_time: float = 10.0  # Residence time within a node (ms)
    
    # Correction factors
    mean_link_delay_correction: float = 0.0  # Effectiveness of Mean Link Delay averaging
    nrr_drift_correction: float = 0.0  # NRR drift correction effectiveness
    rr_drift_correction: float = 0.0  # RR drift correction effectiveness
    pdelayresp_sync_correction: float = 0.0  # pDelay response-to-sync alignment factor
    
    # NRR smoothing parameters
    mnrr_smoothing_n: int = 1  # Number of previous pDelayResp to use
    mnrr_smoothing_m: int = 1  # For median calculation (not used in recommended settings)
    
    # Use temperature-based model or simple uniform model
    use_temperature_model: bool = True

@dataclass
class NodeState:
    """State of a node in the chain"""
    # Clock-related state
    clock_drift: float = 0.0  # Clock drift rate in ppm/s
    
    # Timestamp errors
    t1_pderror: float = 0.0  # TX timestamp error for pDelay request
    t2_pderror: float = 0.0  # RX timestamp error for pDelay request
    t3_pderror: float = 0.0  # TX timestamp error for pDelay response
    t4_pderror: float = 0.0  # RX timestamp error for pDelay response
    t3_pderror_prev: List[float] = field(default_factory=list)  # Previous t3 errors for NRR calculation
    t4_pderror_prev: List[float] = field(default_factory=list)  # Previous t4 errors for NRR calculation
    
    t2_sinerror: float = 0.0  # RX timestamp error for Sync
    t1_souterror: float = 0.0  # TX timestamp error for Sync
    
    # Error accumulation
    mnrr_error: float = 0.0  # Neighbor Rate Ratio error
    mnrr_error_ts: float = 0.0  # NRR error due to timestamp errors
    mnrr_error_cd: float = 0.0  # NRR error due to clock drift
    
    rr_error: float = 0.0  # Rate Ratio error
    rr_error_sum: float = 0.0  # Accumulated RR error
    rr_error_components: Dict[str, float] = field(default_factory=dict)  # Components of RR error
    
    mean_link_delay_error: float = 0.0  # Link delay measurement error
    mean_link_delay_components: Dict[str, float] = field(default_factory=dict)  # Components of MLD error
    
    residence_time_error: float = 0.0  # Residence time measurement error
    residence_time_components: Dict[str, float] = field(default_factory=dict)  # Components of RT error
    
    end_station_error: float = 0.0  # End station error (only for last hop)
    end_station_components: Dict[str, float] = field(default_factory=dict)  # Components of ES error
    
    te: float = 0.0  # Dynamic Time Error at this node

class TimeSyncSimulation:
    """Time synchronization simulation for IEEE 802.1AS in IEC/IEEE 60802"""
    
    def __init__(self, params: SimulationParameters):
        self.params = params
        self.results = {
            'te_max': [],            # Maximum te across all runs
            'te_7sigma': [],         # 7-sigma value of te
            'te_per_hop': np.zeros((params.num_runs, params.num_hops))  # DTE at each hop for each run
        }
        
        # Create output directories if they don't exist
        os.makedirs('output_data', exist_ok=True)
        os.makedirs('output_image', exist_ok=True)
    
    def generate_timestamp_error(self, is_tx: bool) -> float:
        """Generate a random timestamp error based on parameters"""
        if is_tx:
            tsge = np.random.uniform(-self.params.tsge_tx, self.params.tsge_tx)
            dtse = np.random.uniform(-self.params.dtse_tx, self.params.dtse_tx)
        else:
            tsge = np.random.uniform(-self.params.tsge_rx, self.params.tsge_rx)
            dtse = np.random.uniform(-self.params.dtse_rx, self.params.dtse_rx)
        return tsge + dtse
    
    def generate_clock_drift(self, is_gm: bool) -> float:
        """Generate random clock drift based on parameters"""
        if self.params.use_temperature_model:
            return self.generate_clock_drift_temperature_based(is_gm)
        else:
            # Simple uniform distribution model
            if is_gm:
                if np.random.random() <= self.params.gm_clock_drift_fraction:
                    return np.random.uniform(self.params.gm_clock_drift_min, self.params.gm_clock_drift_max)
                return 0.0
            else:
                if np.random.random() <= self.params.clock_drift_fraction:
                    return np.random.uniform(self.params.clock_drift_min, self.params.clock_drift_max)
                return 0.0
                
    def generate_clock_drift_temperature_based(self, is_gm: bool) -> float:
        """Generate clock drift based on temperature model"""
        # Determine if this node should have drift
        if (is_gm and np.random.random() > self.params.gm_clock_drift_fraction) or \
           (not is_gm and np.random.random() > self.params.clock_drift_fraction):
            return 0.0
        
        # Calculate temperature cycle parameters
        temp_cycle_period = ((self.params.temp_max - self.params.temp_min) / self.params.temp_ramp_rate) * 2 + \
                            2 * self.params.temp_hold_period
        
        # Random point in temperature cycle
        t = np.random.uniform(0, temp_cycle_period)
        
        # Find section boundaries
        section_a = (self.params.temp_max - self.params.temp_min) / self.params.temp_ramp_rate
        section_b = section_a + self.params.temp_hold_period
        section_c = section_b + section_a
        
        # Calculate temperature and temperature rate of change
        if t < section_a:
            # Ramp up
            temp_xo = self.params.temp_min + self.params.temp_ramp_rate * t
            temp_roc = self.params.temp_ramp_rate
        elif t < section_b:
            # Hold at max
            temp_xo = self.params.temp_max
            temp_roc = 0
        elif t < section_c:
            # Ramp down
            temp_xo = self.params.temp_max - self.params.temp_ramp_rate * (t - section_b)
            temp_roc = -self.params.temp_ramp_rate
        else:
            # Hold at min
            temp_xo = self.params.temp_min
            temp_roc = 0
        
        # Cubic model constants
        a, b, c, d = 0.00012, -0.01005, -0.0305, 5.73845
        
        # Calculate clock drift
        clock_drift = (3 * a * temp_xo**2 + 2 * b * temp_xo + c) * temp_roc
        
        # Apply scaling factor
        if is_gm:
            clock_drift *= self.params.gm_scaling_factor
        else:
            clock_drift *= self.params.non_gm_scaling_factor
            
        return clock_drift

    def generate_pdelay_interval(self) -> float:
        """Generate random pDelay interval within spec"""
        return np.random.uniform(0.9 * self.params.pdelay_interval, 
                                1.3 * self.params.pdelay_interval)
        
    def run_simulation(self):
        """Run the time sync simulation"""
        for run in range(self.params.num_runs):
            if run % 1000 == 0:
                print(f"Running simulation {run}/{self.params.num_runs}...")
                
            # Reset for new run
            nodes = [NodeState() for _ in range(self.params.num_hops + 1)]  # +1 for GM
            
            # Generate clock drifts for all nodes
            nodes[0].clock_drift = self.generate_clock_drift(is_gm=True)  # GM
            for i in range(1, self.params.num_hops + 1):
                nodes[i].clock_drift = self.generate_clock_drift(is_gm=False)
            
            # Calculate errors across all hops
            te = 0.0
            for hop in range(1, self.params.num_hops + 1):
                # Generate timestamp errors
                nodes[hop].t1_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t3_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t4_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t1_souterror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_sinerror = self.generate_timestamp_error(is_tx=False)
                
                # Generate previous timestamps for NRR calculation
                for n in range(1, self.params.mnrr_smoothing_n):
                    nodes[hop].t3_pderror_prev.append(self.generate_timestamp_error(is_tx=True))
                    nodes[hop].t4_pderror_prev.append(self.generate_timestamp_error(is_tx=False))
                
                # Calculate NRR error components
                self.calculate_mnrr_errors(nodes, hop)
                
                # Calculate RR error
                self.calculate_rr_error(nodes, hop)
                
                # Calculate Mean Link Delay error
                self.calculate_mean_link_delay_error(nodes, hop)
                
                # Calculate Residence Time error or End Station error
                if hop < self.params.num_hops:  # Not the last hop
                    self.calculate_residence_time_error(nodes, hop)
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + nodes[hop].residence_time_error
                else:  # Last hop (End Station)
                    self.calculate_end_station_error(nodes, hop)
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + nodes[hop].end_station_error
                
                # Update accumulated te for next hop
                te = nodes[hop].te
                
                # Store results
                self.results['te_per_hop'][run, hop-1] = te
            
        # Calculate statistics
        for hop in range(self.params.num_hops):
            te_at_hop = self.results['te_per_hop'][:, hop]
            max_abs_te = np.max(np.abs(te_at_hop))
            self.results['te_max'].append(max_abs_te)
            self.results['te_7sigma'].append(np.std(te_at_hop) * 7)
        
        # Save results to CSV
        self.save_results_to_csv()
    
    def calculate_mnrr_errors(self, nodes: List[NodeState], hop: int):
        """Calculate mNRR error components for a given hop"""
        # Calculate effective pDelay interval based on mNRR smoothing
        tpdelay2pdelay = 0
        for n in range(self.params.mnrr_smoothing_n):
            tpdelay2pdelay += self.generate_pdelay_interval()
        
        # Calculate timestamp-induced mNRR error
        if self.params.mnrr_smoothing_n > 1 and len(nodes[hop].t3_pderror_prev) >= self.params.mnrr_smoothing_n - 1:
            # Use previous timestamps for NRR calculation
            t3pd_diff = nodes[hop].t3_pderror - nodes[hop].t3_pderror_prev[-1]
            t4pd_diff = nodes[hop].t4_pderror - nodes[hop].t4_pderror_prev[-1]
        else:
            # Default calculation with most recent timestamps
            t3pd_diff = nodes[hop].t3_pderror - 0  # Assuming previous sample has 0 error (simplified)
            t4pd_diff = nodes[hop].t4_pderror - 0
        
        nodes[hop].mnrr_error_ts = (t3pd_diff - t4pd_diff) / tpdelay2pdelay
        
        # Calculate clock-drift-induced mNRR error
        nodes[hop].mnrr_error_cd = (tpdelay2pdelay * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / (2 * 1000)) * (1.0 - self.params.nrr_drift_correction)
        
        # Total mNRR error
        nodes[hop].mnrr_error = nodes[hop].mnrr_error_ts + nodes[hop].mnrr_error_cd
    
    def calculate_rr_error(self, nodes: List[NodeState], hop: int):
        """Calculate RR error components with improved model"""
        if hop == 1:
            # First hop RR error is just the NRR error
            nodes[hop].rr_error = nodes[hop].mnrr_error
            nodes[hop].rr_error_components = {
                'mnrr_ts': nodes[hop].mnrr_error_ts,
                'mnrr_cd': nodes[hop].mnrr_error_cd,
                'cd_direct': 0.0,
                'gm_impact': 0.0
            }
        else:
            # Special handling of GM clock drift impact
            gm_impact = 0
            for h in range(1, hop):
                gm_impact += self.params.residence_time * (nodes[0].clock_drift - nodes[h].clock_drift) / 1000 * (1.0 - self.params.rr_drift_correction)
            
            # Clock drift between NRR measurement and Sync
            pdelay_to_sync = np.random.uniform(0, self.params.pdelay_interval) * (1.0 - self.params.pdelayresp_sync_correction)
            cd_direct = (pdelay_to_sync * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / 1000) * (1.0 - self.params.nrr_drift_correction)
            
            # Calculate accumulated RR error
            nodes[hop].rr_error = nodes[hop-1].rr_error + nodes[hop].mnrr_error + cd_direct + gm_impact
            
            # Store components for analysis
            nodes[hop].rr_error_components = {
                'mnrr_ts': nodes[hop].mnrr_error_ts,
                'mnrr_cd': nodes[hop].mnrr_error_cd,
                'cd_direct': cd_direct,
                'gm_impact': gm_impact,
                'upstream_rr': nodes[hop-1].rr_error
            }
    
    def calculate_mean_link_delay_error(self, nodes: List[NodeState], hop: int):
        """Calculate Mean Link Delay error components"""
        # Timestamp error component
        pdelay_error_ts = (nodes[hop].t4_pderror - nodes[hop].t1_pderror - nodes[hop].t3_pderror + nodes[hop].t2_pderror) / 2
        pdelay_error_ts *= (1.0 - self.params.mean_link_delay_correction)
        
        # NRR error component
        pdelay_error_nrr = -self.params.pdelay_turnaround * nodes[hop].mnrr_error / 2
        pdelay_error_nrr *= (1.0 - self.params.mean_link_delay_correction)
        
        # Combined error
        nodes[hop].mean_link_delay_error = pdelay_error_ts + pdelay_error_nrr
        
        # Store components
        nodes[hop].mean_link_delay_components = {
            'ts_direct': pdelay_error_ts,
            'nrr': pdelay_error_nrr
        }
    
    def calculate_residence_time_error(self, nodes: List[NodeState], hop: int):
        """Calculate Residence Time error components"""
        # Direct timestamp error
        rt_error_ts_direct = nodes[hop].t1_souterror - nodes[hop].t2_sinerror
        
        # RR-induced error
        rt_error_rr = self.params.residence_time * nodes[hop].rr_error
        
        # Clock drift direct effect
        rt_error_cd_direct = (self.params.residence_time**2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / (2 * 1000)) * (1.0 - self.params.rr_drift_correction)
        
        # Combined error
        nodes[hop].residence_time_error = rt_error_ts_direct + rt_error_rr + rt_error_cd_direct
        
        # Store components
        nodes[hop].residence_time_components = {
            'ts_direct': rt_error_ts_direct,
            'rr': rt_error_rr,
            'cd_direct': rt_error_cd_direct
        }
    
    def calculate_end_station_error(self, nodes: List[NodeState], hop: int):
        """Calculate End Station error components with improved model"""
        # Use gamma distribution for sync interval
        sync_interval = np.random.gamma(270.5532, self.params.sync_interval/270.5532)
        
        # RR component
        es_error_rr = sync_interval * nodes[hop].rr_error
        
        # Clock drift direct effect
        es_error_cd_direct = (sync_interval/2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
        
        # Frequency offset effect
        a, b, c, d = 0.00012, -0.01005, -0.0305, 5.73845
        temp = np.random.uniform(self.params.temp_min, self.params.temp_max)
        freq_offset = a*temp**3 + b*temp**2 + c*temp + d
        es_error_freq_offset = sync_interval * freq_offset / 1e6
        
        # Combined error
        nodes[hop].end_station_error = es_error_rr + es_error_cd_direct + es_error_freq_offset
        
        # Store components
        nodes[hop].end_station_components = {
            'rr': es_error_rr,
            'cd_direct': es_error_cd_direct,
            'freq_offset': es_error_freq_offset
        }
    
    def save_results_to_csv(self):
        """Save simulation results to CSV files"""
        # Create DataFrame for te at each hop for each run
        te_df = pd.DataFrame(self.results['te_per_hop'])
        te_df.columns = [f'Hop_{i+1}' for i in range(self.params.num_hops)]
        
        # Save to CSV
        te_df.to_csv('output_data/te_all_runs_v2.csv', index=False)
        
        # Create summary statistics DataFrame
        summary_df = pd.DataFrame({
            'Hop': range(1, self.params.num_hops + 1),
            'Max_Abs_TE': self.results['te_max'],
            'TE_7sigma': self.results['te_7sigma'],
            'Mean_TE': [np.mean(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'StdDev_TE': [np.std(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)]
        })
        
        # Save to CSV
        summary_df.to_csv('output_data/te_summary_stats.csv', index=False)
        
        print(f"Results saved to output_data folder.")
    
    def plot_results(self):
        """Plot and save simulation results"""
        self.plot_te_distribution()
        self.plot_te_growth()
        self.plot_early_hops()
        self.plot_selected_hops_line()
        self.plot_selected_hops_cdf()
    
    def plot_te_distribution(self):
        """Plot te distribution at final hop"""
        plt.figure(figsize=(10, 6))
        
        final_hop_te = self.results['te_per_hop'][:, -1]
        plt.hist(final_hop_te, bins=50, alpha=0.7, color='royalblue')
        
        plt.axvline(x=self.results['te_7sigma'][-1], color='crimson', linestyle='--', 
                   label=f'7σ: {self.results["te_7sigma"][-1]:.1f} ns')
        plt.axvline(x=-self.results['te_7sigma'][-1], color='crimson', linestyle='--')
        
        plt.axvline(x=1000, color='forestgreen', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='forestgreen', linestyle=':')
        
        plt.xlabel('Time Error (ns)', fontsize=12)
        plt.ylabel('Count', fontsize=12)
        plt.title(f'Distribution at Hop {self.params.num_hops}', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=11)
        
        plt.tight_layout()
        plt.savefig('output_image/te_distribution_v2.png', dpi=300)
        plt.close()
    
    def plot_te_growth(self):
        """Plot te growth across all hops"""
        plt.figure(figsize=(12, 7))
        
        hops = np.arange(1, self.params.num_hops + 1)
        plt.plot(hops, self.results['te_7sigma'], 'b-', linewidth=2, label='7σ te')
        
        # Add target line
        plt.axhline(y=1000, color='forestgreen', linestyle=':', linewidth=2, label='±1μs target')
        
        # Add max line
        plt.plot(hops, self.results['te_max'], 'r--', alpha=0.6, linewidth=1.5, label='Max Abs TE')
        
        plt.xlabel('Hop Number', fontsize=12)
        plt.ylabel('Time Error (ns)', fontsize=12)
        plt.title('TE Growth Across Hops', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=11)
        
        plt.tight_layout()
        plt.savefig('output_image/te_growth_v2.png', dpi=300)
        plt.close()
    
    def plot_early_hops(self):
        """Plot te for hops 1-7"""
        plt.figure(figsize=(12, 7))
        
        # Get data for hops 1-7
        hops = np.arange(1, 8)
        te_values = self.results['te_7sigma'][:7]
        max_values = self.results['te_max'][:7]
        
        # Create plot
        plt.plot(hops, te_values, 'bo-', linewidth=2, label='7σ te')
        plt.plot(hops, max_values, 'ro--', alpha=0.6, linewidth=1.5, label='Max Abs te')
        
        for i, (te, max_val) in enumerate(zip(te_values, max_values)):
            plt.annotate(f'{te:.1f}', (hops[i], te), textcoords="offset points", 
                        xytext=(0,10), ha='center', fontsize=9)
            plt.annotate(f'{max_val:.1f}', (hops[i], max_val), textcoords="offset points", 
                        xytext=(0,10), ha='center', fontsize=9)
        
        plt.xlabel('Hop Number', fontsize=12)
        plt.ylabel('Time Error (ns)', fontsize=12)
        plt.title('TE for First 7 Hops', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=11)
        
        plt.tight_layout()
        plt.savefig('output_image/te_early_hops_v2.png', dpi=300)
        plt.close()
    
    def plot_selected_hops_line(self):
        """Plot line graph for hops 10, 25, 50, 75, 100"""
        plt.figure(figsize=(12, 7))
        
        selected_hops = [10, 25, 50, 75, 100]
        
        for hop in selected_hops:
            if hop <= self.params.num_hops:
                te_values = self.results['te_per_hop'][:, hop-1]
                plt.plot(np.sort(te_values), label=f'Hop {hop}')
        
        plt.axhline(y=1000, color='forestgreen', linestyle=':', linewidth=2, label='±1μs target')
        plt.axhline(y=-1000, color='forestgreen', linestyle=':', linewidth=2)
        
        plt.xlabel('Sorted Run Index', fontsize=12)
        plt.ylabel('Time Error (ns)', fontsize=12)
        plt.title('TE Values for Selected Hops', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=11)
        
        plt.tight_layout()
        plt.savefig('output_image/te_selected_hops_line_v2.png', dpi=300)
        plt.close()
    
    def plot_selected_hops_cdf(self):
        """Plot CDF for hops 10, 25, 50, 75, 100"""
        plt.figure(figsize=(12, 7))
        
        selected_hops = [10, 25, 50, 75, 100]
        
        for hop in selected_hops:
            if hop <= self.params.num_hops:
                te_values = self.results['te_per_hop'][:, hop-1]
                sorted_data = np.sort(te_values)
                yvals = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
                plt.plot(sorted_data, yvals, label=f'Hop {hop}')
        
        plt.axvline(x=1000, color='forestgreen', linestyle=':', linewidth=2, label='±1μs target')
        plt.axvline(x=-1000, color='forestgreen', linestyle=':', linewidth=2)
        
        plt.xlabel('Time Error (ns)', fontsize=12)
        plt.ylabel('Cumulative Probability', fontsize=12)
        plt.title('CDF of TE for Selected Hops', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(fontsize=11)
        
        # Format y-axis as percentage
        plt.gca().yaxis.set_major_formatter(PercentFormatter(1))
        
        plt.tight_layout()
        plt.savefig('output_image/te_selected_hops_cdf_v2.png', dpi=300)
        plt.close()

def main():
    # Create parameters with non-optimized settings to match Fig 5
    params = SimulationParameters(
        num_hops=100,
        num_runs=10000,  # Can adjust for faster runs during testing
        
        # Clock characteristics
        gm_clock_drift_max=1.5,
        gm_clock_drift_min=-1.5,
        gm_clock_drift_fraction=0.8,
        clock_drift_max=1.5,
        clock_drift_min=-1.5,
        clock_drift_fraction=0.8,
        
        # Temperature model parameters
        temp_max=85.0,
        temp_min=-40.0,
        temp_ramp_rate=1.0,
        temp_hold_period=30.0,
        gm_scaling_factor=1.0,
        non_gm_scaling_factor=1.0,
        use_temperature_model=True,  # Use temperature-based model
        
        # Timestamp errors
        tsge_tx=4.0,
        tsge_rx=4.0,
        dtse_tx=4.0,
        dtse_rx=4.0,
        
        # Message intervals
        pdelay_interval=125.0,
        sync_interval=125.0,
        pdelay_turnaround=0.5,
        residence_time=0.5,
        
        # Correction factors - all disabled to match Fig 5
        mean_link_delay_correction=0.0,
        nrr_drift_correction=0.0,
        rr_drift_correction=0.0,
        pdelayresp_sync_correction=0.0,
        mnrr_smoothing_n=1,
        mnrr_smoothing_m=1
    )
    
    # Create and run simulation
    sim = TimeSyncSimulation(params)
    print("Running simulation with non-optimized parameters to match Fig 5...")
    sim.run_simulation()
    
    # Output results
    max_te = max(sim.results['te_max'])
    # final_7sigma = sim.results['te_7sigma'][-1]
    
    print(f"Simulation complete!")
    print(f"Maximum TE: {max_te:.1f} ns")
    # print(f"7-sigma TE at hop {params.num_hops}: {final_7sigma:.1f} ns")
    # print(f"Target (<1000 ns): {'PASSED' if final_7sigma < 1000 else 'FAILED'}")
    
    # Plot results
    print("Generating plots...")
    sim.plot_results()
    print("Plots saved to output_image folder.")

if __name__ == "__main__":
    main()

Running simulation with non-optimized parameters to match Fig 5...
Running simulation 0/10000...
Running simulation 1000/10000...
Running simulation 2000/10000...
Running simulation 3000/10000...
Running simulation 4000/10000...
Running simulation 5000/10000...
Running simulation 6000/10000...
Running simulation 7000/10000...
Running simulation 8000/10000...
Running simulation 9000/10000...
Results saved to output_data folder.
Simulation complete!
Maximum TE: 652.6 ns
Generating plots...
Plots saved to output_image folder.


In [11]:
# Time Synchronization Simulation for IEEE 802.1AS in IEC/IEEE 60802
# Based on analysis of McCall et al. documents (2021-2022)

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import random
import os
import pandas as pd
import seaborn as sns
from matplotlib.ticker import PercentFormatter

@dataclass
class SimulationParameters:
    """时间同步仿真的参数"""
    # 网络配置
    num_hops: int = 100  # 链中的跳数
    num_runs: int = 10000  # 蒙特卡洛运行次数
    
    # 时钟特性
    gm_clock_drift_max: float = 1.5  # 最大GM时钟漂移（ppm/s）
    gm_clock_drift_min: float = -1.5  # 最小GM时钟漂移（ppm/s）
    gm_clock_drift_fraction: float = 0.8  # 具有漂移的GM节点比例
    
    clock_drift_max: float = 1.5  # 最大非GM时钟漂移（ppm/s）
    clock_drift_min: float = -1.5  # 最小非GM时钟漂移（ppm/s）
    clock_drift_fraction: float = 0.8  # 具有漂移的非GM节点比例
    
    # 时间戳误差特性
    tsge_tx: float = 4.0  # TX时间戳粒度误差（±ns）
    tsge_rx: float = 4.0  # RX时间戳粒度误差（±ns）
    dtse_tx: float = 4.0  # TX动态时间戳误差（±ns）
    dtse_rx: float = 4.0  # RX动态时间戳误差（±ns）
    
    # 消息间隔
    pdelay_interval: float = 125.0  # pDelay消息间隔（ms）
    sync_interval: float = 125.0  # 同步消息间隔（ms）
    pdelay_turnaround: float = 10.0  # pDelay响应时间（ms）
    residence_time: float = 10.0  # 节点内驻留时间（ms）
    
    # 校正因子
    mean_link_delay_correction: float = 0.98  # 平均链路延迟平均的有效性
    nrr_drift_correction: float = 0.90  # NRR漂移校正有效性
    rr_drift_correction: float = 0.90  # RR漂移校正有效性
    pdelayresp_sync_correction: float = 0.0  # pDelay响应到同步的对齐因子
    
    # NRR平滑参数
    mnrr_smoothing_n: int = 3  # 使用的先前pDelayResp数量
    mnrr_smoothing_m: int = 1  # 用于中值计算（在推荐设置中未使用）

@dataclass
class NodeState:
    """链中节点的状态"""
    # 时钟相关状态
    clock_drift: float = 0.0  # 时钟漂移率（ppm/s）
    
    # 时间戳误差
    t1_pderror: float = 0.0  # pDelay请求的TX时间戳误差
    t2_pderror: float = 0.0  # pDelay请求的RX时间戳误差
    t3_pderror: float = 0.0  # pDelay响应的TX时间戳误差
    t4_pderror: float = 0.0  # pDelay响应的RX时间戳误差
    t3_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t3误差
    t4_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t4误差
    
    t2_sinerror: float = 0.0  # 同步的RX时间戳误差
    t1_souterror: float = 0.0  # 同步的TX时间戳误差
    
    # 误差累积
    mnrr_error: float = 0.0  # 邻居速率比误差
    mnrr_error_ts: float = 0.0  # 由时间戳误差导致的NRR误差
    mnrr_error_cd: float = 0.0  # 由时钟漂移导致的NRR误差
    
    rr_error: float = 0.0  # 速率比误差
    rr_error_sum: float = 0.0  # 累积的RR误差
    
    mean_link_delay_error: float = 0.0  # 链路延迟测量误差
    residence_time_error: float = 0.0  # 驻留时间测量误差
    
    te: float = 0.0  # 该节点的动态时间误差

class TimeSyncSimulation:
    """IEEE 802.1AS 在 IEC/IEEE 60802 中的时间同步仿真"""
    
    def __init__(self, params: SimulationParameters):
        self.params = params
        self.results = {
            'te_max': [],            # 所有运行中的最大te
            'te_7sigma': [],         # te的7-sigma值
            'te_per_hop': np.zeros((params.num_runs, params.num_hops))  # 每次运行中每个跳的te
        }
        
        # 创建输出目录
        self.output_data_dir = 'output_data'
        self.output_image_dir = 'output_image'
        os.makedirs(self.output_data_dir, exist_ok=True)
        os.makedirs(self.output_image_dir, exist_ok=True)
    
    def generate_timestamp_error(self, is_tx: bool) -> float:
        """根据参数生成随机时间戳误差"""
        if is_tx:
            tsge = np.random.uniform(-self.params.tsge_tx, self.params.tsge_tx)
            dtse = np.random.uniform(-self.params.dtse_tx, self.params.dtse_tx)
        else:
            tsge = np.random.uniform(-self.params.tsge_rx, self.params.tsge_rx)
            dtse = np.random.uniform(-self.params.dtse_rx, self.params.dtse_rx)
        return tsge + dtse
    
    def generate_clock_drift(self, is_gm: bool) -> float:
        """根据参数生成随机时钟漂移"""
        if is_gm:
            if np.random.random() <= self.params.gm_clock_drift_fraction:
                return np.random.uniform(self.params.gm_clock_drift_min, self.params.gm_clock_drift_max)
            return 0.0
        else:
            if np.random.random() <= self.params.clock_drift_fraction:
                return np.random.uniform(self.params.clock_drift_min, self.params.clock_drift_max)
            return 0.0

    def generate_pdelay_interval(self) -> float:
        """在规格范围内生成随机pDelay间隔"""
        return np.random.uniform(0.9 * self.params.pdelay_interval, 
                                1.3 * self.params.pdelay_interval)
        
    def run_simulation(self):
        """运行时间同步仿真"""
        for run in range(self.params.num_runs):
            # 为新的运行重置
            nodes = [NodeState() for _ in range(self.params.num_hops + 1)]  # +1 为GM
            
            # 为所有节点生成时钟漂移
            nodes[0].clock_drift = self.generate_clock_drift(is_gm=True)  # GM
            for i in range(1, self.params.num_hops + 1):
                nodes[i].clock_drift = self.generate_clock_drift(is_gm=False)
            
            # 计算所有跳的误差
            te = 0.0
            for hop in range(1, self.params.num_hops + 1):
                # 生成时间戳误差
                nodes[hop].t1_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t3_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t4_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t1_souterror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_sinerror = self.generate_timestamp_error(is_tx=False)
                
                # 为NRR计算生成先前的时间戳
                for n in range(1, self.params.mnrr_smoothing_n):
                    nodes[hop].t3_pderror_prev.append(self.generate_timestamp_error(is_tx=True))
                    nodes[hop].t4_pderror_prev.append(self.generate_timestamp_error(is_tx=False))
                
                # 计算NRR误差组件
                self.calculate_mnrr_errors(nodes, hop)
                
                # 计算RR误差
                if hop == 1:
                    nodes[hop].rr_error = nodes[hop].mnrr_error
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                else:
                    # 添加由NRR测量到同步之间的时钟漂移引起的RR误差
                    pdelay_to_sync = np.random.uniform(0, self.params.pdelay_interval) * (1.0 - self.params.pdelayresp_sync_correction)
                    rr_error_cd_nrr2sync = (pdelay_to_sync * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / 1000) * (1.0 - self.params.nrr_drift_correction)
                    
                    # 添加由上游RR计算到同步之间的时钟漂移引起的RR误差
                    rr_error_cd_rr2sync = (self.params.residence_time * (nodes[hop-1].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    # 累积RR误差
                    nodes[hop].rr_error = nodes[hop-1].rr_error + nodes[hop].mnrr_error + rr_error_cd_nrr2sync + rr_error_cd_rr2sync
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                
                # 计算平均链路延迟误差
                pdelay_error_ts = (nodes[hop].t4_pderror - nodes[hop].t1_pderror - nodes[hop].t3_pderror + nodes[hop].t2_pderror) / 2
                pdelay_error_ts *= (1.0 - self.params.mean_link_delay_correction)
                
                pdelay_error_nrr = -self.params.pdelay_turnaround * nodes[hop].mnrr_error / 2
                pdelay_error_nrr *= (1.0 - self.params.mean_link_delay_correction)
                
                nodes[hop].mean_link_delay_error = pdelay_error_ts + pdelay_error_nrr
                
                # 计算驻留时间误差或终端站误差
                if hop < self.params.num_hops:  # 不是最后一跳
                    # 驻留时间误差组件
                    rt_error_ts_direct = nodes[hop].t1_souterror - nodes[hop].t2_sinerror
                    rt_error_rr = self.params.residence_time * nodes[hop].rr_error
                    rt_error_cd_direct = (self.params.residence_time**2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / (2 * 1000)) * (1.0 - self.params.rr_drift_correction)
                    
                    nodes[hop].residence_time_error = rt_error_ts_direct + rt_error_rr + rt_error_cd_direct
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + nodes[hop].residence_time_error
                else:  # 最后一跳（终端站）
                    # 终端站误差组件
                    sync_interval = self.params.sync_interval
                    es_error_rr = sync_interval * nodes[hop].rr_error
                    es_error_cd_direct = (sync_interval/2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    end_station_error = es_error_rr + es_error_cd_direct
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + end_station_error
                
                # 更新下一跳的累积te
                te = nodes[hop].te
                
                # 存储结果
                self.results['te_per_hop'][run, hop-1] = te
            
            # 计算完所有跳后，存储此次运行的最大te
            if run == 0 or abs(te) > self.results['te_max'][-1]:
                self.results['te_max'].append(abs(te))
        
        # 计算7-sigma te（比最大值更具统计代表性）
        for hop in range(self.params.num_hops):
            te_at_hop = self.results['te_per_hop'][:, hop]
            self.results['te_7sigma'].append(np.std(te_at_hop) * 7)
        
        # 保存数据到CSV文件
        self.save_results_to_csv()
    
    def calculate_mnrr_errors(self, nodes: List[NodeState], hop: int):
        """计算给定跳的mNRR误差组件"""
        # 基于mNRR平滑计算有效pDelay间隔
        tpdelay2pdelay = 0
        for n in range(self.params.mnrr_smoothing_n):
            tpdelay2pdelay += self.generate_pdelay_interval()
        
        # 计算由时间戳引起的mNRR误差
        if self.params.mnrr_smoothing_n > 1 and len(nodes[hop].t3_pderror_prev) >= self.params.mnrr_smoothing_n - 1:
            # 使用先前的时间戳进行NRR计算
            t3pd_diff = nodes[hop].t3_pderror - nodes[hop].t3_pderror_prev[-1]
            t4pd_diff = nodes[hop].t4_pderror - nodes[hop].t4_pderror_prev[-1]
        else:
            # 使用最近的时间戳进行默认计算
            t3pd_diff = nodes[hop].t3_pderror - 0  # 假设先前样本的误差为0（简化）
            t4pd_diff = nodes[hop].t4_pderror - 0
        
        nodes[hop].mnrr_error_ts = (t3pd_diff - t4pd_diff) / tpdelay2pdelay
        
        # 计算由时钟漂移引起的mNRR误差
        nodes[hop].mnrr_error_cd = (tpdelay2pdelay * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / (2 * 1000)) * (1.0 - self.params.nrr_drift_correction)
        
        # 总mNRR误差
        nodes[hop].mnrr_error = nodes[hop].mnrr_error_ts + nodes[hop].mnrr_error_cd
    
    def save_results_to_csv(self):
        """将所有节点的时间误差结果保存到CSV文件中"""
        # 创建一个包含所有跳的te数据的DataFrame
        all_te_data = {}
        for hop in range(1, self.params.num_hops + 1):
            hop_data = self.results['te_per_hop'][:, hop-1]
            all_te_data[f'Hop_{hop}'] = hop_data
        
        df = pd.DataFrame(all_te_data)
        
        # 保存到CSV文件
        df.to_csv(os.path.join(self.output_data_dir, 'te_all_hops_v3.csv'), index=False)
        
        # 保存7-sigma和统计数据
        stats_data = {
            'Hop': list(range(1, self.params.num_hops + 1)),
            'te_7sigma': self.results['te_7sigma'],
            'te_Mean': [np.mean(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Std': [np.std(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Min': [np.min(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Max': [np.max(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)]
        }
        stats_df = pd.DataFrame(stats_data)
        stats_df.to_csv(os.path.join(self.output_data_dir, 'te_statistics_v3.csv'), index=False)
    
    def plot_results(self):
        """绘制仿真结果并保存图像"""
        # 1. 绘制最终跳的te分布
        self.plot_final_hop_distribution()
        
        # 2. 绘制te随跳数的增长
        self.plot_te_growth()
        
        # 3. 绘制1-7跳的时间误差数据
        self.plot_first_seven_hops()
        
        # 4. 绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图
        self.plot_specific_hops_line_and_cdf()
    
    def plot_final_hop_distribution(self):
        """绘制最终跳的te分布"""
        plt.figure(figsize=(10, 6))
        final_hop_te = self.results['te_per_hop'][:, -1]
        plt.hist(final_hop_te, bins=50, alpha=0.7)
        # plt.axvline(x=self.results['te_7sigma'][-1], color='r', linestyle='--', 
        #            label=f'7σ: {self.results["te_7sigma"][-1]:.1f} ns')
        # plt.axvline(x=-self.results['te_7sigma'][-1], color='r', linestyle='--')
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Count')
        plt.title(f'te Distribution at Hop {self.params.num_hops}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'final_hop_te_distribution_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_te_growth(self):
        """绘制te随跳数的增长"""
        plt.figure(figsize=(10, 6))
        hops = np.arange(1, self.params.num_hops + 1)
        # plt.plot(hops, self.results['te_7sigma'], 'b-', label='7σ te')
        plt.axhline(y=1000, color='g', linestyle=':', label='±1μs target')
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        # plt.title('te Growth Across Hops (7σ values)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'te_growth_across_hops_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_first_seven_hops(self):
        """绘制1-7跳的时间误差数据"""
        plt.figure(figsize=(12, 8))
        
        # 绘制箱形图
        first_seven_hops_data = [self.results['te_per_hop'][:, i] for i in range(7)]
        plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # 绘制小提琴图
        plt.figure(figsize=(12, 8))
        first_seven_df = pd.DataFrame({f'Hop {i+1}': self.results['te_per_hop'][:, i] for i in range(7)})
        sns.violinplot(data=first_seven_df)
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops (Violin Plot)')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_violin_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_specific_hops_line_and_cdf(self):
        """绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图"""
        specific_hops = [10, 25, 50, 75, 100]
        specific_hops = [h for h in specific_hops if h <= self.params.num_hops]
        
        # 折线图 - 每次运行的te随时间的变化
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            # 选择100次运行进行可视化，否则图会太乱
            sample_runs = np.random.choice(self.params.num_runs, size=min(100, self.params.num_runs), replace=False)
            for run in sample_runs:
                if hop == specific_hops[0]:  # 只对第一个跳添加标签，避免重复
                    plt.plot(self.results['te_per_hop'][run, :hop], alpha=0.1, color=f'C{specific_hops.index(hop)}')
            
            # 绘制平均值线
            mean_values = np.mean(self.results['te_per_hop'][:, :hop], axis=0)
            plt.plot(mean_values, linewidth=2, label=f'Hop {hop} (avg)', color=f'C{specific_hops.index(hop)}')
        
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Development for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_line_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # CDF图 - 最终te的累积分布
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            hop_data = self.results['te_per_hop'][:, hop-1]
            sorted_data = np.sort(hop_data)
            # 计算每个值的CDF
            cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
            plt.plot(sorted_data, cdf, label=f'Hop {hop}')
        
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Cumulative Probability')
        plt.title('CDF of te for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_cdf_v3.png'), dpi=300, bbox_inches='tight')
        plt.close()

# 使用推荐参数运行仿真
def main():
    # 使用推荐设置创建参数
    params = SimulationParameters(
        num_hops=100,
        num_runs=1000,  # 为演示减少
        
        # 时钟特性
        gm_clock_drift_max=1.5,
        gm_clock_drift_min=-1.5,
        gm_clock_drift_fraction=0.8,
        clock_drift_max=1.5,
        clock_drift_min=-1.5,
        clock_drift_fraction=0.8,
        
        # 时间戳误差
        tsge_tx=4.0,
        tsge_rx=4.0,
        dtse_tx=4.0,
        dtse_rx=4.0,
        
        # 消息间隔
        pdelay_interval=1000.0,
        sync_interval=125.0,
        pdelay_turnaround=0.5,
        residence_time=0.5,
        
        # 校正因子 - 推荐设置
        mean_link_delay_correction=0.0,
        nrr_drift_correction=0.0,
        rr_drift_correction=0.,
        pdelayresp_sync_correction=0.0,
        mnrr_smoothing_n=3,
        mnrr_smoothing_m=1
    )
    
    # 创建并运行仿真
    sim = TimeSyncSimulation(params)
    print("Running simulation with recommended parameters...")
    sim.run_simulation()
    
    # 输出结果
    max_te = max(sim.results['te_max'])
    final_7sigma = sim.results['te_7sigma'][-1]
    
    print(f"Simulation complete!")
    print(f"Maximum te: {max_te:.1f} ns")
    print(f"7-sigma te at hop {params.num_hops}: {final_7sigma:.1f} ns")
    print(f"Target (<1000 ns): {'PASSED' if final_7sigma < 1000 else 'FAILED'}")
    
    # 绘制结果
    print("Generating plots...")
    sim.plot_results()
    print(f"Results saved to '{sim.output_data_dir}' and '{sim.output_image_dir}' directories.")

if __name__ == "__main__":
    main()

Running simulation with recommended parameters...
Simulation complete!
Maximum te: 2290.8 ns
7-sigma te at hop 100: 4199.3 ns
Target (<1000 ns): FAILED
Generating plots...


  plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])


Results saved to 'output_data' and 'output_image' directories.


In [12]:
# Time Synchronization Simulation for IEEE 802.1AS in IEC/IEEE 60802
# Based on analysis of McCall et al. documents (2021-2022)

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import random
import os
import pandas as pd
import seaborn as sns
from matplotlib.ticker import PercentFormatter

@dataclass
class SimulationParameters:
    """时间同步仿真的参数"""
    # 网络配置
    num_hops: int = 100  # 链中的跳数
    num_runs: int = 10000  # 蒙特卡洛运行次数
    
    # 时钟特性
    gm_clock_drift_max: float = 1.5  # 最大GM时钟漂移（ppm/s）
    gm_clock_drift_min: float = -1.5  # 最小GM时钟漂移（ppm/s）
    gm_clock_drift_fraction: float = 0.8  # 具有漂移的GM节点比例
    
    clock_drift_max: float = 1.5  # 最大非GM时钟漂移（ppm/s）
    clock_drift_min: float = -1.5  # 最小非GM时钟漂移（ppm/s）
    clock_drift_fraction: float = 0.8  # 具有漂移的非GM节点比例
    
    # 时间戳误差特性
    tsge_tx: float = 4.0  # TX时间戳粒度误差（±ns）
    tsge_rx: float = 4.0  # RX时间戳粒度误差（±ns）
    dtse_tx: float = 4.0  # TX动态时间戳误差（±ns）
    dtse_rx: float = 4.0  # RX动态时间戳误差（±ns）
    
    # 消息间隔
    pdelay_interval: float = 125.0  # pDelay消息间隔（ms）
    sync_interval: float = 125.0  # 同步消息间隔（ms）
    pdelay_turnaround: float = 10.0  # pDelay响应时间（ms）
    residence_time: float = 10.0  # 节点内驻留时间（ms）
    
    # 校正因子
    mean_link_delay_correction: float = 0.98  # 平均链路延迟平均的有效性
    nrr_drift_correction: float = 0.90  # NRR漂移校正有效性
    rr_drift_correction: float = 0.90  # RR漂移校正有效性
    pdelayresp_sync_correction: float = 0.0  # pDelay响应到同步的对齐因子
    
    # NRR平滑参数
    mnrr_smoothing_n: int = 3  # 使用的先前pDelayResp数量
    mnrr_smoothing_m: int = 1  # 用于中值计算（在推荐设置中未使用）
    
    # 终端站计算方法使用的特定跳
    end_station_hops: List[int] = field(default_factory=lambda: [10, 25, 50, 75, 100])

@dataclass
class NodeState:
    """链中节点的状态"""
    # 时钟相关状态
    clock_drift: float = 0.0  # 时钟漂移率（ppm/s）
    
    # 时间戳误差
    t1_pderror: float = 0.0  # pDelay请求的TX时间戳误差
    t2_pderror: float = 0.0  # pDelay请求的RX时间戳误差
    t3_pderror: float = 0.0  # pDelay响应的TX时间戳误差
    t4_pderror: float = 0.0  # pDelay响应的RX时间戳误差
    t3_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t3误差
    t4_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t4误差
    
    t2_sinerror: float = 0.0  # 同步的RX时间戳误差
    t1_souterror: float = 0.0  # 同步的TX时间戳误差
    
    # 误差累积
    mnrr_error: float = 0.0  # 邻居速率比误差
    mnrr_error_ts: float = 0.0  # 由时间戳误差导致的NRR误差
    mnrr_error_cd: float = 0.0  # 由时钟漂移导致的NRR误差
    
    rr_error: float = 0.0  # 速率比误差
    rr_error_sum: float = 0.0  # 累积的RR误差
    
    mean_link_delay_error: float = 0.0  # 链路延迟测量误差
    residence_time_error: float = 0.0  # 驻留时间测量误差
    
    te: float = 0.0  # 该节点的动态时间误差

class TimeSyncSimulation:
    """IEEE 802.1AS 在 IEC/IEEE 60802 中的时间同步仿真"""
    
    def __init__(self, params: SimulationParameters):
        self.params = params
        self.results = {
            'te_max': [],            # 所有运行中的最大te
            'te_7sigma': [],         # te的7-sigma值
            'te_per_hop': np.zeros((params.num_runs, params.num_hops))  # 每次运行中每个跳的te
        }
        
        # 创建输出目录
        self.output_data_dir = 'output_data'
        self.output_image_dir = 'output_image'
        os.makedirs(self.output_data_dir, exist_ok=True)
        os.makedirs(self.output_image_dir, exist_ok=True)
    
    def generate_timestamp_error(self, is_tx: bool) -> float:
        """根据参数生成随机时间戳误差"""
        if is_tx:
            tsge = np.random.uniform(-self.params.tsge_tx, self.params.tsge_tx)
            dtse = np.random.uniform(-self.params.dtse_tx, self.params.dtse_tx)
        else:
            tsge = np.random.uniform(-self.params.tsge_rx, self.params.tsge_rx)
            dtse = np.random.uniform(-self.params.dtse_rx, self.params.dtse_rx)
        return tsge + dtse
    
    def generate_clock_drift(self, is_gm: bool) -> float:
        """根据参数生成随机时钟漂移"""
        if is_gm:
            if np.random.random() <= self.params.gm_clock_drift_fraction:
                return np.random.uniform(self.params.gm_clock_drift_min, self.params.gm_clock_drift_max)
            return 0.0
        else:
            if np.random.random() <= self.params.clock_drift_fraction:
                return np.random.uniform(self.params.clock_drift_min, self.params.clock_drift_max)
            return 0.0

    def generate_pdelay_interval(self) -> float:
        """在规格范围内生成随机pDelay间隔"""
        return np.random.uniform(0.9 * self.params.pdelay_interval, 
                                1.3 * self.params.pdelay_interval)
        
    def run_simulation(self):
        """运行时间同步仿真"""
        for run in range(self.params.num_runs):
            # 为新的运行重置
            nodes = [NodeState() for _ in range(self.params.num_hops + 1)]  # +1 为GM
            
            # 为所有节点生成时钟漂移
            nodes[0].clock_drift = self.generate_clock_drift(is_gm=True)  # GM
            for i in range(1, self.params.num_hops + 1):
                nodes[i].clock_drift = self.generate_clock_drift(is_gm=False)
            
            # 计算所有跳的误差
            te = 0.0
            for hop in range(1, self.params.num_hops + 1):
                # 生成时间戳误差
                nodes[hop].t1_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t3_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t4_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t1_souterror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_sinerror = self.generate_timestamp_error(is_tx=False)
                
                # 为NRR计算生成先前的时间戳
                for n in range(1, self.params.mnrr_smoothing_n):
                    nodes[hop].t3_pderror_prev.append(self.generate_timestamp_error(is_tx=True))
                    nodes[hop].t4_pderror_prev.append(self.generate_timestamp_error(is_tx=False))
                
                # 计算NRR误差组件
                self.calculate_mnrr_errors(nodes, hop)
                
                # 计算RR误差
                if hop == 1:
                    nodes[hop].rr_error = nodes[hop].mnrr_error
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                else:
                    # 添加由NRR测量到同步之间的时钟漂移引起的RR误差
                    pdelay_to_sync = np.random.uniform(0, self.params.pdelay_interval) * (1.0 - self.params.pdelayresp_sync_correction)
                    rr_error_cd_nrr2sync = (pdelay_to_sync * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / 1000) * (1.0 - self.params.nrr_drift_correction)
                    
                    # 添加由上游RR计算到同步之间的时钟漂移引起的RR误差
                    rr_error_cd_rr2sync = (self.params.residence_time * (nodes[hop-1].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    # 累积RR误差
                    nodes[hop].rr_error = nodes[hop-1].rr_error + nodes[hop].mnrr_error + rr_error_cd_nrr2sync + rr_error_cd_rr2sync
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                
                # 计算平均链路延迟误差
                pdelay_error_ts = (nodes[hop].t4_pderror - nodes[hop].t1_pderror - nodes[hop].t3_pderror + nodes[hop].t2_pderror) / 2
                pdelay_error_ts *= (1.0 - self.params.mean_link_delay_correction)
                
                pdelay_error_nrr = -self.params.pdelay_turnaround * nodes[hop].mnrr_error / 2
                pdelay_error_nrr *= (1.0 - self.params.mean_link_delay_correction)
                
                nodes[hop].mean_link_delay_error = pdelay_error_ts + pdelay_error_nrr
                
                # 修改: 检查当前跳是否是指定的终端站跳或者最后一跳
                if hop == self.params.num_hops or hop in self.params.end_station_hops:
                    # 使用终端站计算方法
                    sync_interval = self.params.sync_interval
                    es_error_rr = sync_interval * nodes[hop].rr_error
                    es_error_cd_direct = (sync_interval/2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    end_station_error = es_error_rr + es_error_cd_direct
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + end_station_error
                else:
                    # 使用普通桥接设备计算方法
                    rt_error_ts_direct = nodes[hop].t1_souterror - nodes[hop].t2_sinerror
                    rt_error_rr = self.params.residence_time * nodes[hop].rr_error
                    rt_error_cd_direct = (self.params.residence_time**2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / (2 * 1000)) * (1.0 - self.params.rr_drift_correction)
                    
                    nodes[hop].residence_time_error = rt_error_ts_direct + rt_error_rr + rt_error_cd_direct
                    nodes[hop].te = te + nodes[hop].mean_link_delay_error + nodes[hop].residence_time_error
                
                # 更新下一跳的累积te
                te = nodes[hop].te
                
                # 存储结果
                self.results['te_per_hop'][run, hop-1] = te
            
            # 计算完所有跳后，存储此次运行的最大te
            if run == 0 or abs(te) > self.results['te_max'][-1]:
                self.results['te_max'].append(abs(te))
        
        # 计算7-sigma te（比最大值更具统计代表性）
        for hop in range(self.params.num_hops):
            te_at_hop = self.results['te_per_hop'][:, hop]
            self.results['te_7sigma'].append(np.std(te_at_hop) * 7)
        
        # 保存数据到CSV文件
        self.save_results_to_csv()
    
    def calculate_mnrr_errors(self, nodes: List[NodeState], hop: int):
        """计算给定跳的mNRR误差组件"""
        # 基于mNRR平滑计算有效pDelay间隔
        tpdelay2pdelay = 0
        for n in range(self.params.mnrr_smoothing_n):
            tpdelay2pdelay += self.generate_pdelay_interval()
        
        # 计算由时间戳引起的mNRR误差
        if self.params.mnrr_smoothing_n > 1 and len(nodes[hop].t3_pderror_prev) >= self.params.mnrr_smoothing_n - 1:
            # 使用先前的时间戳进行NRR计算
            t3pd_diff = nodes[hop].t3_pderror - nodes[hop].t3_pderror_prev[-1]
            t4pd_diff = nodes[hop].t4_pderror - nodes[hop].t4_pderror_prev[-1]
        else:
            # 使用最近的时间戳进行默认计算
            t3pd_diff = nodes[hop].t3_pderror - 0  # 假设先前样本的误差为0（简化）
            t4pd_diff = nodes[hop].t4_pderror - 0
        
        nodes[hop].mnrr_error_ts = (t3pd_diff - t4pd_diff) / tpdelay2pdelay
        
        # 计算由时钟漂移引起的mNRR误差
        nodes[hop].mnrr_error_cd = (tpdelay2pdelay * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / (2 * 1000)) * (1.0 - self.params.nrr_drift_correction)
        
        # 总mNRR误差
        nodes[hop].mnrr_error = nodes[hop].mnrr_error_ts + nodes[hop].mnrr_error_cd
    
    def save_results_to_csv(self):
        """将所有节点的时间误差结果保存到CSV文件中"""
        # 创建一个包含所有跳的te数据的DataFrame
        all_te_data = {}
        for hop in range(1, self.params.num_hops + 1):
            hop_data = self.results['te_per_hop'][:, hop-1]
            all_te_data[f'Hop_{hop}'] = hop_data
        
        df = pd.DataFrame(all_te_data)
        
        # 保存到CSV文件
        df.to_csv(os.path.join(self.output_data_dir, 'te_all_hops_v4.csv'), index=False)
        
        # 保存7-sigma和统计数据
        stats_data = {
            'Hop': list(range(1, self.params.num_hops + 1)),
            'te_7sigma': self.results['te_7sigma'],
            'te_Mean': [np.mean(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Std': [np.std(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Min': [np.min(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Max': [np.max(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)]
        }
        stats_df = pd.DataFrame(stats_data)
        stats_df.to_csv(os.path.join(self.output_data_dir, 'te_statistics_v4.csv'), index=False)
    
    def plot_results(self):
        """绘制仿真结果并保存图像"""
        # 1. 绘制最终跳的te分布
        self.plot_final_hop_distribution()
        
        # 2. 绘制te随跳数的增长
        self.plot_te_growth()
        
        # 3. 绘制1-7跳的时间误差数据
        self.plot_first_seven_hops()
        
        # 4. 绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图
        self.plot_specific_hops_line_and_cdf()
    
    def plot_final_hop_distribution(self):
        """绘制最终跳的te分布"""
        plt.figure(figsize=(10, 6))
        final_hop_te = self.results['te_per_hop'][:, -1]
        plt.hist(final_hop_te, bins=50, alpha=0.7)
        # plt.axvline(x=self.results['te_7sigma'][-1], color='r', linestyle='--', 
        #            label=f'7σ: {self.results["te_7sigma"][-1]:.1f} ns')
        # plt.axvline(x=-self.results['te_7sigma'][-1], color='r', linestyle='--')
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Count')
        plt.title(f'te Distribution at Hop {self.params.num_hops}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'final_hop_te_distribution_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_te_growth(self):
        """绘制te随跳数的增长"""
        plt.figure(figsize=(10, 6))
        hops = np.arange(1, self.params.num_hops + 1)
        # plt.plot(hops, self.results['te_7sigma'], 'b-', label='7σ te')
        plt.axhline(y=1000, color='g', linestyle=':', label='±1μs target')
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        # plt.title('te Growth Across Hops (7σ values)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'te_growth_across_hops_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_first_seven_hops(self):
        """绘制1-7跳的时间误差数据"""
        plt.figure(figsize=(12, 8))
        
        # 绘制箱形图
        first_seven_hops_data = [self.results['te_per_hop'][:, i] for i in range(7)]
        plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # 绘制小提琴图
        plt.figure(figsize=(12, 8))
        first_seven_df = pd.DataFrame({f'Hop {i+1}': self.results['te_per_hop'][:, i] for i in range(7)})
        sns.violinplot(data=first_seven_df)
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops (Violin Plot)')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_violin_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_specific_hops_line_and_cdf(self):
        """绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图"""
        specific_hops = [10, 25, 50, 75, 100]
        specific_hops = [h for h in specific_hops if h <= self.params.num_hops]
        
        # 折线图 - 每次运行的te随时间的变化
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            # 选择100次运行进行可视化，否则图会太乱
            sample_runs = np.random.choice(self.params.num_runs, size=min(100, self.params.num_runs), replace=False)
            for run in sample_runs:
                if hop == specific_hops[0]:  # 只对第一个跳添加标签，避免重复
                    plt.plot(self.results['te_per_hop'][run, :hop], alpha=0.1, color=f'C{specific_hops.index(hop)}')
            
            # 绘制平均值线
            mean_values = np.mean(self.results['te_per_hop'][:, :hop], axis=0)
            plt.plot(mean_values, linewidth=2, label=f'Hop {hop} (avg)', color=f'C{specific_hops.index(hop)}')
        
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Development for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_line_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # CDF图 - 最终te的累积分布
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            hop_data = self.results['te_per_hop'][:, hop-1]
            sorted_data = np.sort(hop_data)
            # 计算每个值的CDF
            cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
            plt.plot(sorted_data, cdf, label=f'Hop {hop}')
        
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Cumulative Probability')
        plt.title('CDF of te for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_cdf_v4.png'), dpi=300, bbox_inches='tight')
        plt.close()

# 使用推荐参数运行仿真
def main():
    # 使用推荐设置创建参数
    params = SimulationParameters(
        num_hops=100,
        num_runs=1000,  # 为演示减少
        
        # 时钟特性
        gm_clock_drift_max=1.5,
        gm_clock_drift_min=-1.5,
        gm_clock_drift_fraction=0.8,
        clock_drift_max=1.5,
        clock_drift_min=-1.5,
        clock_drift_fraction=0.8,
        
        # 时间戳误差
        tsge_tx=4.0,
        tsge_rx=4.0,
        dtse_tx=4.0,
        dtse_rx=4.0,
        
        # 消息间隔
        pdelay_interval=1000.0,
        sync_interval=125.0,
        pdelay_turnaround=0.5,
        residence_time=0.5,
        
        # 校正因子 - 推荐设置
        mean_link_delay_correction=0.0,
        nrr_drift_correction=0.0,
        rr_drift_correction=0.,
        pdelayresp_sync_correction=0.0,
        mnrr_smoothing_n=3,
        mnrr_smoothing_m=1,
        
        # 终端站计算使用的特定跳
        end_station_hops=[10, 25, 50, 75, 100]
    )
    
    # 创建并运行仿真
    sim = TimeSyncSimulation(params)
    print("Running simulation with recommended parameters...")
    sim.run_simulation()
    
    # 输出结果
    max_te = max(sim.results['te_max'])
    final_7sigma = sim.results['te_7sigma'][-1]
    
    print(f"Simulation complete!")
    print(f"Maximum te: {max_te:.1f} ns")
    print(f"7-sigma te at hop {params.num_hops}: {final_7sigma:.1f} ns")
    print(f"Target (<1000 ns): {'PASSED' if final_7sigma < 1000 else 'FAILED'}")
    
    # 绘制结果
    print("Generating plots...")
    sim.plot_results()
    print(f"Results saved to '{sim.output_data_dir}' and '{sim.output_image_dir}' directories.")

if __name__ == "__main__":
    main()

Running simulation with recommended parameters...
Simulation complete!
Maximum te: 6057.8 ns
7-sigma te at hop 100: 11219.0 ns
Target (<1000 ns): FAILED
Generating plots...


  plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])


Results saved to 'output_data' and 'output_image' directories.


In [13]:
# Time Synchronization Simulation for IEEE 802.1AS in IEC/IEEE 60802
# Based on analysis of McCall et al. documents (2021-2022)

import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import random
import os
import pandas as pd
import seaborn as sns
from matplotlib.ticker import PercentFormatter

@dataclass
class SimulationParameters:
    """时间同步仿真的参数"""
    # 网络配置
    num_hops: int = 100  # 链中的跳数
    num_runs: int = 10000  # 蒙特卡洛运行次数
    
    # 时钟特性
    gm_clock_drift_max: float = 1.5  # 最大GM时钟漂移（ppm/s）
    gm_clock_drift_min: float = -1.5  # 最小GM时钟漂移（ppm/s）
    gm_clock_drift_fraction: float = 0.8  # 具有漂移的GM节点比例
    
    clock_drift_max: float = 1.5  # 最大非GM时钟漂移（ppm/s）
    clock_drift_min: float = -1.5  # 最小非GM时钟漂移（ppm/s）
    clock_drift_fraction: float = 0.8  # 具有漂移的非GM节点比例
    
    # 时间戳误差特性
    tsge_tx: float = 4.0  # TX时间戳粒度误差（±ns）
    tsge_rx: float = 4.0  # RX时间戳粒度误差（±ns）
    dtse_tx: float = 4.0  # TX动态时间戳误差（±ns）
    dtse_rx: float = 4.0  # RX动态时间戳误差（±ns）
    
    # 消息间隔
    pdelay_interval: float = 125.0  # pDelay消息间隔（ms）
    sync_interval: float = 125.0  # 同步消息间隔（ms）
    pdelay_turnaround: float = 10.0  # pDelay响应时间（ms）
    residence_time: float = 10.0  # 节点内驻留时间（ms）
    
    # 校正因子
    mean_link_delay_correction: float = 0.98  # 平均链路延迟平均的有效性
    nrr_drift_correction: float = 0.90  # NRR漂移校正有效性
    rr_drift_correction: float = 0.90  # RR漂移校正有效性
    pdelayresp_sync_correction: float = 0.0  # pDelay响应到同步的对齐因子
    
    # NRR平滑参数
    mnrr_smoothing_n: int = 3  # 使用的先前pDelayResp数量
    mnrr_smoothing_m: int = 1  # 用于中值计算（在推荐设置中未使用）
    
    # 终端站计算方法使用的特定跳
    end_station_hops: List[int] = field(default_factory=lambda: [10, 25, 50, 75, 100])
    
    # 下一次同步消息相关参数
    consider_next_sync: bool = True  # 是否考虑下一次同步消息的影响
    time_to_next_sync: float = None  # 到下一次同步消息的时间(ms), None表示使用sync_interval

@dataclass
class NodeState:
    """链中节点的状态"""
    # 时钟相关状态
    clock_drift: float = 0.0  # 时钟漂移率（ppm/s）
    
    # 时间戳误差
    t1_pderror: float = 0.0  # pDelay请求的TX时间戳误差
    t2_pderror: float = 0.0  # pDelay请求的RX时间戳误差
    t3_pderror: float = 0.0  # pDelay响应的TX时间戳误差
    t4_pderror: float = 0.0  # pDelay响应的RX时间戳误差
    t3_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t3误差
    t4_pderror_prev: List[float] = field(default_factory=list)  # 用于NRR计算的先前t4误差
    
    t2_sinerror: float = 0.0  # 同步的RX时间戳误差
    t1_souterror: float = 0.0  # 同步的TX时间戳误差
    
    # 误差累积
    mnrr_error: float = 0.0  # 邻居速率比误差
    mnrr_error_ts: float = 0.0  # 由时间戳误差导致的NRR误差
    mnrr_error_cd: float = 0.0  # 由时钟漂移导致的NRR误差
    
    rr_error: float = 0.0  # 速率比误差
    rr_error_sum: float = 0.0  # 累积的RR误差
    
    mean_link_delay_error: float = 0.0  # 链路延迟测量误差
    residence_time_error: float = 0.0  # 驻留时间测量误差
    
    te: float = 0.0  # 该节点的动态时间误差

class TimeSyncSimulation:
    """IEEE 802.1AS 在 IEC/IEEE 60802 中的时间同步仿真"""
    
    def __init__(self, params: SimulationParameters):
        self.params = params
        # 设置默认的下一次同步时间，如果未指定
        if self.params.time_to_next_sync is None:
            self.params.time_to_next_sync = self.params.sync_interval
            
        self.results = {
            'te_max': [],            # 所有运行中的最大te
            'te_7sigma': [],         # te的7-sigma值
            'te_per_hop': np.zeros((params.num_runs, params.num_hops))  # 每次运行中每个跳的te
        }
        
        # 创建输出目录
        self.output_data_dir = 'output_data_v2'
        self.output_image_dir = 'output_image_v2'
        os.makedirs(self.output_data_dir, exist_ok=True)
        os.makedirs(self.output_image_dir, exist_ok=True)
    
    def generate_timestamp_error(self, is_tx: bool) -> float:
        """根据参数生成随机时间戳误差"""
        if is_tx:
            tsge = np.random.uniform(-self.params.tsge_tx, self.params.tsge_tx)
            dtse = np.random.uniform(-self.params.dtse_tx, self.params.dtse_tx)
        else:
            tsge = np.random.uniform(-self.params.tsge_rx, self.params.tsge_rx)
            dtse = np.random.uniform(-self.params.dtse_rx, self.params.dtse_rx)
        return tsge + dtse
    
    def generate_clock_drift(self, is_gm: bool) -> float:
        """根据参数生成随机时钟漂移"""
        if is_gm:
            if np.random.random() <= self.params.gm_clock_drift_fraction:
                return np.random.uniform(self.params.gm_clock_drift_min, self.params.gm_clock_drift_max)
            return 0.0
        else:
            if np.random.random() <= self.params.clock_drift_fraction:
                return np.random.uniform(self.params.clock_drift_min, self.params.clock_drift_max)
            return 0.0

    def generate_pdelay_interval(self) -> float:
        """在规格范围内生成随机pDelay间隔"""
        return np.random.uniform(0.9 * self.params.pdelay_interval, 
                                1.3 * self.params.pdelay_interval)
        
    def run_simulation(self):
        """运行时间同步仿真"""
        for run in range(self.params.num_runs):
            # 为新的运行重置
            nodes = [NodeState() for _ in range(self.params.num_hops + 1)]  # +1 为GM
            
            # 为所有节点生成时钟漂移
            nodes[0].clock_drift = self.generate_clock_drift(is_gm=True)  # GM
            for i in range(1, self.params.num_hops + 1):
                nodes[i].clock_drift = self.generate_clock_drift(is_gm=False)
            
            # 计算所有跳的误差
            te = 0.0
            for hop in range(1, self.params.num_hops + 1):
                # 生成时间戳误差
                nodes[hop].t1_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t3_pderror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t4_pderror = self.generate_timestamp_error(is_tx=False)
                nodes[hop].t1_souterror = self.generate_timestamp_error(is_tx=True)
                nodes[hop].t2_sinerror = self.generate_timestamp_error(is_tx=False)
                
                # 为NRR计算生成先前的时间戳
                for n in range(1, self.params.mnrr_smoothing_n):
                    nodes[hop].t3_pderror_prev.append(self.generate_timestamp_error(is_tx=True))
                    nodes[hop].t4_pderror_prev.append(self.generate_timestamp_error(is_tx=False))
                
                # 计算NRR误差组件
                self.calculate_mnrr_errors(nodes, hop)
                
                # 计算RR误差
                if hop == 1:
                    nodes[hop].rr_error = nodes[hop].mnrr_error
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                else:
                    # 添加由NRR测量到同步之间的时钟漂移引起的RR误差
                    pdelay_to_sync = np.random.uniform(0, self.params.pdelay_interval) * (1.0 - self.params.pdelayresp_sync_correction)
                    rr_error_cd_nrr2sync = (pdelay_to_sync * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / 1000) * (1.0 - self.params.nrr_drift_correction)
                    
                    # 添加由上游RR计算到同步之间的时钟漂移引起的RR误差
                    rr_error_cd_rr2sync = (self.params.residence_time * (nodes[hop-1].clock_drift - nodes[0].clock_drift) / 1000) * (1.0 - self.params.rr_drift_correction)
                    
                    # 累积RR误差
                    nodes[hop].rr_error = nodes[hop-1].rr_error + nodes[hop].mnrr_error + rr_error_cd_nrr2sync + rr_error_cd_rr2sync
                    nodes[hop].rr_error_sum = nodes[hop].rr_error
                
                # 计算平均链路延迟误差
                pdelay_error_ts = (nodes[hop].t4_pderror - nodes[hop].t1_pderror - nodes[hop].t3_pderror + nodes[hop].t2_pderror) / 2
                pdelay_error_ts *= (1.0 - self.params.mean_link_delay_correction)
                
                pdelay_error_nrr = -self.params.pdelay_turnaround * nodes[hop].mnrr_error / 2
                pdelay_error_nrr *= (1.0 - self.params.mean_link_delay_correction)
                
                nodes[hop].mean_link_delay_error = pdelay_error_ts + pdelay_error_nrr
                
                # 修改: 对所有节点使用相同的误差计算方法（普通桥接设备方法）
                rt_error_ts_direct = nodes[hop].t1_souterror - nodes[hop].t2_sinerror
                rt_error_rr = self.params.residence_time * nodes[hop].rr_error
                rt_error_cd_direct = (self.params.residence_time**2 * (nodes[hop].clock_drift - nodes[0].clock_drift) / (2 * 1000)) * (1.0 - self.params.rr_drift_correction)
                
                nodes[hop].residence_time_error = rt_error_ts_direct + rt_error_rr + rt_error_cd_direct
                nodes[hop].te = te + nodes[hop].mean_link_delay_error + nodes[hop].residence_time_error
                
                # 如果需要考虑下一次同步消息的影响并且是指定跳或最后一跳，则添加额外的时钟漂移误差
                if self.params.consider_next_sync and (hop == self.params.num_hops or hop in self.params.end_station_hops):
                    # 计算到下一次同步到达之前积累的额外时钟漂移误差
                    additional_drift_error = (self.params.time_to_next_sync * (nodes[hop].clock_drift - nodes[0].clock_drift) / 1000)
                    nodes[hop].te += additional_drift_error
                
                # 更新下一跳的累积te
                te = nodes[hop].te
                
                # 存储结果
                self.results['te_per_hop'][run, hop-1] = te
            
            # 计算完所有跳后，存储此次运行的最大te
            if run == 0 or abs(te) > self.results['te_max'][-1]:
                self.results['te_max'].append(abs(te))
        
        # 计算7-sigma te（比最大值更具统计代表性）
        for hop in range(self.params.num_hops):
            te_at_hop = self.results['te_per_hop'][:, hop]
            self.results['te_7sigma'].append(np.std(te_at_hop) * 7)
        
        # 保存数据到CSV文件
        self.save_results_to_csv()
    
    def calculate_mnrr_errors(self, nodes: List[NodeState], hop: int):
        """计算给定跳的mNRR误差组件"""
        # 基于mNRR平滑计算有效pDelay间隔
        tpdelay2pdelay = 0
        for n in range(self.params.mnrr_smoothing_n):
            tpdelay2pdelay += self.generate_pdelay_interval()
        
        # 计算由时间戳引起的mNRR误差
        if self.params.mnrr_smoothing_n > 1 and len(nodes[hop].t3_pderror_prev) >= self.params.mnrr_smoothing_n - 1:
            # 使用先前的时间戳进行NRR计算
            t3pd_diff = nodes[hop].t3_pderror - nodes[hop].t3_pderror_prev[-1]
            t4pd_diff = nodes[hop].t4_pderror - nodes[hop].t4_pderror_prev[-1]
        else:
            # 使用最近的时间戳进行默认计算
            t3pd_diff = nodes[hop].t3_pderror - 0  # 假设先前样本的误差为0（简化）
            t4pd_diff = nodes[hop].t4_pderror - 0
        
        nodes[hop].mnrr_error_ts = (t3pd_diff - t4pd_diff) / tpdelay2pdelay
        
        # 计算由时钟漂移引起的mNRR误差
        nodes[hop].mnrr_error_cd = (tpdelay2pdelay * (nodes[hop].clock_drift - nodes[hop-1].clock_drift) / (2 * 1000)) * (1.0 - self.params.nrr_drift_correction)
        
        # 总mNRR误差
        nodes[hop].mnrr_error = nodes[hop].mnrr_error_ts + nodes[hop].mnrr_error_cd
    
    def save_results_to_csv(self):
        """将所有节点的时间误差结果保存到CSV文件中"""
        # 创建一个包含所有跳的te数据的DataFrame
        all_te_data = {}
        for hop in range(1, self.params.num_hops + 1):
            hop_data = self.results['te_per_hop'][:, hop-1]
            all_te_data[f'Hop_{hop}'] = hop_data
        
        df = pd.DataFrame(all_te_data)
        
        # 保存到CSV文件
        df.to_csv(os.path.join(self.output_data_dir, 'te_all_hops_v5.csv'), index=False)
        
        # 保存7-sigma和统计数据
        stats_data = {
            'Hop': list(range(1, self.params.num_hops + 1)),
            'te_7sigma': self.results['te_7sigma'],
            'te_Mean': [np.mean(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Std': [np.std(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Min': [np.min(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)],
            'te_Max': [np.max(self.results['te_per_hop'][:, i]) for i in range(self.params.num_hops)]
        }
        stats_df = pd.DataFrame(stats_data)
        stats_df.to_csv(os.path.join(self.output_data_dir, 'te_statistics_v5.csv'), index=False)
    
    def plot_results(self):
        """绘制仿真结果并保存图像"""
        # 1. 绘制最终跳的te分布
        self.plot_final_hop_distribution()
        
        # 2. 绘制te随跳数的增长
        self.plot_te_growth()
        
        # 3. 绘制1-7跳的时间误差数据
        self.plot_first_seven_hops()
        
        # 4. 绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图
        self.plot_specific_hops_line_and_cdf()
    
    def plot_final_hop_distribution(self):
        """绘制最终跳的te分布"""
        plt.figure(figsize=(10, 6))
        final_hop_te = self.results['te_per_hop'][:, -1]
        plt.hist(final_hop_te, bins=50, alpha=0.7)
        # plt.axvline(x=self.results['te_7sigma'][-1], color='r', linestyle='--', 
        #            label=f'7σ: {self.results["te_7sigma"][-1]:.1f} ns')
        # plt.axvline(x=-self.results['te_7sigma'][-1], color='r', linestyle='--')
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Count')
        plt.title(f'te Distribution at Hop {self.params.num_hops}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'final_hop_te_distribution_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_te_growth(self):
        """绘制te随跳数的增长"""
        plt.figure(figsize=(10, 6))
        hops = np.arange(1, self.params.num_hops + 1)
        # plt.plot(hops, self.results['te_7sigma'], 'b-', label='7σ te')
        plt.axhline(y=1000, color='g', linestyle=':', label='±1μs target')
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        # plt.title('te Growth Across Hops (7σ values)')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'te_growth_across_hops_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_first_seven_hops(self):
        """绘制1-7跳的时间误差数据"""
        plt.figure(figsize=(12, 8))
        
        # 绘制箱形图
        first_seven_hops_data = [self.results['te_per_hop'][:, i] for i in range(7)]
        plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # 绘制小提琴图
        plt.figure(figsize=(12, 8))
        first_seven_df = pd.DataFrame({f'Hop {i+1}': self.results['te_per_hop'][:, i] for i in range(7)})
        sns.violinplot(data=first_seven_df)
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Distribution for First 7 Hops (Violin Plot)')
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'first_seven_hops_te_violin_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()
    
    def plot_specific_hops_line_and_cdf(self):
        """绘制特定跳数(10,25,50,75,100)的时间误差折线图和CDF图"""
        specific_hops = [10, 25, 50, 75, 100]
        specific_hops = [h for h in specific_hops if h <= self.params.num_hops]
        
        # 折线图 - 每次运行的te随时间的变化
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            # 选择100次运行进行可视化，否则图会太乱
            sample_runs = np.random.choice(self.params.num_runs, size=min(100, self.params.num_runs), replace=False)
            for run in sample_runs:
                if hop == specific_hops[0]:  # 只对第一个跳添加标签，避免重复
                    plt.plot(self.results['te_per_hop'][run, :hop], alpha=0.1, color=f'C{specific_hops.index(hop)}')
            
            # 绘制平均值线
            mean_values = np.mean(self.results['te_per_hop'][:, :hop], axis=0)
            plt.plot(mean_values, linewidth=2, label=f'Hop {hop} (avg)', color=f'C{specific_hops.index(hop)}')
        
        plt.xlabel('Hop Number')
        plt.ylabel('Time Error (ns)')
        plt.title('te Development for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_line_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()
        
        # CDF图 - 最终te的累积分布
        plt.figure(figsize=(12, 8))
        for hop in specific_hops:
            hop_data = self.results['te_per_hop'][:, hop-1]
            sorted_data = np.sort(hop_data)
            # 计算每个值的CDF
            cdf = np.arange(1, len(sorted_data) + 1) / len(sorted_data)
            plt.plot(sorted_data, cdf, label=f'Hop {hop}')
        
        plt.axvline(x=1000, color='g', linestyle=':', label='±1μs target')
        plt.axvline(x=-1000, color='g', linestyle=':')
        plt.xlabel('Time Error (ns)')
        plt.ylabel('Cumulative Probability')
        plt.title('CDF of te for Specific Hops')
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # 保存图像
        plt.savefig(os.path.join(self.output_image_dir, 'specific_hops_te_cdf_v5.png'), dpi=300, bbox_inches='tight')
        plt.close()

# 使用推荐参数运行仿真
def main():
    # 使用推荐设置创建参数
    params = SimulationParameters(
        num_hops=100,
        num_runs=1000,  # 为演示减少
        
        # 时钟特性
        gm_clock_drift_max=1.5,
        gm_clock_drift_min=-1.5,
        gm_clock_drift_fraction=0.8,
        clock_drift_max=1.5,
        clock_drift_min=-1.5,
        clock_drift_fraction=0.8,
        
        # 时间戳误差
        tsge_tx=4.0,
        tsge_rx=4.0,
        dtse_tx=4.0,
        dtse_rx=4.0,
        
        # 消息间隔
        pdelay_interval=1000.0,
        sync_interval=125.0,
        pdelay_turnaround=0.5,
        residence_time=0.5,
        
        # 校正因子 - 推荐设置
        mean_link_delay_correction=0.0,
        nrr_drift_correction=0.0,
        rr_drift_correction=0.0,
        pdelayresp_sync_correction=0.0,
        mnrr_smoothing_n=3,
        mnrr_smoothing_m=1,
        
        # 终端站计算使用的特定跳
        end_station_hops=[10, 25, 50, 75, 100],
        
        # 下一次同步消息相关设置
        consider_next_sync=True,
        time_to_next_sync=125.0   # 默认使用sync_interval
    )
    
    # 创建并运行仿真
    sim = TimeSyncSimulation(params)
    print("Running simulation with recommended parameters...")
    sim.run_simulation()
    
    # 输出结果
    max_te = max(sim.results['te_max'])
    final_7sigma = sim.results['te_7sigma'][-1]
    
    print(f"Simulation complete!")
    print(f"Maximum te: {max_te:.1f} ns")
    print(f"7-sigma te at hop {params.num_hops}: {final_7sigma:.1f} ns")
    print(f"Target (<1000 ns): {'PASSED' if final_7sigma < 1000 else 'FAILED'}")
    
    # 绘制结果
    print("Generating plots...")
    sim.plot_results()
    print(f"Results saved to '{sim.output_data_dir}' and '{sim.output_image_dir}' directories.")

if __name__ == "__main__":
    main()

Running simulation with recommended parameters...
Simulation complete!
Maximum te: 469.9 ns
7-sigma te at hop 100: 931.3 ns
Target (<1000 ns): PASSED
Generating plots...


  plt.boxplot(first_seven_hops_data, labels=[f'Hop {i+1}' for i in range(7)])


Results saved to 'output_data_v2' and 'output_image_v2' directories.


In [14]:
import os
import pandas as pd

# 定义基础路径
base_path = r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428"
input_folder = os.path.join(base_path, "input_data")
output_folder = os.path.join(base_path, "output_data_v2")

# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)

# 初始化一个空的DataFrame来存储所有数据
combined_data = pd.DataFrame()

# 循环读取TSN1到TSN7的文件
for i in range(1, 8):
    # 构造文件名
    filename = f"TSN{i}-20m.txt"
    filepath = os.path.join(input_folder, filename)
    
    try:
        # 读取文件数据，假设是制表符分隔的文本文件
        # 如果分隔符不同，请调整sep参数
        data = pd.read_csv(filepath, sep='\t', header=None, names=[f"TSN{i}"])
        
        # 将数据添加到combined_data中
        combined_data[f"TSN{i}"] = data[f"TSN{i}"]
        
    except Exception as e:
        print(f"读取文件 {filename} 时出错: {e}")

# 保存到CSV文件
output_path = os.path.join(output_folder, "data.csv")
combined_data.to_csv(output_path, index=False)

print(f"数据已成功保存到 {output_path}")


数据已成功保存到 D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\data.csv


In [17]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np

# 读取两个CSV文件
te_all_hops = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\te_all_hops_v5.csv")
data_csv = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\data.csv")

# 提取两个文件的前7列数据
file1_data = [te_all_hops.iloc[:, i] for i in range(7)]
file2_data = [data_csv.iloc[:, i] for i in range(7)]

# 合并数据，相同跳数的相邻排列
combined_data = []
positions = []
for i in range(7):
    combined_data.append(file1_data[i])
    combined_data.append(file2_data[i])
    positions.extend([i*3+1, i*3+2])  # 调整位置使相同跳数的箱线图相邻

# 绘制箱线图
plt.figure(figsize=(14, 8))

# 绘制箱线图
box = plt.boxplot(combined_data, 
                 positions=positions,
                 widths=0.6,
                 patch_artist=True,
                 boxprops=dict(facecolor='lightblue', color='blue', alpha=0.7),
                 whiskerprops=dict(color='gray'),
                 capprops=dict(color='gray'),
                 medianprops=dict(color='red', linewidth=2),
                 flierprops=dict(marker='o', markersize=5, markerfacecolor='none', markeredgecolor='gray'))

# 为第二个文件的数据设置不同颜色
for i in range(1, len(combined_data), 2):
    box['boxes'][i].set_facecolor('lightgreen')
    box['boxes'][i].set_alpha(0.7)

# 设置x轴标签和标题
x_labels = [f'Hop {i+1}' for i in range(7)]
plt.xticks([i*3+1.5 for i in range(7)], x_labels)
plt.xlabel('Hop Number')
plt.ylabel('Time Error (ns)')
plt.title('Comparison of Time Error Distribution for First 7 Hops (v5 vs data)')

# 添加图例
plt.plot([], [], color='lightblue', label='te_all_hops_v5.csv')
plt.plot([], [], color='lightgreen', label='data.csv')
plt.legend()

plt.grid(True, alpha=0.3)

# 保存图像
output_dir = r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2"
os.makedirs(output_dir, exist_ok=True)
plt.savefig(os.path.join(output_dir, 'combined_first_seven_hops_comparison.png'), dpi=300, bbox_inches='tight')
plt.close()

print("箱线图已保存至:", os.path.join(output_dir, 'combined_first_seven_hops_comparison.png'))


箱线图已保存至: D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\combined_first_seven_hops_comparison.png


In [19]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np
from matplotlib import rcParams

# 设置中文字体 - 解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']  # 使用黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
rcParams.update({'font.size': 12})  # 设置全局字体大小

# 读取两个CSV文件
te_all_hops = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\te_all_hops_v5.csv")
data_csv = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\data.csv")

# 提取两个文件的前7列数据
sim_data = [te_all_hops.iloc[:, i] for i in range(7)]  # 仿真数据
real_data = [data_csv.iloc[:, i] for i in range(7)]    # 实测数据

# 合并数据，相同跳数的相邻排列
combined_data = []
positions = []
for i in range(7):
    combined_data.append(sim_data[i])
    combined_data.append(real_data[i])
    positions.extend([i*3+1, i*3+2])  # 调整位置使相同跳数的箱线图相邻

# 创建图形
plt.figure(figsize=(14, 8), dpi=100)

# 绘制箱线图
box = plt.boxplot(combined_data, 
                 positions=positions,
                 widths=0.6,
                 patch_artist=True,
                 boxprops=dict(facecolor='lightblue', color='blue', alpha=0.7),
                 whiskerprops=dict(color='gray'),
                 capprops=dict(color='gray'),
                 medianprops=dict(color='red', linewidth=2),
                 flierprops=dict(marker='o', markersize=5, markerfacecolor='none', markeredgecolor='gray'))

# 为实测数据设置不同颜色
for i in range(1, len(combined_data), 2):
    box['boxes'][i].set_facecolor('lightgreen')
    box['boxes'][i].set_alpha(0.7)

# 设置中文标签
x_labels = [f'第{i+1}跳' for i in range(7)]
plt.xticks([i*3+1.5 for i in range(7)], x_labels)
plt.xlabel('跳数', fontsize=14)
plt.ylabel('时间误差(ns)', fontsize=14)
plt.title('前7跳时间误差分布对比(仿真数据 vs 实测数据)', fontsize=16, pad=20)

# 添加中文图例
from matplotlib.patches import Patch
legend_elements = [
    Patch(facecolor='lightblue', alpha=0.7, label='仿真数据'),
    Patch(facecolor='lightgreen', alpha=0.7, label='实测数据')
]
plt.legend(handles=legend_elements, loc='upper right', fontsize=12)

# 网格线设置
plt.grid(True, alpha=0.3, linestyle='--')

# 调整布局
plt.tight_layout()

# 保存图像（确保文件名不含中文）
output_dir = r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, 'combined_first_seven_hops_comparison.png')

# 使用bbox_inches='tight'防止标签被截断
plt.savefig(output_path, dpi=300, bbox_inches='tight', facecolor='white')
plt.close()

print(f"箱线图已保存至: {output_path}")


箱线图已保存至: D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\combined_first_seven_hops_comparison.png


In [20]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np
from matplotlib import rcParams

# 设置中文字体 - 解决中文显示问题
plt.rcParams['font.sans-serif'] = ['SimHei']  # 使用黑体显示中文
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
rcParams.update({'font.size': 12})  # 设置全局字体大小

# 读取数据
# 前7跳数据
te_all_hops = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\te_all_hops_v5.csv")
data_csv = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\data.csv")

# 第32跳数据
sim_hop32 = pd.read_csv(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\te_all_hops_v5.csv").iloc[:, 31]  # 第32列
real_hop32 = pd.read_excel(r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\data_hop32.xlsx").iloc[:, 2]  # 第3列

# 提取数据
sim_data = [te_all_hops.iloc[:, i] for i in range(7)]  # 仿真数据前7跳
real_data = [data_csv.iloc[:, i] for i in range(7)]    # 实测数据前7跳

# 添加第32跳数据
sim_data.append(sim_hop32)
real_data.append(real_hop32)

# 合并数据，相同跳数的相邻排列
combined_data = []
positions = []
for i in range(8):  # 现在有8跳数据(前7跳+第32跳)
    combined_data.append(sim_data[i])
    combined_data.append(real_data[i])
    positions.extend([i*3+1, i*3+2])  # 调整位置使相同跳数的箱线图相邻

# 创建图形
plt.figure(figsize=(16, 8), dpi=100)

# 绘制箱线图
box = plt.boxplot(combined_data, 
                 positions=positions,
                 widths=0.6,
                 patch_artist=True,
                 boxprops=dict(facecolor='lightblue', color='blue', alpha=0.7),
                 whiskerprops=dict(color='gray'),
                 capprops=dict(color='gray'),
                 medianprops=dict(color='red', linewidth=2),
                 flierprops=dict(marker='o', markersize=5, markerfacecolor='none', markeredgecolor='gray'))

# 为实测数据设置不同颜色
for i in range(1, len(combined_data), 2):
    box['boxes'][i].set_facecolor('lightgreen')
    box['boxes'][i].set_alpha(0.7)

# 设置中文标签
x_labels = [f'第{i+1}跳' for i in range(7)] + ['第32跳']
plt.xticks([i*3+1.5 for i in range(8)], x_labels)
plt.xlabel('跳数', fontsize=14)
plt.ylabel('时间误差(ns)', fontsize=14)
plt.title('时间误差分布对比(仿真数据 vs 实测数据)', fontsize=16, pad=20)

# 添加中文图例
from matplotlib.patches import Patch
legend_elements = [
    Patch(facecolor='lightblue', alpha=0.7, label='仿真数据'),
    Patch(facecolor='lightgreen', alpha=0.7, label='实测数据')
]
plt.legend(handles=legend_elements, loc='upper right', fontsize=12)

# 网格线设置
plt.grid(True, alpha=0.3, linestyle='--')

# 调整布局
plt.tight_layout()

# 保存图像
output_dir = r"D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, 'all_hops_comparison_with_hop32.png')

# 使用bbox_inches='tight'防止标签被截断
plt.savefig(output_path, dpi=300, bbox_inches='tight', facecolor='white')
plt.close()

print(f"箱线图已保存至: {output_path}")


箱线图已保存至: D:\06_engineering\03_analysis\pj_gptp_simulation\version\20250428\output_data_v2\all_hops_comparison_with_hop32.png
