# Real-Time Engagement Classification with Roboflow Workflows
## Experimenting with Hosted Inference to Replace Traditional OpenFace/MARLIN Pipelines

The goal of this project is to build a real-time system capable of estimating
student engagement directly from video. Engagement prediction has applications
in online education, classroom analytics, tutoring systems, and human–computer
interaction. The core challenge lies in accurately interpreting subtle facial
behaviors—such as gaze direction, head pose, and facial expressions—in
uncontrolled environments.

# Previous implementation

## The DAiSEE Dataset
DAiSEE (Dataset for Affective States in E-Environments) is a large-scale video
dataset containing **9,068 short clips** of students in natural learning
environments. Each clip is labeled with one of four engagement levels:

1. **Very Low**
2. **Low**
3. **High**
4. **Very High**

DAiSEE is challenging because:
- Labels are subjective
- Lighting and camera conditions vary widely
- Engagement is a high-level affective state, not directly visible

## Feature Extraction: OpenFace and MARLIN

### OpenFace 2.2
OpenFace is an open-source facial behavior analysis toolkit that extracts:
- Facial Action Units (AUs)  
- Eye gaze direction  
- Head pose  
- Facial landmarks  

These features capture interpretable behavioral signals directly linked to
attention, focus, and affect.

### MARLIN Embeddings
MARLIN is a deep learning model that produces a **768-dimensional embedding**
for every video frame. Unlike OpenFace’s engineered features, MARLIN provides
a rich, high-level representation of facial appearance and expression learned
from large-scale data.

Together, OpenFace and MARLIN provide complementary information:
- **OpenFace:** interpretable, low-level behavioral cues  
- **MARLIN:** abstract, high-level visual features  

## EngageNet: A Multimodal Fusion Model
To combine these two modalities, we furthered **EngageNet**, a dual-stream
Transformer-based fusion architecture. EngageNet:

- Accepts **MARLIN embeddings** as a 768-dimensional vector  
- Accepts **OpenFace features** as a short temporal sequence (9 frames)  
- Processes each stream independently  
- Uses Transformers to model temporal dependencies  
- Fuses them into a joint representation  
- Predicts one of the four DAiSEE engagement levels

This notebook demonstrates:
1. The model architecture and validation performance    
2. Why engagement classification is tested using a Roboflow-hosted model

## Import and Setup

In [None]:
import tensorflow as tf  # pyright: ignore[reportMissingImports]
from keras import Input, Model  # pyright: ignore[reportMissingImports]
from keras.layers import (  # pyright: ignore[reportMissingImports]
    Dense, Dropout, LayerNormalization,
    GlobalAveragePooling1D, MultiHeadAttention, Concatenate
)

## Fusion Model Architecture

This function builds a dual-stream neural network that fuses MARLIN and OpenFace features using a Transformer layer to predict one of four engagement levels.

In [None]:
def build_fusion_model(hidden_dim=128, dropout_rate=0.4, num_heads=2, num_layers=1):

    # MARLIN Input Stream
    marlin_input = Input(shape=(1, 768), name="marlin_input")
    x1 = LayerNormalization()(marlin_input)
    x1 = GlobalAveragePooling1D()(x1)
    x1 = Dense(256, activation="relu")(x1)
    x1 = Dropout(dropout_rate)(x1)
    x1 = Dense(hidden_dim, activation="relu")(x1)

    # Transformer layers
    for _ in range(num_layers):
        attn_out = MultiHeadAttention(num_heads=num_heads, key_dim=64)(x2, x2)
        x2 = LayerNormalization()(x2 + attn_out)

    x2 = GlobalAveragePooling1D()(x2)
    x2 = Dense(256, activation="relu")(x2)
    x2 = Dropout(dropout_rate)(x2)
    x2 = Dense(hidden_dim, activation="relu")(x2)

    # Fusion
    fused = Concatenate()([x1, x2])
    fused = Dense(hidden_dim, activation="relu")(fused)
    fused = Dropout(dropout_rate)(fused)
    output = Dense(4, activation="softmax")(fused)

    return Model(inputs=[marlin_input, openface_input], outputs=output)  # pyright: ignore[reportUndefinedVariable]

