## 整体思路

1.  **数据源 (Data Source)**: 按照50Hz的频率，逐个“吐出”数据点，模拟传感器实时产生数据。
2.  **边缘节点 (Edge Node)**: 接收数据点，维护一个滑动窗口，并按设定的时间间隔进行模型推理，生成特征和实时判断，并将特征分批保存。

#### 关键设计决策：

  * **数据流的重建**：数据 `(9491, 200, 11)` 是一个已经切分好的序列集合。为了模拟真实的连续数据流，我们需要将其“展开”成一个长的一维时间序列。假设这些序列是步长为1生成的，那么原始数据流的长度大约是 `9491 - 1 + 200 = 9690` 个时间点。我们的模拟器会从这个重建的流中逐个读取数据。
  * **处理间隔（步长）**：我们之前讨论过，滑动窗口的步长是关键。一个**0.5秒（25个样本点）的步长是一个很好的起点，它在实时性和计算负载之间取得了很好的平衡。这意味着边缘节点每秒会进行2次推理**。
  * **特征文件保存**：为了模拟“实时生成特征文件”，我们不会等所有数据处理完再保存。而是每当边缘节点生成了一定数量（例如100个）的特征后，就将其保存为一个独立的文件（`features_batch_0.npy`, `features_batch_1.npy`, ...），这更贴近真实场景。


**每生成100个特征，总共对应消耗了 `2675` 条原始数据。**



In [1]:
import time

import os
import numpy as np
import pandas as pd
from collections import defaultdict, deque
import re
import io
import joblib

# PyTorch
import torch
import torch.nn as nn

## 定义模型和节点、数据加载模拟器

In [2]:
class FeatureModel1DCNN(nn.Module):
    def __init__(self, input_channels=11, num_classes=1):
        super(FeatureModel1DCNN, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv1d(in_channels=input_channels, out_channels=64, kernel_size=3, padding='same'), nn.ReLU(), nn.BatchNorm1d(64),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding='same'), nn.ReLU(), nn.BatchNorm1d(128),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding='same'), nn.ReLU(), nn.BatchNorm1d(256),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 25, 512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = x.permute(0, 2, 1)
        features = self.feature_extractor(x)
        output = self.classifier(features)
        return output

    def extract_features(self, x):
        x = x.permute(0, 2, 1)
        features = self.feature_extractor(x)
        return features

# --- 模拟器核心代码 ---

class DataSourceSimulator:
    """
    模拟一个实时数据源，从一个大的序列集合中重建并逐个吐出数据点。
    """
    def __init__(self, sequences_array, sampling_rate_hz=50):
        print("Data Source: Reconstructing continuous data stream from sequences...")
        # 从 (N, 200, 11) 的序列中重建原始的连续数据流
        # 第一个序列是 [P0..P199], 第二个是 [P1..P200], ...
        # 因此，连续数据流是 [P0..P199] + [P200] + [P201] + ...
        self.continuous_data = sequences_array[0] # 以第一个完整序列开始
        additional_points = sequences_array[1:, -1, :] # 从第二个序列开始，只取最后一个新数据点
        self.continuous_data = np.vstack((self.continuous_data, additional_points))
        
        self.sampling_period_s = 1.0 / sampling_rate_hz
        self.total_points = len(self.continuous_data)
        self._current_index = 0
        print(f"Data Source: Stream reconstructed. Total points: {self.total_points}.")

    def __iter__(self):
        return self

    def __next__(self):
        if self._current_index < self.total_points:
            data_point = self.continuous_data[self._current_index]
            timestamp = self._current_index * self.sampling_period_s
            self._current_index += 1
            return timestamp, data_point
        else:
            raise StopIteration

