In [9]:
import os, glob, random, csv, shutil, time
from pathlib import Path
import cv2, torch, torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch import nn
from PIL import Image
from tqdm.auto import tqdm
from ultralytics import YOLO

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print('✅ Torch', torch.__version__, '| device:', DEVICE)

✅ Torch 2.6.0 | device: cpu


In [12]:

ROOT         = Path().resolve()             # notebook folder
RAW_DIR      = ROOT/'filtered_cattle_dataset'               # ≡ cattle_1/ … cattle_20/
DET_WEIGHTS  = '/Users/saidheeraj/Desktop/Yolo_cutom/muzzle_detector_v3.pt'    # muzzle detector you already trained
CROP_DIR     = ROOT/'crops'                 # will be created
REC_WT       = ROOT/'weights'/'rec_model.pth'

DET_CONF   = 0.35
BATCH      = 32
EPOCHS     = 20
LR         = 1e-4

CROP_DIR.mkdir(exist_ok=True, parents=True)
print('RAW images  :', RAW_DIR)
print('Crop output :', CROP_DIR)

RAW images  : /Users/saidheeraj/Desktop/Bot/filtered_cattle_dataset
Crop output : /Users/saidheeraj/Desktop/Bot/crops


In [13]:
print('Loading YOLOv8 detector ...')
detector = YOLO(str(DET_WEIGHTS))  # ultralytics YOLO loader

# loop over every cattle folder
for cow_dir in tqdm(sorted(RAW_DIR.iterdir()), desc='Folders'):
    if not cow_dir.is_dir():
        continue
    cow_id = cow_dir.name
    out_dir = CROP_DIR / cow_id
    out_dir.mkdir(exist_ok=True, parents=True)

    for img_path in cow_dir.glob('*'):
        img = cv2.imread(str(img_path))
        if img is None:
            print('❌ cannot read', img_path)
            continue

        # BGR ➔ RGB
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Run detection
        results = detector.predict(img_rgb, device=DEVICE, conf=DET_CONF, verbose=False)

        # results[0].boxes.xyxy → bounding boxes
        boxes = results[0].boxes.xyxy.cpu().numpy()  # (x1, y1, x2, y2)

        for i, (x1, y1, x2, y2) in enumerate(boxes):
            x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
            crop = img[y1:y2, x1:x2]
            if crop.size == 0 or min(crop.shape[:2]) < 20:
                continue
            cv2.imwrite(str(out_dir / f'{img_path.stem}_{i}.jpg'), crop)

print('✅ Cropping done – crops stored in', CROP_DIR)

Loading YOLOv8 detector ...


Folders:   0%|          | 0/22 [00:00<?, ?it/s]

✅ Cropping done – crops stored in /Users/saidheeraj/Desktop/Bot/crops


In [36]:
from torchvision.models import resnet50, ResNet50_Weights

# Define transforms
train_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])
val_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Dataset class
class CropDS(torch.utils.data.Dataset):
    def __init__(self, root, tfm):
        self.samples = []
        for cls_name in sorted(os.listdir(root)):
            for p in glob.glob(f'{root}/{cls_name}/*.jpg'):
                self.samples.append((p, cls_name))
        random.shuffle(self.samples)
        self.tfm = tfm
        self.labels = sorted({l for _, l in self.samples})
        self.l2i = {l:i for i,l in enumerate(self.labels)}
        
    def __len__(self): 
        return len(self.samples)

    def __getitem__(self, idx):
        p, lbl = self.samples[idx]
        img = Image.open(p).convert('RGB')
        return self.tfm(img), self.l2i[lbl]

# Load dataset
full_ds = CropDS(CROP_DIR, train_tf)
val_len = int(0.1 * len(full_ds))
train_ds, val_ds = torch.utils.data.random_split(full_ds, [len(full_ds) - val_len, val_len])

train_dl = DataLoader(train_ds, batch_size=BATCH, shuffle=True, num_workers=0)
val_dl = DataLoader(val_ds, batch_size=BATCH, shuffle=False, num_workers=0)

# Define model
model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2)
model.fc = nn.Linear(model.fc.in_features, len(full_ds.labels))
model = model.to(DEVICE)

crit = nn.CrossEntropyLoss()
opt  = torch.optim.Adam(model.parameters(), lr=LR)

# Train
best = 0
for ep in range(EPOCHS):
    model.train()
    for x, y in train_dl:
        x, y = x.to(DEVICE), y.to(DEVICE)
        opt.zero_grad()
        loss = crit(model(x), y)
        loss.backward()
        opt.step()

    # Validate
    model.eval()
    corr = tot = 0
    with torch.no_grad():
        for x, y in val_dl:
            x, y = x.to(DEVICE), y.to(DEVICE)
            pred = model(x).argmax(1)
            corr += (pred == y).sum().item()
            tot += y.size(0)
    acc = corr / tot
    print(f'Epoch {ep+1:02d}/{EPOCHS}  val_acc={acc:.3f}')

    if acc > best:
        best = acc
        REC_WT.parent.mkdir(exist_ok=True, parents=True)
        torch.save({'model': model.state_dict(), 'classes': full_ds.labels}, REC_WT)

print('🏁 Training complete • Best validation accuracy:', best)


Epoch 01/20  val_acc=0.100
Epoch 02/20  val_acc=0.100
Epoch 03/20  val_acc=0.500
Epoch 04/20  val_acc=0.600
Epoch 05/20  val_acc=0.600
Epoch 06/20  val_acc=0.900
Epoch 07/20  val_acc=1.000
Epoch 08/20  val_acc=0.900
Epoch 09/20  val_acc=1.000
Epoch 10/20  val_acc=1.000
Epoch 11/20  val_acc=1.000
Epoch 12/20  val_acc=1.000
Epoch 13/20  val_acc=1.000
Epoch 14/20  val_acc=1.000
Epoch 15/20  val_acc=1.000
Epoch 16/20  val_acc=1.000
Epoch 17/20  val_acc=1.000
Epoch 18/20  val_acc=1.000
Epoch 19/20  val_acc=1.000
Epoch 20/20  val_acc=1.000
🏁 Training complete • Best validation accuracy: 1.0


