In [3]:
import os


In [4]:
os.chdir('../')
%pwd

'c:\\Users\\User\\Python\\pool_management_system'

In [23]:
from src.constants import CONFIG_FILE_PATH
from src.utils.utils import  read_yaml, write_yaml
from src.utils.preprocess_utils import  create_yolo_data_yaml

config = read_yaml(CONFIG_FILE_PATH)


CLASSES_TXT_PATH  = config["CLASSES_TXT_PATH"]
DATA_YAML_PATH    = config["MODEL"]["DATA_YAML_PATH"]
DATA_PATH = './data'
create_yolo_data_yaml(DATA_PATH, CLASSES_TXT_PATH, DATA_YAML_PATH)



File not found: ./artifact/original_data/classes.txt


## Check Bias

In [13]:
label_dir = "data/labels"

empty_count = 0
occupied_count = 0

for label_file in os.listdir(label_dir):
    with open(os.path.join(label_dir, label_file), "r") as f:
        for line in f.readlines():
            class_id = line.split()[0]  # First number is class ID
            if class_id == "0":
                empty_count += 1
            elif class_id == "1":
                occupied_count += 1

print(f"Empty Chairs: {empty_count}")
print(f"Occupied Chairs: {occupied_count}")


Empty Chairs: 105
Occupied Chairs: 71


# Test Image

In [16]:
# Load your trained YOLOv8 model
model = YOLO("model/my_model.pt")  # Update path if needed

# Load a test image
image_path = "test_images/side_3.jpg"  
image = cv2.imread(image_path)

# Run inference
results = model(image, imgsz=image.shape[:2])

# Draw detections on the image (manually, no cropping)
for r in results:
    for box in r.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box coordinates
        class_id = int(box.cls[0])  # Class index
        confidence = box.conf[0]  # Confidence score
        
        # Class names (ensure these match your training labels)
        class_names = ["empty_chair", "occupied_chair"]
        label = f"{class_names[class_id]} {confidence:.2f}"
        
        # Draw bounding box and label
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, label, (x1, y1 - 10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

# Save and display result
output_path = "output/detected_pool_test.jpg"
cv2.imwrite(output_path, image)

cv2.imshow("YOLOv8 Detection", image)
cv2.waitKey(0)
cv2.destroyAllWindows()



0: 2752x4160 1 occupied-chair, 10 unoccupied-chairs, 51172.3ms
Speed: 225.4ms preprocess, 51172.3ms inference, 33.2ms postprocess per image at shape (1, 3, 2752, 4160)


## Test Video

In [None]:
from ultralytics import YOLO
import cv2

# Load your fine-tuned model
model = YOLO("runs/train/exp/weights/best.pt")  # Update path if needed

# Load the test video
video_path = "test_videos/pool_video.mp4"
cap = cv2.VideoCapture(video_path)

# Get video properties
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define video writer to save output
out = cv2.VideoWriter("output/pool_output.mp4", 
                      cv2.VideoWriter_fourcc(*'mp4v'), fps, 
                      (frame_width, frame_height))

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Run YOLOv8 inference on the frame
    results = model(frame)

    # Draw detections on the frame
    for r in results:
        for box in r.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box coordinates
            class_id = int(box.cls[0])  # Class index
            confidence = box.conf[0]  # Confidence score
            
            # Class names (ensure this matches your trained labels)
            class_names = ["empty_chair", "occupied_chair"]
            label = f"{class_names[class_id]} {confidence:.2f}"
            
            # Draw bounding box and label
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Write frame to output video
    out.write(frame)

    # Show frame (optional, remove for headless execution)
    cv2.imshow("YOLOv8 Pool Chair Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Cleanup
cap.release()
out.release()
cv2.destroyAllWindows()


In [None]:
import os
import logging
from pathlib import Path
from fastapi import FastAPI, File, UploadFile, Request, Response, Query
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from fastapi.templating import Jinja2Templates
import tempfile
from src.scripts.inference import load_model, read_image, run_inference_on_video
import cv2
import numpy as np
import base64

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# FastAPI app setup
app = FastAPI(title="YOLOv8 Inference API", version="1.0")

# Enable CORS for frontend access
origins = ["*"]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

# Set up templates
templates = Jinja2Templates(directory="templates")

# Model path
MODEL_PATH = Path(os.getenv("MODEL_PATH", "final_model/train/weights/best.pt")).resolve()
logger.info(f"Using YOLO model from: {MODEL_PATH}")

# Load YOLO model at startup
try:
    model = load_model(str(MODEL_PATH))
    logger.info("YOLO model loaded successfully")
except Exception as e:
    logger.error(f"Model loading failed: {e}")
    model = None  # Prevent crashes if model isn't found

@app.get("/", tags=["root"])
async def index():
    """Redirects to API docs."""
    return RedirectResponse(url="/docs")

@app.post("/predict/")
async def predict(
    request: Request,
    file: UploadFile = File(...),
    conf_threshold: float = 0.5 
):
    """
    Accepts an image file, runs YOLO inference, and returns an HTML page with the processed image.
    
    Query Parameters:
    - `conf_threshold` (float, default=0.5): Minimum confidence score to display detections.

    Returns:
    - An HTML page displaying the processed image and detected objects.
    """
    if model is None:
        return {"error": "🚨 Model is not loaded. Check logs for issues."}

    try:
        # Read and process image
        image_bytes = await file.read()
        processed_img = read_image(model, image_bytes, conf_threshold=conf_threshold)

        # Encode the processed image as JPEG
        _, encoded_image = cv2.imencode('.jpg', processed_img)

        return Response(content=encoded_image.tobytes(), media_type="image/jpeg")

    except Exception as e:
        logger.error(f"Inference failed: {e}")
        return {"error": str(e)}

@app.post("/predict/video/")
async def predict_video(
    file: UploadFile = File(...),
    conf_threshold: float = Query(0.5, description="Confidence threshold for detections")
):
    """
    Accepts a video file, runs YOLO inference frame-by-frame,
    and returns the processed video as a response.

    Query Parameters:
    - `conf_threshold` (float, default=0.5): Minimum confidence to display detections.
    
    Returns:
    - A video (mp4) with bounding boxes drawn on each frame.
    """
    if model is None:
        return {"error": "Model is not loaded. Check logs for issues."}
    
    try:
        # Save the uploaded video to a temporary file
        temp_input = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
        temp_input_path = temp_input.name
        temp_input.close()
        with open(temp_input_path, "wb") as f:
            f.write(await file.read())
        logger.info(f"Video saved to temp file: {temp_input_path}")

        # Create a temporary file for the processed output video
        temp_output = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
        temp_output_path = temp_output.name
        temp_output.close()

        # Run inference on video: this function writes processed frames to temp_output_path.
        run_inference_on_video(model, temp_input_path, conf_threshold=conf_threshold, imgsz=640)

        # Read the processed video and return it as the response
        with open(temp_output_path, "rb") as processed_video:
            video_bytes = processed_video.read()

        return Response(content=video_bytes, media_type="video/mp4")

    except Exception as e:
        logger.error(f"Video inference failed: {e}")
        return {"error": str(e)}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)
