In [4]:
pip install --upgrade torch torchvision





[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import os
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim

# --- CONFIG ---
BASE_PATH = r"C:\Users\harkp\Desktop\HV-AI-2025\HV-AI-2025"
CSV_PATH = r"C:\Users\harkp\Desktop\HV-AI-2025\HV-AI-2025\CSV"
BATCH_SIZE = 32
NUM_CLASSES = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- DATASET CLASSES ---
class LabeledDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.label_map = {f'class_{i}': i for i in range(NUM_CLASSES)}
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.data.iloc[idx, 0])
        image = Image.open(img_path).convert('RGB')
        label = self.label_map[self.data.iloc[idx, 1]]
        if self.transform:
            image = self.transform(image)
        return image, label

class UnlabeledDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_paths = [os.path.join(img_dir, fname) for fname in os.listdir(img_dir)]
        self.transform = transform
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        image = Image.open(self.img_paths[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, self.img_paths[idx]

class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        self.img_paths = [os.path.join(img_dir, fname) for fname in os.listdir(img_dir)]
        self.transform = transform
    def __len__(self):
        return len(self.img_paths)
    def __getitem__(self, idx):
        image = Image.open(self.img_paths[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(self.img_paths[idx])

# --- TRANSFORMS ---
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# --- DATA LOADERS ---
labeled_dataset = LabeledDataset(
    csv_file=os.path.join(BASE_PATH, "labeled_data/labeled_data.csv"),
    img_dir=os.path.join(BASE_PATH, "labeled_data/images"),
    transform=transform_train
)
unlabeled_dataset = UnlabeledDataset(
    img_dir=os.path.join(BASE_PATH, "unlabeled_data"),
    transform=transform_test
)
test_dataset = TestDataset(
    img_dir=os.path.join(BASE_PATH, "test_images"),
    transform=transform_test
)

labeled_loader = DataLoader(labeled_dataset, batch_size=BATCH_SIZE, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# --- MODEL ---
def get_model():
    model = models.resnet18(pretrained=True)
    model.fc = nn.Linear(model.fc.in_features, NUM_CLASSES)
    return model.to(DEVICE)

# --- TRAIN FUNCTION ---
def train(model, loader, criterion, optimizer, epochs=5):
    model.train()
    for epoch in range(epochs):
        for images, labels in tqdm(loader):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

# --- PHASE 2: PSEUDO-LABELING ---
# 1. Train on labeled data
model = get_model()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
train(model, labeled_loader, criterion, optimizer, epochs=10)

# 2. Generate pseudo-labels for unlabeled data
model.eval()
pseudo_images, pseudo_labels = [], []
with torch.no_grad():
    for images, paths in tqdm(unlabeled_loader):
        images = images.to(DEVICE)
        outputs = model(images)
        probs = torch.softmax(outputs, dim=1)
        confs, preds = torch.max(probs, dim=1)
        for i in range(len(images)):
            if confs[i] > 0.9:  # confidence threshold
                pseudo_images.append(paths[i])
                pseudo_labels.append(preds[i].item())

# 3. Create pseudo-labeled dataset
pseudo_df = pd.DataFrame({
    'path': [os.path.basename(p) for p in pseudo_images],
    'label': [f'class_{l}' for l in pseudo_labels]
})
pseudo_df.to_csv(os.path.join(CSV_PATH, "pseudo_labels.csv"), index=False)

pseudo_dataset = LabeledDataset(
    csv_file=os.path.join(CSV_PATH, "pseudo_labels.csv"),
    img_dir=os.path.join(BASE_PATH, "unlabeled_data"),
    transform=transform_train
)

# 4. Combine datasets and retrain
combined_dataset = ConcatDataset([labeled_dataset, pseudo_dataset])
combined_loader = DataLoader(combined_dataset, batch_size=BATCH_SIZE, shuffle=True)

model = get_model()  # Optionally re-init or continue training
optimizer = optim.Adam(model.parameters(), lr=1e-4)
train(model, combined_loader, criterion, optimizer, epochs=10)

# 5. Predict on test set
model.eval()
results = []
with torch.no_grad():
    for images, paths in tqdm(test_loader):
        images = images.to(DEVICE)
        outputs = model(images)
        preds = torch.argmax(outputs, dim=1)
        for i in range(len(images)):
            results.append({
                'path': paths[i],
                'predicted_label': f'class_{preds[i].item()}'
            })

# 6. Save predictions
pred_df = pd.DataFrame(results)
pred_df.to_csv(os.path.join(CSV_PATH, "phase2_predictions.csv"), index=False)
print("Saved:", os.path.join(CSV_PATH, "phase2_predictions.csv"))

  0%|          | 0/25 [00:00<?, ?it/s]


KeyError: 'scoiattolo'