In [2]:
!pip install opencv-python opencv-python-headless



In [3]:
import os
import zipfile

# Define dataset path
dataset_zip_path = '/content/dataset.zip'  # Adjust the file name if needed
dataset_extract_path = '/content/dataset'

# Unzip the dataset
with zipfile.ZipFile(dataset_zip_path, 'r') as zip_ref:
    zip_ref.extractall(dataset_extract_path)

print("Dataset extracted successfully!")

Dataset extracted successfully!


In [4]:
import cv2

def extract_frames_at_intervals(video_path, output_dir, interval=10, resize_shape=(224, 224)):
    """
    Extract frames from a video at a fixed interval and save them in a directory.

    :param video_path: Path to the video file.
    :param output_dir: Directory to save the frames.
    :param interval: Number of frames to skip between each extraction.
    :param resize_shape: Tuple (width, height) to resize the frames.
    """
    try:
        os.makedirs(output_dir, exist_ok=True)
        video = cv2.VideoCapture(video_path)
        if not video.isOpened():
            print(f"Error opening video file: {video_path}")
            return

        frame_count = 0
        saved_frame_count = 0

        while True:
            ret, frame = video.read()
            if not ret:
                break

            # Save frame at the specified interval
            if frame_count % interval == 0:
                frame = cv2.resize(frame, resize_shape)
                frame_path = os.path.join(output_dir, f"frame_{saved_frame_count:04d}.jpg")
                cv2.imwrite(frame_path, frame)
                saved_frame_count += 1

            frame_count += 1

        video.release()
        print(f"Extracted {saved_frame_count} frames from {video_path} at interval {interval} and saved to {output_dir}")
    except Exception as e:
        print(f"Error extracting frames: {e}")

# Use the correct video path
video_path = '/content/dataset/dataset/003_anonymized.mp4'
output_dir = '/content/frames'  # Directory to save extracted frames

# Extract frames at intervals of 10
extract_frames_at_intervals(video_path, output_dir, interval=10)


Extracted 5911 frames from /content/dataset/dataset/003_anonymized.mp4 at interval 10 and saved to /content/frames


In [5]:
import numpy as np
import shutil

def compare_and_classify_frames_with_motion_detection(input_dir, output_dir, motion_threshold=500):
    """
    Use motion detection to classify frames as robot_off or robot_working.

    :param input_dir: Directory containing the extracted frames.
    :param output_dir: Base directory for saving classified frames.
    :param motion_threshold: Minimum number of changed pixels to classify as working.
    """
    try:
        # Create directories for robot_off and robot_working
        off_dir = os.path.join(output_dir, "robot_off")
        working_dir = os.path.join(output_dir, "robot_working")
        os.makedirs(off_dir, exist_ok=True)
        os.makedirs(working_dir, exist_ok=True)

        # Sort frame files to ensure proper sequence
        frame_files = sorted(os.listdir(input_dir))
        frame_paths = [os.path.join(input_dir, file) for file in frame_files]

        for i in range(1, len(frame_paths)):
            prev_frame = cv2.imread(frame_paths[i - 1], cv2.IMREAD_GRAYSCALE)
            curr_frame = cv2.imread(frame_paths[i], cv2.IMREAD_GRAYSCALE)

            # Skip if frames cannot be read
            if prev_frame is None or curr_frame is None:
                print(f"Skipping frame {frame_files[i]} due to read error.")
                continue

            # Compute absolute difference
            diff = cv2.absdiff(curr_frame, prev_frame)

            # Apply a binary threshold
            _, diff_thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)

            # Count non-zero pixels
            motion_pixels = cv2.countNonZero(diff_thresh)

            # Classify based on motion detection
            if motion_pixels > motion_threshold:
                # Robot is working
                shutil.copy(frame_paths[i], os.path.join(working_dir, frame_files[i]))
            else:
                # Robot is off
                shutil.copy(frame_paths[i], os.path.join(off_dir, frame_files[i]))

        print(f"Frames classified and saved to {output_dir}")

    except Exception as e:
        print(f"Error classifying frames: {e}")

