### **NEW**

In [None]:
pip install tensorflow pandas numpy scikit-learn matplotlib opencv-python


In [None]:
import os
import json
import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor
import random
import time

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Adjust the path to point to your specific dataset location
LEFT_EYE_DIR = '/content/drive/MyDrive/AllGalaxy/allgalaxy-webgazer/Data collection/Data/UnityEyes_Windows/UnityEyes_Windows/left_eye'
RIGHT_EYE_DIR = '/content/drive/MyDrive/AllGalaxy/allgalaxy-webgazer/Data collection/Data/UnityEyes_Windows/UnityEyes_Windows/right_eye'

# Initialize lists to store data
images = []
json_features = []
labels = []

# Function to parse JSON features
def parse_json(json_path):
    try:
        with open(json_path, 'r') as f:
            data = json.load(f)
        features = []

        # Extract head pose and eye details
        head_pose = [float(angle) for angle in data.get('head_pose', "(0,0,0)").strip("()").split(",")]
        eye_details = data.get('eye_details', {})
        pupil_size = float(eye_details.get('pupil_size', 0.0))
        iris_size = float(eye_details.get('iris_size', 0.0))

        # Combine all features into a single list
        features.extend(head_pose)
        features.extend([pupil_size, iris_size])
        return features
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Error loading JSON file {json_path}: {e}")
        return [0.0, 0.0, 0.0, 0.0, 0.0]

# Function to process a single image file and corresponding JSON file
def process_file(img_path, json_path):
    try:
        # Load and preprocess the image
        img = cv2.imread(img_path)
        if img is None:
            print(f"Error loading image {img_path}")
            return None, None, None
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (224, 224)) / 255.0  # Normalize to [0, 1]

        # Load and parse JSON features
        features = parse_json(json_path)

        # Placeholder label (replace with actual labels if available)
        label = [0, 0]

        return img, features, label
    except Exception as e:
        print(f"Error processing file {img_path}: {e}")
        return None, None, None

# Function to process random samples in batches to avoid memory overload and I/O issues
def process_in_batches(directory, batch_size=100, num_samples=1000):
    try:
        # Get all JPG files and randomly sample them
        files = [file for file in os.listdir(directory) if file.endswith('.jpg')]
        total_files = len(files)

        # Randomly sample num_samples files
        sample_files = random.sample(files, min(num_samples, total_files))

        # Process files in batches
        for i in range(0, len(sample_files), batch_size):
            batch_files = sample_files[i:i + batch_size]
            print(f"Processing batch {i // batch_size + 1} from {directory}...")

            for file in batch_files:
                img_path = os.path.join(directory, file)
                json_path = os.path.join(directory, file.replace('.jpg', '.json'))

                img, features, label = process_file(img_path, json_path)
                if img is not None:
                    images.append(img)
                    json_features.append(features)
                    labels.append(label)

            # Pause for a short time to reduce pressure on file system
            time.sleep(1)

    except OSError as e:
        print(f"Error processing files in directory {directory}: {e}")

# Process 1000 random files from each directory in batches of 100
print("Processing samples from left_eye directory")
process_in_batches(LEFT_EYE_DIR, batch_size=100, num_samples=1000)

print("Processing samples from right_eye directory")
process_in_batches(RIGHT_EYE_DIR, batch_size=100, num_samples=1000)

# Convert lists to numpy arrays
images = np.array(images)
json_features = np.array(json_features)
labels = np.array(labels)

# Print shapes for confirmation
print(f'Images shape: {images.shape}')
print(f'JSON features shape: {json_features.shape}')
print(f'Labels shape: {labels.shape}')

# Example of how to use Matplotlib for visualization
def visualize_data():
    plt.figure(figsize=(10, 10))
    for i in range(9):
        plt.subplot(3, 3, i+1)
        plt.imshow(images[i])
        plt.title(f"Label: {labels[i]}")
        plt.axis('off')
    plt.show()

# Call the visualization function if needed
visualize_data()


In [None]:
# Standardize JSON features
scaler = StandardScaler()
json_features = scaler.fit_transform(json_features)
# Split into training and testing sets
X_img_train, X_img_temp, X_json_train, X_json_temp, y_train, y_temp = train_test_split(
    images, json_features, labels, test_size=0.3, random_state=42
)

# Further split temp into validation and testing
X_img_val, X_img_test, X_json_val, X_json_test, y_val, y_test = train_test_split(
    X_img_temp, X_json_temp, y_temp, test_size=0.5, random_state=42
)

