In [57]:
import os
import shutil
import random
import json
import yaml
import pandas as pd
from pathlib import Path
from PIL import Image

project_root = Path.cwd()
print(f"directory: {project_root}")

dirs_to_create = [
    project_root / "Images",
    project_root / "Annotation_CSV_Files",
    project_root / "labels",
    project_root / "dataset" / "images" / "train",
    project_root / "dataset" / "images" / "val",
    project_root / "dataset" / "labels" / "train",
    project_root / "dataset" / "labels" / "val",
    project_root / "test_results"
]

for directory in dirs_to_create:
    directory.mkdir(parents=True, exist_ok=True)
print("structure created")


directory: /Users/larasabha/Desktop/Candy-object-detection
structure created


In [58]:
def organize_dataset_files(root_dir=project_root):
    """
    Images -> Images/
    CSV annotation files -> Annotation_CSV_Files/
    """
    print("organizing dataset files")

    archive_folders = [d for d in root_dir.iterdir() if d.is_dir() and 'candy' in d.name.lower()]
    if archive_folders:
        archive_folder = archive_folders[0]
        print(f"found archive folder: {archive_folder}")
        source_img_folder = archive_folder / "Images"
        source_csv_folder = archive_folder / "Annotation CSV Files"
    else:
        print("no archive folder found")
        source_img_folder = root_dir / "Images"
        source_csv_folder = root_dir / "Annotation CSV Files"

    dest_img_folder = root_dir / "Images"
    dest_csv_folder = root_dir / "Annotation_CSV_Files"
    
    moved_images = 0
    moved_csvs = 0

    if source_img_folder.exists():
        for file in source_img_folder.iterdir():
            if file.suffix.lower() in ['.jpg', '.jpeg', '.JPG', '.JPEG', '.png', '.PNG']:
                dst = dest_img_folder / file.name
                if not dst.exists():
                    file.rename(dst)  # move file
                    moved_images += 1
        print(f"moved {moved_images} images to {dest_img_folder}")
    else:
        print(f"images folder not found at {source_img_folder}")

    if source_csv_folder.exists():
        for file in source_csv_folder.iterdir():
            if file.suffix.lower() == '.csv':
                dst = dest_csv_folder / file.name
                if not dst.exists():
                    file.rename(dst)
                    moved_csvs += 1
        print(f"moved {moved_csvs} CSV files to {dest_csv_folder}")
    else:
        print(f"CSV folder not found at {source_csv_folder}")

    final_images = len(list(dest_img_folder.glob("*.[jJ][pP][gG]"))) + \
                   len(list(dest_img_folder.glob("*.[jJ][pP][eE][gG]"))) + \
                   len(list(dest_img_folder.glob("*.png")))

    final_csvs = len(list(dest_csv_folder.glob("*.csv")))

    print(f"final counts: {final_images} images, {final_csvs} CSV files")
    return final_images > 0 and final_csvs > 0

organize_dataset_files(project_root)

organizing dataset files
no archive folder found
moved 0 images to /Users/larasabha/Desktop/Candy-object-detection/Images
moved 11 CSV files to /Users/larasabha/Desktop/Candy-object-detection/Annotation_CSV_Files
final counts: 528 images, 11 CSV files


True

