In [None]:
!pip install ultralytics

In [None]:
!pip install numpy scipy scikit-learn matplotlib pandas tqdm opencv-python

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

In [None]:
!pip install torch-geometric

In [18]:
import torch
import torch.nn.functional as F
import numpy as np
import os
import shutil
from sklearn.metrics import pairwise_distances
from ultralytics import YOLO
from torch_geometric.nn import GATConv, GCNConv
from torch_geometric.data import Data
from torchvision import transforms
from sklearn.neighbors import kneighbors_graph
from torch_geometric.utils import add_self_loops, remove_self_loops
from PIL import Image
import torch.nn as nn

In [None]:
model = YOLO("yolo11s.pt")

model.train(
    data="yaml-files/dataset.yaml",
    epochs=500,
    imgsz=512,
    batch=16,
    lr0=0.0002,
    lrf=0.002,
    momentum=0.98,
    weight_decay=0.0001,
    optimizer="RAdam",
    cos_lr = True,
    warmup_epochs=30,
    iou=0.5,
    mosaic=0.0, 
    dfl = 2.0,
    device="cuda",
    augment=True,
    verbose=True
)

In [36]:
def extract_features(image_paths,device="cuda"):
    features = []
    transform = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    for img_path in image_paths:
        img = transform(Image.open(img_path).convert("RGB")).unsqueeze(0).to(device)
        with torch.no_grad():
            feat1 = model.model.model[0:5](img)
            feat2 = model.model.model[0:10](img)
            
            feat1_pooled = torch.cat([
                torch.mean(feat1, dim=[2, 3]),
                torch.max(feat1, dim=3)[0].max(dim=2)[0]
            ], dim=1)
            
            feat2_pooled = torch.cat([
                torch.mean(feat2, dim=[2, 3]),
                torch.max(feat2, dim=3)[0].max(dim=2)[0]
            ], dim=1)
            
            combined_feat = torch.cat([feat1_pooled, feat2_pooled], dim=1)
            features.append(combined_feat.view(-1).cpu().numpy())
    
    features = np.array(features)
    features = (features - features.mean(axis=0)) / (features.std(axis=0) + 1e-6)
    return features

labeled_image_paths = ["dataset/labeled/images/" + f for f in os.listdir("dataset/labeled/images")]
unlabeled_image_paths = ["dataset/unlabeled/" + f for f in os.listdir("dataset/unlabeled")]

labeled_features = extract_features(labeled_image_paths)
unlabeled_features = extract_features(unlabeled_image_paths)

In [37]:
def construct_knn_graph(features, k=15):
    distances = pairwise_distances(features)
    sigma = np.mean(np.sort(distances, axis=1)[:, 1:k+1])
    weights = np.exp(-distances**2 / (2 * sigma**2))
    
    adjacency_matrix = kneighbors_graph(features, k, mode="connectivity", include_self=False)
    edge_index_np = np.array(adjacency_matrix.nonzero())
    edge_weights_np = weights[adjacency_matrix.nonzero()]
    edge_index = torch.tensor(edge_index_np, dtype=torch.long)
    edge_weights = torch.tensor(edge_weights_np, dtype=torch.float32)
    
    edge_index, edge_weights = add_self_loops(edge_index, edge_weights, num_nodes=len(features))
    return edge_index, edge_weights

edge_index, edge_weights = construct_knn_graph(np.vstack([labeled_features, unlabeled_features]))
num_nodes = len(labeled_features) + len(unlabeled_features)


In [38]:
def load_labels(label_paths):
    labels = []
    for label_path in label_paths:
        with open(label_path, "r") as f:
            class_ids = [int(line.split()[0]) for line in f.readlines()]
        labels.append(max(set(class_ids)) if class_ids else 0)
    return np.array(labels)

labeled_label_paths = [f"dataset/labeled/labels/{os.path.basename(p).rsplit('.', 1)[0]}.txt" for p in labeled_image_paths]
actual_labels = load_labels(labeled_label_paths)
graph_data = Data(
    x=torch.tensor(np.vstack([labeled_features, unlabeled_features]), dtype=torch.float32),
    edge_index=edge_index,
    y=torch.cat([torch.tensor(actual_labels, dtype=torch.long), 
                torch.full((len(unlabeled_features),), -1)])
)