## Model Summary

In [None]:
fusion_model = build_fusion_model()
fusion_model.summary()

## Training Setup

This code compiles the fusion model with an adaptive learning-rate schedule.

In [None]:
fusion_model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    metrics=["accuracy"],
)

checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    filepath="fusion_best.keras",
    monitor="val_accuracy",
    save_best_only=True,
    verbose=1,
)

def cosine_annealing(epoch, lr, T_max=200, eta_min=1e-6):
    import math
    return eta_min + (lr - eta_min) * (1 + math.cos(math.pi * epoch / T_max)) / 2

lr_schedule_cb = tf.keras.callbacks.LearningRateScheduler(
    cosine_annealing,
    verbose=0,
)

history = fusion_model.fit(
    x=[train_x1, train_x2],  # pyright: ignore[reportUndefinedVariable]
    y=train_y,  # pyright: ignore[reportUndefinedVariable]
    validation_data=([val_x1, val_x2], val_y),  # pyright: ignore[reportUndefinedVariable]
    epochs=200,
    batch_size=32,
    class_weight=class_weights_dict,  # pyright: ignore[reportUndefinedVariable]
    callbacks=[checkpoint_cb, lr_schedule_cb],
    verbose=1,
)


## Validation Performance Snippet

This block evaluates the saved fusion model on the validation set, producing accuracy, a classification report, and a confusion matrix.

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix  # pyright: ignore[reportMissingImports]

# Load best checkpoint if you want the best-val model
best_model = tf.keras.models.load_model("fusion_best.keras", compile=False)

val_probs = best_model.predict([val_x1, val_x2])  # pyright: ignore[reportUndefinedVariable]
val_preds = np.argmax(val_probs, axis=1)

print("Validation accuracy:",
      np.mean(val_preds == val_y))  # pyright: ignore[reportUndefinedVariable]

print("\nClassification report:")
print(classification_report(val_y, val_preds, digits=3))  # pyright: ignore[reportUndefinedVariable]

print("\nConfusion matrix:")
print(confusion_matrix(val_y, val_preds))  # pyright: ignore[reportUndefinedVariable]

## Visual overview of performance

In [None]:
best_val_acc = np.max(history.history["val_accuracy"])
final_train_acc = history.history["accuracy"][-1]
final_val_acc = history.history["val_accuracy"][-1]
final_train_loss = history.history["loss"][-1]
final_val_loss = history.history["val_loss"][-1]
epochs = len(history.history["accuracy"])

summary_text = (
    "EngageNet Fusion — Final Training Summary\n\n"
    f"Epochs Trained:        {epochs}\n"
    f"Final Train Accuracy:  {final_train_acc:.4f}\n"
    f"Final Val Accuracy:    {final_val_acc:.4f}\n"
    f"Best Val Accuracy:     {best_val_acc:.4f}\n"
    f"Final Train Loss:      {final_train_loss:.6f}\n"
    f"Final Val Loss:        {final_val_loss:.4f}\n"
)

summary_img = Image.new("RGB", (800, 350), color=(245, 245, 245))  # pyright: ignore[reportUndefinedVariable]
draw = ImageDraw.Draw(summary_img)  # pyright: ignore[reportUndefinedVariable]
draw.text((25, 25), summary_text, fill=(0, 0, 0))

summary_img.save("training_plots/training_summary.png")

![Training Summary](assets/summary.png)

## Training and Validation curves

