In [None]:
# Install YOLOv8 and its dependencies
!pip install ultralytics --upgrade

# Install OpenCV
!pip install opencv-python-headless




In [None]:
import os
import cv2
from pathlib import Path
from ultralytics import YOLO
from ultralytics.trackers import register_tracker
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import threading
import torch
import torchvision.transforms as transforms
from PIL import Image
import torchvision.models as models

# Step 1: Define architectures
resnet_model = models.resnet50(pretrained=False)
resnet_model_neck = models.resnet50(pretrained=False)

num_features = resnet_model.fc.in_features
resnet_model.fc = torch.nn.Linear(num_features, 3)        # 3 classes
resnet_model_neck.fc = torch.nn.Linear(num_features, 3)   # 3 classes for neck

# Step 2: Load weights
resnet_model.load_state_dict(torch.load("/content/drive/MyDrive/FYP models/models/resnet50(3_fullchicken)/resnet50_3class_best_full.pth", map_location="cuda" if torch.cuda.is_available() else "cpu"))
resnet_model_neck.load_state_dict(torch.load("/content/drive/MyDrive/FYP models/models/resnet50(3_neck)/resnet50_3class_best_neck.pth", map_location="cuda" if torch.cuda.is_available() else "cpu"))

# Step 3: Set device and eval
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet_model = resnet_model.to(device)
resnet_model_neck = resnet_model_neck.to(device)

resnet_model.eval()
resnet_model_neck.eval()

# Transform (must match your training)
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),  # or whatever your input size is
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Class labels full body
CLASS_LABELS = {
    0: "Marek",
    1: "Disease",
    2: "Healthy"
}
# Class labels neck area
CLASS_LABELS1 = {
    0: "infectious_coryza",
    1: "fowl_pox",
    2: "Healthy"
}
# Constants
DETECTION_MODEL_PATH1 = "/content/drive/MyDrive/FYP models/models/yolo_detection/best .pt"
DETECTION_MODEL_PATH2 = "/content/drive/MyDrive/FYP models/models/neck_detection/best.pt"
# Load YOLO models
model = YOLO(DETECTION_MODEL_PATH1)
model_neck = YOLO(DETECTION_MODEL_PATH2)
QUALITY_THRESHOLDS1 = {
    "blur": 23.0,             # Adjust based on how sharp your objects are
}
QUALITY_THRESHOLDS_Neck = {
    "blur": 19.0,             # Adjust based on how sharp your objects are
}
#object_predictions = {}  # {track_id: {"quality": quality, "prediction": "Healthy"}}
max_threads = 22
thread_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=max_threads)



In [None]:

def calculate_quality(frame, bbox,label1="body"):
    x1, y1, x2, y2 = bbox
    crop = frame[y1:y2, x1:x2]
    if label1=="body":
        QUALITY_THRESHOLDS = QUALITY_THRESHOLDS1
    else:
        QUALITY_THRESHOLDS = QUALITY_THRESHOLDS_Neck
    if crop.size == 0:
        return 0.0, False  # avoid crash

    # Convert to grayscale for blur & brightness
    gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)

    blur = cv2.Laplacian(gray, cv2.CV_64F).var()
    brightness = np.mean(gray)



    # Check thresholds
    quality_ok = (
        blur > QUALITY_THRESHOLDS["blur"]
    )

    # Combine into one score if needed (optional)
    combined_score = (blur + brightness) / 2

    return combined_score, quality_ok


In [None]:
def ensure_output_dir(input_path):
    input_path = Path(input_path)
    if input_path.is_file():
        out_dir = input_path.parent / f"{input_path.stem}_output_yolo"
    else:
        out_dir = input_path / "output_yolo"
    os.makedirs(out_dir, exist_ok=True)
    return out_dir

