# Pong Behavioral Cloning - Google Colab Training

This notebook contains everything needed to train the Pong CNN model on Google Colab.

## Setup Instructions:
1. Upload `pong_dataset.npz` to Google Drive
2. Mount your Google Drive
3. Update the `DATA_PATH` below to point to your dataset
4. Run all cells

## 1. Install Dependencies

In [None]:
# Install required packages (most are pre-installed on Colab)
!pip install -q tensorflow numpy scikit-learn matplotlib

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os
import time

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {len(tf.config.list_physical_devices('GPU')) > 0}")

## 2. Mount Google Drive (Optional - for storing dataset)

In [None]:
# Mount Google Drive to access your dataset
from google.colab import drive
drive.mount('/content/drive')

# If you uploaded dataset to Google Drive, update this path:
# DATA_PATH = '/content/drive/MyDrive/pong_dataset.npz'

# Or if you'll upload directly to Colab session:
# DATA_PATH = '/content/pong_dataset.npz'

## 3. Configuration

In [None]:
# ===== CONFIGURATION =====
# Update this path to your dataset location
DATA_PATH = '/content/drive/MyDrive/pong_dataset.npz'  # Change this!

# Training parameters
BATCH_SIZE = 64
EPOCHS = 20
LEARNING_RATE = 1e-4
NUM_ACTIONS = 6

# Smoke test mode (for quick testing)
SMOKE_TEST = False  # Set to True to use subset of data
SUBSET_SIZE = 1000  # Only used if SMOKE_TEST=True

# Checkpoint directory
CHECKPOINT_DIR = '/content/checkpoints'

print("Configuration:")
print(f"  Data path: {DATA_PATH}")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Epochs: {EPOCHS}")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  Smoke test: {SMOKE_TEST}")

## 4. Model Architecture

In [None]:
def create_pong_cnn(input_shape=(84, 84, 1), num_actions=6):
    """
    Create a Convolutional Neural Network for Pong action prediction.
    Architecture inspired by DQN/Atari networks.
    
    Args:
        input_shape: Input image shape (height, width, channels)
        num_actions: Number of possible actions (default: 6)
    
    Returns:
        Keras Model
    """
    
    # Input layer
    inputs = keras.Input(shape=input_shape, name='input_image')
    
    # Convolutional layers
    # Conv1: 32 filters, 8x8 kernel, stride 4
    x = layers.Conv2D(32, kernel_size=8, strides=4, activation='relu', 
                      name='conv1')(inputs)
    
    # Conv2: 64 filters, 4x4 kernel, stride 2
    x = layers.Conv2D(64, kernel_size=4, strides=2, activation='relu',
                      name='conv2')(x)
    
    # Conv3: 64 filters, 3x3 kernel, stride 1
    x = layers.Conv2D(64, kernel_size=3, strides=1, activation='relu',
                      name='conv3')(x)
    
    # Flatten
    x = layers.Flatten(name='flatten')(x)
    
    # Fully connected layers
    x = layers.Dense(512, activation='relu', name='fc1')(x)
    
    # Output layer (logits, no activation)
    outputs = layers.Dense(num_actions, activation=None, name='output')(x)
    
    # Create model
    model = keras.Model(inputs=inputs, outputs=outputs, name='PongCNN')
    
    return model

print("✓ Model architecture defined")

## 5. Load Dataset