class EdgeNodeSimulator:
    """
    模拟边缘节点的行为：接收数据，用滑动窗口处理，并实时生成特征和判断。
    """
    def __init__(self, model, scaler, device, window_size, step_size, feature_batch_size=100, output_dir="features"):
        self.model = model
        self.scaler = scaler
        self.device = device
        self.WINDOW_SIZE = window_size
        self.STEP_SIZE = step_size
        self.FEATURE_BATCH_SIZE = feature_batch_size
        self.OUTPUT_DIR = output_dir

        self.buffer = deque(maxlen=self.WINDOW_SIZE)
        self.points_since_last_inference = 0
        
        self.feature_batch = []
        self.feature_files_saved = 0
        
        if not os.path.exists(self.OUTPUT_DIR):
            os.makedirs(self.OUTPUT_DIR)

    def _preprocess(self, window_data):
        scaled_window = self.scaler.transform(window_data)
        return torch.tensor(scaled_window, dtype=torch.float32).unsqueeze(0).to(self.device)

    def _save_feature_batch(self):
        if not self.feature_batch:
            return
        
        save_path = os.path.join(self.OUTPUT_DIR, f"features_batch_{self.feature_files_saved}.npy")
        np.save(save_path, np.vstack(self.feature_batch))
        print(f"\n--- Saved {len(self.feature_batch)} features to {save_path} ---\n")
        
        self.feature_batch = []
        self.feature_files_saved += 1
        
    def process_data_point(self, timestamp, data_point):
        self.buffer.append(data_point)
        self.points_since_last_inference += 1
        
        # 1. 检查缓冲区是否已满
        if len(self.buffer) < self.WINDOW_SIZE:
            return # 继续收集数据
        
        # 2. 检查是否到达下一个推理时间点 (步长)
        if self.points_since_last_inference >= self.STEP_SIZE:
            self.points_since_last_inference = 0 # 重置计数器
            
            # 准备数据并进行推理
            current_window = np.array(self.buffer)
            window_tensor = self._preprocess(current_window)
            
            with torch.no_grad():
                features = self.model.extract_features(window_tensor)
                logits = self.model(window_tensor)
            
            # 处理结果
            confidence = torch.sigmoid(logits).item()
            prediction = "FALL DETECTED!" if confidence > 0.5 else "No Fall"
            
            # 打印实时判断日志
            print(f"Timestamp: {timestamp:7.2f}s | Confidence: {confidence:.4f} | Prediction: {prediction}")
            
            # 收集特征，准备分批保存
            self.feature_batch.append(features.cpu().numpy().flatten())
            if len(self.feature_batch) >= self.FEATURE_BATCH_SIZE:
                self._save_feature_batch()

    def finalize(self):
        """在数据流结束后，保存剩余的特征。"""
        self._save_feature_batch()
        print("Simulation finished. All remaining features saved.")



## 获取并清理数据

In [3]:
DATASET_PATH = 'MobiFall_Dataset'
TARGET_SAMPLING_RATE_HZ = 50.0  # Target sampling rate in Hz
TARGET_SAMPLING_PERIOD = f"{int(1000 / TARGET_SAMPLING_RATE_HZ)}ms"
SEQUENCE_LENGTH = int(TARGET_SAMPLING_RATE_HZ * 4) # 200 samples for 4 seconds at 50Hz
STEP = int(TARGET_SAMPLING_RATE_HZ * 1)          # 50 samples for 1 second step at 50Hz

SENSOR_CODES = ["acc", "gyro", "ori"]
EXPECTED_COLUMNS = {
    "acc": ["acc_x", "acc_y", "acc_z"],
    "gyro": ["gyro_x", "gyro_y", "gyro_z"],
    "ori": ["ori_azimuth", "ori_pitch", "ori_roll"]
}
ALL_FEATURE_COLUMNS = [
    "acc_x", "acc_y", "acc_z", "acc_smv",
    "gyro_x", "gyro_y", "gyro_z", "gyro_smv",
    "ori_azimuth", "ori_pitch", "ori_roll"
]


