# Sentient Core: Coral TPU Pixel Controller Training

This notebook trains a TensorFlow model to control 500,000 particle positions based on sensor data.

**Training Pipeline:**
1. Generate synthetic sensor → particle behavior dataset
2. Train quantization-aware model
3. Convert to TensorFlow Lite with full int8 quantization
4. Download `.tflite` model for Edge TPU compilation

**Hardware Target:** Google Coral Edge TPU (4 TOPS, USB Accelerator)

## 1. Setup & Dependencies

In [None]:
# Install required packages
!pip install -q tensorflow==2.13.0
!pip install -q tensorflow-model-optimization
!pip install -q matplotlib seaborn

import tensorflow as tf
import tensorflow_model_optimization as tfmot
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from google.colab import files
import json

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {tf.config.list_physical_devices('GPU')}")

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## 2. Dataset Generation: Synthetic Sensor Data

In [None]:
def generate_sensor_data(n_samples=10000):
    """
    Generate synthetic sensor readings covering full operational range.
    
    Returns:
        np.ndarray: (n_samples, 22) normalized sensor features
    """
    data = np.zeros((n_samples, 22), dtype=np.float32)
    
    # Environment (8 features) - indices 0-7
    data[:, 0] = np.random.uniform(10, 40, n_samples) / 50.0  # temperature (°C) / 50
    data[:, 1] = np.random.uniform(20, 80, n_samples) / 100.0  # humidity (%) / 100
    data[:, 2] = np.random.uniform(980, 1040, n_samples) / 1100.0  # pressure (hPa) / 1100
    data[:, 3] = np.random.uniform(10000, 100000, n_samples) / 200000.0  # gas_resistance (Ω) / 200k
    data[:, 4] = np.random.uniform(0, 500, n_samples) / 1000.0  # oxidising (ppm) / 1000
    data[:, 5] = np.random.uniform(0, 500, n_samples) / 1000.0  # reducing (ppm) / 1000
    data[:, 6] = np.random.uniform(0, 100, n_samples) / 200.0  # nh3 (ppm) / 200
    data[:, 7] = np.random.uniform(0, 1000, n_samples) / 1000.0  # light_level (lux) / 1000
    
    # Audio (2 features) - indices 8-9
    data[:, 8] = np.random.uniform(30, 80, n_samples) / 100.0  # ambient_noise (dB) / 100
    data[:, 9] = np.random.uniform(0, 360, n_samples) / 360.0  # sound_direction (degrees) / 360
    
    # Vision (3 features) - indices 10-12
    data[:, 10] = np.random.choice([0.0, 1.0], n_samples)  # motion_detected (bool)
    data[:, 11] = np.random.randint(0, 10, n_samples) / 10.0  # detected_objects_count / 10
    data[:, 12] = np.random.randint(0, 5, n_samples) / 5.0  # faces_detected_count / 5
    
    # Location (3 features) - indices 13-15 (using relative values)
    data[:, 13] = np.random.uniform(-90, 90, n_samples) / 180.0 + 0.5  # latitude / 180 + 0.5
    data[:, 14] = np.random.uniform(-180, 180, n_samples) / 360.0 + 0.5  # longitude / 360 + 0.5
    data[:, 15] = np.random.uniform(0, 500, n_samples) / 1000.0  # altitude (m) / 1000
    
    # Power (3 features) - indices 16-18
    data[:, 16] = np.random.uniform(20, 100, n_samples) / 100.0  # battery_charge (%) / 100
    data[:, 17] = np.random.uniform(3.3, 4.2, n_samples) / 5.0  # battery_voltage (V) / 5
    data[:, 18] = np.random.choice([0.0, 1.0], n_samples)  # is_charging (bool)
    
    # System (3 features) - indices 19-21
    data[:, 19] = np.random.uniform(0, 86400, n_samples) / 86400.0  # uptime (seconds) / 24h
    data[:, 20] = np.random.randint(1, 10, n_samples) / 10.0  # active_daemons_count / 10
    data[:, 21] = np.random.uniform(40, 70, n_samples) / 100.0  # cpu_temp (°C) / 100
    
    return data

# Generate dataset
print("Generating sensor data...")
X = generate_sensor_data(10000)
print(f"Generated {X.shape[0]} samples with {X.shape[1]} features")
print(f"Feature range: [{X.min():.3f}, {X.max():.3f}]")