In [54]:
def convert_csv_to_yolo(csv_folder=project_root / "Annotation_CSV_Files", 
                       image_folder=project_root / "Images", 
                       output_labels=project_root / "labels"):
    
    print("converting CSV annotations to YOLO format")
    import random

    # make sure output directory exists
    output_labels.mkdir(parents=True, exist_ok=True)
    
    if not csv_folder.exists():
        print(f"CSV folder not found: {csv_folder}")
        return False
    
    csv_files = list(csv_folder.glob("*.csv"))
    if not csv_files:
        print("no CSV files found")
        return False
    
    print(f"found {len(csv_files)} CSV files")
    
    df_list = []
    for csv_file in csv_files:
        try:
            df = pd.read_csv(csv_file)
            # check if the dataframe is empty or has no columns
            if df.empty or df.columns.size == 0:
                print(f"skipping empty file: {csv_file.name}")
                continue
            df_list.append(df)
            print(f"loaded {csv_file.name}")
        except pd.errors.EmptyDataError:
            print(f"skipping empty file: {csv_file.name}")
            continue
        except Exception as e:
            print(f"error loading {csv_file.name}: {e}")
    
    if not df_list:
        print("no valid CSV files loaded")
        return False
        
    # combine all annotation dataframes into one    
    df = pd.concat(df_list, ignore_index=True)
    print(f"total annotations: {len(df)}")
    
    try:
        # extract class names from JSON in region_attributes column
        df['class'] = df['region_attributes'].apply(lambda x: json.loads(x)['candy_type'])
        class_names = sorted(df['class'].unique())
        if "Unknown" not in class_names:
            class_names.append("Unknown")
            print("added 'Unknown' class manually")

        class_to_id = {name: i for i, name in enumerate(class_names)}
        
        print(f"found {len(class_names)} classes: {class_names}")
        
        with open(project_root / "classes.txt", "w") as f:
            for name in class_names:
                f.write(name + "\n")
        
        print("created classes.txt")
        
    except Exception as e:
        print(f"error extracting classes: {e}")
        return False

    all_images = [f for f in image_folder.glob("*") if f.suffix.lower() in [".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG"]]
    random.shuffle(all_images)
    unknown_images = set(f.name for f in all_images[:5])
    
    converted_count = 0
    for filename in df['filename'].unique():
        img_path = image_folder / filename
        label_path = output_labels / (img_path.stem + ".txt")
        
        if not img_path.exists():
            print(f"image not found: {img_path}")
            continue
        
        try:
            img = Image.open(img_path)
            img_w, img_h = img.size  # get image width and height
        except Exception as e:
            print(f"failed to open image {filename}: {e}")
            continue

        # save class names to classes.txt for YOLO training
        with open(label_path, "w") as f:
            rows = df[df['filename'] == filename]
            for _, row in rows.iterrows():
                try:
                    # parse bounding box and label info from JSON
                    shape = json.loads(row['region_shape_attributes'])
                    label = json.loads(row['region_attributes'])['candy_type']
                    class_id = class_to_id[label]

                    # get bounding box coordinates and size
                    x, y, w, h = shape['x'], shape['y'], shape['width'], shape['height']
                    # convert to YOLO format (normalized center x, center y, width, height)
                    # YOLO format requires relative coordinates [0,1] normalized by image size
                    x_center = (x + w / 2) / img_w
                    y_center = (y + h / 2) / img_h
                    w_norm = w / img_w
                    h_norm = h / img_h
                    
                    f.write(f"{class_id} {x_center:.6f} {y_center:.6f} {w_norm:.6f} {h_norm:.6f}\n")
                except Exception as e:
                    print(f"skipping annotation in {filename}: {e}")
                    continue
        
        converted_count += 1

        if filename in unknown_images:
            unknown_id = class_to_id["Unknown"]
            # add a fixed dummy box (centered 20% box)
            x_center, y_center, w_norm, h_norm = 0.5, 0.5, 0.2, 0.2
            with open(label_path, "a") as f:
                f.write(f"{unknown_id} {x_center:.6f} {y_center:.6f} {w_norm:.6f} {h_norm:.6f}\n")
            print(f"'Unknown' label added into {filename}")
    
    print(f"converted {converted_count} image-label pairs")
    return True

# run
convert_csv_to_yolo()

converting CSV annotations to YOLO format
found 11 CSV files
skipping empty file: Candy_Project_1930-1981_csv.csv
skipping empty file: Candy_Project_2039-2092_csv.csv
skipping empty file: Candy_Project_2359-2415_csv.csv
skipping empty file: Candy_Project_1982-2038_csv.csv
skipping empty file: Candy_Project_2147-2197_csv.csv
skipping empty file: Candy_Project_2468-2496_csv.csv
skipping empty file: Candy_Project_2093-2146_csv.csv
skipping empty file: Candy_Project_2304-2358_csv.csv
skipping empty file: Candy_Project_2253-2303_csv.csv
skipping empty file: Candy_Project_2416-2467_csv.csv
skipping empty file: Candy_Project_2198-2252_csv.csv
no valid CSV files loaded


