In [2]:
!pip install thop ptflops

Collecting thop
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl.metadata (2.7 kB)
Collecting ptflops
  Downloading ptflops-0.7.5-py3-none-any.whl.metadata (9.4 kB)
Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Downloading ptflops-0.7.5-py3-none-any.whl (19 kB)
Installing collected packages: thop, ptflops
Successfully installed ptflops-0.7.5 thop-0.1.1.post2209072238


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import glob
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from thop import profile


class FallDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class SisFallLoader:
    def __init__(self, data_path, window_size=256, stride=128, use_rate=1.0,
                 sr=200, pre_sec=2.0, post_sec=2.0):
        self.data_path = data_path
        self.window_size = window_size
        self.stride = stride
        self.use_rate = use_rate
        self.sr = sr
        self.pre_len = int(sr * pre_sec)
        self.post_len = int(sr * post_sec)

        self.adl_activities = [f'D{str(i).zfill(2)}' for i in range(1, 20)]
        self.fall_activities = [f'F{str(i).zfill(2)}' for i in range(1, 16)]
        self.label2id = {act: 0 for act in self.adl_activities}
        self.label2id.update({act: 1 for act in self.fall_activities})

    def read_sensor_file(self, filepath):
        with open(filepath, 'r') as file:
            content = file.read()

        content = content.replace(' ', '')
        rows = []

        for line in content.split(';\n'):
            if line.strip():
                try:
                    values = [float(x) for x in line.split(',')]
                    rows.append(values)
                except (ValueError, IndexError):
                    continue

        return np.array(rows)

    def generate_pre_post_labels(self, raw_labels):
        labels = raw_labels.copy()
        n = len(labels)

        for i in range(n):
            if i < n - 1 and labels[i] == 1 and labels[i + 1] != 1:
                end_idx = min(i + 1 + self.post_len, n)
                labels[i+1:end_idx] = 3

        for i in range(n):
            if raw_labels[i] == 1 and (i == 0 or raw_labels[i-1] == 0):
                start_idx = max(0, i - self.pre_len)
                labels[start_idx:i] = 2

        return labels

    def sliding_window(self, data, labels):
        windows = []
        window_labels = []

        for i in range(0, len(data) - self.window_size + 1, self.stride):
            window = data[i:i + self.window_size]
            label_segment = labels[i:i + self.window_size]

            unique, counts = np.unique(label_segment, return_counts=True)
            label_dict = dict(zip(unique, counts))

            threshold = int(self.window_size * 0.1)

            if label_dict.get(1, 0) >= threshold:
                final_label = 1
            elif label_dict.get(3, 0) >= threshold:
                final_label = 3
            elif label_dict.get(2, 0) >= threshold:
                final_label = 2
            else:
                final_label = 0

            windows.append(window)
            window_labels.append(final_label)

        return np.array(windows), np.array(window_labels)

    def load_subject_data(self, subject, activities):
        subject_dir = os.path.join(self.data_path, subject)
        all_data = []
        all_labels = []

        for activity in activities:
            activity_pattern = os.path.join(subject_dir, f"{activity}*.txt")
            activity_files = glob.glob(activity_pattern)

            for file_path in activity_files:
                try:
                    raw_data = self.read_sensor_file(file_path)
                    if raw_data is not None and raw_data.shape[1] == 9:
                        sensor_data = raw_data
                        activity_labels = np.full(len(sensor_data), self.label2id[activity])

                        all_data.append(sensor_data)
                        all_labels.append(activity_labels)
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")

        if len(all_data) == 0:
            return None, None

        return np.concatenate(all_data, axis=0), np.concatenate(all_labels)

    def balance_per_subject_data(self, subjects, samples_per_class):
        balanced_X_list = []
        balanced_y_list = []

        for subject in subjects:
            is_elderly = subject.startswith('SE')
            if is_elderly and subject != 'SE06':
                activities = self.adl_activities
            else:
                activities = self.adl_activities + self.fall_activities

            data, labels = self.load_subject_data(subject, activities)
            if data is None:
                continue

            labels = self.generate_pre_post_labels(labels)

            mean = data.mean(axis=0)
            std = data.std(axis=0) + 1e-8
            data = (data - mean) / std

            subject_X, subject_y = self.sliding_window(data, labels)

            for class_id in range(4):
                class_mask = subject_y == class_id
                class_indices = np.where(class_mask)[0]

                if len(class_indices) > 0:
                    if len(class_indices) >= samples_per_class:
                        selected = np.random.choice(class_indices, samples_per_class, replace=False)
                    else:
                        selected = np.random.choice(class_indices, samples_per_class, replace=True)

                    balanced_X_list.append(subject_X[selected])
                    balanced_y_list.append(subject_y[selected])

        return np.concatenate(balanced_X_list, axis=0), np.concatenate(balanced_y_list)

    def prepare_dataset(self, train_subjects, val_subjects, test_subjects):
        np.random.seed(42)
        if self.use_rate < 1.0:
            n_train = int(len(train_subjects) * self.use_rate)
            train_subjects = np.random.choice(train_subjects, size=n_train, replace=False)

        print("\n" + "="*80)
        print("Loading and Balancing Training Data")
        print("="*80)
        train_X, train_y = self.balance_per_subject_data(train_subjects, samples_per_class=100)
        print(f"Train: {train_X.shape}, Labels: {np.bincount(train_y, minlength=4)}")

        print("\n" + "="*80)
        print("Loading and Balancing Validation Data")
        print("="*80)
        val_X, val_y = self.balance_per_subject_data(val_subjects, samples_per_class=50)
        print(f"Val:   {val_X.shape}, Labels: {np.bincount(val_y, minlength=4)}")

        print("\n" + "="*80)
        print("Loading and Balancing Test Data")
        print("="*80)
        test_X, test_y = self.balance_per_subject_data(test_subjects, samples_per_class=50)
        print(f"Test:  {test_X.shape}, Labels: {np.bincount(test_y, minlength=4)}")

        return train_X, train_y, val_X, val_y, test_X, test_y


