# ResNet-Transformer for Human Activity Recognition (Sensor Data) - Google Colab Ready

This notebook implements a hybrid 1D ResNet + Transformer encoder architecture tailored for time-series sensor data classification on the same dataset and pipeline as your other notebooks.

## Instructions for Google Colab:
1. Upload your HAR_synthetic_full.csv file using the file upload cell below
2. Run all cells sequentially
3. Results will be saved and can be downloaded


In [None]:
%pip install -q tensorflow scikit-learn pandas numpy matplotlib seaborn joblib


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Imports
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.metrics import TopKCategoricalAccuracy
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import json
import time
import joblib
from google.colab import files
warnings.filterwarnings('ignore')

# Reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))


In [None]:
# File Upload for Google Colab
print("Please upload your HAR_synthetic_full.csv file:")
uploaded = files.upload()

# Get the uploaded file name
csv_filename = list(uploaded.keys())[0]
print(f"Uploaded file: {csv_filename}")


In [None]:
# Mixed precision and memory growth
from tensorflow.keras import mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)


In [None]:
# Data loading and preprocessing (reuse same pipeline)

def load_sensor_data(csv_path):
    print("Loading sensor data...")
    df = pd.read_csv(csv_path)
    df = df.dropna(subset=['label'])
    print(f"Data shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(f"Unique labels: {df['label'].unique()}")
    print("Label distribution:")
    print(df['label'].value_counts())
    return df


def create_sequences(df, sequence_length=128, overlap=0.5):
    print(f"Creating sequences with length {sequence_length} and overlap {overlap}...")
    feature_cols = ['acc_x', 'acc_y', 'acc_z', 'gyro_x', 'gyro_y', 'gyro_z', 'heart_rate_bpm']
    sequences, labels = [], []
    step_size = int(sequence_length * (1 - overlap))

    for label in df['label'].unique():
        label_data = df[df['label'] == label].copy()
        label_data = label_data.sort_values('timestamp_ms')
        features = label_data[feature_cols].values
        for i in range(0, len(features) - sequence_length + 1, step_size):
            seq = features[i:i + sequence_length]
            if len(seq) == sequence_length:
                sequences.append(seq)
                labels.append(label)

    sequences = np.array(sequences, dtype=np.float32)
    labels = np.array(labels)
    print(f"Created {len(sequences)} sequences with shape {sequences.shape}")
    return sequences, labels


def preprocess_data(sequences, labels):
    print("Preprocessing data...")
    scaler = StandardScaler()
    original_shape = sequences.shape
    flat = sequences.reshape(-1, sequences.shape[-1])
    norm = scaler.fit_transform(flat)
    sequences = norm.reshape(original_shape)

    le = LabelEncoder()
    labels_idx = le.fit_transform(labels)
    labels_oh = to_categorical(labels_idx)

    print(f"Number of classes: {len(le.classes_)}")
    print(f"Classes: {le.classes_}")
    return sequences, labels_oh, le, scaler


In [None]:
# Load dataset and split
# Use uploaded file for Colab
csv_path = csv_filename

df = load_sensor_data(csv_path)
sequences, labels = create_sequences(df, sequence_length=128, overlap=0.5)
X, y, label_encoder, scaler = preprocess_data(sequences, labels)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=np.argmax(y, axis=1)
)

print(f"\nTraining set: {X_train.shape}")
print(f"Test set: {X_test.shape}")
print(f"Number of features: {X_train.shape[2]}")
print(f"Number of classes: {y_train.shape[1]}")


In [None]:
# Data augmentation (same as Transformer)

def augment_sensor_sequence(sequence, noise_factor=0.1, time_shift=0.1):
    augmented = sequence.copy()
    if np.random.random() < 0.5:
        noise = np.random.normal(0, noise_factor, sequence.shape)
        augmented = augmented + noise
    if np.random.random() < 0.3:
        shift = int(sequence.shape[0] * time_shift * np.random.uniform(-1, 1))
        augmented = np.roll(augmented, shift, axis=0)
    if np.random.random() < 0.3:
        scale_factor = np.random.uniform(0.9, 1.1)
        augmented = augmented * scale_factor
    return augmented


def create_augmented_data(X_train, y_train, augmentation_factor=1):
    print(f"Creating augmented data with factor {augmentation_factor}...")
    X_aug, y_aug = [], []
    for i in range(len(X_train)):
        X_aug.append(X_train[i]); y_aug.append(y_train[i])
        for _ in range(augmentation_factor):
            X_aug.append(augment_sensor_sequence(X_train[i]))
            y_aug.append(y_train[i])
    return np.array(X_aug), np.array(y_aug)

