In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, f1_score, precision_score, recall_score
import os
import zipfile
import urllib.request
from io import BytesIO
from scipy import stats

In [5]:

def load_uci_har_data(data_path):
    # Load activity labels
    activity_labels = pd.read_csv(os.path.join(data_path, 'activity_labels.txt'),
                                 delim_whitespace=True, header=None,
                                 names=['class_index', 'class_name'])

    # Load labels
    y_train = pd.read_csv(os.path.join(data_path, 'y_train.txt'), header=None)
    y_test = pd.read_csv(os.path.join(data_path, 'y_test.txt'), header=None)

    # Convert labels to numpy arrays
    y_train = y_train.values.flatten().astype('int32') - 1  # Make labels zero-indexed
    y_test = y_test.values.flatten().astype('int32') - 1  # Make labels zero-indexed

    # Load inertial signals data
    train_signals_paths = [
        os.path.join(data_path, 'body_acc_x_train.txt'),
        os.path.join(data_path, 'body_acc_y_train.txt'),
        os.path.join(data_path, 'body_acc_z_train.txt'),
        os.path.join(data_path, 'body_gyro_x_train.txt'),
        os.path.join(data_path, 'body_gyro_y_train.txt'),
        os.path.join(data_path, 'body_gyro_z_train.txt'),
        os.path.join(data_path, 'total_acc_x_train.txt'),
        os.path.join(data_path, 'total_acc_y_train.txt'),
        os.path.join(data_path, 'total_acc_z_train.txt')
    ]

    test_signals_paths = [
        os.path.join(data_path, 'body_acc_x_test.txt'),
        os.path.join(data_path, 'body_acc_y_test.txt'),
        os.path.join(data_path, 'body_acc_z_test.txt'),
        os.path.join(data_path, 'body_gyro_x_test.txt'),
        os.path.join(data_path, 'body_gyro_y_test.txt'),
        os.path.join(data_path, 'body_gyro_z_test.txt'),
        os.path.join(data_path, 'total_acc_x_test.txt'),
        os.path.join(data_path, 'total_acc_y_test.txt'),
        os.path.join(data_path, 'total_acc_z_test.txt')
    ]

    # Load and stack training data
    x_train_signals = []
    for path in train_signals_paths:
        signal = pd.read_csv(path, delim_whitespace=True, header=None).values
        x_train_signals.append(signal)

    # Load and stack test data
    x_test_signals = []
    for path in test_signals_paths:
        signal = pd.read_csv(path, delim_whitespace=True, header=None).values
        x_test_signals.append(signal)

    # Convert to numpy arrays with shape (n_samples, n_timestamps, n_features)
    # Each signal has shape (n_samples, n_timestamps)
    # We'll transpose to get shape (n_samples, n_timestamps, n_features)
    x_train = np.transpose(np.array(x_train_signals), (1, 2, 0)).astype('float32')
    x_test = np.transpose(np.array(x_test_signals), (1, 2, 0)).astype('float32')

    print(f"Loaded inertial signals data with shape: {x_train.shape} (train), {x_test.shape} (test)")

    # Normalize each feature independently
    n_samples_train, n_timestamps, n_features = x_train.shape
    n_samples_test = x_test.shape[0]

    # Reshape to normalize features independently
    x_train_reshaped = x_train.reshape((n_samples_train * n_timestamps, n_features))
    x_test_reshaped = x_test.reshape((n_samples_test * n_timestamps, n_features))

    # Normalize
    scaler = StandardScaler()
    x_train_reshaped = scaler.fit_transform(x_train_reshaped)
    x_test_reshaped = scaler.transform(x_test_reshaped)

    # Reshape back to original shape
    x_train = x_train_reshaped.reshape((n_samples_train, n_timestamps, n_features))
    x_test = x_test_reshaped.reshape((n_samples_test, n_timestamps, n_features))

    # One-hot encode labels
    y_train_onehot = tf.keras.utils.to_categorical(y_train, num_classes=6)
    y_test_onehot = tf.keras.utils.to_categorical(y_test, num_classes=6)

    return x_train, y_train, y_train_onehot, x_test, y_test, y_test_onehot, activity_labels

data_path = "../UCI_HAR_Dataset/for_colab/"
x_train, y_train, y_train_onehot, x_test, y_test, y_test_onehot, activity_labels = load_uci_har_data(data_path)

