In [12]:
pip install tensorflow


Note: you may need to restart the kernel to use updated packages.


In [13]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import os

# ✅ 文件路径
file_paths = {
    "Freestyle": "/Users/zhuoshangming/Desktop/freestyle--swim_data.csv",
    "Breaststroke": "/Users/zhuoshangming/Desktop/breakstroke--swim_data.csv",
    "Backstroke": "/Users/zhuoshangming/Desktop/Backstroke--swim_data.csv",
}

# ✅ 读取 & 归一化数据
dataframes = []
scaler = StandardScaler()

for swim_type, path in file_paths.items():
    df = pd.read_csv(path)
    df["Stroke"] = swim_type
    df = df.sort_values(by=[df.columns[0]])  # 按时间戳升序排序
    df.reset_index(drop=True, inplace=True)
    dataframes.append(df)

df_all = pd.concat(dataframes, ignore_index=True)

# 归一化 X, Y, Z
df_all[["X", "Y", "Z"]] = scaler.fit_transform(df_all[["X", "Y", "Z"]])

# ✅ 保存归一化模型
scaler_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/scaler.pkl"
os.makedirs(os.path.dirname(scaler_path), exist_ok=True)
import joblib
joblib.dump(scaler, scaler_path)

# ✅ 按 15 个数一组
GROUP_SIZE = 15
groups = [df_all.iloc[i:i+GROUP_SIZE] for i in range(0, len(df_all), GROUP_SIZE)]
valid_groups = [g for g in groups if len(g["Stroke"].unique()) == 1]

# ✅ 划分训练集 & 测试集（**不打乱顺序**）
test_data = valid_groups[:24]  # 前 24 组
train_data = valid_groups[24:]  # 剩余部分

# ✅ 保存数据
output_dir = "/Users/zhuoshangming/Desktop/训练数据原始处理/"
os.makedirs(output_dir, exist_ok=True)

pd.concat(train_data, ignore_index=True).to_csv(os.path.join(output_dir, "train_data.csv"), index=False)
pd.concat(test_data, ignore_index=True).to_csv(os.path.join(output_dir, "test_data.csv"), index=False)

print("✅ 训练数据和测试数据已保存！")


✅ 训练数据和测试数据已保存！


In [14]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import LabelEncoder
import joblib

# ✅ 读取数据
train_data_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/train_data.csv"
df_train = pd.read_csv(train_data_path)

# ✅ 加载归一化模型
scaler_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/scaler.pkl"
scaler = joblib.load(scaler_path)
df_train[["X", "Y", "Z"]] = scaler.transform(df_train[["X", "Y", "Z"]])

# ✅ 特征提取
def extract_sequences(df):
    X, y = [], []
    for i in range(0, len(df), 15):
        subset = df.iloc[i:i + 15]
        if len(subset) == 15 and len(subset["Stroke"].unique()) == 1:
            X.append(subset[["X", "Y", "Z"]].values)
            y.append(subset["Stroke"].values[0])
    return np.array(X), np.array(y)

X_train, y_train = extract_sequences(df_train)

# ✅ 标签编码
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)

# ✅ LSTM 模型
model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(15, 3)),
    Dropout(0.2),
    LSTM(32),
    Dropout(0.2),
    Dense(3, activation="softmax")
])

model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

# ✅ 训练模型
model.fit(X_train, y_train, epochs=30, batch_size=16, validation_split=0.2)

# ✅ 保存模型
model_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/swim_lstm_model.h5"
model.save(model_path)
joblib.dump(label_encoder, "/Users/zhuoshangming/Desktop/训练数据原始处理/label_encoder.pkl")

print("✅ LSTM 训练完成并保存！")


Epoch 1/30


  super().__init__(**kwargs)