In [None]:
history = model.fit(  # pyright: ignore[reportUndefinedVariable]
        x=[train_x1, train_x2],  # pyright: ignore[reportUndefinedVariable]
        y=train_y,  # pyright: ignore[reportUndefinedVariable]
        validation_data=([val_x1, val_x2], val_y),  # pyright: ignore[reportUndefinedVariable]
        epochs=200,
        batch_size=32,
        class_weight=class_weights_dict,  # pyright: ignore[reportUndefinedVariable]
        callbacks=[checkpoint_callback, lr_scheduler],  # pyright: ignore[reportUndefinedVariable]
        verbose=1
)

# Create output folder
os.makedirs("training_plots", exist_ok=True)  # pyright: ignore[reportUndefinedVariable]

# Save Accuracy Plot
plt.figure(figsize=(8, 5))  # pyright: ignore[reportUndefinedVariable]
plt.plot(history.history["accuracy"], label="Train Accuracy")  # pyright: ignore[reportUndefinedVariable]
plt.plot(history.history["val_accuracy"], label="Validation Accuracy")  # pyright: ignore[reportUndefinedVariable]
plt.xlabel("Epoch")  # pyright: ignore[reportUndefinedVariable]
plt.ylabel("Accuracy")  # pyright: ignore[reportUndefinedVariable]
plt.title("EngageNet Fusion: Training vs Validation Accuracy")  # pyright: ignore[reportUndefinedVariable]
plt.legend()  # pyright: ignore[reportUndefinedVariable]
plt.grid(True, alpha=0.3)  # pyright: ignore[reportUndefinedVariable]
plt.savefig("training_plots/fusion_training_accuracy.png", dpi=300, bbox_inches="tight")   # pyright: ignore[reportUndefinedVariable]
plt.close()   # pyright: ignore[reportUndefinedVariable]

# Save Loss Plot
plt.figure(figsize=(8, 5))  # pyright: ignore[reportUndefinedVariable]
plt.plot(history.history["loss"], label="Train Loss")  # pyright: ignore[reportUndefinedVariable]
plt.plot(history.history["val_loss"], label="Validation Loss")  # pyright: ignore[reportUndefinedVariable]
plt.xlabel("Epoch")  # pyright: ignore[reportUndefinedVariable]
plt.ylabel("Loss")  # pyright: ignore[reportUndefinedVariable]
plt.title("EngageNet Fusion: Training vs Validation Loss")  # pyright: ignore[reportUndefinedVariable]
plt.legend()  # pyright: ignore[reportUndefinedVariable]
plt.grid(True, alpha=0.3)  # pyright: ignore[reportUndefinedVariable]
plt.savefig("training_plots/fusion_training_loss.png", dpi=300, bbox_inches="tight")   # pyright: ignore[reportUndefinedVariable]
plt.close()  # pyright: ignore[reportUndefinedVariable]

![Training Accuracy](assets/fusion_training_accuracy.png)

![Loss Curve](assets/fusion_training_loss.png)


## Roboflow implementation

One major limitation of my offline model was its lack of reproducibility and its inability to run real-time inference in a portable way. Docker initially seemed like a solution, since it could bundle dependencies and weights into a shareable environment, but it proved impractical: the model remained tied to my local repo, and Docker’s headless nature prevents access to webcams and other device-level features.

To address this, I began exploring more lightweight, fully hosted alternatives. This led me to experiment with Roboflow’s single-label image classification workflow—uploading representative frames, training a hosted model, and testing whether it could reliably separate different engagement levels.

## Example Predictions from the Roboflow Classifier


![Attentive](assets/Attentive.png)

![Confused](assets/Confused.png)

![Non-Attentive](assets/Non-Attentive.png)

These sample predictions show that the Roboflow classifier successfully learned coarse engagement cues such as gaze direction, head orientation, and posture. However, the model is limited by the homogeneity of the dataset - same subject, environment, lighting, and clothing - which means further generalization would require additional, more diverse training data.

## Custom Roboflow Workflow for API-Based Inference

![Custom-Workflow](assets/Custom-Workflow.png)

## Connecting to Single-Label Classification Model