In [52]:
# Load recognition model
ckpt = torch.load(REC_WT, map_location=DEVICE)
num_cls = len(ckpt['classes'])

reid = torchvision.models.resnet50(weights=None)
reid.fc = nn.Linear(reid.fc.in_features, num_cls)
reid.load_state_dict(ckpt['model'])
reid = reid.eval().to(DEVICE)

class_names = ckpt['classes']

# Transform for recognition
reid_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Recognition function
def recognise(frame_bgr):
    """Return list of (x1, y1, x2, y2, class_name, confidence)"""
    results = detector.predict(frame_bgr[..., ::-1], verbose=False)  # predict expects RGB
    out = []
    boxes = results[0].boxes.xyxy.cpu().numpy()   # <--- Correct way
    scores = results[0].boxes.conf.cpu().numpy()  # detection confidences

    for box, score in zip(boxes, scores):
        x1, y1, x2, y2 = map(int, box)
        crop = frame_bgr[y1:y2, x1:x2]
        if crop.size == 0:
            continue
        with torch.no_grad():
            t = reid_tf(Image.fromarray(crop[:, :, ::-1])).unsqueeze(0).to(DEVICE)
            p = reid(t).softmax(1)
            idx = int(p.argmax())
            out.append((x1, y1, x2, y2, class_names[idx], float(p[0, idx])))
    return out


# Run a prediction
TEST_IMG = random.choice(glob.glob(str(RAW_DIR/'cattle_*/*.jpg')))
print('Demo image:', TEST_IMG)

img = cv2.imread(TEST_IMG)
for x1, y1, x2, y2, cname, conf in recognise(img):
    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
    cv2.putText(img, f'{cname}:{conf:.2f}', (x1, y1 - 4),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

out_path = ROOT/'prediction.jpg'
cv2.imwrite(str(out_path), img)
print('🖼️ Annotated image saved at →', out_path)


Demo image: /Users/saidheeraj/Desktop/Bot/filtered_cattle_dataset/cattle_9/IMG_20241108_094253.jpg
🖼️ Annotated image saved at → /Users/saidheeraj/Desktop/Bot/prediction.jpg


In [18]:
import os, glob
from pathlib import Path
import torch, torchvision
from torch import nn
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
from tqdm.auto import tqdm

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
WEIGHTS = Path('feature_extractor.pth')   # path to the file you saved
print('loading weights from', WEIGHTS)

# ---------- 1. Define the *same* architecture -----------------
class FeatureExtractor(nn.Module):
    def __init__(self, backbone_name='resnet50'):
        super().__init__()
        backbone = torchvision.models.__dict__[backbone_name](weights=None)
        self.features = nn.Sequential(*list(backbone.children())[:-1])  # drop FC layer

    @torch.inference_mode()
    def forward(self, x):
        x = self.features(x)            # (B, 2048, 1, 1)
        x = x.flatten(1)                # (B, 2048)
        return x

# instantiate & load weights
feat_extractor = FeatureExtractor()
feat_extractor.load_state_dict(torch.load(WEIGHTS, map_location=DEVICE))
feat_extractor.eval().to(DEVICE)
print('✅ feature extractor ready')

# ---------- 2. Image transform (must be the *validation* tfm) --
val_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# ---------- 3. Helper to get one embedding --------------------
@torch.inference_mode()
def get_embedding(img_path, tfm=val_tf):
    img = Image.open(img_path).convert('RGB')
    x   = tfm(img).unsqueeze(0).to(DEVICE)          # (1,3,224,224)
    emb = feat_extractor(x)                         # (1, 2048)
    emb = F.normalize(emb, p=2, dim=1)              # L2-norm (optional but recommended)
    return emb.squeeze(0).cpu()                     # → torch tensor of shape (2048,)

# ---------- 4. Similarity functions ---------------------------
def cosine_similarity(e1, e2):
    """Return cos-sim in [-1,1]; higher = more similar"""
    return float(torch.dot(e1, e2))

def euclidean_dist(e1, e2):
    return float(torch.norm(e1 - e2))

# ---------- 5. Example: compare two images --------------------
img1 = '/Users/saidheeraj/Desktop/Bot/filtered_cattle_dataset/cattle_3/IMG_20241108_092618.jpg'
img2 = '/Users/saidheeraj/Desktop/Bot/filtered_cattle_dataset/cattle_3/IMG_20241108_092620.jpg'

emb1 = get_embedding(img1)
emb2 = get_embedding(img2)

print('cosine similarity:', cosine_similarity(emb1, emb2))
print('euclidean distance:', euclidean_dist(emb1, emb2))

# ---------- 6. Example: find top-k most similar in a folder ---
def topk_similar(query_img, gallery_dir, k=5):
    q_emb = get_embedding(query_img)
    scores = []
    for p in Path(gallery_dir).glob('*.jpg'):
        g_emb  = get_embedding(p)
        score  = cosine_similarity(q_emb, g_emb)   # use euclidean_dist if you prefer
        scores.append((score, str(p)))

    # larger cosine similarity = more alike
    scores.sort(reverse=True)
    return scores[:k]


loading weights from feature_extractor.pth
✅ feature extractor ready
cosine similarity: 0.8959433436393738
euclidean distance: 0.4561946988105774