## 3. Expert Labeling: Heuristic Rules

In [None]:
def expert_label_particle_params(sensor_data):
    """
    Apply expert rules to generate ideal particle behavior parameters.
    
    Args:
        sensor_data: (n_samples, 22) normalized sensor features
    
    Returns:
        np.ndarray: (n_samples, 12) particle behavior parameters
    """
    n_samples = sensor_data.shape[0]
    params = np.zeros((n_samples, 12), dtype=np.float32)
    
    # Extract key sensors (denormalized for rule logic)
    temperature = sensor_data[:, 0] * 50.0  # 0-50°C
    humidity = sensor_data[:, 1] * 100.0  # 0-100%
    light_level = sensor_data[:, 7] * 1000.0  # 0-1000 lux
    ambient_noise = sensor_data[:, 8] * 100.0  # 0-100 dB
    motion_detected = sensor_data[:, 10]  # 0 or 1
    detected_objects = sensor_data[:, 11] * 10.0  # 0-10 objects
    battery_charge = sensor_data[:, 16] * 100.0  # 0-100%
    is_charging = sensor_data[:, 18]  # 0 or 1
    
    # Rule 1: Temperature affects color hue (blue=cold, red=hot)
    params[:, 3] = np.clip((temperature - 15) / 30.0, 0, 1)  # color_hue_shift
    
    # Rule 2: Humidity affects cohesion (high humidity = tight clusters)
    params[:, 0] = np.clip(humidity / 100.0, 0, 1)  # swarm_cohesion
    
    # Rule 3: Motion triggers turbulence and speed
    params[:, 2] = motion_detected * 0.8 + np.random.uniform(0, 0.2, n_samples)  # turbulence
    params[:, 1] = motion_detected * 0.7 + detected_objects * 0.03  # flow_speed
    
    # Rule 4: Light level controls brightness
    params[:, 4] = np.clip(light_level / 1000.0, 0.2, 1.0)  # brightness (min 0.2)
    
    # Rule 5: Ambient noise drives pulse frequency
    params[:, 5] = np.clip((ambient_noise - 30) / 50.0, 0, 1)  # pulse_frequency
    
    # Rule 6: Low battery = reduced particle size and glow
    params[:, 10] = np.clip(battery_charge / 100.0, 0.3, 1.0)  # particle_size
    params[:, 11] = np.clip(battery_charge / 100.0, 0.2, 1.0)  # glow_intensity
    
    # Rule 7: Charging state affects vertical bias (rising energy)
    params[:, 7] = is_charging * 0.5 - 0.25  # vertical_bias (-0.25 to +0.25)
    
    # Rule 8: Default symmetry based on temperature variance
    params[:, 6] = 0.5 + np.random.uniform(-0.2, 0.2, n_samples)  # symmetry
    
    # Rule 9: Horizontal spread inversely related to cohesion
    params[:, 8] = 1.0 - params[:, 0] * 0.5  # horizontal_spread
    
    # Rule 10: Depth layering based on object count (more objects = more depth)
    params[:, 9] = np.clip(detected_objects / 10.0, 0.2, 0.8)  # depth_layering
    
    # Ensure all values in [0, 1] (except vertical_bias in [-1, 1])
    params[:, :7] = np.clip(params[:, :7], 0, 1)
    params[:, 7] = np.clip(params[:, 7], -1, 1)  # vertical_bias
    params[:, 8:] = np.clip(params[:, 8:], 0, 1)
    
    return params

# Generate labels
print("Applying expert rules to generate labels...")
y = expert_label_particle_params(X)
print(f"Generated {y.shape[0]} labels with {y.shape[1]} parameters")
print(f"Label range: [{y.min():.3f}, {y.max():.3f}]")

# Visualize label distributions
param_names = [
    'swarm_cohesion', 'flow_speed', 'turbulence', 'color_hue_shift',
    'brightness', 'pulse_frequency', 'symmetry', 'vertical_bias',
    'horizontal_spread', 'depth_layering', 'particle_size', 'glow_intensity'
]

fig, axes = plt.subplots(3, 4, figsize=(16, 10))
for i, ax in enumerate(axes.flat):
    ax.hist(y[:, i], bins=50, alpha=0.7, edgecolor='black')
    ax.set_title(param_names[i])
    ax.set_xlabel('Value')
    ax.set_ylabel('Frequency')
plt.tight_layout()
plt.show()