print(f'Training images: {X_img_train.shape}')
print(f'Validation images: {X_img_val.shape}')
print(f'Testing images: {X_img_test.shape}')
# Image input
image_input = layers.Input(shape=(224, 224, 3), name='image_input')

# Pre-trained CNN for image feature extraction
base_model = tf.keras.applications.ResNet50(
    include_top=False, weights='imagenet', input_tensor=image_input
)
base_model.trainable = False  # Freeze the base model

# Add global pooling
x = layers.GlobalAveragePooling2D()(base_model.output)
x = layers.Dense(512, activation='relu')(x)
x = layers.Dropout(0.5)(x)
image_features = layers.Dense(256, activation='relu')(x)

# JSON features input
json_input = layers.Input(shape=(X_json_train.shape[1],), name='json_input')
y = layers.Dense(128, activation='relu')(json_input)
y = layers.Dropout(0.3)(y)
json_features_dense = layers.Dense(64, activation='relu')(y)

# Combine image and JSON features
combined = layers.concatenate([image_features, json_features_dense])

# Add fully connected layers
z = layers.Dense(256, activation='relu')(combined)
z = layers.Dropout(0.5)(z)
z = layers.Dense(128, activation='relu')(z)

# Output layer
# For regression (e.g., gaze x and y coordinates)
output = layers.Dense(2, activation='linear', name='output')(z)

# Define the model
model = models.Model(inputs=[image_input, json_input], outputs=output)

# Compile the model
model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-4),
    loss='mean_squared_error',
    metrics=['mae']
)

model.summary()
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define ImageDataGenerator for augmentation
datagen = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.1,
    horizontal_flip=True,
    brightness_range=[0.8,1.2]
)

# Example of applying augmentation
# Note: When using multiple inputs, custom generators might be needed

In [None]:
# Define callbacks
checkpoint = callbacks.ModelCheckpoint(
    'second.h5', monitor='val_loss', save_best_only=True, mode='min'
)
early_stop = callbacks.EarlyStopping(
    monitor='val_loss', patience=10, restore_best_weights=True
)

# Train the model
history = model.fit(
    {'image_input': X_img_train, 'json_input': X_json_train},
    y_train,
    epochs=10,
    batch_size=32,
    validation_data=(
        {'image_input': X_img_val, 'json_input': X_json_val},
        y_val
    ),
    callbacks=[checkpoint, early_stop]
)
# Load the best model
model.load_weights('second.h5')

# Evaluate on test data
test_loss, test_mae = model.evaluate(
    {'image_input': X_img_test, 'json_input': X_json_test},
    y_test
)

print(f'Test Loss: {test_loss}')
print(f'Test MAE: {test_mae}')

In [None]:
# Plot training & validation loss values
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('MSE Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Train MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.ylabel('Mean Absolute Error')
plt.xlabel('Epoch')
plt.legend(loc='upper right')

plt.tight_layout()
plt.show()


In [None]:
import os
import random
import cv2
import numpy as np
import json

# Paths to your left and right eye data in Google Drive
LEFT_EYE_DIR = '/content/drive/MyDrive/AllGalaxy/allgalaxy-webgazer/Data collection/Data/UnityEyes_Windows/UnityEyes_Windows/left_eye'
RIGHT_EYE_DIR = '/content/drive/MyDrive/AllGalaxy/allgalaxy-webgazer/Data collection/Data/UnityEyes_Windows/UnityEyes_Windows/right_eye'

# Function to parse JSON features
def parse_json(json_path):
    try:
        with open(json_path, 'r') as f:
            data = json.load(f)
        features = []
        head_pose = data.get('head_pose', "(0,0,0)").strip("()").split(",")
        head_pose = [float(angle) for angle in head_pose]
        features.extend(head_pose)
        eye_details = data.get('eye_details', {})
        pupil_size = float(eye_details.get('pupil_size', 0.0))
        iris_size = float(eye_details.get('iris_size', 0.0))
        features.extend([pupil_size, iris_size])
        return features
    except (json.JSONDecodeError, FileNotFoundError) as e:
        print(f"Error loading JSON file {json_path}: {e}")
        return None

# Function to load and preprocess an image
def load_and_preprocess_image(image_path):
    image = cv2.imread(image_path)

    if image is None:
        print(f"Error loading image at {image_path}")
        return None

    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))  # Resize to match model input size
    image = image / 255.0  # Normalize to range [0, 1]
    image = np.expand_dims(image, axis=0)  # Expand dims to match input shape

    return image