False

In [48]:
def create_train_val_split(train_ratio=0.9, root_dir=project_root):
    """train/validation split for YOLO training"""
    
    print("creating train/validation split")
    
    # paths
    images_dir = root_dir / "Images"
    labels_dir = root_dir / "labels"
    train_img_dir = root_dir / "dataset/images/train"
    val_img_dir = root_dir / "dataset/images/val"
    train_label_dir = root_dir / "dataset/labels/train"
    val_label_dir = root_dir / "dataset/labels/val"
    
    # check if source directories exist
    if not images_dir.exists() or not labels_dir.exists():
        print("images or labels directory not found")
        return False

    # collect image files
    image_files = [f.name for f in images_dir.glob("*") if f.suffix.lower() in [".jpg", ".jpeg", ".JPG", ".JPEG", ".png", ".PNG"]]

    # collect label files
    label_files = [f.name for f in labels_dir.glob("*.txt")]

    print(f"found {len(image_files)} images and {len(label_files)} labels")

    # match image label pairs by filename
    matched_pairs = []
    for img_file in image_files:
        base_name = Path(img_file).stem
        label_file = base_name + ".txt"
        if label_file in label_files:
            matched_pairs.append((img_file, label_file))

    print(f"found {len(matched_pairs)} matching image-label pairs")

    if len(matched_pairs) == 0:
        print("no matching pairs found")
        return False

    # shuffle and split
    random.seed(42)
    random.shuffle(matched_pairs)

    split_idx = int(len(matched_pairs) * train_ratio)
    train_pairs = matched_pairs[:split_idx]
    val_pairs = matched_pairs[split_idx:]

    print(f"split: {len(train_pairs)} training, {len(val_pairs)} validation")

    # copy training files
    train_success = 0
    val_success = 0

    for img_file, label_file in train_pairs:
        try:
            shutil.copy2(images_dir / img_file, train_img_dir / img_file)
            shutil.copy2(labels_dir / label_file, train_label_dir / label_file)
            train_success += 1
        except Exception as e:
            print(f"failed to copy training pair {img_file}: {e}")

    # copy validation files
    for img_file, label_file in val_pairs:
        try:
            shutil.copy2(images_dir / img_file, val_img_dir / img_file)
            shutil.copy2(labels_dir / label_file, val_label_dir / label_file)
            val_success += 1
        except Exception as e:
            print(f"failed to copy validation pair {img_file}: {e}")

    print(f"successfully created {train_success} training and {val_success} validation pairs")
    return True

# run the split
create_train_val_split()

creating train/validation split
found 528 images and 528 labels
found 528 matching image-label pairs
split: 475 training, 53 validation
successfully created 475 training and 53 validation pairs


True

In [50]:
def create_data_yaml(root_dir):
    """create data.yaml file"""
    
    print("creating data.yaml")
    
    # path to the file that contains class names
    classes_file = root_dir / "classes.txt"
    
    # try reading class names from classes.txt
    try:
        with open(classes_file, "r") as f:
            class_names = [line.strip() for line in f.readlines() if line.strip()]
    except FileNotFoundError:
        print(f"{classes_file} not found!")
        return False

    # prepare dictionary with dataset info
    data_content = {
        'path': str(root_dir / 'dataset'),       # path for YOLO training
        'train': 'images/train',
        'val': 'images/val',
        'nc': len(class_names),                  # # of classes
        'names': class_names                     # list of class names
    }

    # write the YAML
    output_yaml = root_dir / 'data.yaml'
    with open(output_yaml, 'w') as f:
        yaml.dump(data_content, f, default_flow_style=False, sort_keys=False)

    print(f"created data.yaml with {len(class_names)} classes")
    print(f"classes: {class_names}")
    return True

# run the function
create_data_yaml(project_root)