[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - accuracy: 0.7353 - loss: 0.8798 - val_accuracy: 1.0000 - val_loss: 0.3784
Epoch 2/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 1.0000 - loss: 0.5025 - val_accuracy: 1.0000 - val_loss: 0.1821
Epoch 3/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 1.0000 - loss: 0.2446 - val_accuracy: 1.0000 - val_loss: 0.0825
Epoch 4/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 1.0000 - loss: 0.1137 - val_accuracy: 1.0000 - val_loss: 0.0456
Epoch 5/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 1.0000 - loss: 0.0564 - val_accuracy: 1.0000 - val_loss: 0.0313
Epoch 6/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - accuracy: 1.0000 - loss: 0.0368 - val_accuracy: 1.0000 - val_loss: 0.0237
Epoch 7/30
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 



✅ LSTM 训练完成并保存！


In [15]:
import pandas as pd
import numpy as np
import tensorflow as tf
import joblib

# ✅ 读取测试数据
test_data_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/test_data.csv"
df_test = pd.read_csv(test_data_path)

# ✅ 加载模型
model_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/swim_lstm_model.h5"
scaler_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/scaler.pkl"
label_encoder_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/label_encoder.pkl"

model = tf.keras.models.load_model(model_path)
scaler = joblib.load(scaler_path)
label_encoder = joblib.load(label_encoder_path)

df_test[["X", "Y", "Z"]] = scaler.transform(df_test[["X", "Y", "Z"]])

# ✅ 提取测试集特征
def extract_sequences(df):
    X, y = [], []
    for i in range(0, len(df), 15):
        subset = df.iloc[i:i + 15]
        if len(subset) == 15 and len(subset["Stroke"].unique()) == 1:
            X.append(subset[["X", "Y", "Z"]].values)
            y.append(subset["Stroke"].values[0])
    return np.array(X), np.array(y)

X_test, y_test = extract_sequences(df_test)
y_test_encoded = label_encoder.transform(y_test)

# ✅ 进行预测
y_pred = model.predict(X_test)
y_pred_labels = label_encoder.inverse_transform(np.argmax(y_pred, axis=1))

# ✅ 计算准确率
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(y_test_encoded, np.argmax(y_pred, axis=1))
print(f"🎯 LSTM 测试准确率: {accuracy * 100:.2f}%")




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 104ms/step
🎯 LSTM 测试准确率: 100.00%


In [24]:
import pandas as pd
import numpy as np
import tensorflow as tf
import joblib

# ✅ 读取未知数据
test_unknown_path = "/Users/zhuoshangming/Desktop/Test.csv"
df_unknown = pd.read_csv(test_unknown_path)

# ✅ 加载模型和标准化
model_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/swim_lstm_model.h5"
scaler_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/scaler.pkl"
label_encoder_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/label_encoder.pkl"

model = tf.keras.models.load_model(model_path)
scaler = joblib.load(scaler_path)
label_encoder = joblib.load(label_encoder_path)

# ✅ 归一化 `X, Y, Z`
df_unknown[["X", "Y", "Z"]] = scaler.transform(df_unknown[["X", "Y", "Z"]])

# ✅ 提取 `15` 个数的特征
def extract_sequences(df):
    X = []
    indices = []
    for i in range(0, len(df) - 14, 15):  # 确保每组15个
        subset = df.iloc[i:i + 15]
        if len(subset) == 15:
            X.append(subset[["X", "Y", "Z"]].values)
            indices.append((i, i + 15))  # 记录原始索引
    return np.array(X), indices

X_unknown, indices = extract_sequences(df_unknown)

# ✅ 进行预测
y_unknown_pred = model.predict(X_unknown)
y_unknown_labels = label_encoder.inverse_transform(np.argmax(y_unknown_pred, axis=1))

# ✅ 处理预测结果，填充 `df_unknown`
predicted_stroke_column = [""] * len(df_unknown)

for (start_idx, end_idx), label in zip(indices, y_unknown_labels):
    predicted_stroke_column[start_idx:end_idx] = [label] * 15  # 填充 15 个数

df_unknown["Predicted Stroke"] = predicted_stroke_column

# ✅ 保存预测结果
output_unknown_path = "/Users/zhuoshangming/Desktop/训练数据原始处理/Test_Predictions.csv"
df_unknown.to_csv(output_unknown_path, index=False)

print(f"✅ 未知数据预测完成，结果已保存至 {output_unknown_path}")




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 115ms/step
✅ 未知数据预测完成，结果已保存至 /Users/zhuoshangming/Desktop/训练数据原始处理/Test_Predictions.csv
