<a href="https://colab.research.google.com/github/waihongthong/RaspberryPi-object-measurement/blob/main/DrowningYolo8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ultralytics opencv-python torch seaborn torchvision numpy pandas seaborn tqdm matplotlib

In [None]:
import os
import numpy as np
import torch
import cv2
import yaml
from ultralytics import YOLO
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

In [None]:
# Mount Google Drive (for Colab)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
dataset_path = '/content/drive/MyDrive/Final Dataset'
train_path = os.path.join(dataset_path, 'train')
val_path = os.path.join(dataset_path, 'valid')
test_path = os.path.join(dataset_path, 'test')
video_path = os.path.join(dataset_path, 'videos')

In [None]:
# Check if dataset exists
print(f"Train path exists: {os.path.exists(train_path)}")
print(f"Val path exists: {os.path.exists(val_path)}")
print(f"Test path exists: {os.path.exists(test_path)}")
print(f"Video path exists: {os.path.exists(test_path)}")

In [None]:
print("Processing videos for training data...")
extracted_frames_path = os.path.join(dataset_path, 'extracted_frames')
os.makedirs(extracted_frames_path, exist_ok=True)

if os.path.exists(video_path) and os.path.isdir(video_path):
    video_files = [f for f in os.listdir(video_path) if f.endswith(('.mp4', '.avi', '.mov'))]

    # Process each video file
    for video_file in video_files:
        video_filepath = os.path.join(video_path, video_file)
        print(f"Processing video: {video_file}")

        # Determine if video contains drowning (based on filename or your own logic)
        is_drowning = 'drowning' in video_file.lower()
        label_class = 1 if is_drowning else 0  # 1 for drowning, 0 for normal

        # Extract frames
        cap = cv2.VideoCapture(video_filepath)
        frame_count = 0
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)

        # Extract 1 frame per second
        frame_interval = int(fps)

        video_name = os.path.splitext(video_file)[0]

        # Create directory for this video's frames
        video_frames_dir = os.path.join(extracted_frames_path, video_name)
        os.makedirs(video_frames_dir, exist_ok=True)

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            # Process every nth frame (based on frame_interval)
            if frame_count % frame_interval == 0:
                frame_filename = f"{video_name}_frame_{frame_count:05d}.jpg"
                frame_path = os.path.join(video_frames_dir, frame_filename)
                cv2.imwrite(frame_path, frame)

                # Create label file (YOLO format)
                # Format: <class> <x_center> <y_center> <width> <height>
                # We're assuming the full frame may contain a drowning/normal person
                h, w, _ = frame.shape
                label_filename = f"{video_name}_frame_{frame_count:05d}.txt"
                label_path = os.path.join(video_frames_dir, label_filename)

                # Create a basic label file - this is just an example
                # In real use, you would need precise annotations
                with open(label_path, 'w') as f:
                    # Center of the image with 80% width and height
                    f.write(f"{label_class} 0.5 0.5 0.8 0.8\n")

                print(f"  Extracted frame {frame_count//frame_interval} ({frame_count}/{total_frames})")

            frame_count += 1

        cap.release()
        print(f"  Extracted {frame_count//frame_interval} frames from {video_file}")

    # Distribute the extracted frames to train/val/test directories
    print("\nDistributing extracted frames to train/val/test sets...")
    video_dirs = [d for d in os.listdir(extracted_frames_path) if os.path.isdir(os.path.join(extracted_frames_path, d))]

    for video_dir in video_dirs:
        frames_dir = os.path.join(extracted_frames_path, video_dir)
        frame_files = [f for f in os.listdir(frames_dir) if f.endswith('.jpg')]

        # Split frames into train (70%), val (15%), test (15%)
        num_frames = len(frame_files)
        train_split = int(0.7 * num_frames)
        val_split = int(0.15 * num_frames)

        # Shuffle frame files for better distribution
        np.random.shuffle(frame_files)

        for i, frame_file in enumerate(frame_files):
            frame_path = os.path.join(frames_dir, frame_file)
            label_file = frame_file.replace('.jpg', '.txt')
            label_path = os.path.join(frames_dir, label_file)

            # Determine destination directory
            if i < train_split:
                dest_dir = train_path
            elif i < train_split + val_split:
                dest_dir = val_path
            else:
                dest_dir = test_path

            # Copy frame and label files
            shutil.copy(frame_path, os.path.join(dest_dir, frame_file))
            if os.path.exists(label_path):
                shutil.copy(label_path, os.path.join(dest_dir, label_file))
else:
    print("Video directory not found. Skipping video processing.")

In [None]:
# Load pre-trained YOLOv8 model
model = YOLO('yolov8n.pt')

In [None]:
# Create a YAML file for dataset configuration
yaml_content = "path: " + dataset_path + "\n"
yaml_content += "train: " + train_path + "\n"
yaml_content += "val: " + val_path + "\n"
yaml_content += "test: " + test_path + "\n\n"
yaml_content += "# Classes\n"
yaml_content += "names:\n"
yaml_content += "  0: person\n"
yaml_content += "  1: drowning_person\n"