creating data.yaml
created data.yaml with 9 classes
classes: ['100_Grand', '3_Musketeers', 'Baby_Ruth', 'Butterfingers', 'Crunch', 'Midnight_Milky_Way', 'Milky_Way', 'Snickers', 'Twix']


True

In [None]:
def train_model(epochs=100, img_size=640, batch_size=16, root_dir=project_root):
    """train YOLOv8 model"""
    
    print("starting model training")
    
    # path to data.yaml
    data_yaml = root_dir / "data.yaml"
    
    # check if data.yaml exists
    if not data_yaml.exists():
        print(f"{data_yaml} not found")
        return False

    try:
        # load base model
        model = YOLO('yolov8n.pt')  # will auto download

        # train the model with provided parameters
        results = model.train(
            data=str(data_yaml),        # path to data.yaml
            epochs=epochs,              # # of training epochs
            imgsz=img_size,             # image input size
            batch=batch_size,           # batch size
            name='candy_detection',     # training run name
            patience=20,                # early stopping patience
            save=True,                  # save final weights
            plots=True                  # save training plots
        )

        print("training completed")
        print(f"results saved in: runs/detect/candy_detection/")

        return True

    except Exception as e:
        print(f"training failed: {e}")
        return False

train_model(epochs=20)  # reduced epochs for faster training -> change to 100 for complete training

In [18]:
# check for existing trained model
def find_best_model():
    """find the best trained model"""
    
   # list of common paths
    possible_paths = [
        "runs/detect/candy_detection/weights/best.pt",
        "runs/detect/train/weights/best.pt",
        "runs/detect/train2/weights/best.pt",
        "best.pt",
        "my_model.pt"
    ]
    
    # search for the first valid path
    for path in possible_paths:
        if os.path.exists(path):
            print(f"Found model: {path}")
            return path  # return the first match

    print("no trained model found")
    return None

# path for best model
model_path = find_best_model()

found model: runs/detect/train2/weights/best.pt