def load_and_resample_sensor_file(filepath, sensor_code):
    """加载单个传感器文件，转换时间戳并进行重采样。"""
    try:
        with open(filepath, 'r') as f:
            lines = f.readlines()

        # 初始化一个变量作为“标记未找到”的标志
        data_start_line_index = -1

        # 遍历文件中的每一行
        for i, line in enumerate(lines):
            # 检查当前行是否是"@DATA"标记
            if line.strip().upper() == "@DATA":
                # 如果是，则记录下一行的行号并跳出循环
                data_start_line_index = i + 1
                break

        # 检查标记是否被找到
        if data_start_line_index == -1 or data_start_line_index >= len(lines):
            return None

        # 将数据行拼接成单个字符串
        data_string = "".join(lines[data_start_line_index:])

        # 检查字符串是否为空
        if not data_string.strip():
            return None

        # 使用pandas处理数据
        df = pd.read_csv(io.StringIO(data_string), header=None, usecols=[0, 1, 2, 3])
        
        # 检查生成的数据表是否为空
        if df.empty:
            return None

        # 为数据列进行命名
        df.columns = ['timestamp_ns'] + EXPECTED_COLUMNS[sensor_code]

        # 将ns时间戳转换为标准的日期时间格式
        df['timestamp'] = pd.to_datetime(df['timestamp_ns'], unit='ns')

        # 将新的日期时间设置为索引，并删除旧的时间戳列
        df = df.set_index('timestamp').drop(columns=['timestamp_ns'])

        # 按时间索引进行排序
        df = df.sort_index()

        # 将采样时间不均匀的传感器数据，强制转换为频率统一（每20毫秒一个点）的规整数据流，并填补其中的所有空白
        df_resampled = df.resample(TARGET_SAMPLING_PERIOD).mean().interpolate(method='linear', limit_direction='both')

        # 检查当前处理的传感器是否为加速度计 ('acc')
        if sensor_code == 'acc':
            # 安全性检查 - 确认三轴数据都存在
            if all(col in df_resampled.columns for col in ['acc_x', 'acc_y', 'acc_z']):
                # 计算信号幅值向量 (SMV)
                df_resampled['acc_smv'] = np.sqrt(
                    df_resampled['acc_x']**2 + df_resampled['acc_y']**2 + df_resampled['acc_z']**2
                )

        # 如果不是加速度计，则检查是否为陀螺仪 ('gyro')
        elif sensor_code == 'gyro':
            # 对陀螺仪数据执行相同的操作
            if all(col in df_resampled.columns for col in ['gyro_x', 'gyro_y', 'gyro_z']):
                df_resampled['gyro_smv'] = np.sqrt(
                    df_resampled['gyro_x']**2 + df_resampled['gyro_y']**2 + df_resampled['gyro_z']**2
                )

        return df_resampled

    except (pd.errors.EmptyDataError, ValueError):
        return None
    except Exception as e:
        print(f"Error processing file {filepath}: {e}. Skipping.")
        return None