# Example usage
input_frames_dir = '/content/frames'  # Directory where reduced frames are stored
classified_output_dir = '/content/classified_frames'  # Directory to save classified frames

# Classify the frames
compare_and_classify_frames_with_motion_detection(input_frames_dir, classified_output_dir, motion_threshold=500)

Frames classified and saved to /content/classified_frames


In [6]:
for folder in ["robot_off", "robot_working"]:
    folder_path = os.path.join(classified_output_dir, folder)
    if os.path.exists(folder_path):
        print(f"Found {folder}: {len(os.listdir(folder_path))} frames")
    else:
        print(f"{folder} not found")

Found robot_off: 3985 frames
Found robot_working: 1925 frames


In [7]:
import os
import cv2
import numpy as np
from tensorflow.keras.utils import Sequence

class FrameDataGenerator(Sequence):
    """
    Data generator for loading frames in batches during training.
    """
    def __init__(self, base_dir, label_map, batch_size=32, resize_shape=(224, 224), shuffle=True):
        self.base_dir = base_dir
        self.label_map = label_map
        self.batch_size = batch_size
        self.resize_shape = resize_shape
        self.shuffle = shuffle
        self.file_paths, self.labels = self._load_file_paths()
        self.on_epoch_end()

    def _load_file_paths(self):
        """
        Load file paths and corresponding labels from the base directory.
        """
        file_paths = []
        labels = []
        for folder, label in self.label_map.items():
            folder_path = os.path.join(self.base_dir, folder)
            if not os.path.exists(folder_path):
                print(f"Warning: Folder {folder} does not exist.")
                continue
            for file in os.listdir(folder_path):
                if file.endswith(".jpg"):
                    file_paths.append(os.path.join(folder_path, file))
                    labels.append(label)
        return file_paths, labels

    def __len__(self):
        """
        Number of batches per epoch.
        """
        return int(np.ceil(len(self.file_paths) / self.batch_size))

    def __getitem__(self, index):
        """
        Generate one batch of data.
        """
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        batch_paths = self.file_paths[start:end]
        batch_labels = self.labels[start:end]

        images = []
        for file_path in batch_paths:
            img = cv2.imread(file_path)
            if img is None:
                continue
            img = cv2.resize(img, self.resize_shape) / 255.0  # Normalize
            images.append(img)
        return np.array(images), np.array(batch_labels)

    def on_epoch_end(self):
        """
        Shuffle data at the end of each epoch.
        """
        if self.shuffle:
            combined = list(zip(self.file_paths, self.labels))
            np.random.shuffle(combined)
            self.file_paths, self.labels = zip(*combined)

In [8]:
# Define label map
label_map = {"robot_off": 0, "robot_working": 1}

# Define paths
classified_data_dir = '/content/classified_frames'

# Create generators
batch_size = 32
train_generator = FrameDataGenerator(base_dir=classified_data_dir, label_map=label_map, batch_size=batch_size, resize_shape=(224, 224), shuffle=True)
test_generator = FrameDataGenerator(base_dir=classified_data_dir, label_map=label_map, batch_size=batch_size, resize_shape=(224, 224), shuffle=False)

In [9]:
# Define the CNN model (same as before)
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