In [None]:
def load_data(data_path, smoke_test=False, subset_size=1000):
    """
    Load the processed Pong dataset.
    
    Args:
        data_path: Path to the .npz file
        smoke_test: If True, use only a small subset
        subset_size: Number of samples for smoke test
    
    Returns:
        (X_train, y_train, X_test, y_test)
    """
    print("\nLoading datasets...")
    data = np.load(data_path)
    
    X_train = data['X_train']
    y_train = data['y_train']
    X_test = data['X_test']
    y_test = data['y_test']
    
    print(f"Loaded train set: {X_train.shape[0]} samples")
    print(f"  Image shape: {X_train.shape[1:]}")
    print(f"  Label shape: {y_train.shape}")
    print(f"  Unique actions: {np.unique(y_train)}")
    
    print(f"Loaded test set: {X_test.shape[0]} samples")
    
    # Smoke test: use subset
    if smoke_test:
        print(f"\nMOKE TEST MODE: Using {subset_size} samples")
        train_indices = np.random.choice(len(X_train), 
                                        min(subset_size, len(X_train)), 
                                        replace=False)
        test_indices = np.random.choice(len(X_test), 
                                       min(subset_size // 4, len(X_test)), 
                                       replace=False)
        X_train = X_train[train_indices]
        y_train = y_train[train_indices]
        X_test = X_test[test_indices]
        y_test = y_test[test_indices]
        print(f"  Train subset: {len(X_train)} samples")
        print(f"  Test subset: {len(X_test)} samples")
    
    return X_train, y_train, X_test, y_test

# Load the data
X_train, y_train, X_test, y_test = load_data(DATA_PATH, SMOKE_TEST, SUBSET_SIZE)

print("\nData loaded successfully")

## 6. Create and Compile Model

In [None]:
# Check for GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"Using GPU: {gpus}")
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
else:
    print("Using CPU")

# Create checkpoint directory
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

# Create model
print("\nCreating model...")
model = create_pong_cnn(input_shape=(84, 84, 1), num_actions=NUM_ACTIONS)

# Print model summary
model.summary()

# Compile model
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

print("\nModel compiled")

## 7. Setup Callbacks

In [None]:
# Setup callbacks
callbacks = [
    # Save checkpoints
    keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(CHECKPOINT_DIR, 'checkpoint_epoch_{epoch:02d}.weights.h5'),
        save_weights_only=True,
        save_freq='epoch',
        verbose=1
    ),
    # Save best model
    keras.callbacks.ModelCheckpoint(
        filepath=os.path.join(CHECKPOINT_DIR, 'best_model.weights.h5'),
        save_weights_only=True,
        save_best_only=True,
        monitor='val_accuracy',
        mode='max',
        verbose=1
    ),
    # CSV logger
    keras.callbacks.CSVLogger(
        os.path.join(CHECKPOINT_DIR, 'training_log.csv'),
        separator=',',
        append=False
    ),
    # TensorBoard
    keras.callbacks.TensorBoard(
        log_dir=os.path.join(CHECKPOINT_DIR, 'logs'),
        histogram_freq=0,
        write_graph=True,
        update_freq='epoch'
    )
]

print("Callbacks configured")

## 8. Train Model

In [None]:
# Training
print(f"\nStarting training for {EPOCHS} epochs...")
print("=" * 70)

start_time = time.time()

history = model.fit(
    X_train, y_train,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=(X_test, y_test),
    callbacks=callbacks,
    verbose=1
)

total_time = time.time() - start_time

print("\n" + "=" * 70)
print("Training completed!")
print(f"Total training time: {total_time:.2f}s")

## 9. Evaluate Model

In [None]:
# Final evaluation
print("\nFinal evaluation on test set:")
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"  Test Loss: {test_loss:.4f}")
print(f"  Test Accuracy: {test_acc * 100:.2f}%")

# Save final model
final_model_path = os.path.join(CHECKPOINT_DIR, 'final_model.weights.h5')
model.save_weights(final_model_path)
print(f"\nFinal model saved to {final_model_path}")

# Print best results
best_val_acc = max(history.history['val_accuracy'])
best_epoch = history.history['val_accuracy'].index(best_val_acc) + 1
print(f"\nBest validation accuracy: {best_val_acc * 100:.2f}% (Epoch {best_epoch})")

print("\nTraining complete!")

## 10. Plot Training History

In [None]:
import matplotlib.pyplot as plt

# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot accuracy
ax1.plot(history.history['accuracy'], label='Train Accuracy')
ax1.plot(history.history['val_accuracy'], label='Val Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.set_title('Model Accuracy')
ax1.legend()
ax1.grid(True)

# Plot loss
ax2.plot(history.history['loss'], label='Train Loss')
ax2.plot(history.history['val_loss'], label='Val Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.set_title('Model Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.savefig(os.path.join(CHECKPOINT_DIR, 'training_history.png'), dpi=150, bbox_inches='tight')
plt.show()

print("Training history plot saved!")

## 11. Download Trained Model (Optional)

In [None]:
# Download the best model to your local machine
from google.colab import files

# Download best model
files.download(os.path.join(CHECKPOINT_DIR, 'best_model.weights.h5'))

# Download training log
files.download(os.path.join(CHECKPOINT_DIR, 'training_log.csv'))

# Download training history plot
files.download(os.path.join(CHECKPOINT_DIR, 'training_history.png'))

print("Files ready for download!")

## 12. Test Model Predictions (Optional)

In [None]:
# Test predictions on a few random samples
num_samples = 5
random_indices = np.random.choice(len(X_test), num_samples, replace=False)

print("Sample Predictions:")
print("=" * 50)

for idx in random_indices:
    sample = X_test[idx:idx+1]
    true_action = y_test[idx]
    
    # Predict
    logits = model.predict(sample, verbose=0)
    predicted_action = np.argmax(logits, axis=1)[0]
    
    match = "✓" if predicted_action == true_action else "✗"
    print(f"{match} True: {true_action}, Predicted: {predicted_action}")

# Calculate overall accuracy on test set
test_predictions = model.predict(X_test, verbose=0)
test_pred_actions = np.argmax(test_predictions, axis=1)
accuracy = np.mean(test_pred_actions == y_test)
print(f"\nOverall Test Accuracy: {accuracy * 100:.2f}%")