# Select a random file from the left_eye directory
def get_random_image_and_json(directory):
    try:
        # Get all .jpg files in the directory
        files = [f for f in os.listdir(directory) if f.endswith('.jpg')]
        if not files:
            print(f"No images found in directory: {directory}")
            return None, None

        # Select a random file
        random_file = random.choice(files)

        # Get the corresponding JSON path
        image_path = os.path.join(directory, random_file)
        json_path = os.path.join(directory, random_file.replace('.jpg', '.json'))

        # Check if the corresponding JSON file exists
        if not os.path.exists(json_path):
            print(f"Error: Corresponding JSON file {json_path} not found.")
            return None, None

        return image_path, json_path
    except Exception as e:
        print(f"Error in get_random_image_and_json: {e}")
        return None, None

# Load a random image and JSON from the left_eye directory for prediction
image_path, json_path = get_random_image_and_json(LEFT_EYE_DIR)

if image_path and json_path:
    # Load and preprocess the image
    new_image = load_and_preprocess_image(image_path)

    # Load and preprocess the JSON data
    new_json = parse_json(json_path)

    if new_image is not None and new_json is not None:
        # Assuming you have a trained scaler (replace 'scaler' with your actual scaler)
        new_json = scaler.transform([new_json])

        # Predict gaze coordinates (assuming your model expects 'image_input' and 'json_input' as inputs)
        prediction = model.predict({'image_input': new_image, 'json_input': new_json})
        print(f'Predicted Gaze Coordinates: {prediction[0]}')
    else:
        print("Error: Could not load image or JSON data.")
else:
    print("Error: Could not retrieve a random image and its corresponding JSON.")


In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
import base64
import cv2
import numpy as np
import tensorflow as tf
import time
import math

# JavaScript to handle the video stream and snapshot
def video_stream():
    js = Javascript('''
    var video;
    var div = null;
    var stream;

    async function streamVideo() {
        div = document.createElement('div');
        document.body.appendChild(div);
        div.style.textAlign = 'center';

        video = document.createElement('video');
        video.style.display = 'block';
        div.appendChild(video);

        stream = await navigator.mediaDevices.getUserMedia({video: true});
        video.srcObject = stream;
        await video.play();

        window.imgElement = document.createElement('img');
        div.appendChild(window.imgElement);

        window.captureCanvas = document.createElement('canvas');
        captureCanvas.width = 224;
        captureCanvas.height = 224;
        captureCanvas.style.display = 'block';
        div.appendChild(captureCanvas);

        window.labelElement = document.createElement('div');
        labelElement.innerText = 'Model output will appear here';
        div.appendChild(labelElement);

        var shutdown = false;
        var pendingResolve = null;

        function removeDom() {
            stream.getVideoTracks()[0].stop();
            if (div !== null) {
                div.remove();
                div = null;
                video = null;
                captureCanvas = null;
                imgElement = null;
                labelElement = null;
            }
        }

        function onAnimationFrame() {
            if (!shutdown) {
                captureCanvas.getContext('2d').drawImage(video, 0, 0, 224, 224);
                requestAnimationFrame(onAnimationFrame);
            }
        }

        requestAnimationFrame(onAnimationFrame);

        // Define takeSnapshot as a method of window
        window.takeSnapshot = async function() {
            return captureCanvas.toDataURL('image/jpeg', 0.8);
        };

        await new Promise((resolve) => {
            pendingResolve = resolve;
        });
        shutdown = true;
        removeDom();
    }

    streamVideo();
    ''')
    display(js)

def get_frame():
    data = eval_js('takeSnapshot()')
    binary = base64.b64decode(data.split(',')[1])
    image = np.frombuffer(binary, dtype=np.uint8)
    image = cv2.imdecode(image, flags=cv2.IMREAD_COLOR)
    return image

# Load your pre-trained model
model = tf.keras.models.load_model('/content/second.h5')

# Start streaming video from webcam
video_stream()

# Wait a bit to let JavaScript code initialize
time.sleep(2)

# Set initial previous x, y positions and time for speed calculation
prev_x, prev_y = None, None
prev_time = time.time()

# Amplification factor for movement sensitivity
movement_amplification_factor = 2.0  # You can adjust this value

