In [None]:
# SETUP
!pip install -q tensorflow scikit-learn scipy h5py

import tensorflow as tf
import numpy as np
import h5py
from scipy import signal
from sklearn.model_selection import train_test_split
import os

print(f'TensorFlow: {tf.__version__}')
print(f'GPU: {", ".join([d.name for d in tf.config.list_physical_devices("GPU")]) or "None"}')

os.makedirs('data', exist_ok=True)
os.makedirs('models', exist_ok=True)

In [None]:
# DATA GENERATION
print('\n' + '='*70)
print('GENERATING 50,000 SAMPLES')
print('='*70)

CLASSES = ['blast', 'gunshot', 'artillery', 'vehicle_crash', 'fall', 'normal']
FS = 200
SEQ_LEN = 200
SAMPLES_PER_CLASS = 8333

def generate_impact(impact_type, fs=200, seq_len=200):
    """Generate physics-based impact signature"""
    t = np.linspace(0, seq_len/fs, seq_len)
    
    # Impact parameters
    params = {
        'blast': {'amp': (150, 300), 'freq': (50, 150), 'decay': 15, 'sev': (0.6, 1.0)},
        'gunshot': {'amp': (40, 100), 'freq': (100, 300), 'decay': 25, 'sev': (0.5, 0.9)},
        'artillery': {'amp': (200, 400), 'freq': (30, 100), 'decay': 12, 'sev': (0.6, 1.0)},
        'vehicle_crash': {'amp': (20, 80), 'freq': (5, 30), 'decay': 5, 'sev': (0.3, 0.7)},
        'fall': {'amp': (15, 60), 'freq': (3, 20), 'decay': 8, 'sev': (0.3, 0.7)},
        'normal': {'amp': (0.5, 3), 'freq': (0.1, 5), 'decay': 2, 'sev': (0.0, 0.2)}
    }
    
    p = params[impact_type]
    
    # Generate signal
    amp = np.random.uniform(*p['amp'])
    freq = np.random.uniform(*p['freq'])
    
    # Impact at random position
    start = np.random.randint(10, seq_len - 50)
    duration = np.random.randint(20, 80)
    
    signal_wave = np.zeros(seq_len)
    impact_t = t[start:start+duration] - t[start]
    signal_wave[start:start+duration] = amp * np.sin(2*np.pi*freq*impact_t) * np.exp(-p['decay']*impact_t)
    
    # Add noise
    signal_wave += np.random.normal(0, amp*0.1, seq_len)
    
    # 3-axis accel (z primary, x,y coupled)
    accel_z = signal_wave
    accel_x = np.roll(signal_wave, 3) * np.random.uniform(0.2, 0.5) + np.random.normal(0, 0.5, seq_len)
    accel_y = np.roll(signal_wave, 5) * np.random.uniform(0.2, 0.5) + np.random.normal(0, 0.5, seq_len)
    
    # Gyro (derivative of accel)
    gyro_x = np.gradient(accel_y) * 10 + np.random.normal(0, 2, seq_len)
    gyro_y = np.gradient(accel_x) * 10 + np.random.normal(0, 2, seq_len)
    gyro_z = np.gradient(accel_z) * 10 + np.random.normal(0, 2, seq_len)
    
    # Magnetometer (mostly stable)
    mag = np.array([30, 20, -40]) + np.random.normal(0, 3, (seq_len, 3))
    
    # Vitals (physiological response)
    severity = np.random.uniform(*p['sev'])
    
    hr_base = np.random.uniform(60, 80)
    hr = hr_base + severity * 80 * (1 - np.exp(-np.arange(seq_len)/100)) + np.random.normal(0, 2, seq_len)
    hr = np.clip(hr, 40, 180)
    
    spo2_base = np.random.uniform(96, 99)
    spo2 = spo2_base - severity * 15 * (1 - np.exp(-np.arange(seq_len)/150)) + np.random.normal(0, 0.5, seq_len)
    spo2 = np.clip(spo2, 70, 100)
    
    br_base = np.random.uniform(12, 16)
    br = br_base + severity * 12 * (1 - np.exp(-np.arange(seq_len)/100)) + np.random.normal(0, 1, seq_len)
    br = np.clip(br, 8, 35)
    
    temp_base = np.random.uniform(36.3, 37.0)
    temp = temp_base + severity * 0.5 * (1 - np.exp(-np.arange(seq_len)/200)) + np.random.normal(0, 0.1, seq_len)
    temp = np.clip(temp, 34, 39)
    
    # Combine: (200, 13) [accel(3) + gyro(3) + mag(3) + vitals(4)]
    sample = np.column_stack([
        accel_x, accel_y, accel_z,
        gyro_x, gyro_y, gyro_z,
        mag,
        hr, spo2, br, temp
    ])
    
    return sample.astype(np.float32), severity

# Generate dataset
X_all = []
y_type_all = []
y_sev_all = []

for i, cls in enumerate(CLASSES):
    print(f'Generating {cls}... ', end='')
    for _ in range(SAMPLES_PER_CLASS):
        sample, sev = generate_impact(cls)
        X_all.append(sample)
        
        label = np.zeros(len(CLASSES))
        label[i] = 1
        y_type_all.append(label)
        y_sev_all.append(sev)
    print('Done')

