In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import cv2
import os
import time

In [2]:
def load_fer2013():
    """Load and preprocess FER2013 dataset."""
    try:
        df = pd.read_csv('fer2013.csv', names=['emotion', 'pixels', 'Usage'], skiprows=1)
        print("Dataset loaded successfully!")
        print(f"Total samples: {len(df)}")
        pixels = df['pixels'].str.split(' ').apply(lambda x: [int(pixel) for pixel in x])
        X = np.array(pixels.tolist())
        X = X.reshape(-1, 48, 48, 1).astype('float32')
        X = X / 255.0
        y = df['emotion'].values
        
        print(f"Input shape: {X.shape}")
        print(f"Number of classes: {len(np.unique(y))}")
        print("\nClass distribution:")
        for emotion_id, count in enumerate(np.bincount(y)):
            print(f"Emotion {emotion_id}: {count} samples")     
        return X, y, df['Usage'].values
        
    except Exception as e:
        print(f"Error loading dataset: {str(e)}")
        return None, None, None

In [3]:
def create_model():
    """Create CNN model for facial emotion recognition."""
    model = Sequential([
        # First Convolution Block
        Conv2D(96, (3, 3), padding='same', activation='relu', input_shape=(48, 48, 1)),
        BatchNormalization(),
        Conv2D(96, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Second Convolution Block
        Conv2D(192, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        Conv2D(192, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Third Convolution Block
        Conv2D(384, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        Conv2D(384, (3, 3), padding='same', activation='relu'),
        BatchNormalization(),
        MaxPooling2D(pool_size=(2, 2)),
        Dropout(0.25),
        
        # Dense Layers
        Flatten(),
        Dense(512, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(256, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(7, activation='softmax')
    ]) 
    return model

In [4]:
def train_model(continue_training=False):
    """Train the emotion detection model."""
    # Load and preprocess data
    print("Loading dataset...")
    X, y, usage = load_fer2013()
    
    if X is None:
        return None, None
    
    # Split data
    X_train = X[usage == 'Training']
    y_train = y[usage == 'Training']
    X_test = X[usage == 'PrivateTest']
    y_test = y[usage == 'PrivateTest']
    
    print(f"Training samples: {len(X_train)}")
    print(f"Test samples: {len(X_test)}")
    
    # Data augmentation
    datagen = ImageDataGenerator(
        rotation_range=30,
        width_shift_range=0.2,
        height_shift_range=0.2,
        horizontal_flip=True,
        zoom_range=0.2,
        shear_range=0.2,
        fill_mode='nearest'
    )
    
    if continue_training and os.path.exists('checkpoints/best_model.h5'):
        print("\nLoading existing model for continued training...")
        model = load_model('checkpoints/best_model.h5')
    else:
        print("\nCreating new model...")
        model = create_model()
    
    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=0.0005),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    model.summary()
    
    # Create checkpoint directory
    checkpoint_dir = 'checkpoints'
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    
    # Callbacks
    callbacks = [
        ModelCheckpoint(
            filepath=os.path.join(checkpoint_dir, 'best_model.h5'),
            monitor='val_accuracy',
            mode='max',
            save_best_only=True,
            verbose=1
        ),
        EarlyStopping(
            monitor='val_loss',
            mode='min',
            patience=15,
            verbose=1,
            restore_best_weights=True
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=8,
            min_lr=1e-7,
            verbose=1
        )
    ]
    
    # Train
    print("\nTraining model...")
    history = model.fit(
        datagen.flow(X_train, y_train, batch_size=32),
        validation_data=(X_test, y_test),
        steps_per_epoch=len(X_train) // 32,
        epochs=100,
        callbacks=callbacks
    )
    return model, history

In [5]:
def process_video(input_path, output_path):
    """Process a video file with emotion detection."""
    # Load model
    print("Loading model...")
    model = load_model('checkpoints/best_model.h5')
    
    # Load face cascade
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    
    # Emotion labels
    emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
    
    # Open video
    print("Opening video file...")
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print("Error: Could not open video file")
        return
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    
    # Create video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
            
        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        # Detect faces
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=4,
            minSize=(30, 30)
        )
        
        # Process each face
        for (x, y, w, h) in faces:
            face_roi = gray[y:y+h, x:x+w]
            face_roi = cv2.resize(face_roi, (48, 48))
            face_roi = face_roi.astype('float32') / 255.0
            face_roi = np.expand_dims(face_roi, axis=[0, -1])
            
            # Get predictions
            predictions = model.predict(face_roi, verbose=0)[0]
            
            # Get top 2 emotions
            top_2_idx = np.argsort(predictions)[-2:][::-1]
            emotion_1 = emotions[top_2_idx[0]]
            emotion_2 = emotions[top_2_idx[1]]
            conf_1 = predictions[top_2_idx[0]]
            conf_2 = predictions[top_2_idx[1]]
            
            # Draw rectangle and labels
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
            
            # First emotion
            label_1 = f"{emotion_1}: {conf_1:.2f}"
            label_size, _ = cv2.getTextSize(label_1, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
            cv2.rectangle(frame, 
                        (x, y - label_size[1] - 10), 
                        (x + label_size[0], y), 
                        (0, 255, 0), 
                        cv2.FILLED)
            cv2.putText(frame, label_1, (x, y - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
            
            # Second emotion
            label_2 = f"{emotion_2}: {conf_2:.2f}"
            label_size, _ = cv2.getTextSize(label_2, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            cv2.rectangle(frame, 
                        (x, y - label_size[1] - 30), 
                        (x + label_size[0], y - 20), 
                        (255, 255, 0), 
                        cv2.FILLED)
            cv2.putText(frame, label_2, (x, y - 25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
        
        # Display and write frame
        cv2.imshow('Emotion Detection', frame)
        out.write(frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Clean up
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"\nProcessed video saved as: {output_path}")

In [7]:
def start_webcam_detection():
    """Start real-time emotion detection using webcam."""
    # Load model
    print("Loading model...")
    model = load_model('checkpoints/best_model.h5')
    
    # Load face cascade
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    
    # Emotion labels
    emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']
    
    # Start webcam
    print("Starting webcam...")
    cap = cv2.VideoCapture(0)
    
    # Set webcam properties
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
    
    # FPS calculation variables
    fps_start_time = time.time()
    fps = 0
    frame_count = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Error: Can't receive frame from webcam")
            break
            
        # Calculate FPS
        frame_count += 1
        if frame_count >= 30:
            fps = frame_count / (time.time() - fps_start_time)
            fps_start_time = time.time()
            frame_count = 0
        
        # Process frame
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=4,
            minSize=(30, 30)
        )
        
        # Process each face
        for (x, y, w, h) in faces:
            face_roi = gray[y:y+h, x:x+w]
            face_roi = cv2.resize(face_roi, (48, 48))
            face_roi = face_roi.astype('float32') / 255.0
            face_roi = np.expand_dims(face_roi, axis=[0, -1])
            
            # Get predictions
            predictions = model.predict(face_roi, verbose=0)[0]
            top_2_idx = np.argsort(predictions)[-2:][::-1]
            
            # Get emotions
            emotion_1 = emotions[top_2_idx[0]]
            emotion_2 = emotions[top_2_idx[1]]
            conf_1 = predictions[top_2_idx[0]]
            conf_2 = predictions[top_2_idx[1]]
            
            # Draw rectangle and labels
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
            
            # First emotion
            label_1 = f"{emotion_1}: {conf_1:.2f}"
            label_size, _ = cv2.getTextSize(label_1, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
            cv2.rectangle(frame, 
                        (x, y - label_size[1] - 10), 
                        (x + label_size[0], y), 
                        (0, 255, 0), 
                        cv2.FILLED)
            cv2.putText(frame, label_1, (x, y - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
            
            # Second emotion
            label_2 = f"{emotion_2}: {conf_2:.2f}"
            label_size, _ = cv2.getTextSize(label_2, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            cv2.rectangle(frame, 
                        (x, y - label_size[1] - 30), 
                        (x + label_size[0], y - 20), 
                        (255, 255, 0), 
                        cv2.FILLED)
            cv2.putText(frame, label_2, (x, y - 25), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1)
        
        # Add FPS counter
        cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # Show frame
        cv2.imshow('Emotion Detection (Press Q to quit)', frame)
        
        # Break loop on 'q' press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Clean up
    cap.release()
    cv2.destroyAllWindows()
    print("Webcam closed")

In [9]:
if __name__ == "__main__":
    # Set memory growth for GPU if available
    physical_devices = tf.config.list_physical_devices('GPU')
    if physical_devices:
        for device in physical_devices:
            tf.config.experimental.set_memory_growth(device, True)
    
    # Train the model
    continue_training = True 
    model, history = train_model(continue_training)
    
    if model is not None:
        model.save('final_model.h5')
        print("\nTraining completed!")
        print("Models saved in 'checkpoints/best_model.h5' and 'final_model.h5'")

Loading dataset...
Dataset loaded successfully!
Total samples: 26510




Input shape: (26510, 48, 48, 1)
Number of classes: 7

Class distribution:
Emotion 0: 3640 samples
Emotion 1: 400 samples
Emotion 2: 3763 samples
Emotion 3: 6604 samples
Emotion 4: 4515 samples
Emotion 5: 2994 samples
Emotion 6: 4594 samples
Training samples: 19332
Test samples: 3589

Loading existing model for continued training...
Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_3 (Conv2D)           (None, 48, 48, 64)        640       
                                                                 
 batch_normalization_4 (Bat  (None, 48, 48, 64)        256       
 chNormalization)                                                
                                                                 
 max_pooling2d_3 (MaxPoolin  (None, 24, 24, 64)        0         
 g2D)                                                            
                                                  

  saving_api.save_model(


Epoch 2: val_accuracy did not improve from 0.61939
Epoch 3/100
Epoch 3: val_accuracy did not improve from 0.61939
Epoch 4/100
Epoch 4: val_accuracy did not improve from 0.61939
Epoch 5/100
Epoch 5: val_accuracy did not improve from 0.61939
Epoch 6/100
Epoch 6: val_accuracy did not improve from 0.61939
Epoch 7/100
Epoch 7: val_accuracy did not improve from 0.61939
Epoch 8/100
Epoch 8: val_accuracy did not improve from 0.61939
Epoch 9/100
Epoch 9: val_accuracy did not improve from 0.61939

Epoch 9: ReduceLROnPlateau reducing learning rate to 0.00010000000474974513.
Epoch 10/100
Epoch 10: val_accuracy improved from 0.61939 to 0.62023, saving model to checkpoints/best_model.h5
Epoch 11/100
Epoch 11: val_accuracy improved from 0.62023 to 0.62775, saving model to checkpoints/best_model.h5
Epoch 12/100
Epoch 12: val_accuracy did not improve from 0.62775
Epoch 13/100
Epoch 13: val_accuracy did not improve from 0.62775
Epoch 14/100
Epoch 14: val_accuracy did not improve from 0.62775
Epoch 15/10

In [10]:
# Process a video file
video_input_path = './videos/emotion1.mp4' 
video_output_path = './output/emotion1_processed.mp4' 
process_video(video_input_path, video_output_path)

Loading model...
Opening video file...





Processed video saved as: ./output/emotion1_processed.mp4


In [13]:
# Start real-time webcam detection
start_webcam_detection() 

Loading model...
Starting webcam...
Webcam closed
