In [2]:
import os
import shutil
from PIL import Image
import torch
import torch.nn as nn
from torchvision import models, transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from tqdm import tqdm  # For progress bars
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, accuracy_score
import numpy as np

import torch
import torch.nn as nn
from torchvision import datasets, transforms, models  # ✅ This line is critical
from torch.utils.data import DataLoader

In [3]:
class ImageClassifier:
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        self.model = None
        self.class_names = []
        self.transform = None

    def load_data(self, data_path):
        transform = transforms.Compose([
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ])
        dataset = datasets.ImageFolder(data_path, transform=transform)
        self.class_names = dataset.classes
        self.dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
        print(f"Loaded {len(dataset)} images from '{data_path}' with classes: {self.class_names}")

    def define_model(self, model_type="resnet18"):
        print(f"Using pretrained {model_type}...")
        if model_type == "resnet18":
            self.model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
            num_ftrs = self.model.fc.in_features
            num_classes = len(self.class_names) if self.class_names else 10
            self.model.fc = nn.Sequential(
                nn.Linear(num_ftrs, 256),
                nn.ReLU(),
                nn.Dropout(0.4),
                nn.Linear(256, num_classes)
            )
        else:
            raise ValueError(f"Model type '{model_type}' not supported.")
        self.model.to(self.device)

    def train_model(self, epochs=5, save_path=None):
        if not self.model:
            raise RuntimeError("Model is not defined. Call define_model() first.")

        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=0.0005)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

        self.model.train()
        for epoch in range(epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            for images, labels in self.dataloader:
                images, labels = images.to(self.device), labels.to(self.device)
                optimizer.zero_grad()
                outputs = self.model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

            scheduler.step()
            acc = 100 * correct / total
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss:.4f}, Accuracy: {acc:.2f}%")

        if save_path:
            self.save_model(save_path)

    def save_model(self, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        torch.save(self.model.state_dict(), path)
        print(f"Model saved to {path}")

    def load_model(self, path):
        if not self.model:
            raise RuntimeError("Model must be defined before loading weights.")
        self.model.load_state_dict(torch.load(path, map_location=self.device))
        self.model.to(self.device)
        self.model.eval()
        print(f"Model loaded from {path}")

    def load_unclassified_images(self, path):
        self.unclassified = []
        self.image_paths = []
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ])

        for filename in os.listdir(path):
            if filename.lower().endswith((".png", ".jpg", ".jpeg")):
                img_path = os.path.join(path, filename)
                image = Image.open(img_path).convert("RGB")
                image = self.transform(image)
                self.unclassified.append(image)
                self.image_paths.append(img_path)

        print(f"Loaded {len(self.unclassified)} images from '{path}' for classification.")

    def organize_images(self, output_dir):
        if not self.model:
            raise RuntimeError("Model not loaded.")

        os.makedirs(output_dir, exist_ok=True)

        with torch.no_grad():
            for i, image in enumerate(self.unclassified):
                img_tensor = image.unsqueeze(0).to(self.device)
                outputs = self.model(img_tensor)
                _, predicted = torch.max(outputs, 1)
                predicted_label = self.class_names[predicted.item()]
                class_dir = os.path.join(output_dir, predicted_label)
                os.makedirs(class_dir, exist_ok=True)
                shutil.copy(self.image_paths[i], class_dir)

        print(f"Organized images into folders under '{output_dir}'")
        
    def evaluate_model(self, data_path):
        print(f"Evaluating model on: {data_path}")
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ])
        dataset = datasets.ImageFolder(data_path, transform=transform)
        loader = DataLoader(dataset, batch_size=32, shuffle=False)
    
        self.model.eval()
        preds = []
        targets = []
    
        with torch.no_grad():
            for images, labels in loader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                _, predicted = torch.max(outputs, 1)
                preds.extend(predicted.cpu().numpy())
                targets.extend(labels.cpu().numpy())
    
        acc = accuracy_score(targets, preds) * 100
        print(f"Accuracy: {acc:.2f}%")

    def show_confusion_matrix(self, data_path):
        print(f"Generating confusion matrix on: {data_path}")
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406],
                                 [0.229, 0.224, 0.225])
        ])
        dataset = datasets.ImageFolder(data_path, transform=transform)
        loader = DataLoader(dataset, batch_size=32, shuffle=False)
    
        self.model.eval()
        preds = []
        targets = []
    
        with torch.no_grad():
            for images, labels in loader:
                images, labels = images.to(self.device), labels.to(self.device)
                outputs = self.model(images)
                _, predicted = torch.max(outputs, 1)
                preds.extend(predicted.cpu().numpy())
                targets.extend(labels.cpu().numpy())
    
        cm = confusion_matrix(targets, preds)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=self.class_names,
                    yticklabels=self.class_names)
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.title("Confusion Matrix")
        plt.tight_layout()
        os.makedirs("reports", exist_ok=True)
        plt.savefig("reports/confusion_matrix.png")
        plt.show()
        print("Confusion matrix saved to 'reports/confusion_matrix.png'")