X = np.array(X_all, dtype=np.float32)
y_type = np.array(y_type_all, dtype=np.float32)
y_sev = np.array(y_sev_all, dtype=np.float32)

print(f'\nDataset shape: {X.shape}')
print(f'Saving to data/combat_dataset.h5...')

with h5py.File('data/combat_dataset.h5', 'w') as f:
    f.create_dataset('X', data=X, compression='gzip')
    f.create_dataset('y_type', data=y_type, compression='gzip')
    f.create_dataset('y_severity', data=y_sev, compression='gzip')

print('✓ Dataset ready')

In [None]:
# LOAD AND SPLIT DATA
print('\nLoading dataset...')

with h5py.File('data/combat_dataset.h5', 'r') as f:
    X = f['X'][:]
    y_type = f['y_type'][:]
    y_sev = f['y_severity'][:]

# Split: 70% train, 15% val, 15% test
X_train, X_temp, y_type_train, y_type_temp, y_sev_train, y_sev_temp = train_test_split(
    X, y_type, y_sev, test_size=0.3, random_state=42, stratify=y_type.argmax(axis=1)
)

X_val, X_test, y_type_val, y_type_test, y_sev_val, y_sev_test = train_test_split(
    X_temp, y_type_temp, y_sev_temp, test_size=0.5, random_state=42, stratify=y_type_temp.argmax(axis=1)
)

print(f'Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}')

In [None]:
# BUILD MODEL
from tensorflow.keras import layers, Model

def build_model():
    inputs = layers.Input(shape=(200, 13))
    
    # CNN feature extraction
    x = layers.Conv1D(64, 7, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(2)(x)
    
    x = layers.Conv1D(128, 5, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling1D(2)(x)
    
    x = layers.Conv1D(256, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    
    # Bidirectional GRU
    x = layers.Bidirectional(layers.GRU(128, return_sequences=True, dropout=0.2))(x)
    x = layers.Bidirectional(layers.GRU(64, return_sequences=True, dropout=0.2))(x)
    
    # Attention
    attn = layers.MultiHeadAttention(num_heads=8, key_dim=16)(x, x)
    x = layers.Add()([x, attn])
    x = layers.LayerNormalization()(x)
    
    # Global pooling
    avg_pool = layers.GlobalAveragePooling1D()(x)
    max_pool = layers.GlobalMaxPooling1D()(x)
    x = layers.concatenate([avg_pool, max_pool])
    
    # Dense layers
    x = layers.Dense(256, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.4)(x)
    
    x = layers.Dense(128, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)
    
    # Outputs
    impact_type = layers.Dense(6, activation='softmax', name='impact_type')(x)
    severity = layers.Dense(1, activation='sigmoid', name='severity')(x)
    
    model = Model(inputs=inputs, outputs=[impact_type, severity])
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(0.001, clipnorm=1.0),
        loss={
            'impact_type': tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
            'severity': 'mse'
        },
        loss_weights={'impact_type': 1.0, 'severity': 0.3},
        metrics={
            'impact_type': ['accuracy'],
            'severity': ['mae']
        }
    )
    
    return model

model = build_model()
model.summary()

In [None]:
# TRAIN MODEL
print('\n' + '='*70)
print('TRAINING MODEL')
print('='*70)

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_impact_type_accuracy',
        patience=25,
        restore_best_weights=True,
        mode='max'
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=10,
        min_lr=1e-7
    ),
    tf.keras.callbacks.ModelCheckpoint(
        'models/best_model.h5',
        monitor='val_impact_type_accuracy',
        save_best_only=True,
        mode='max'
    )
]

history = model.fit(
    X_train,
    {'impact_type': y_type_train, 'severity': y_sev_train},
    validation_data=(X_val, {'impact_type': y_type_val, 'severity': y_sev_val}),
    epochs=150,
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

print('\n✓ Training complete')

In [None]:
# EVALUATE
print('\n' + '='*70)
print('FINAL EVALUATION')
print('='*70)

results = model.evaluate(
    X_test,
    {'impact_type': y_type_test, 'severity': y_sev_test},
    batch_size=32,
    verbose=1
)

# Extract accuracy
test_acc = results[3]  # impact_type_accuracy

print(f'\n{"="*70}')
print(f'TEST ACCURACY: {test_acc:.4f} ({test_acc*100:.2f}%)')
print(f'TARGET: 0.9500 (95.00%)')

if test_acc >= 0.95:
    print('✓ TARGET ACHIEVED!')
else:
    print(f'✗ Below target by {(0.95-test_acc)*100:.2f}%')
print('='*70)

In [None]:
# CONVERT TO TFLITE
print('\nConverting to TFLite...')

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]

tflite_model = converter.convert()

with open('models/impact_classifier.tflite', 'wb') as f:
    f.write(tflite_model)

print(f'✓ TFLite model saved ({len(tflite_model)/1024:.1f} KB)')
print(f'  Accuracy: {test_acc:.4f}')

In [None]:
# DOWNLOAD FILES
from google.colab import files

print('\nDownloading files...')
files.download('models/impact_classifier.tflite')
files.download('models/best_model.h5')
print('✓ Download complete')

print('\n' + '='*70)
print('DEPLOYMENT READY')
print('='*70)
print('\nFiles downloaded:')
print('1. impact_classifier.tflite (deploy to Raspberry Pi)')
print('2. best_model.h5 (for fine-tuning)')