def classify_speed(speed):
    if speed < 10:
        return "very slow"
    elif speed < 30:
        return "slow"
    elif speed < 70:
        return "normal"
    elif speed < 150:
        return "fast"
    else:
        return "very fast"

# Perform a continuous loop to process the camera frames
try:
    while True:
        frame = get_frame()
        input_frame = cv2.resize(frame, (224, 224))
        input_frame = np.expand_dims(input_frame, axis=0)

        # Generate dummy input (if required by the model)
        json_input = np.zeros((1, 5))

        # Predict eye coordinates
        prediction = model.predict([input_frame, json_input])

        # Extract predicted x, y coordinates (assuming model outputs values scaled between [0, 1])
        pred_x = prediction[0][0]
        pred_y = prediction[0][1]

        # Amplify movement for more noticeable changes
        pred_x = int(pred_x * frame.shape[1] * movement_amplification_factor)
        pred_y = int(pred_y * frame.shape[0] * movement_amplification_factor)

        # Clamp coordinates to frame size
        pred_x = np.clip(pred_x, 0, frame.shape[1] - 1)
        pred_y = np.clip(pred_y, 0, frame.shape[0] - 1)

        # Calculate speed of saccade (eye movement)
        curr_time = time.time()
        if prev_x is not None and prev_y is not None:
            time_diff = curr_time - prev_time
            distance = math.sqrt((pred_x - prev_x) ** 2 + (pred_y - prev_y) ** 2)
            speed = distance / time_diff if time_diff > 0 else 0

            # Update previous position and time
            prev_time = curr_time
            prev_x, prev_y = pred_x, pred_y

            # Classify the speed of the saccade
            speed_classification = classify_speed(speed)
            print(f"Saccade speed: {speed:.2f} pixels/second, classified as: {speed_classification}")
        else:
            prev_x, prev_y = pred_x, pred_y

        # Draw the yellow point representing eye movement
        cv2.circle(frame, (pred_x, pred_y), 10, (0, 255, 255), -1)

        # Convert frame to JPEG format and display it in the live stream
        _, jpeg_image = cv2.imencode('.jpg', frame)
        data_url_image = 'data:image/jpeg;base64,' + base64.b64encode(jpeg_image).decode('utf-8')
        eval_js(f'imgElement.src="{data_url_image}"; labelElement.innerText="Predicted: ({pred_x}, {pred_y})";')

        # Add a short delay to control frame rate
        time.sleep(0.1)

except KeyboardInterrupt:
    pass


Pytorch tained model

In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
import base64
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
import time
import math

# Step 1: Define JavaScript code to capture video stream and snapshots
def video_stream():
    js = Javascript('''
    async function videoCapture() {
        const video = document.createElement('video');
        document.body.appendChild(video);

        const stream = await navigator.mediaDevices.getUserMedia({ video: true });
        video.srcObject = stream;
        await video.play();

        const canvas = document.createElement('canvas');
        canvas.width = 224;
        canvas.height = 224;
        document.body.appendChild(canvas);
        const ctx = canvas.getContext('2d');

        window.takeSnapshot = function() {
            ctx.drawImage(video, 0, 0, canvas.width, canvas.height);
            return canvas.toDataURL('image/jpeg', 0.8);
        };
    }
    videoCapture();
    ''')
    display(js)

# Step 2: Function to capture a frame from the video stream
def get_frame():
    data = eval_js('takeSnapshot()')  # Call the takeSnapshot function defined in JS
    binary = base64.b64decode(data.split(',')[1])  # Decode the base64 image
    image = np.frombuffer(binary, dtype=np.uint8)
    return cv2.imdecode(image, cv2.IMREAD_COLOR)  # Convert image to OpenCV format

# Step 3: Function to extract the eye region using OpenCV's face detection
def extract_eye_region(frame):
    # Convert to grayscale for face/eye detection
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')

    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    for (x, y, w, h) in faces:
        face_roi = gray[y:y+h, x:x+w]
        eyes = eye_cascade.detectMultiScale(face_roi)
        for (ex, ey, ew, eh) in eyes:
            return frame[y+ey:y+ey+eh, x+ex:x+ex+ew]  # Return eye region
    return frame  # Fallback to original frame if no eye detected

# Step 4: Define the PyTorch model (using ResNet18 as an example)
class GazeNetwork(torch.nn.Module):
    def __init__(self):
        super(GazeNetwork, self).__init__()
        self.backbone = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)
        num_ftrs = self.backbone.fc.in_features
        self.backbone.fc = torch.nn.Linear(num_ftrs, 2)  # Output x, y coordinates

    def forward(self, x):
        return self.backbone(x)