X_train_aug, y_train_aug = create_augmented_data(X_train, y_train, augmentation_factor=1)
print(f"Original training data: {X_train.shape}")
print(f"Augmented training data: {X_train_aug.shape}")


In [None]:
# ResNet-Transformer model

class ResidualBlock1D(Layer):
    def __init__(self, filters, kernel_size=3, stride=1, use_projection=False, dropout_rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.stride = stride
        self.use_projection = use_projection
        self.dropout_rate = dropout_rate

        self.conv1 = Conv1D(filters, kernel_size, padding='same', strides=stride)
        self.bn1 = LayerNormalization()
        self.act1 = Activation('relu')
        self.conv2 = Conv1D(filters, kernel_size, padding='same', strides=1)
        self.bn2 = LayerNormalization()
        self.dropout = Dropout(dropout_rate)
        self.proj = Conv1D(filters, 1, strides=stride, padding='same') if use_projection else None

    def call(self, inputs, training=None):
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.act1(x)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = self.dropout(x, training=training)
        shortcut = self.proj(inputs) if self.proj is not None else inputs
        x = Add()([x, shortcut])
        return Activation('relu')(x)


class PositionalEncoding(Layer):
    def __init__(self, max_len, d_model, **kwargs):
        super().__init__(**kwargs)
        self.max_len = max_len
        self.d_model = d_model
        pos_encoding = np.zeros((max_len, d_model))
        position = np.arange(0, max_len, dtype=np.float32)[:, np.newaxis]
        div_term = np.exp(np.arange(0, d_model, 2, dtype=np.float32) * -(np.log(10000.0) / d_model))
        pos_encoding[:, 0::2] = np.sin(position * div_term)
        pos_encoding[:, 1::2] = np.cos(position * div_term)
        self.pos_encoding_np = pos_encoding[np.newaxis, :, :]

    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        pos_encoding = tf.cast(self.pos_encoding_np, dtype=inputs.dtype)
        return inputs + pos_encoding[:, :seq_len, :]


class TransformerEncoderBlock(Layer):
    def __init__(self, d_model=64, num_heads=8, dff=128, dropout_rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.mha = MultiHeadAttention(num_heads=num_heads, key_dim=d_model, dropout=dropout_rate)
        self.ffn = Sequential([Dense(dff, activation='relu'), Dense(d_model)])
        self.ln1 = LayerNormalization(epsilon=1e-6)
        self.ln2 = LayerNormalization(epsilon=1e-6)
        self.do1 = Dropout(dropout_rate)
        self.do2 = Dropout(dropout_rate)

    def call(self, inputs, training=None):
        attn = self.mha(inputs, inputs, training=training)
        attn = self.do1(attn, training=training)
        out1 = self.ln1(inputs + attn)
        ffn = self.ffn(out1)
        ffn = self.do2(ffn, training=training)
        out2 = self.ln2(out1 + ffn)
        return out2


def create_resnet_transformer(input_shape, num_classes,
                               resnet_filters=[32, 64],
                               res_blocks_per_stage=2,
                               d_model=64, num_heads=8, num_layers=2, dff=128,
                               dropout_rate=0.1):
    inputs = Input(shape=input_shape)

    # Stem
    x = Conv1D(resnet_filters[0], 7, strides=1, padding='same')(inputs)
    x = LayerNormalization()(x)
    x = Activation('relu')(x)

    # ResNet stages
    for stage_idx, filters in enumerate(resnet_filters):
        for block_idx in range(res_blocks_per_stage):
            use_proj = (block_idx == 0 and stage_idx > 0)
            stride = 2 if use_proj else 1
            x = ResidualBlock1D(filters, kernel_size=3, stride=stride, use_projection=use_proj, dropout_rate=dropout_rate)(x)

    # Project to transformer dim
    x = Conv1D(d_model, 1, padding='same')(x)

    # Positional encoding
    x = PositionalEncoding(max_len=input_shape[0], d_model=d_model)(x)

    # Transformer encoder stack
    for _ in range(num_layers):
        x = TransformerEncoderBlock(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate)(x)

    # Head
    x = GlobalAveragePooling1D()(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.2)(x)
    outputs = Dense(num_classes, activation='softmax', dtype='float32')(x)

    model = Model(inputs, outputs)
    return model

# Shapes and model
input_shape = (X_train.shape[1], X_train.shape[2])
num_classes = y_train.shape[1]
model = create_resnet_transformer(input_shape, num_classes)

# Summary
total_params = model.count_params()
print(f"Total parameters: {total_params:,}")
model.summary()


In [None]:
# Compile, callbacks, and data generators

# Loss with label smoothing
def smooth_categorical_crossentropy(y_true, y_pred, alpha=0.1):
    num_classes = tf.cast(tf.shape(y_true)[-1], tf.float32)
    y_true_smooth = y_true * (1.0 - alpha) + alpha / num_classes
    return tf.keras.losses.categorical_crossentropy(y_true_smooth, y_pred)

optimizer = AdamW(learning_rate=0.001, weight_decay=1e-4)

# Metrics
top_3_accuracy = TopKCategoricalAccuracy(k=3, name='top_3_accuracy')
top_5_accuracy = TopKCategoricalAccuracy(k=5, name='top_5_accuracy')

model.compile(optimizer=optimizer,
              loss=smooth_categorical_crossentropy,
              metrics=['accuracy', top_3_accuracy, top_5_accuracy])

callbacks = [
    EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True, verbose=1, min_delta=0.001),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=10, min_lr=1e-7, verbose=1, cooldown=3),
    ModelCheckpoint('best_resnet_transformer_har_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
]

class SensorDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, X, y, batch_size=32, shuffle=True, augment=True):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.augment = augment
        self.indices = np.arange(len(X))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, index):
        indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        X_batch = self.X[indices].copy()
        y_batch = self.y[indices]
        if self.augment:
            for i in range(len(X_batch)):
                if np.random.random() < 0.3:
                    X_batch[i] = augment_sensor_sequence(X_batch[i])
        return X_batch, y_batch

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