In [None]:
unique_labels, counts = np.unique(graph_data.y.numpy(), return_counts=True)
print("✅ Fixed Graph Data Labels:", dict(zip(unique_labels, counts)))

In [40]:
class GNN(nn.Module):
    def __init__(self, in_dim, hidden_dim=128, num_classes=2):
        super(GNN, self).__init__()
        self.conv1 = GATConv(in_dim, hidden_dim, heads=8, dropout=0.5)
        self.conv2 = GATConv(hidden_dim * 8, hidden_dim, heads=8, dropout=0.5)
        self.conv3 = GCNConv(hidden_dim * 8, hidden_dim)
        self.lin = nn.Linear(hidden_dim, num_classes)
        self.batch_norm1 = nn.BatchNorm1d(hidden_dim * 8)
        self.batch_norm2 = nn.BatchNorm1d(hidden_dim * 8)
        self.batch_norm3 = nn.BatchNorm1d(hidden_dim)
        
    def forward(self, x, edge_index):
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv1(x, edge_index)
        x = self.batch_norm1(x)
        x = F.elu(x)
        
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        x = self.batch_norm2(x)
        x = F.elu(x)
        
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv3(x, edge_index)
        x = self.batch_norm3(x)
        x = F.elu(x)
        
        x = self.lin(x)
        return F.log_softmax(x, dim=1)
gnn_model = GNN(in_dim=graph_data.x.shape[1])
optimizer = torch.optim.AdamW(gnn_model.parameters(), lr=0.0005, weight_decay=0.01, amsgrad=True)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=10, factor=0.5)


In [42]:
def calculate_loss(out, graph_data, pseudo_labels, pseudo_mask):
    labeled_mask = graph_data.y != -1
    supervised_loss = F.nll_loss(out[labeled_mask], graph_data.y[labeled_mask])
    
    pseudo_loss = 0.0
    if pseudo_mask.any():
        pseudo_loss = F.nll_loss(out[pseudo_mask], pseudo_labels[pseudo_mask])
    
    edge_index, _ = remove_self_loops(graph_data.edge_index)
    smoothness_loss = torch.mean(torch.pow(
        out[edge_index[0]] - out[edge_index[1]], 2
    ))
    
    pseudo_weight = min(0.3, gnn_model.training_step / 100)
    total_loss = supervised_loss + pseudo_weight * pseudo_loss + 0.1 * smoothness_loss
    
    return total_loss, {
        'supervised': supervised_loss.item(),
        'pseudo': pseudo_loss.item() if isinstance(pseudo_loss, torch.Tensor) else pseudo_loss,
        'smoothness': smoothness_loss.item(),
        'total': total_loss.item()
    }

In [43]:
def generate_pseudo_labels(model, graph_data, confidence_threshold=0.9):
    model.eval()
    with torch.no_grad():
        out = model(graph_data.x, graph_data.edge_index)
        probs = torch.exp(out)
        max_probs, pseudo_labels = probs.max(dim=1)
        
        unlabeled_mask = graph_data.y == -1
        mean_confidence = max_probs[unlabeled_mask].mean()
        adaptive_threshold = min(confidence_threshold, mean_confidence + 0.1)
        
        confidence_mask = (max_probs > adaptive_threshold) & unlabeled_mask
        
        if confidence_mask.any():
            class_dist = torch.bincount(pseudo_labels[confidence_mask])
            if len(class_dist) > 1:
                ratio = float(class_dist.max()) / class_dist.min()
                if ratio > 3:
                    minor_class = class_dist.argmin()
                    confidence_mask &= ((pseudo_labels == minor_class) | 
                                     (max_probs > (adaptive_threshold + 0.1)))
        
        return pseudo_labels, confidence_mask

In [None]:
gnn_model.training_step = 0
best_loss = float('inf')

for epoch in range(250):
    gnn_model.train()
    optimizer.zero_grad()
    out = gnn_model(graph_data.x, graph_data.edge_index)
    
    pseudo_labels, confidence_mask = generate_pseudo_labels(gnn_model, graph_data)
    loss, loss_components = calculate_loss(out, graph_data, pseudo_labels, confidence_mask)
    
    loss.backward()
    torch.nn.utils.clip_grad_norm_(gnn_model.parameters(), max_norm=1.0)
    optimizer.step()
    scheduler.step(loss_components['total'])
    
    gnn_model.training_step += 1
    
    if loss_components['total'] < best_loss:
        best_loss = loss_components['total']
        torch.save(gnn_model.state_dict(), 'WEED.pt')
    
    if epoch % 10 == 0:
        print(f"\nEpoch {epoch}:")
        for k, v in loss_components.items():
            print(f"{k.capitalize()} Loss: {v:.4f}")
        print(f"Confident Pseudo Labels: {confidence_mask.sum().item()}")
        if confidence_mask.any():
            print(f"Class distribution: {torch.bincount(pseudo_labels[confidence_mask])}")