In [4]:
class DSLCompiler:
    def __init__(self, dsl_path):
        self.dsl_path = dsl_path
        self.classifier = ImageClassifier()

    def compile_and_run(self):
        if not os.path.exists(self.dsl_path):
            raise FileNotFoundError(f"DSL file not found: {self.dsl_path}")
        
        with open(self.dsl_path, 'r') as file:
            lines = [line.strip() for line in file if line.strip() and not line.strip().startswith('#')]

        for line in lines:
            if line.startswith("load_images from"):
                path = self._extract_path(line)
                self.classifier.load_data(path)

            elif line.startswith("define_model"):
                model_type = line.split()[1]
                self.classifier.define_model(model_type)

            elif line.startswith("train_model for"):
                try:
                    epochs = int(line.split()[2])
                    save_path = "models/model.pth"
                    if "save_model_to" in line:
                        save_path = self._extract_custom_save_path(line)
                    self.classifier.train_model(epochs, save_path=save_path)
                except ValueError:
                    print("Invalid epoch value in DSL script.")

            elif line.startswith("load_model from"):
                model_path = self._extract_path(line)
                if not model_path.startswith("models/"):
                    model_path = os.path.join("models", model_path)
                self.classifier.load_model(model_path)

            elif line.startswith("predict_images from"):
                path = self._extract_path(line)
                self.classifier.load_unclassified_images(path)

            elif line.startswith("organize_images into"):
                path = self._extract_path(line)
                self.classifier.organize_images(path)
                
            elif line.startswith("evaluate_model on"):
                path = self._extract_path(line)
                self.classifier.evaluate_model(path)

            elif line.startswith("show_confusion_matrix on"):
                path = self._extract_path(line)
                self.classifier.show_confusion_matrix(path)

            else:
                print(f"Unknown DSL command: {line}")

    def _extract_path(self, line):
        try:
            return line.split('"')[1]
        except IndexError:
            raise ValueError(f"Expected a path in double quotes: {line}")

    def _extract_custom_save_path(self, line):
        try:
            return line.split('save_model_to')[1].strip().split('"')[1]
        except IndexError:
            raise ValueError(f"Expected a custom path in double quotes after 'save_model_to': {line}")

In [4]:
if __name__ == "__main__":
    compiler = DSLCompiler("program.dsl")
    compiler.compile_and_run()

Using device: cpu
Loaded 1969 images from 'dataset/' with classes: ['butterfly', 'cat', 'cow', 'dog', 'elephant', 'horse', 'penguin', 'rat', 'sheep', 'squirrel']
Using pretrained resnet18...
Epoch [1/5], Loss: 58.4792, Accuracy: 70.24%
Epoch [2/5], Loss: 46.9264, Accuracy: 74.96%
Epoch [3/5], Loss: 38.6532, Accuracy: 79.48%
Epoch [4/5], Loss: 34.6293, Accuracy: 82.22%
Epoch [5/5], Loss: 32.9162, Accuracy: 82.83%
Model saved to models/model.pth


In [6]:
if __name__ == "__main__":
    compiler = DSLCompiler("program_load.dsl")
    compiler.compile_and_run()

Using device: cpu
Loaded 1969 images from 'dataset/' with classes: ['butterfly', 'cat', 'cow', 'dog', 'elephant', 'horse', 'penguin', 'rat', 'sheep', 'squirrel']
Using pretrained resnet18...
Model loaded from models/model.pth
Loaded 7 images from 'unclassified/' for classification.
Organized images into folders under 'organized_output/'