## 4. Train/Val/Test Split

In [None]:
# Split dataset: 70% train, 20% validation, 10% test
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.33, random_state=42)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Validation set: {X_val.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")

## 5. Model Architecture: Dense Neural Network

In [None]:
def create_pixel_controller_model():
    """
    Create compact dense neural network for Coral Edge TPU.
    
    Input: 22 sensor features
    Output: 12 particle behavior parameters
    """
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(22,), name='sensor_input'),
        
        # Hidden layers with decreasing size
        tf.keras.layers.Dense(128, activation='relu', name='dense_128'),
        tf.keras.layers.Dropout(0.2, name='dropout_1'),
        
        tf.keras.layers.Dense(64, activation='relu', name='dense_64'),
        tf.keras.layers.Dropout(0.15, name='dropout_2'),
        
        tf.keras.layers.Dense(32, activation='relu', name='dense_32'),
        
        # Output layer: 12 parameters
        # Using sigmoid for 11 params (0-1 range) and tanh for vertical_bias (-1 to 1)
        tf.keras.layers.Dense(12, activation='sigmoid', name='particle_params')
    ], name='SentientPixelController')
    
    return model

# Create and compile model
base_model = create_pixel_controller_model()
base_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='mse',
    metrics=['mae', 'mse']
)

base_model.summary()

# Calculate total parameters
total_params = base_model.count_params()
print(f"\nTotal parameters: {total_params:,}")
print(f"Estimated model size (float32): {total_params * 4 / 1024:.2f} KB")
print(f"Estimated model size (int8): {total_params / 1024:.2f} KB")

## 6. Train Base Model

In [None]:
# Train base model for initial convergence
print("Training base model...")

history = base_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=32,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5, min_lr=1e-6)
    ],
    verbose=1
)

# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(history.history['loss'], label='Train Loss')
ax1.plot(history.history['val_loss'], label='Val Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('MSE Loss')
ax1.set_title('Training & Validation Loss')
ax1.legend()
ax1.grid(True)

ax2.plot(history.history['mae'], label='Train MAE')
ax2.plot(history.history['val_mae'], label='Val MAE')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Mean Absolute Error')
ax2.set_title('Training & Validation MAE')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# Evaluate on test set
test_loss, test_mae, test_mse = base_model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Set Performance:")
print(f"  MSE Loss: {test_mse:.6f}")
print(f"  MAE: {test_mae:.6f}")

## 7. Quantization-Aware Training

In [None]:
# Apply quantization-aware training
quantize_model = tfmot.quantization.keras.quantize_model
q_aware_model = quantize_model(base_model)

q_aware_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),  # Lower LR for fine-tuning
    loss='mse',
    metrics=['mae', 'mse']
)

print("Fine-tuning with quantization-aware training...")
q_history = q_aware_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=30,
    batch_size=32,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=8, restore_best_weights=True)
    ],
    verbose=1
)

# Evaluate quantized model
q_test_loss, q_test_mae, q_test_mse = q_aware_model.evaluate(X_test, y_test, verbose=0)
print(f"\nQuantized Model Test Performance:")
print(f"  MSE Loss: {q_test_mse:.6f}")
print(f"  MAE: {q_test_mae:.6f}")
print(f"\nAccuracy degradation from quantization: {(q_test_mae - test_mae) / test_mae * 100:.2f}%")

## 8. Convert to TensorFlow Lite

In [None]:
# Representative dataset for full integer quantization
def representative_dataset():
    """
    Provide representative samples for post-training quantization calibration.
    """
    for i in range(100):
        # Use actual training samples
        sample = X_train[i:i+1].astype(np.float32)
        yield [sample]

# Configure TFLite converter for Edge TPU
converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)

# Enable full integer quantization (REQUIRED for Edge TPU)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

# Convert model
print("Converting to TensorFlow Lite (int8)...")
tflite_model = converter.convert()

# Save model
tflite_filename = 'sentient_pixel_controller.tflite'
with open(tflite_filename, 'wb') as f:
    f.write(tflite_model)

print(f"\nTFLite model saved: {tflite_filename}")
print(f"Model size: {len(tflite_model) / 1024:.2f} KB")

## 9. Test TFLite Model Inference