train_gen = SensorDataGenerator(X_train_aug, y_train_aug, batch_size=32, augment=True)
val_gen = SensorDataGenerator(X_test, y_test, batch_size=32, augment=False, shuffle=False)

print(f"Training batches: {len(train_gen)}")
print(f"Validation batches: {len(val_gen)}")


In [None]:
# Train and evaluate
print("Starting training ResNet-Transformer...")
print(f"Total parameters: {total_params:,}")

history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=50,
    callbacks=callbacks,
    verbose=1
)

test_results = model.evaluate(val_gen, verbose=0)

test_loss = test_results[0]
test_acc = test_results[1]
test_top3 = test_results[2] if len(test_results) > 2 else 0
test_top5 = test_results[3] if len(test_results) > 3 else 0

print(f"\n=== RESNET-TRANSFORMER RESULTS ===")
print(f"Parameters: {total_params:,}")
print(f"Test Accuracy: {test_acc*100:.2f}%")
print(f"Top-3 Accuracy: {test_top3*100:.2f}%")
print(f"Top-5 Accuracy: {test_top5*100:.2f}%")


In [None]:
# Visualization
plt.figure(figsize=(15, 10))

plt.subplot(2, 3, 1)
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.title('Accuracy')
plt.legend(); plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Loss')
plt.legend(); plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 3)
if 'top_3_accuracy' in history.history:
    plt.plot(history.history['top_3_accuracy'], label='Train Top-3')
    plt.plot(history.history['val_top_3_accuracy'], label='Val Top-3')
    plt.title('Top-3 Accuracy')
    plt.legend(); plt.grid(True, alpha=0.3)

plt.subplot(2, 3, 5)
plt.text(0.1, 0.8, 'Model: ResNet-Transformer', fontsize=12, fontweight='bold')
plt.text(0.1, 0.7, f'Parameters: {total_params:,}', fontsize=10)
plt.text(0.1, 0.6, f'Final Accuracy: {test_acc*100:.2f}%', fontsize=10)
plt.text(0.1, 0.4, f'Input Shape: {input_shape}', fontsize=9)
plt.text(0.1, 0.3, f'Classes: {num_classes}', fontsize=9)
plt.axis('off')

plt.subplot(2, 3, 6)
y_pred = model.predict(X_test, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)
cm = confusion_matrix(y_true_classes, y_pred_classes)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix')
plt.xlabel('Predicted'); plt.ylabel('Actual')

plt.tight_layout(); plt.show()


In [None]:
# Classification report and saving artifacts
print("\n=== RESNET-TRANSFORMER CLASSIFICATION REPORT ===")
print(classification_report(y_true_classes, y_pred_classes, target_names=label_encoder.classes_))

# Save model and preprocessors
model.save('resnet_transformer_model_final.keras')
joblib.dump(scaler, 'resnet_transformer_sensor_scaler.pkl')
joblib.dump(label_encoder, 'resnet_transformer_sensor_label_encoder.pkl')
print("Saved model and preprocessing objects")