def load_data_from_structured_folders(dataset_root_path):
    """遍历数据集文件夹，处理、对齐并组合每个试验的传感器数据。"""
    print(f"Scanning for data in: {dataset_root_path}")
    if not os.path.isdir(dataset_root_path):
        print(f"ERROR: Dataset root path '{dataset_root_path}' not found.")
        return [], []

    # 存放每一次活动试验（trial）所对应的各个传感器文件的路径（数据文件的位置）
    trial_sensor_files_map = defaultdict(lambda: defaultdict(str))

    # 存放每一次活动试验的元数据（这些数据代表什么，即标签信息）
    trial_metadata_map = {}
    
    # 遍历数据集的每一个文件夹
    for dirpath, _, filenames in os.walk(dataset_root_path):
        # 解析文件夹路径，以确定活动类别和具体活动
        relative_path = os.path.relpath(dirpath, dataset_root_path)
        path_parts = relative_path.split(os.sep)
        # 确保只处理包含实际数据文件的特定层级文件夹
        if len(path_parts) != 3: continue

        # 遍历这些特定文件夹中的每一个文件
        for filename in filenames:
            # 确保只处理.txt文件
            if not filename.endswith(".txt"): continue
            
            # 解析文件名，通过下划线分割以获取各个部分
            fname_parts = filename.replace('.txt', '').split('_')
            # 过滤掉不符合预期格式的文件名
            if len(fname_parts) != 4: continue
            
            # 从文件名部分中提取所需信息
            _, sensor_code, _, trial_no_str = fname_parts
            # 将传感器代码转为小写以保持一致性
            sensor_code = sensor_code.lower()
            # 确保是已知的传感器类型 ('acc', 'gyro', 'ori')
            if sensor_code not in SENSOR_CODES: continue

            # 尝试从路径和文件名中提取并转换所有元数据
            try:
                # 从文件夹路径的第一部分提取受试者ID
                subject_match = re.fullmatch(r'sub(\d+)', path_parts[0], re.IGNORECASE)
                if not subject_match: continue
                subject_id = int(subject_match.group(1))
                
                # 从文件夹路径的第二和第三部分获取类别和活动代码
                category = path_parts[1].upper()
                activity_code = path_parts[2].upper()
                # 将试验编号从字符串转换为整数
                trial_no = int(trial_no_str)
                # 构建完整的文件路径
                filepath = os.path.join(dirpath, filename)
                
                # 创建一个唯一的键来标识这次试验 (受试者, 活动, 试验编号)
                trial_key = (subject_id, activity_code, trial_no)
                # 在映射表中存储该传感器文件的路径
                trial_sensor_files_map[trial_key][sensor_code] = filepath
                # 如果是第一次遇到这个试验，则记录其元数据（类别和活动代码）
                if trial_key not in trial_metadata_map:
                    trial_metadata_map[trial_key] = {"category": category, "activity_code": activity_code}
            except (AttributeError, ValueError):
                # 如果在提取或转换过程中出现任何错误，则跳过该文件
                continue

    # 初始化两个列表，用于存放最终处理好的数据和对应的标签
    processed_trials_data, labels = [], []
    print(f"\nProcessing and combining {len(trial_sensor_files_map)} unique trials...")
    
    # 遍历前面组织好的每一次活动试验（trial）
    for trial_key, sensor_files in trial_sensor_files_map.items():
        # 确保该次试验包含了 acc, gyro, ori 全部三种传感器文件，否则跳过
        if not all(s_code in sensor_files for s_code in SENSOR_CODES): continue

        # 使用字典推导式，为每种传感器加载并重采样数据
        resampled_dfs = {s_code: load_and_resample_sensor_file(sensor_files[s_code], s_code) for s_code in SENSOR_CODES}
        # 如果任何一个文件加载或处理失败（返回了None或空表），则跳过这次试验
        if any(df is None or df.empty for df in resampled_dfs.values()): continue

        try:
            # --- 时间对齐关键步骤 ---
            # 找到三个传感器数据中最晚的开始时间
            common_start = max(df.index.min() for df in resampled_dfs.values())
            # 找到三个传感器数据中最早的结束时间
            common_end = min(df.index.max() for df in resampled_dfs.values())
            # 如果没有重叠的时间窗口，则跳过
            if common_start >= common_end: continue

            # 将三个数据表都裁剪到共同的时间范围内
            aligned_dfs = [resampled_dfs[s_code][common_start:common_end].reset_index(drop=True) for s_code in SENSOR_CODES]
            # 确保对齐后的数据表长度一致且不为空，否则跳过
            if not all(len(df) > 0 and len(df) == len(aligned_dfs[0]) for df in aligned_dfs): continue
            
            # --- 数据合并 ---
            # 按列（axis=1）将三个对齐后的数据表拼接成一个宽表
            combined_df = pd.concat(aligned_dfs, axis=1)
            
            # 再次检查并确保列名正确
            if len(combined_df.columns) == len(ALL_FEATURE_COLUMNS):
                 combined_df.columns = ALL_FEATURE_COLUMNS
            else:
                 continue # 如果列数不匹配则跳过

            # 如果合并后的数据长度不足一个序列窗口（4秒），则跳过
            if len(combined_df) < SEQUENCE_LENGTH: continue
            
            # --- 数据和标签存储 ---
            # 将处理好的数据（转换为Numpy数组）存入列表
            processed_trials_data.append(combined_df.values)
            # 根据元数据判断该试验是"FALLS"还是"ADL"，并存入标签（1代表跌倒，0代表非跌倒）
            labels.append(1 if trial_metadata_map[trial_key]["category"] == "FALLS" else 0)
            
        except Exception:
            # 捕获任何在对齐和合并过程中可能出现的意外错误，并跳过该试验
            continue

    print(f"Successfully processed and combined sensor data for {len(processed_trials_data)} trials.")
    # 返回包含所有处理好的试验数据和标签的列表
    return processed_trials_data, labels

def create_sequences(data_list, label_list, seq_length, step):
    """使用滑动窗口从试验数据创建序列。"""
    # 初始化用于存放最终序列和对应标签的列表
    X, y = [], []
    # 遍历每一次活动试验的数据
    for i, trial_data in enumerate(data_list):
        trial_label = label_list[i]
        # 在单次试验数据上，按指定的步长（step）移动窗口
        for j in range(0, len(trial_data) - seq_length + 1, step):
            # 截取一个固定长度（seq_length）的片段作为序列
            X.append(trial_data[j:(j + seq_length)])
            # 为这个序列分配对应的标签
            y.append(trial_label)
            
    if not X: return np.array([]), np.array([])
    # 将列表转换为Numpy数组后返回
    return np.array(X), np.array(y)