In [None]:
# Write YAML file
yaml_path = os.path.join(dataset_path, 'drowning_dataset.yaml')
with open(yaml_path, 'w') as f:
    f.write(yaml_content)

print("Dataset configuration YAML file created.")

# Train the model
print("Starting model training...")
model.train(
    data=yaml_path,
    epochs=10,
    imgsz=412,
    batch=64,
    workers=8,
    patience=5,
    save=True,
    project='drowning_detection',
    name='yolov8_drowning',
    amp=True,
    cache=True
)

In [None]:
# Load the best trained model
trained_model = YOLO(os.path.join('drowning_detection', 'yolov8_drowning', 'weights', 'best.pt'))

In [None]:
print("\nEvaluating model on test videos...")
test_video_path = os.path.join(dataset_path, 'test_videos')  # Path to test videos

if os.path.exists(test_video_path) and os.path.isdir(test_video_path):
    # Create directory for results
    results_dir = "video_results"
    os.makedirs(results_dir, exist_ok=True)

    video_files = [f for f in os.listdir(test_video_path) if f.endswith(('.mp4', '.avi', '.mov'))]

    # Process each test video
    for video_file in video_files:
        print(f"Processing test video: {video_file}")
        video_path = os.path.join(test_video_path, video_file)

        # Run tracking on video
        results = trained_model.track(source=video_path, save=True, tracker="botsort.yaml",
                                     project=results_dir, name=os.path.splitext(video_file)[0])

        print(f"Processed {video_file} - results saved to {results_dir}/{os.path.splitext(video_file)[0]}")
else:
    print("Test video directory not found. Skipping video evaluation.")

In [None]:
# Initialize variables for metrics calculation
y_true = []
y_pred = []
id_tracker = {}

In [None]:
print("Evaluating model on test dataset...")
test_files = [os.path.join(test_path, f) for f in os.listdir(test_path) if f.endswith(('.jpg', '.jpeg', '.png', ))]

for file in tqdm(test_files):
    # Extract ground truth from filename or corresponding label file
    # This depends on your dataset structure, modify as needed
    label_file = file.replace('.jpg', '.txt').replace('.jpeg', '.txt').replace('.png', '.txt')

    if os.path.exists(label_file):
        with open(label_file, 'r') as f:
            lines = f.readlines()
            gt_classes = [int(line.split()[0]) for line in lines]
            # If any class is 1 (drowning_person), mark as drowning
            is_drowning_gt = 1 in gt_classes
            y_true.append(int(is_drowning_gt))
    else:
        # No label file, assume no drowning
        y_true.append(0)

    # Run detection on the image
    results = trained_model.track(source=file, persist=True, tracker="botsort.yaml")

    # Process the results
    is_drowning_pred = False
    unique_ids = set()

    for r in results:
        if r.boxes.id is not None:  # Check if IDs are available
            boxes = r.boxes.xyxy.cpu().numpy()
            classes = r.boxes.cls.cpu().numpy()
            ids = r.boxes.id.int().cpu().numpy()

            for box, cls, id in zip(boxes, classes, ids):
                # Track unique IDs across frames
                unique_ids.add(id)

                # Update tracker
                if id not in id_tracker:
                    id_tracker[id] = {"frames": 1, "drowning_frames": 1 if cls == 1 else 0}
                else:
                    id_tracker[id]["frames"] += 1
                    if cls == 1:
                        id_tracker[id]["drowning_frames"] += 1

                # If class is 1 (drowning_person), mark as drowning
                if cls == 1:
                    is_drowning_pred = True

    y_pred.append(int(is_drowning_pred))

In [None]:
# Calculate metrics
print("\nPerformance Metrics:")
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

In [None]:
# Detailed classification report
print("\nClassification Report:")
target_names = ['Normal', 'Drowning']
report = classification_report(y_true, y_pred, target_names=target_names)
print(report)

In [None]:
print("\nUnique ID Statistics:")
for id, stats in id_tracker.items():
    drowning_percentage = (stats["drowning_frames"] / stats["frames"]) * 100
    status = "POTENTIAL DROWNING" if drowning_percentage > 50 else "NORMAL"
    print(f"ID {id:02d}: {status} ({drowning_percentage:.1f}% drowning frames)")

In [None]:
# Visualize results with a confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

In [None]:
# Save some example detections from test set
os.makedirs('detection_results', exist_ok=True)
sample_test_files = test_files[:5]  # Take first 5 test images

In [None]:
print("\nGenerating sample detection visualizations...")
for i, file in enumerate(sample_test_files):
    results = trained_model.track(source=file, persist=True, tracker="botsort.yaml")

    for r in results:
        im_array = r.plot(line_width=2, font_size=1)  # Plot with detections and tracking IDs
        img = cv2.cvtColor(im_array, cv2.COLOR_BGR2RGB)
        output_path = f"detection_results/sample_{i+1}.jpg"
        cv2.imwrite(output_path, img)
        print(f"Saved detection result to {output_path}")

print("\nDrowning detection evaluation complete!")