In [None]:
def process_single_image(img_path, output_dir):
    frame = cv2.imread(str(img_path))
    results = model.predict(frame, conf=0.5, iou=0.7)[0]  # Only detection, no tracking

    temp_id = 0

    if results.boxes is not None and len(results.boxes) > 0:
        for box in results.boxes.xyxy:
            x1, y1, x2, y2 = map(int, box)
            track_id = temp_id
            temp_id += 1

            try:
                quality, quality_ok = calculate_quality(frame, (x1, y1, x2, y2))
                crop = frame[y1:y2, x1:x2]

                if quality_ok:
                    pred_label = predict_with_resnet(crop)

                    # Assign color based on prediction
                    if pred_label == "Healthy":
                        box_color = (0, 255, 0)
                    elif pred_label == "Marek":
                        box_color = (0, 165, 255)
                    elif pred_label == "Disease":
                        box_color = (0, 0, 255)

                    # 🧠 Neck classification without checking containment
                    neck_results = model_neck.predict(frame, conf=0.5, iou=0.5)[0]
                    if neck_results.boxes is not None and len(neck_results.boxes) > 0:
                        for neck_box in neck_results.boxes.xyxy:
                            nx1, ny1, nx2, ny2 = map(int, neck_box)
                            try:
                                neck_crop = frame[ny1:ny2, nx1:nx2]
                                neck_quality, neck_quality_ok = calculate_quality(frame, (nx1, ny1, nx2, ny2),"neck")
                                if neck_quality_ok:
                                    neck_pred = predict_with_resnet_neck(neck_crop)

                                    if neck_pred != "Healthy":
                                        neck_color = (0, 165, 255) if neck_pred == "infectious_coryza" else (0, 0, 255)
                                        cv2.rectangle(frame, (nx1, ny1), (nx2, ny2), neck_color, 2)
                                        cv2.putText(frame, f"Neck: {neck_pred}", (nx1, ny2 + 20),
                                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, neck_color, 2)
                            except Exception as e:
                                print(f"❌ Neck classification error: {e}")
                else:
                    pred_label = "LowQuality"
                    box_color = (255, 255, 255)

                # 🟩 Draw full-body bounding box
                cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)

                # 🏷️ Label
                label = f"{pred_label} (ID:{track_id})"
                label_y = y1 - 10 if y1 - 10 > 10 else y1 + 20
                cv2.putText(frame, label, (x1, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 2)

                # 📏 Quality score
                q_color = (0, 255, 255) if quality_ok else (0, 0, 255)
                cv2.putText(frame, f"Q:{quality:.2f}", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, q_color, 2)

            except Exception as e:
                print(f"❌ Error in {Path(img_path).name}: {e}")

    else:
        print(f"🚫 No objects detected in image: {Path(img_path).name}")

    out_path = os.path.join(output_dir, f"{Path(img_path).stem}_classified.jpg")
    cv2.imwrite(out_path, frame)
    print(f"🖼️ Saved: {out_path}")


In [None]:
def process_image_directory(input_dir, output_dir):
    image_files = [f for f in Path(input_dir).glob("*") if f.suffix.lower() in [".jpg", ".png", ".jpeg"]]
    for img_path in image_files:
        process_single_image(img_path, output_dir)

In [None]:
def quality_and_classify(track_id, frame, box):
    x1, y1, x2, y2 = box
    crop = frame[y1:y2, x1:x2]

    try:
        # ✅ Validate crop size to prevent model errors
        if crop.size == 0 or crop.shape[0] < 10 or crop.shape[1] < 10:
            print(f"[Crop Error] ID {track_id}: Invalid crop")
            return

        # ✅ Assess quality
        quality, quality_ok = calculate_quality(frame, box)

        # ✅ Proceed only if quality is sufficient and better than previous
        if quality_ok:
            with thread_lock:
                last = object_predictions.get(track_id)
                if last is None or quality > last["quality"]:
                    print(f"[Classify Start] ID {track_id}")
                    prediction = predict_with_resnet(crop)
                    print(f"[Classify Done] ID {track_id}")
                    object_predictions[track_id] = {
                        "quality": quality,
                        "prediction": prediction
                    }

    except Exception as e:
        print(f"[Thread error] ID {track_id}: {e}")


In [None]:
def classify_neck_area(frame, full_body_box):
    try:
        neck_results = model_neck.predict(frame, conf=0.5, iou=0.5)[0]

        if neck_results.boxes is not None and len(neck_results.boxes) > 0:
            for neck_box in neck_results.boxes.xyxy:
                nx1, ny1, nx2, ny2 = map(int, neck_box)

                # ✅ Only consider neck boxes inside full-body box
                fx1, fy1, fx2, fy2 = full_body_box
                if not (fx1 <= nx1 <= fx2 and fx1 <= nx2 <= fx2 and
                        fy1 <= ny1 <= fy2 and fy1 <= ny2 <= fy2):
                    continue  # skip neck boxes outside tracked chicken

                neck_crop = frame[ny1:ny2, nx1:nx2]

                try:
                    neck_quality, neck_quality_ok = calculate_quality(frame, (nx1, ny1, nx2, ny2), "neck")
                    if neck_quality_ok:
                        neck_pred = predict_with_resnet_neck(neck_crop)

                        # ✅ Only draw if not healthy
                        if neck_pred != "Healthy":
                            neck_color = (0, 165, 255) if neck_pred == "infectious_coryza" else (0, 0, 255)
                            cv2.rectangle(frame, (nx1, ny1), (nx2, ny2), neck_color, 2)
                            cv2.putText(frame, f"Neck: {neck_pred}", (nx1, ny2 + 20),
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, neck_color, 2)

                except Exception as e:
                    print(f"[Neck Detection Error] {e}")
    except Exception as e:
        print(f"[Neck Detection Outer Error] {e}")


In [None]:
def process_video(video_path, output_dir):
    cap = cv2.VideoCapture(str(video_path))
    out_video_path = os.path.join(output_dir, f"{Path(video_path).stem}_tracked.mp4")

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    writer = cv2.VideoWriter(out_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    frame_buffer = []  # [(frame, [(box, track_id)])]

    print("🔄 Starting Phase 1: Detection, Tracking, Classification")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = model.track(frame, persist=True, conf=0.5, iou=0.5, tracker="botsort.yaml")[0]
        detections = []

        if results.boxes.id is not None and len(results.boxes.id) > 0:
            for box, track_id in zip(results.boxes.xyxy, results.boxes.id):
                x1, y1, x2, y2 = map(int, box)
                track_id = int(track_id)

                # Launch classification threads
                executor.submit(quality_and_classify, track_id, frame.copy(), (x1, y1, x2, y2))
                executor.submit(classify_neck_area, frame.copy(), (x1, y1, x2, y2))

                detections.append(((x1, y1, x2, y2), track_id))
        frame_buffer.append((frame.copy(), detections))

    cap.release()

    print("⏳ Waiting for all threads to finish...")
    executor.shutdown(wait=True)
    print("✅ All classifications completed")

    print("🖊️ Starting Phase 2: Annotating and Writing Video")

    for frame, detections in frame_buffer:
        for (x1, y1, x2, y2), track_id in detections:
            pred_label = "Unknown"
            pred_color = (128, 128, 128)
            quality = 0.0

            prediction_info = object_predictions.get(track_id)
            if prediction_info:
                pred_label = prediction_info["prediction"]
                quality = prediction_info["quality"]

                if pred_label == "Healthy":
                    pred_color = (0, 255, 0)
                elif pred_label == "Marek":
                    pred_color = (0, 165, 255)
                elif pred_label == "Disease":
                    pred_color = (0, 0, 255)
            else:
                pred_color=(255,255,255)
                pred_label = "LowQuality"

            # Draw final predictions
            cv2.rectangle(frame, (x1, y1), (x2, y2), pred_color, 2)
            cv2.putText(frame, f"ID: {track_id}", (x1, y1 - 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            cv2.putText(frame, pred_label, (x1, y2 + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, pred_color, 2)
            cv2.putText(frame, f"Q:{quality:.2f}", (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

        writer.write(frame)

    writer.release()
    print(f"✅ Tracked video saved to: {out_video_path}")


In [None]:
def predict_with_resnet(crop):
    image = transform(crop).unsqueeze(0).to(device)  # [1, 3, 224, 224]
    with torch.no_grad():
        outputs = resnet_model(image)
        _, predicted = torch.max(outputs, 1)
        label = CLASS_LABELS[predicted.item()]
    return label



In [None]:
def predict_with_resnet_neck(crop):
    image = transform(crop).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = resnet_model_neck(image)
        _, predicted = torch.max(outputs, 1)
        label = CLASS_LABELS1[predicted.item()]
    return label


In [None]:
def run_detection_and_tracking(input_path):
    input_path = Path(input_path)
    output_dir = ensure_output_dir(input_path)

    if input_path.is_file() and input_path.suffix.lower() in [".jpg", ".png", ".jpeg"]:
        process_single_image(input_path, output_dir)

    elif input_path.is_file() and input_path.suffix.lower() in [".mp4", ".avi", ".mov"]:
        # 🔄 Reset object_predictions to avoid contamination between videos
        global object_predictions
        object_predictions = {}  # {track_id: {"quality": quality, "prediction": "Healthy"}}
        if hasattr(model, "predictor") and model.predictor is not None and hasattr(model.predictor, "tracker"):
            model.predictor.tracker = None

        process_video(input_path, output_dir)

    elif input_path.is_dir():
        process_image_directory(input_path, output_dir)

    else:
        print("Unsupported input format.")



In [None]:
#run_detection_and_tracking("/content/drive/MyDrive/FYP models/test_models/Test_images/chicken-4392787.jpg")
#run_detection_and_tracking("/content/drive/MyDrive/FYP models/test_models/Test_images")
run_detection_and_tracking("/content/drive/MyDrive/FYP models/test_models/test_videos/165148-832090850.mp4")
#/content/drive/MyDrive/FYP models/test_models/test_videos/MXClip_N_C_D.mp4
#/content/drive/MyDrive/FYP models/test_models/test_videos/130614-747879918_large.mp4
#/content/drive/MyDrive/FYP models/test_models/test_videos/165148-832090850.mp4
#/content/drive/MyDrive/FYP models/test_models/test_videos/190142-887464235.mp4
#/content/drive/MyDrive/FYP models/test_models/test_videos/MXClip_CHICKEN FARMING, NEWCASTLE DISEASE Symptoms & Signs in Broiler, Poultry Diseases_2025_03_20_11_59_03_output_yolo

🔄 Starting Phase 1: Detection, Tracking, Classification

0: 384x640 1 chicken, 161.0ms
Speed: 1.4ms preprocess, 161.0ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)
[Classify Start] ID 1

[Classify Done] ID 1
0: 384x640 1 chicken, 176.7ms
Speed: 1.6ms preprocess, 176.7ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

Ultralytics 8.3.135 🚀 Python-3.11.12 torch-2.6.0+cpu CPU (AMD EPYC 9B14)

0: 384x640 1 chicken, 304.1ms
Speed: 1.8ms preprocess, 304.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

Ultralytics 8.3.135 🚀 Python-3.11.12 torch-2.6.0+cpu CPU (AMD EPYC 9B14)


0: 384x640 1 chicken, 224.8ms
Speed: 1.8ms preprocess, 224.8ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)


0: 384x640 1 neck, 303.9ms
Speed: 1.9ms preprocess, 303.9ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)
0: 384x640 1 chicken, 175.5ms
Speed: 1.3ms preprocess, 175.5ms inference, 0.6ms postprocess per image at shap

KeyboardInterrupt: 