In [44]:
def test_single_image(image_path, model_path=None, conf_threshold=0.5):
    """test detection on a single image"""
    
    print(f"testing detection on: {image_path}")
    
    # find model if not provided
    if model_path is None:
        model_path = find_best_model()
        if model_path is None:
            return False
    
    # check image exists
    if not os.path.exists(image_path):
        print(f"image not found: {image_path}")
        return False
    
    # load model
    try:
        model = YOLO(model_path)
        print(f"model loaded: {model_path}")
    except Exception as e:
        print(f"error loading model: {e}")
        return False
    
    # load image
    try:
        image = cv2.imread(image_path)
        if image is None:
            print("could not load image")
            return False
        
        height, width = image.shape[:2]
        print(f"image size: {width}x{height}")
        
    except Exception as e:
        print(f"error loading image: {e}")
        return False
    
    # run detection
    try:
        results = model(image, conf=conf_threshold)
        
        detections = []
        result_image = image.copy()
        
        for result in results:
            if hasattr(result, 'boxes') and result.boxes is not None:
                for box in result.boxes:
                    if box.xyxy is None or box.conf is None or box.cls is None:
                        continue
                    
                    # get detection
                    x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
                    conf = float(box.conf[0].cpu().numpy())
                    class_id = int(box.cls[0].cpu().numpy())
                    class_name = model.names.get(class_id, f"Class_{class_id}")
                    
                    detections.append({
                        'class': class_name,
                        'confidence': conf,
                        'box': [x1, y1, x2, y2]
                    })
                    
                    # draw on image
                    color = (0, 255, 0)  # Green
                    cv2.rectangle(result_image, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(result_image, f"{class_name} {conf:.2f}", 
                              (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 
                              0.6, color, 2)
        
        print(f"found {len(detections)} detections:")
        for i, det in enumerate(detections):
            print(f"  {i+1}. {det['class']} ({det['confidence']:.2f})")

        os.makedirs("test_results", exist_ok=True)

        # save result
        output_path = Path("test_results") / "detection_result.jpg"
        cv2.imwrite(str(output_path), result_image)
        print(f"result saved to: {output_path.resolve()}")
        
        return True, detections, result_image
        
    except Exception as e:
        print(f"error during detection: {e}")
        return False, [], None

# test with desktop image (change path as needed)
desktop_image = str(Path.home() / "Desktop" / "image.jpg")
if os.path.exists(desktop_image):
    test_single_image(desktop_image, model_path)
else:
    print(f"no test image found at {desktop_image}")
    print("place an image named 'image.jpg' on your Desktop to test")

testing detection on: /Users/larasabha/Desktop/image.jpg
model loaded: runs/detect/train2/weights/best.pt
image size: 194x259

0: 640x480 2 Twixs, 50.1ms
Speed: 1.5ms preprocess, 50.1ms inference, 0.7ms postprocess per image at shape (1, 3, 640, 480)
found 2 detections:
  1. Twix (0.93)
  2. Twix (0.87)
result saved to: /Users/larasabha/Desktop/Candy-object-detection/test_results/detection_result.jpg


In [20]:
def test_multiple_images(image_folder="Images", model_path=None, conf_threshold=0.5):
    """test detection on multiple images"""
    
    image_folder = Path(image_folder)
    
    print(f"testing detection on images in: {image_folder}")
    
    if model_path is None:
        model_path = find_best_model()
        if model_path is None:
            return False
    
    # load model
    try:
        model = YOLO(model_path)
        print(f"model loaded: {model_path}")
    except Exception as e:
        print(f"error loading model: {e}")
        return False
    
    if not image_folder.exists() or not image_folder.is_dir():
        print(f"image folder does not exist or is not a directory: {image_folder}")
        return False
    
    # get all image files (case insensitive)
    image_files = []
    for ext in ['.jpg', '.jpeg']:
        image_files.extend(image_folder.glob(f"*{ext}"))
        image_files.extend(image_folder.glob(f"*{ext.upper()}"))
    
    if not image_files:
        print(f"no images found in {image_folder}")
        return False
    
    print(f"found {len(image_files)} images")
    
    total_detections = 0
    processed_count = 0
    
    for img_path in image_files[:10]:  # test first 10 images
        try:
            image = cv2.imread(str(img_path))
            if image is None:
                print(f"could not load image {img_path}")
                continue
            
            results = model(image, conf=conf_threshold)
            
            detections = 0
            for result in results:
                if hasattr(result, 'boxes') and result.boxes is not None:
                    detections += len(result.boxes)
            
            total_detections += detections
            processed_count += 1
            
            if detections > 0:
                print(f"{img_path.name}: {detections} detections")
        
        except Exception as e:
            print(f"error processing {img_path.name}: {e}")
    
    print(f"summary: {total_detections} total detections in {processed_count} images")
    return True

# test on multiple images
if Path("Images").exists():
    test_multiple_images("Images", model_path)

testing detection on images in: Images
model loaded: runs/detect/train2/weights/best.pt
found 528 images

0: 480x640 1 Butterfingers, 1 Midnight_Milky_Way, 1 Snickers, 1 Twix, 55.6ms
Speed: 5.6ms preprocess, 55.6ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)
IMG_2165.JPG: 4 detections

0: 480x640 1 3_Musketeers, 1 Butterfingers, 1 Midnight_Milky_Way, 1 Snickers, 46.5ms
Speed: 2.8ms preprocess, 46.5ms inference, 0.7ms postprocess per image at shape (1, 3, 480, 640)
IMG_2171.JPG: 4 detections

0: 480x640 1 Butterfingers, 1 Milky_Way, 1 Snickers, 1 Twix, 50.2ms
Speed: 2.7ms preprocess, 50.2ms inference, 0.6ms postprocess per image at shape (1, 3, 480, 640)
IMG_2159.JPG: 4 detections

0: 480x640 1 Crunch, 1 Milky_Way, 1 Snickers, 1 Twix, 45.6ms
Speed: 2.2ms preprocess, 45.6ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)
IMG_2398.JPG: 4 detections

0: 480x640 1 Crunch, 1 Midnight_Milky_Way, 1 Snickers, 1 Twix, 50.7ms
Speed: 2.4ms preprocess, 50.7ms i