class TemporalPatternAttention(nn.Module):
    def __init__(self, filter_size, num_heads, tau, bias_scale):
        super().__init__()
        self.filter_size = filter_size
        self.num_heads = num_heads
        self.tau = tau
        self.bias_scale = bias_scale

        self.W_q = nn.Linear(filter_size, filter_size * num_heads)
        self.W_k = nn.Linear(filter_size, filter_size * num_heads)
        self.W_v = nn.Linear(filter_size, filter_size * num_heads)
        self.out_proj = nn.Linear(filter_size * num_heads, filter_size)

    def forward(self, x):
        B, T, C = x.shape
        H = self.num_heads
        d = self.filter_size

        Q = self.W_q(x).view(B, T, H, d).transpose(1, 2)
        K = self.W_k(x).view(B, T, H, d).transpose(1, 2)
        V = self.W_v(x).view(B, T, H, d).transpose(1, 2)

        pos = torch.arange(T, device=x.device).unsqueeze(0).unsqueeze(0).float()
        pos_diff = pos.transpose(-1, -2) - pos
        bias = self.bias_scale * torch.exp(-torch.abs(pos_diff) / self.tau)

        attn = torch.matmul(Q, K.transpose(-2, -1)) / (d ** 0.5) + bias.unsqueeze(1)
        attn = torch.softmax(attn, dim=-1)

        out = torch.matmul(attn, V)
        out = out.transpose(1, 2).contiguous().view(B, T, H * d)
        out = self.out_proj(out)

        return out