gnn_model.load_state_dict(torch.load('WEED.pt'))
gnn_model.eval()

with torch.no_grad():
    final_out = gnn_model(graph_data.x, graph_data.edge_index)
    pseudo_labels = final_out.argmax(dim=1)
    confidence_mask = torch.exp(final_out).max(dim=1)[0] > 0.9


In [None]:
unique_preds, counts_preds = np.unique(pseudo_labels, return_counts=True)
print("✅ Predicted class distribution:", dict(zip(unique_preds, counts_preds)))

In [None]:
print("graph_data.x shape:", graph_data.x.shape)
print("Max index in edge_index:", edge_index.max().item())

In [None]:
def save_pseudo_labels(pseudo_labels, confidence_mask, unlabeled_image_paths, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    for idx, (img_path, label, is_confident) in enumerate(zip(
            unlabeled_image_paths, 
            pseudo_labels[len(labeled_image_paths):], 
            confidence_mask[len(labeled_image_paths):])):
        
        if is_confident:
            img_name = os.path.basename(img_path).rsplit('.', 1)[0]
            label_path = os.path.join(output_dir, f"{img_name}.txt")
            results = model(img_path)
            
            with open(label_path, "w") as f:
                for result in results:
                    for box in result.boxes:
                        if box.conf > 0.6:
                            x_center, y_center, width, height = box.xywhn.cpu().numpy().flatten()
                            f.write(f"{int(label.item())} {x_center} {y_center} {width} {height}\n")

save_pseudo_labels(
    pseudo_labels,
    confidence_mask,
    unlabeled_image_paths,
    "dataset/unlabeled/labels/"
)

In [None]:
os.makedirs("dataset_updated/images", exist_ok=True)
os.makedirs("dataset_updated/labels", exist_ok=True)
for file in os.listdir("dataset/labeled/images/"):
    src = os.path.join("dataset/labeled/images", file)
    dst = os.path.join("dataset_updated/images", file)
    if os.path.isfile(src):
        shutil.copy(src, dst)

for file in os.listdir("dataset/unlabeled/"):
    src = os.path.join("dataset/unlabeled", file)
    dst = os.path.join("dataset_updated/images", file)
    if os.path.isfile(src):
        shutil.copy(src, dst)

shutil.copytree("unlabeled/labels/", "dataset_updated/labels/", dirs_exist_ok=True)
shutil.copytree("dataset/labeled/labels/", "dataset_updated/labels/", dirs_exist_ok=True)

In [None]:
model.train(
    data="yaml-files/dataset_updated.yaml",
    epochs=500,
    imgsz=512,
    batch=64,
    lr0=0.0002,
    lrf=0.0025,
    momentum=0.98,
    weight_decay=0.0001,
    optimizer="RAdam",
    cos_lr = True,
    warmup_epochs=30,
    iou=0.5,
    mosaic=0.0,
    dfl = 1.5,
    augment=True,
    device="cuda"
)

In [None]:
model = YOLO("train_final/best.pt")
metrics = model.val(data="yaml-files/test.yaml", device="cuda")
precision = metrics.box.p.mean()  # ✅ Mean Precision
recall = metrics.box.r.mean()  # ✅ Mean Recall
map_50 = metrics.box.map50  # ✅ mAP@50
map_50_95 = metrics.box.map  # ✅ mAP@50-95

f1_score = (2 * precision * recall) / (precision + recall + 1e-6) 
print(f"✅ Mean Precision : {precision:.4f}")
print(f"✅ Mean Recall : {recall:.4f}")
print(f"✅ mAP@50 : {map_50:.4f}")
print(f"✅ mAP@50-95 : {map_50_95:.4f}")
print(f"✅ Mean F1-Score : {f1_score:.4f}")

final_score = 0.5*f1_score + 0.5*map_50_95
print(f"✅ Final Score : {final_score:.4f}")