# Step 5: Initialize the model
model = GazeNetwork()

# Step 6: Load the checkpoint
checkpoint = torch.load('/content/epoch_24_ckpt.pth.tar', map_location=torch.device('cpu'))
state_dict = checkpoint['model_state']  # Adjust this based on how the checkpoint was saved
model.load_state_dict(state_dict, strict=False)

# Set the model to evaluation mode
model.eval()

# Step 7: Preprocess the image for model inference
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),  # Resize the frame to 224x224, assuming the model expects this size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize using ImageNet stats
])

# Start the video stream
video_stream()

# Wait for JavaScript to initialize
time.sleep(2)

# Step 8: Define the function to draw a yellow point and calculate saccade speed
prev_x, prev_y = None, None  # Store previous position
prev_time = time.time()  # Store previous timestamp

def draw_eye_movement(pred_x, pred_y):
    global prev_x, prev_y, prev_time

    # Create a blank black image to display the yellow point
    frame = np.zeros((224, 224, 3), dtype=np.uint8)

    # Normalize the predicted coordinates to cover the full range of the frame
    pred_x = (pred_x + 1) / 2  # Map [-1, 1] to [0, 1]
    pred_y = (pred_y + 1) / 2  # Same for Y

    # Adjust scaling to make the movement more visible within the frame
    pred_x = int(pred_x * 224)
    pred_y = int(pred_y * 224)

    # Clamp coordinates to ensure they stay within the frame boundaries
    pred_x = np.clip(pred_x, 0, 223)
    pred_y = np.clip(pred_y, 0, 223)

    # Calculate saccade speed
    if prev_x is not None and prev_y is not None:
        curr_time = time.time()
        time_diff = curr_time - prev_time
        distance = math.sqrt((pred_x - prev_x) ** 2 + (pred_y - prev_y) ** 2)
        speed = distance / time_diff if time_diff > 0 else 0

        # Update previous time and position
        prev_time = curr_time
        prev_x, prev_y = pred_x, pred_y

        # Classify speed
        speed_classification = classify_speed(speed)
        print(f"Saccade speed: {speed:.2f} pixels/second, classified as: {speed_classification}")

    else:
        # First frame, just set previous position
        prev_x, prev_y = pred_x, pred_y

    # Draw the yellow point representing eye movement
    cv2.circle(frame, (pred_x, pred_y), 5, (0, 255, 255), -1)  # Yellow point

    return frame

# Step 9: Speed classification with adjusted thresholds
def classify_speed(speed):
    if speed < 5:  # Adjusted to detect very fast saccades
        return "very slow"
    elif speed < 25:
        return "slow"
    elif speed < 50:
        return "normal"
    elif speed < 100:
        return "high"
    else:
        return "very high"

# Step 10: Loop to capture, process, and show the frame with the yellow point
try:
    frame_count = 0  # Count frames to provide feedback
    while True:
        frame = get_frame()  # Capture a frame from the webcam
        frame_count += 1

        # Extract the eye region from the frame
        eye_region = extract_eye_region(frame)

        # Preprocess the frame for the model
        input_image = preprocess(eye_region)
        input_image = input_image.unsqueeze(0)  # Add batch dimension

        # Run inference on the frame
        with torch.no_grad():
            prediction = model(input_image)

        # Extract x, y coordinates from the model's output and normalize to [0, 1] range
        pred_x = prediction[0][0].item()  # X coordinate (assuming [-1, 1] output)
        pred_y = prediction[0][1].item()  # Y coordinate (assuming [-1, 1] output)

        # Generate a smaller frame that shows only the yellow point's movement
        yellow_point_frame = draw_eye_movement(pred_x, pred_y)

        # Display the frame with the yellow point in Colab by converting to base64
        _, img_encoded = cv2.imencode('.jpg', yellow_point_frame)
        img_base64 = base64.b64encode(img_encoded).decode('utf-8')
        display(Javascript(f"""
            var img = document.getElementById('point-img');
            if (!img) {{
                img = new Image();
                img.id = 'point-img';
                document.body.appendChild(img);
            }}
            img.src = "data:image/jpeg;base64,{img_base64}";
        """))

        # Add delay to control the frame rate
        time.sleep(0.1)

except KeyboardInterrupt:
    pass