print ("/content/activity_labels.txt")
print(f"Training data shape: {x_train.shape}")
print(f"Training labels shape: {y_train_onehot.shape}")
print(f"Test data shape: {x_test.shape}")
print(f"Test labels shape: {y_test_onehot.shape}")
print("\nActivity labels:")
print(activity_labels)


  activity_labels = pd.read_csv(os.path.join(data_path, 'activity_labels.txt'),
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, header=None).values
  signal = pd.read_csv(path, delim_whitespace=True, head

Loaded inertial signals data with shape: (7352, 128, 9) (train), (2947, 128, 9) (test)
/content/activity_labels.txt
Training data shape: (7352, 128, 9)
Training labels shape: (7352, 6)
Test data shape: (2947, 128, 9)
Test labels shape: (2947, 6)

Activity labels:
   class_index          class_name
0            1             WALKING
1            2    WALKING_UPSTAIRS
2            3  WALKING_DOWNSTAIRS
3            4             SITTING
4            5            STANDING
5            6              LAYING


In [6]:
def split_sequences(data, window_size=128, step=64):
    """
    Split the data into sequences with overlapping windows

    Args:
        data: Input data with shape (n_samples, n_features)
        window_size: Number of time steps in each sequence
        step: Step size between consecutive sequences

    Returns:
        Sequences with shape (n_sequences, window_size, n_features)
    """
    n_samples, n_features = data.shape
    sequences = []

    for i in range(0, n_samples - window_size + 1, step):
        sequence = data[i:i + window_size]
        sequences.append(sequence)

    return np.array(sequences)

# -----
# Since the inertial signals data is already in the format of (samples, timestamps, features),
# we don't need to use the split_sequences function for basic formatting, but we'll still use it
# if needed for resizing or overlap

# Check the current shape of the data
print(f"Original training data shape: {x_train.shape}")
print(f"Original test data shape: {x_test.shape}")

# The data already has 128 timestamps per sample, so we can use it directly
# If we need to adjust window size or create overlapping windows:
window_size = 128  # This is already the size in the dataset
step = 128  # Non-overlapping by default - set to smaller value for overlap

# Check if we need to reshape the data
if x_train.shape[1] != window_size or step != window_size:
    print("Reshaping data to adjust window size or create overlapping samples...")
    # Flatten the data to 2D and then recreate sequences
    x_train_flat = x_train.reshape(x_train.shape[0] * x_train.shape[1], x_train.shape[2])
    x_test_flat = x_test.reshape(x_test.shape[0] * x_test.shape[1], x_test.shape[2])

    # Create new sequences
    x_train_seq = split_sequences(x_train_flat, window_size=window_size, step=step)
    x_test_seq = split_sequences(x_test_flat, window_size=window_size, step=step)

    # Create corresponding labels for sequences
    # For each sequence, we'll use the mode of the labels
    orig_samples_per_window = x_train.shape[1] / window_size

    y_train_seq = []
    for i in range(0, len(x_train_flat) - window_size + 1, step):
        # Map back to original sample indices
        orig_sample_idx = int(i / x_train.shape[1])
        mode_label = y_train[orig_sample_idx]
        y_train_seq.append(mode_label)

    y_test_seq = []
    for i in range(0, len(x_test_flat) - window_size + 1, step):
        # Map back to original sample indices
        orig_sample_idx = int(i / x_test.shape[1])
        mode_label = y_test[orig_sample_idx]
        y_test_seq.append(mode_label)

    y_train_seq = np.array(y_train_seq)
    y_test_seq = np.array(y_test_seq)
else:
    # If no reshaping is needed, use the data directly
    x_train_seq = x_train
    x_test_seq = x_test
    y_train_seq = y_train
    y_test_seq = y_test

# One-hot encode labels
y_train_seq_onehot = tf.keras.utils.to_categorical(y_train_seq, num_classes=6)
y_test_seq_onehot = tf.keras.utils.to_categorical(y_test_seq, num_classes=6)

print(f"Final training data shape: {x_train_seq.shape}")
print(f"Final training labels shape: {y_train_seq_onehot.shape}")
print(f"Final test data shape: {x_test_seq.shape}")
print(f"Final test labels shape: {y_test_seq_onehot.shape}")


Original training data shape: (7352, 128, 9)
Original test data shape: (2947, 128, 9)
Final training data shape: (7352, 128, 9)
Final training labels shape: (7352, 6)
Final test data shape: (2947, 128, 9)
Final test labels shape: (2947, 6)


In [7]:
X_train_seq = x_train_seq.copy()
X_test_seq = x_test_seq.copy()

In [8]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers # Import optimizers here

# -------------------------
# ABFA 모듈 정의
# -------------------------
class ABFA(tf.keras.layers.Layer):
    def __init__(self, filters, activity_classes=6, dropout_rate=0.2, **kwargs):
        super(ABFA, self).__init__(**kwargs)
        self.filters = filters
        self.activity_classes = activity_classes
        self.dropout_rate = dropout_rate

    def build(self, input_shape):
        self.action_prototypes = self.add_weight(
            name='action_prototypes',
            shape=(self.activity_classes, input_shape[-1]),
            initializer='glorot_uniform',
            trainable=True
        )
        self.embedding_proj = layers.Dense(input_shape[-1])
        self.augment_proj = layers.Dense(input_shape[-1])
        self.bn = layers.BatchNormalization()
        self.dropout = layers.Dropout(self.dropout_rate)
        self.layer_norm = layers.LayerNormalization(epsilon=1e-6)
        super().build(input_shape)

    def call(self, inputs):
        x_proj = self.embedding_proj(inputs)
        proto_similarity = tf.einsum('btd,cd->btc', x_proj, self.action_prototypes)
        proto_attention = tf.nn.softmax(proto_similarity, axis=-1)
        enhanced = tf.einsum('btc,cd->btd', proto_attention, self.action_prototypes)
        augmented = self.augment_proj(enhanced)
        x = self.bn(inputs + augmented)
        x = self.dropout(x)
        return self.layer_norm(x)

# -------------------------
# MST Block 정의
# -------------------------
class MST_Block(layers.Layer):
    def __init__(self, filters, kernel_sizes=(3, 5, 7), **kwargs):
        super(MST_Block, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_sizes = kernel_sizes

        if len(kernel_sizes) == 3:
            self.filter_dims = [filters // 4, filters // 4, filters // 2]
        else:
            self.filter_dims = [filters // 2, filters // 2]

        self.conv_layers = []
        for i, k_size in enumerate(kernel_sizes):
            self.conv_layers.append(
                layers.DepthwiseConv1D(
                    kernel_size=k_size,
                    strides=1,
                    padding='same',
                    depth_multiplier=1
                )
            )
        self.proj_layers = [
            layers.Conv1D(self.filter_dims[i], kernel_size=1, padding='same') for i in range(len(kernel_sizes))
        ]
        self.output_proj = layers.Conv1D(filters, kernel_size=1)

    def call(self, inputs):
        outputs = []
        for conv, proj in zip(self.conv_layers, self.proj_layers):
            x = conv(inputs)
            x = proj(x)
            outputs.append(x)
        x = layers.Concatenate()(outputs)
        return self.output_proj(x)

# -------------------------
# 모델 정의
# -------------------------
def build_model3(input_shape, num_classes=6):
    inputs = layers.Input(shape=input_shape)

    # Initial projection
    x = layers.Conv1D(64, kernel_size=1, activation=None)(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv1D(128, kernel_size=1, activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    # Multi-Scale CNN Path
    kernel_sizes = [3, 7, 11]
    multi_scale_outputs = []
    for k in kernel_sizes:
        branch = layers.Conv1D(
            filters=128 // len(kernel_sizes),
            kernel_size=k,
            padding='same',
            activation='relu'
        )(x)
        branch = layers.BatchNormalization()(branch)
        multi_scale_outputs.append(branch)

    x = layers.Concatenate()(multi_scale_outputs)
    x = layers.Conv1D(128, kernel_size=1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    # ABFA block
    x = ABFA(filters=128, activity_classes=num_classes)(x)

    # MST block
    x = MST_Block(filters=128, kernel_sizes=(3, 5, 7))(x)

    # Transformer Encoder block (1 layer)
    attn_output = layers.MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
    x = layers.Add()([x, attn_output])
    x = layers.LayerNormalization()(x)
    ffn_output = layers.Dense(128, activation='relu')(x)
    ffn_output = layers.Dense(128)(ffn_output)
    x = layers.Add()([x, ffn_output])
    x = layers.LayerNormalization()(x)

    # GlobalAveragePooling + Classifier
    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    return models.Model(inputs=inputs, outputs=outputs)


model3 = build_model3(input_shape=(window_size, X_train_seq.shape[2]))
model3.summary()

# Compile the model
model3.compile(
    optimizer=optimizers.Adam(learning_rate=0.0001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [9]:
# import os
# import numpy as np
# import pandas as pd
# import tensorflow as tf
# from tensorflow.keras import layers, models, optimizers, callbacks
# from sklearn.model_selection import train_test_split # Import train_test_split


X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
    X_train_seq, y_train_seq_onehot, test_size=0.2, random_state=42
)

# early_stopping = callbacks.EarlyStopping(
#     monitor='val_loss',
#     patience=10,
#     restore_best_weights=True
# )

# reduce_lr = callbacks.ReduceLROnPlateau(
#     monitor='val_loss',
#     factor=0.5,
#     patience=5,
#     min_lr=1e-5
# )

# history = model3.fit(
#     X_train_split,
#     y_train_split,
#     epochs=100,
#     batch_size=32,
#     validation_data=(X_val_split, y_val_split),
#     callbacks=[early_stopping, reduce_lr],
#     verbose=1
# )

In [10]:
# import matplotlib.pyplot as plt

# plt.figure(figsize=(12, 5))

# plt.subplot(1, 2, 1)
# plt.plot(history.history['accuracy'], label='Train Accuracy')
# plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
# plt.title('Model Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.legend()

# plt.subplot(1, 2, 2)
# plt.plot(history.history['loss'], label='Train Loss')
# plt.plot(history.history['val_loss'], label='Validation Loss')
# plt.title('Model Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()

# plt.tight_layout()
# plt.show()

In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers, callbacks


# # Assuming your activity labels are in a CSV file, load them
# # Replace 'activity_labels.csv' with the actual file path
# try:
#     activity_labels = pd.read_csv('activity_labels.csv')
# except FileNotFoundError:
#     # If the file doesn't exist, create a DataFrame manually
#     activity_labels = pd.DataFrame({
#         'class_name': ['WALKING', 'WALKING_UPSTAIRS', 'WALKING_DOWNSTAIRS', 'SITTING', 'STANDING', 'LAYING'],
#         'label': [0, 1, 2, 3, 4, 5]  # Assuming labels are 0-indexed
#     })

# y_pred_probs = model3.predict(X_test_seq)  # Get predicted probabilities
# y_pred = np.argmax(y_pred_probs, axis=1)  # Convert probabilities to class labels
# y_true = np.argmax(y_test_seq_onehot, axis=1)  # Get true labels from one-hot encoded data


# precision = precision_score(y_true, y_pred, average='weighted')
# recall = recall_score(y_true, y_pred, average='weighted')
# f1 = f1_score(y_true, y_pred, average='weighted')
# # Calculate specificity for each class
# def specificity_score(y_true, y_pred, num_classes=6):
#     specificities = []

#     for i in range(num_classes):
#         true_negative = np.sum((y_true != i) & (y_pred != i))
#         false_positive = np.sum((y_true != i) & (y_pred == i))

#         if true_negative  false_positive == 0:
#             specificities.append(1.0)
#         else:
#             specificities.append(true_negative / (true_negative + false_positive))

#     return specificities

# specificities = specificity_score(y_true, y_pred)
# avg_specificity = np.mean(specificities)

# test_loss, test_acc = model3.evaluate(X_test_seq, y_test_seq_onehot, verbose=0)

# print("\nEvaluation Metrics:")
# print(f"Accuracy: {test_acc:.4f}")
# print(f"Precision: {precision:.4f}")
# print(f"Recall: {recall:.4f}")
# print(f"F1 Score: {f1:.4f}")
# print(f"Average Specificity: {avg_specificity:.4f}")

# print("\nClass-wise Specificities:")
# for i, spec in enumerate(specificities):
#     print(f"Class {i} ({activity_labels.iloc[i, 0]}): {spec:.4f}") # Assuming 'class_name' is in the first column (index 0)

# cm = confusion_matrix(y_true, y_pred)
# plt.figure(figsize=(10, 8))
# sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
#            xticklabels=activity_labels['class_name'].values,
#            yticklabels=activity_labels['class_name'].values)
# plt.xlabel('Predicted')
# plt.ylabel('True')
# plt.title('Confusion Matrix - PatchFormer (UCI-HAR)')
# plt.tight_layout()
# plt.show()

In [12]:
'''

Input (batch_size, time_steps, channels)
    ↓
[Initial Projection]
    Conv1D(kernel=1, filters=64) → BatchNorm → ReLU
    ↓
    Conv1D(kernel=1, filters=128) → BatchNorm → ReLU
    ↓
[Multi-Scale CNN Path]
    ┌─ Conv1D(kernel=3,  filters=128/3≈42) → BatchNorm
    ├─ Conv1D(kernel=7,  filters=128/3≈42) → BatchNorm
    └─ Conv1D(kernel=11, filters=128/3≈42) → BatchNorm
        ↓
    Concatenate(branches) → Conv1D(kernel=1, filters=128) → BatchNorm → ReLU
    ↓
[ABFA Block]
    • 입력 차원: (time_steps, 128)
    • action_prototypes: (num_classes, 128)
    • 내부:
        – Dense→프로젝트 → proto 유사도 계산 → softmax attention
        – attention ⊗ prototypes → augment_proj → residual + BN → Dropout → LayerNorm
    ↓
[MST_Block]
    • DepthwiseConv1D(kernel=3) → Conv1D(filters=32)
    • DepthwiseConv1D(kernel=5) → Conv1D(filters=32)
    • DepthwiseConv1D(kernel=7) → Conv1D(filters=64)
        ↓
    Concatenate → Conv1D(kernel=1, filters=128)
    ↓
[Transformer Encoder Block]
    MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
      → Add(x) → LayerNormalization
    ↓
    Feed-Forward: Dense(128, ReLU) → Dense(128)
      → Add(x) → LayerNormalization
    ↓
GlobalAveragePooling1D
    ↓
Dense(128, ReLU) → Dropout(0.2) → Dense(64, ReLU)
    ↓
Dense(num_classes, Softmax)
초기 투영(Initial Projection): 1×1 컨볼루션으로 채널 차원 확장

다중 스케일 CNN(Multi-Scale CNN Path): 서로 다른 커널 크기로 병렬 특징 추출 후 통합

ABFA Block: 학습된 행동 프로토타입과의 attention 기반 특징 증강

MST_Block: 깊이별(depthwise) 컨볼루션으로 다중 스케일 시간 특징 캡처

Transformer Encoder: 자기-어텐션과 FFN으로 시퀀스 전역 관계 학습

분류기(Classifier): 전역 풀링 후 완전 연결 계층으로 최종 클래스 예측

'''

'\n\nInput (batch_size, time_steps, channels)\n    ↓\n[Initial Projection]\n    Conv1D(kernel=1, filters=64) → BatchNorm → ReLU\n    ↓\n    Conv1D(kernel=1, filters=128) → BatchNorm → ReLU\n    ↓\n[Multi-Scale CNN Path]\n    ┌─ Conv1D(kernel=3,  filters=128/3≈42) → BatchNorm\n    ├─ Conv1D(kernel=7,  filters=128/3≈42) → BatchNorm\n    └─ Conv1D(kernel=11, filters=128/3≈42) → BatchNorm\n        ↓\n    Concatenate(branches) → Conv1D(kernel=1, filters=128) → BatchNorm → ReLU\n    ↓\n[ABFA Block]\n    • 입력 차원: (time_steps, 128)\n    • action_prototypes: (num_classes, 128)\n    • 내부:\n        – Dense→프로젝트 → proto 유사도 계산 → softmax attention\n        – attention ⊗ prototypes → augment_proj → residual + BN → Dropout → LayerNorm\n    ↓\n[MST_Block]\n    • DepthwiseConv1D(kernel=3) → Conv1D(filters=32)\n    • DepthwiseConv1D(kernel=5) → Conv1D(filters=32)\n    • DepthwiseConv1D(kernel=7) → Conv1D(filters=64)\n        ↓\n    Concatenate → Conv1D(kernel=1, filters=128)\n    ↓\n[Transformer Encoder

---

In [14]:
model3.save("./ABFA_MST.h5")