class ConvEncoderTPA_FallModel(nn.Module):
    def __init__(self, in_ch, num_classes, heads, tau, bias_scale):
        super().__init__()

        self.conv1 = nn.Conv1d(in_ch, 64, kernel_size=8, stride=1, padding=3)
        self.bn1 = nn.BatchNorm1d(64)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.sc1 = nn.Sequential(
            nn.Conv1d(in_ch, 64, kernel_size=1, stride=1),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.conv2 = nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.sc2 = nn.Sequential(
            nn.Conv1d(64, 128, kernel_size=1, stride=1),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.sc3 = nn.Sequential(
            nn.Conv1d(128, 256, kernel_size=1, stride=1),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.conv4 = nn.Conv1d(256, 512, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm1d(512)
        self.pool4 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.sc4 = nn.Sequential(
            nn.Conv1d(256, 512, kernel_size=1, stride=1),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.tpa = TemporalPatternAttention(512, heads, tau, bias_scale)

        self.fc = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = x.transpose(1, 2)

        x1_in = x
        x1 = self.relu(self.bn1(self.conv1(x1_in)))
        x1 = self.pool1(x1)
        x1 = x1 + self.sc1(x1_in)

        x2_in = x1
        x2 = self.relu(self.bn2(self.conv2(x2_in)))
        x2 = self.pool2(x2)
        x2 = x2 + self.sc2(x2_in)

        x3_in = x2
        x3 = self.relu(self.bn3(self.conv3(x3_in)))
        x3 = self.pool3(x3)
        x3 = x3 + self.sc3(x3_in)

        x4_in = x3
        x4 = self.relu(self.bn4(self.conv4(x4_in)))
        x4 = self.pool4(x4)
        x4 = x4 + self.sc4(x4_in)

        x_tpa_in = x4.transpose(1, 2)

        x_att = self.tpa(x_tpa_in)

        x_global = x_att.mean(dim=1)
        x_global = self.dropout(x_global)

        out = self.fc(x_global)
        return out


def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for X, y in loader:
        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(X)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * X.size(0)
        _, predicted = outputs.max(1)
        total += y.size(0)
        correct += predicted.eq(y).sum().item()

    return total_loss / total, 100. * correct / total


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            outputs = model(X)
            loss = criterion(outputs, y)

            total_loss += loss.item() * X.size(0)
            _, predicted = outputs.max(1)
            total += y.size(0)
            correct += predicted.eq(y).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    return total_loss / total, 100. * correct / total, all_preds, all_labels


def main():
    CONFIG = {
        'window_size': 200,
        'stride': 100,
        'batch_size': 512,
        'epochs': 50,
        'hidden_dim': 128,
        'sisfall_path': '/content/drive/MyDrive/HAR_Dataset/SISFALL/',
        'tpa_heads': 4,
        'tpa_tau': 1.0,
        'tpa_bias_scale': 1.0,
        'class_weights': [1.0, 5.0, 8.0, 8.0],
        'use_rate': 1.0,
        'sr': 200,
        'pre_sec': 2.0,
        'post_sec': 2.0,
    }

    adult_subjects = [f'SA{str(i).zfill(2)}' for i in range(1, 24)]
    elderly_subjects = [f'SE{str(i).zfill(2)}' for i in range(1, 16)]

    train_sa = adult_subjects[:16]
    val_sa = adult_subjects[16:19]
    test_sa = adult_subjects[19:]

    train_se = elderly_subjects[:10]
    val_se = elderly_subjects[10:12]
    test_se = elderly_subjects[12:]

    train_split = train_sa + train_se
    val_split = val_sa + val_se
    test_split = test_sa + test_se

    loader = SisFallLoader(CONFIG['sisfall_path'],
                           window_size=CONFIG['window_size'],
                           stride=CONFIG['stride'],
                           use_rate=CONFIG['use_rate'],
                           sr=CONFIG['sr'],
                           pre_sec=CONFIG['pre_sec'],
                           post_sec=CONFIG['post_sec'])

    train_X, train_y, val_X, val_y, test_X, test_y = loader.prepare_dataset(
        train_split, val_split, test_split
    )

    train_ds = FallDataset(train_X, train_y)
    val_ds = FallDataset(val_X, val_y)
    test_ds = FallDataset(test_X, test_y)

    train_loader = DataLoader(train_ds, batch_size=CONFIG['batch_size'], shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=CONFIG['batch_size'], shuffle=False)
    test_loader = DataLoader(test_ds, batch_size=CONFIG['batch_size'], shuffle=False)

    print("\n" + "="*80)
    print("Build Model (ConvEncoder + TPA)")
    print("="*80)
    model = ConvEncoderTPA_FallModel(
        in_ch=9,
        num_classes=4,
        heads=CONFIG['tpa_heads'],
        tau=CONFIG['tpa_tau'],
        bias_scale=CONFIG['tpa_bias_scale']
    )
    n_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f" ✓ Parameters: {n_params:,}")
    print(f" ✓ TPA: heads={CONFIG['tpa_heads']}, tau={CONFIG['tpa_tau']}, bias_scale={CONFIG['tpa_bias_scale']}")


    weights = torch.FloatTensor(CONFIG['class_weights'])
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    weights = weights.to(device)

    criterion = nn.CrossEntropyLoss(weight=weights)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

    print("\n" + "="*80)
    print("Class Weights:")
    print("="*80)
    for name, w in zip(['ADL','Fall','Pre','Post'], CONFIG['class_weights']):
        print(f" {name:7s}: {w:.2f}")

    print("\n" + "="*80)
    print("Training Start")
    print("="*80)

    best_val_acc = 0
    best_epoch = 0

    for epoch in range(CONFIG['epochs']):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc, _, _ = evaluate(model, val_loader, criterion, device)

        scheduler.step(val_loss)

        print(f"Epoch {epoch+1:2d}/{CONFIG['epochs']} | "
              f"Train Loss: {train_loss:.4f} Acc: {train_acc:.2f}% | "
              f"Val Loss: {val_loss:.4f} Acc: {val_acc:.2f}%")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_epoch = epoch + 1
            torch.save(model.state_dict(), 'best_model.pth')

    print("\n" + "="*80)
    print(f"Best Validation Accuracy: {best_val_acc:.2f}% at Epoch {best_epoch}")
    print("="*80)

    model.load_state_dict(torch.load('best_model.pth'))
    test_loss, test_acc, preds, gts = evaluate(model, test_loader, criterion, device)

    print("\n" + "="*80)
    print("Test Set Performance")
    print("="*80)
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_acc:.2f}%")

    print("\n" + classification_report(gts, preds, labels=[0,1,2,3],
                                target_names=['ADL','Fall','Pre-fall','Post-fall'],
                                digits=4, zero_division=0))

    cm = confusion_matrix(gts, preds, labels=[0,1,2,3])
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['ADL','Fall','Pre','Post'],
                yticklabels=['ADL','Fall','Pre','Post'])
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
    print("\nConfusion matrix saved to 'confusion_matrix.png'")

    dummy_input = torch.randn(1, CONFIG['window_size'], 9).to(device)
    flops, params = profile(model, inputs=(dummy_input,), verbose=False)
    print(f"\nModel Statistics:")
    print(f"  Params (M):  {params / 1e6:.2f}")
    print(f"  FLOPs (M):   {flops / 1e6:.2f}")

    import time
    model.eval()
    times = []
    with torch.no_grad():
        for _ in range(100):
            start = time.time()
            _ = model(dummy_input)
            torch.cuda.synchronize() if torch.cuda.is_available() else None
            times.append((time.time() - start) * 1000)
    print(f"  Inference (ms): {np.mean(times[10:]):.2f}")


if __name__ == '__main__':
    main()


Loading and Balancing Training Data
Train: (6000, 200, 9), Labels: [2600 1700 1700    0]

Loading and Balancing Validation Data


KeyboardInterrupt: 