In [None]:
import cv2
import os
from inference_sdk import InferenceHTTPClient
from dotenv import load_dotenv

load_dotenv() # Important keys are stored in the env. file

API_URL = os.getenv("ROBOFLOW_API_URL")
API_KEY = os.getenv("ROBOFLOW_API_KEY")

client = InferenceHTTPClient(
    api_url= API_URL,
    api_key= API_KEY 
)

## Capturing one frame

In [None]:
# capture one frame
cap = cv2.VideoCapture(1)
ret, frame = cap.read()
cap.release()

# Run workflow to receive JSON response

In [None]:
if ret:
    # save it temporarily
    cv2.imwrite("frame.jpg", frame)
    
    # run workflow
    result = client.run_workflow(
        workspace_name="testing-qqggh",
        workflow_id="custom-workflow",
        images={"image": "frame.jpg"}
    )

## Create annotated engagement frame

In [None]:
# Parse JSON response to extract class name and confidence
try:
    predictions = result[0]['predictions']['predictions']
    if predictions:
        class_name = predictions[0]['class']
        confidence = predictions[0]['confidence']
    else:
        # Fallback to top-level values if predictions array is empty
        class_name = result[0]['predictions'].get('top', 'unknown')
        confidence = result[0]['predictions'].get('confidence', 0.0)
except (KeyError, IndexError, TypeError) as e:
    print(f"Error parsing result: {e}")
    print(f"Result structure: {result}")
    class_name = "unknown"
    confidence = 0.0

# Format class name (capitalize properly, handling hyphenated words)
if '-' in class_name:
    formatted_class = '-'.join(word.capitalize() for word in class_name.split('-'))
else:
    formatted_class = class_name.capitalize()

confidence_pct = confidence * 100
confidence_str = f"{confidence_pct:.2f}%"

# Load the original frame.jpg image
annotated_frame = cv2.imread("frame.jpg")

if annotated_frame is not None:
    # Draw box overlay in top-left corner 
    box_x, box_y = 0, 0
    text = f"{formatted_class} {confidence_str}"
    
    # Get text size to determine box dimensions
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.85 
    thickness = 2
    (text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness)
    
    # Box dimensions with padding (adjusted for better proportions)
    padding_x = 14
    padding_y = 10
    box_width = text_width + padding_x * 2
    box_height = text_height + baseline + padding_y * 2
    
    # Draw filled purple rectangle
    purple_color = (128, 0, 128)  
    cv2.rectangle(annotated_frame, 
                    (box_x, box_y), 
                    (box_x + box_width, box_y + box_height), 
                    purple_color, 
                    -1)
    
    # Add white text overlay
    text_x = box_x + padding_x
    text_y = box_y + text_height + padding_y
    white_color = (255, 255, 255)
    cv2.putText(annotated_frame, text, (text_x, text_y), 
                font, font_scale, white_color, thickness)
    
    # Display the annotated image
    cv2.imshow("Roboflow Test - Annotated", annotated_frame)
    print(f"Class: {formatted_class}, Confidence: {confidence_str}")
    print("Press any key to close the window...")
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    
    # Save annotated image
    output_filename = "frame_annotated.jpg"
    cv2.imwrite(output_filename, annotated_frame)
    print(f"Annotated image saved as {output_filename}")
else:
    print("Could not load frame.jpg for annotation.")

## Output

![Annotated-Attentive](assets/frame_annotated_attentive.jpg)

![Annotated-Non-Attentive](assets/frame_annotated_non_attentive.jpg)

![Annotated-Confused](assets/frame_annotated_confused.jpg)

## Results and Conclusions

Offline validation mattered less than real-time accuracy, since my earlier MARLIN + MediaPipe pipeline was slow and only correct about 45–50% of the time. The Roboflow model, trained on far less data, ran instantly and produced consistently accurate predictions (aside from occasional “confused” cases). Tests in different lighting showed similarly strong behavior, making this a far more practical option for real-time engagement classification.