def create_cnn_model(input_shape):
    """
    Create a CNN model for binary classification.
    """
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        MaxPooling2D(pool_size=(2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D(pool_size=(2, 2)),
        Flatten(),
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')  # Binary classification
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

input_shape = (224, 224, 3)
model = create_cnn_model(input_shape)

# Train the model using the generator
model.fit(train_generator, validation_data=test_generator, epochs=10)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/10


  self._warn_if_super_not_called()


[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m774s[0m 4s/step - accuracy: 0.7416 - loss: 1.0353 - val_accuracy: 0.8794 - val_loss: 0.3050
Epoch 2/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m785s[0m 4s/step - accuracy: 0.8879 - loss: 0.3012 - val_accuracy: 0.9115 - val_loss: 0.2815
Epoch 3/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m802s[0m 4s/step - accuracy: 0.8882 - loss: 0.2644 - val_accuracy: 0.9140 - val_loss: 0.1964
Epoch 4/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m822s[0m 4s/step - accuracy: 0.9089 - loss: 0.2223 - val_accuracy: 0.9267 - val_loss: 0.1706
Epoch 5/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m794s[0m 4s/step - accuracy: 0.9224 - loss: 0.1929 - val_accuracy: 0.9315 - val_loss: 0.1600
Epoch 6/10
[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m754s[0m 4s/step - accuracy: 0.9194 - loss: 0.1877 - val_accuracy: 0.9360 - val_loss: 0.1385
Epoch 7/10
[1m185/185[0m [32m━

<keras.src.callbacks.history.History at 0x7e0cbbc4bdc0>

In [10]:
# Evaluate the model
loss, accuracy = model.evaluate(test_generator)
print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

[1m185/185[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m177s[0m 956ms/step - accuracy: 0.9642 - loss: 0.0840
Test Loss: 0.11431056261062622
Test Accuracy: 0.9543147087097168


In [14]:
import cv2
import os
import pandas as pd

def detect_start_stop_timings(input_dir, threshold=500, fps=30):
    """
    Detect start and stop timings of a cobot based on motion in frames.

    :param input_dir: Directory containing the extracted frames.
    :param threshold: Minimum number of changed pixels to detect motion.
    :param fps: Frames per second of the video, used to calculate time.
    :return: DataFrame with start and stop timings.
    """
    # Sort frame files to ensure proper sequence
    frame_files = sorted(os.listdir(input_dir))
    frame_paths = [os.path.join(input_dir, file) for file in frame_files]

    start_time = None
    stop_time = None
    timings = []

    for i in range(1, len(frame_paths)):
        prev_frame = cv2.imread(frame_paths[i - 1], cv2.IMREAD_GRAYSCALE)
        curr_frame = cv2.imread(frame_paths[i], cv2.IMREAD_GRAYSCALE)

        # Skip if frames cannot be read
        if prev_frame is None or curr_frame is None:
            print(f"Skipping frame {frame_files[i]} due to read error.")
            continue

        # Compute absolute difference
        diff = cv2.absdiff(curr_frame, prev_frame)

        # Apply a binary threshold
        _, diff_thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)

        # Count non-zero pixels
        motion_pixels = cv2.countNonZero(diff_thresh)

        # Detect start and stop times
        if motion_pixels > threshold:
            if start_time is None:
                # Start detected
                start_time = i / fps  # Convert frame number to time
        else:
            if start_time is not None:
                # Stop detected
                stop_time = (i - 1) / fps  # Convert frame number to time
                timings.append({"Start Time (s)": start_time, "Stop Time (s)": stop_time})
                start_time = None  # Reset start time for the next motion segment

    # Handle case where video ends during motion
    if start_time is not None:
        timings.append({"Start Time (s)": start_time, "Stop Time (s)": len(frame_paths) / fps})

    # Convert to DataFrame
    timing_df = pd.DataFrame(timings)
    return timing_df

# Example usage
input_frames_dir = '/content/frames'
fps = 30  # Frames per second of the video
timing_df = detect_start_stop_timings(input_frames_dir, threshold=500, fps=fps)

# Display the table
print("Cobot Start and Stop Timings:")
print(timing_df)

# Optionally save the table to a CSV file
timing_df.to_csv('/content/cobot_timing.csv', index=False)
print("Timings saved to cobot_timing.csv")

Cobot Start and Stop Timings:
     Start Time (s)  Stop Time (s)
0          0.100000       0.100000
1          0.166667       0.233333
2          0.333333       0.566667
3          0.700000       1.166667
4          1.333333       1.400000
..              ...            ...
226      185.033333     185.233333
227      189.366667     189.633333
228      190.700000     190.966667
229      193.766667     193.866667
230      193.933333     195.200000

[231 rows x 2 columns]
Timings saved to cobot_timing.csv


In [16]:
model.save('/content/robot_classification_model.h5')
print("Model saved successfully!")



Model saved successfully!


In [17]:
from tensorflow.keras.models import load_model

loaded_model = load_model('/content/robot_classification_model.h5')
print("Model loaded successfully!")



Model loaded successfully!