# Save metrics
metrics_df = pd.DataFrame({
    'Class': label_encoder.classes_,
    'Precision': pd.Series(dtype=float),
    'Recall': pd.Series(dtype=float),
    'F1-Score': pd.Series(dtype=float)
})
from sklearn.metrics import precision_score, recall_score, f1_score
precision = precision_score(y_true_classes, y_pred_classes, average=None)
recall = recall_score(y_true_classes, y_pred_classes, average=None)
f1 = f1_score(y_true_classes, y_pred_classes, average=None)
metrics_df['Precision'] = precision
metrics_df['Recall'] = recall
metrics_df['F1-Score'] = f1
metrics_df.to_csv('resnet_transformer_classification_report.csv', index=False)
print("Saved metrics to 'resnet_transformer_classification_report.csv'")


In [None]:
# Comprehensive Results Collection for Journal Publication

print("=== COLLECTING COMPREHENSIVE RESULTS ===")

# Get predictions for comprehensive evaluation
y_pred_proba = model.predict(X_test, verbose=0)
y_pred = np.argmax(y_pred_proba, axis=1)
y_true = np.argmax(y_test, axis=1)

# Calculate comprehensive metrics
accuracy = np.mean(y_pred == y_true)
precision_macro = precision_score(y_true, y_pred, average='macro')
recall_macro = recall_score(y_true, y_pred, average='macro')
f1_macro = f1_score(y_true, y_pred, average='macro')

# ROC AUC (multi-class)
try:
    auc = roc_auc_score(y_test, y_pred_proba, multi_class='ovr', average='macro')
except:
    auc = 0.0

# Confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Classification report
class_report = classification_report(y_true, y_pred, target_names=label_encoder.classes_, output_dict=True)

# Store comprehensive results
results = {
    'model_name': 'ResNet-Transformer',
    'accuracy': float(accuracy),
    'precision': float(precision_macro),
    'recall': float(recall_macro),
    'f1_score': float(f1_macro),
    'auc': float(auc),
    'confusion_matrix': cm.tolist(),
    'classification_report': class_report,
    'predictions': y_pred.tolist(),
    'true_labels': y_true.tolist(),
    'prediction_probabilities': y_pred_proba.tolist(),
    'training_time': training_time,
    'total_parameters': int(total_params),
    'sequence_length': 128,
    'classes': label_encoder.classes_.tolist()
}

# Save results as JSON
with open('resnet_transformer_results.json', 'w') as f:
    json.dump(results, f, indent=2)

print("=== RESULTS SUMMARY ===")
print(f"Model: ResNet-Transformer")
print(f"Accuracy: {accuracy*100:.2f}%")
print(f"Precision: {precision_macro*100:.2f}%")
print(f"Recall: {recall_macro*100:.2f}%")
print(f"F1-Score: {f1_macro*100:.2f}%")
print(f"AUC: {auc*100:.2f}%")
print(f"Training Time: {training_time:.2f} seconds")
print(f"Parameters: {total_params:,}")

print("\n=== FILES SAVED ===")
print("- resnet_transformer_model_final.keras")
print("- resnet_transformer_results.json")
print("- resnet_transformer_classification_report.csv")
print("- resnet_transformer_sensor_scaler.pkl")
print("- resnet_transformer_sensor_label_encoder.pkl")
print("- resnet_transformer_results.png (from visualization cell)")

print("\n=== DOWNLOAD FILES ===")
print("Run the next cell to download all results files")


In [None]:
# Download Results Files for Google Colab
print("Downloading all result files...")

# Create a zip file with all results
import zipfile
import os

# List of files to include in the download
files_to_download = [
    'resnet_transformer_model_final.keras',
    'resnet_transformer_results.json', 
    'resnet_transformer_classification_report.csv',
    'resnet_transformer_sensor_scaler.pkl',
    'resnet_transformer_sensor_label_encoder.pkl'
]

# Add PNG file if it exists
if os.path.exists('resnet_transformer_results.png'):
    files_to_download.append('resnet_transformer_results.png')

# Create zip file
with zipfile.ZipFile('resnet_transformer_results.zip', 'w') as zipf:
    for file in files_to_download:
        if os.path.exists(file):
            zipf.write(file)
            print(f"Added {file} to download")

# Download the zip file
files.download('resnet_transformer_results.zip')

print("\n=== RESNET-TRANSFORMER MODEL EXECUTION COMPLETE ===")
print("All results have been saved and downloaded!")
print("You can now use these files for your journal publication.")