# 1. 加载和创建序列
trial_arrays, trial_labels = load_data_from_structured_folders(DATASET_PATH)
X_sequences, _ = create_sequences(trial_arrays, trial_labels, SEQUENCE_LENGTH, STEP)
print(f"Created {X_sequences.shape} sequences.")

Scanning for data in: MobiFall_Dataset

Processing and combining 627 unique trials...
Successfully processed and combined sensor data for 627 trials.
Created (9491, 200, 11) sequences.


## 模拟过程运行

In [4]:
# --- 1. 配置参数 ---
SAMPLING_RATE_HZ = 50
WINDOW_SECONDS = 4
STEP_SECONDS = 0.5 # 每隔0.5秒进行一次判断

WINDOW_SIZE = SAMPLING_RATE_HZ * WINDOW_SECONDS # 200 个数据点
STEP_SIZE = int(SAMPLING_RATE_HZ * STEP_SECONDS) # 25 个数据点

MODEL_PATH = "feature_model_1dcnn.pth"
SCALER_PATH = "scaler_50hz_torch.gz"
FEATURE_OUTPUT_DIR = "simulated_features"
FEATURE_BATCH_SIZE = 100 # 每100个特征保存一次文件

# --- 2. 加载资源 ---
print("Loading model and scaler...")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 加载模型
model = FeatureModel1DCNN(input_channels=11, num_classes=1).to(device)
# 如果文件不存在，则跳过加载，使用随机初始化的模型进行模拟
if os.path.exists(MODEL_PATH):
    model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
    print(f"Model loaded from {MODEL_PATH}")
else:
    print(f"Warning: Model file not found at {MODEL_PATH}. Using a randomly initialized model.")
model.eval()

# 加载标准化器
if os.path.exists(SCALER_PATH):
    scaler = joblib.load(SCALER_PATH)
    print(f"Scaler loaded from {SCALER_PATH}")
else:
    # 如果找不到，创建一个虚拟的scaler
    print(f"Warning: Scaler file not found at {SCALER_PATH}. Using a dummy scaler.")
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()

# --- 3. 加载数据 ---
print("Loading sequence data...")
try:
    # 尝试加载真实数据（如果存在）
    all_sequences = X_sequences
except:
    print("Sequence data file not found. Generating random data for simulation.")
    all_sequences = np.random.rand(9491, 200, 11)



# --- 4. 运行模拟 ---
print("\nStarting real-time simulation...")
data_source = DataSourceSimulator(all_sequences, sampling_rate_hz=SAMPLING_RATE_HZ)
edge_node = EdgeNodeSimulator(
    model, scaler, device, 
    window_size=WINDOW_SIZE, 
    step_size=STEP_SIZE, 
    feature_batch_size=FEATURE_BATCH_SIZE,
    output_dir=FEATURE_OUTPUT_DIR
)

start_time = time.time()

for timestamp, data_point in data_source:
    edge_node.process_data_point(timestamp, data_point)
    # 可以加入一个小的延时来模拟实时，但对于纯模拟可以注释掉
    # time.sleep(0.001) 

# 结束时，保存最后一批不满的特征
edge_node.finalize()

end_time = time.time()
print(f"\nTotal simulation time: {end_time - start_time:.2f} seconds.")

Loading model and scaler...
Model loaded from feature_model_1dcnn.pth
Scaler loaded from scaler_50hz_torch.gz
Loading sequence data...

Starting real-time simulation...
Data Source: Reconstructing continuous data stream from sequences...
Data Source: Stream reconstructed. Total points: 9690.
Timestamp:    3.98s | Confidence: 1.0000 | Prediction: FALL DETECTED!
Timestamp:    4.48s | Confidence: 1.0000 | Prediction: FALL DETECTED!
Timestamp:    4.98s | Confidence: 1.0000 | Prediction: FALL DETECTED!
Timestamp:    5.48s | Confidence: 1.0000 | Prediction: FALL DETECTED!
Timestamp:    5.98s | Confidence: 0.0001 | Prediction: No Fall
Timestamp:    6.48s | Confidence: 0.8275 | Prediction: FALL DETECTED!
Timestamp:    6.98s | Confidence: 0.0000 | Prediction: No Fall
Timestamp:    7.48s | Confidence: 0.0000 | Prediction: No Fall
Timestamp:    7.98s | Confidence: 0.0000 | Prediction: No Fall
Timestamp:    8.48s | Confidence: 0.0000 | Prediction: No Fall
Timestamp:    8.98s | Confidence: 0.0000 |