In [None]:
# Load TFLite model and test
interpreter = tf.lite.Interpreter(model_path=tflite_filename)
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("TFLite Model Details:")
print(f"Input shape: {input_details[0]['shape']}")
print(f"Input dtype: {input_details[0]['dtype']}")
print(f"Input quantization: {input_details[0]['quantization']}")
print(f"\nOutput shape: {output_details[0]['shape']}")
print(f"Output dtype: {output_details[0]['dtype']}")
print(f"Output quantization: {output_details[0]['quantization']}")

# Test inference on sample
def run_tflite_inference(interpreter, input_data):
    """Run inference on TFLite model with quantization."""
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    # Quantize input
    input_scale, input_zero_point = input_details[0]['quantization']
    input_data_int8 = (input_data / input_scale + input_zero_point).astype(np.int8)
    
    # Run inference
    interpreter.set_tensor(input_details[0]['index'], input_data_int8)
    interpreter.invoke()
    
    # Dequantize output
    output_data_int8 = interpreter.get_tensor(output_details[0]['index'])
    output_scale, output_zero_point = output_details[0]['quantization']
    output_data = (output_data_int8.astype(np.float32) - output_zero_point) * output_scale
    
    return output_data

# Compare TFLite vs original model
test_sample = X_test[:5]
tflite_predictions = np.array([run_tflite_inference(interpreter, sample.reshape(1, -1)) for sample in test_sample])
original_predictions = q_aware_model.predict(test_sample, verbose=0)

print("\nPrediction Comparison (first 5 test samples):")
print(f"Original model shape: {original_predictions.shape}")
print(f"TFLite model shape: {tflite_predictions.shape}")
print(f"Mean absolute difference: {np.mean(np.abs(original_predictions - tflite_predictions.squeeze())):.6f}")
print(f"Max absolute difference: {np.max(np.abs(original_predictions - tflite_predictions.squeeze())):.6f}")

## 10. Save Training Metadata

In [None]:
# Save metadata for deployment
metadata = {
    "model_name": "SentientPixelController",
    "version": "1.0",
    "training_date": "2025-10-26",
    "input_features": 22,
    "output_params": 12,
    "total_params": int(total_params),
    "model_size_kb": len(tflite_model) / 1024,
    "test_mae": float(q_test_mae),
    "test_mse": float(q_test_mse),
    "input_normalization": {
        "temperature": "/ 50.0",
        "humidity": "/ 100.0",
        "pressure": "/ 1100.0",
        "light_level": "/ 1000.0",
        "battery_charge": "/ 100.0"
    },
    "output_params": [
        "swarm_cohesion", "flow_speed", "turbulence", "color_hue_shift",
        "brightness", "pulse_frequency", "symmetry", "vertical_bias",
        "horizontal_spread", "depth_layering", "particle_size", "glow_intensity"
    ],
    "quantization": {
        "input_scale": float(input_details[0]['quantization'][0]),
        "input_zero_point": int(input_details[0]['quantization'][1]),
        "output_scale": float(output_details[0]['quantization'][0]),
        "output_zero_point": int(output_details[0]['quantization'][1])
    },
    "next_steps": [
        "1. Download sentient_pixel_controller.tflite",
        "2. Transfer to Raspberry Pi",
        "3. Compile with: edgetpu_compiler sentient_pixel_controller.tflite",
        "4. Integrate into sentient_core.py using coral_pixel_engine.py"
    ]
}

with open('model_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("Metadata saved to model_metadata.json")
print(json.dumps(metadata, indent=2))

## 11. Download Files

In [None]:
# Download TFLite model and metadata
print("Downloading files...")
files.download(tflite_filename)
files.download('model_metadata.json')

print("\n" + "="*70)
print("TRAINING COMPLETE!")
print("="*70)
print("\nNext steps on Raspberry Pi:")
print("1. Install Edge TPU compiler:")
print("   curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -")
print("   echo 'deb https://packages.cloud.google.com/apt coral-edgetpu-stable main' | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list")
print("   sudo apt-get update && sudo apt-get install edgetpu-compiler")
print("\n2. Compile for Edge TPU:")
print("   edgetpu_compiler sentient_pixel_controller.tflite")
print("\n3. Move compiled model:")
print("   mkdir -p ~/Sentient-Core-v4/models")
print("   mv sentient_pixel_controller_edgetpu.tflite ~/Sentient-Core-v4/models/")
print("\n4. Integrate coral_pixel_engine.py into sentient_core.py")
print("\n5. Launch and watch the pixels come ALIVE!")