In [1]:
!pip install torch torchvision



In [3]:
import boto3
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

In [4]:
import os
from sklearn.model_selection import train_test_split

In [6]:
# Define the dataset class
class LegoDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [7]:
# Define the CNN model
class LegoCNN(nn.Module):
    def __init__(self):
        super(LegoCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 8 * 8, 64)
        self.fc2 = nn.Linear(64, 1)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [8]:
def get_file_hash(filepath):
    with open(filepath, "rb") as f:
        return hashlib.md5(f.read()).hexdigest()

def load_data(good_dir, defective_dir):
    good_images = [os.path.join(good_dir, f) for f in os.listdir(good_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]
    defective_images = [os.path.join(defective_dir, f) for f in os.listdir(defective_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]
    
    # Remove duplicates
    hash_dict = {}
    unique_good_images = []
    unique_defective_images = []
    
    unique_images = good_images + defective_images #unique_defective_images
    unique_labels = [0] * len(good_images) + [1] * len(defective_images) #unique_defective_images)
    
    print(f"Total good images: {len(good_images)}")
    print(f"Unique good images: {len(unique_good_images)}")
    print(f"Total defective images: {len(defective_images)}")
    print(f"Unique defective images: {len(unique_defective_images)}")
    print(f"Total images: {len(good_images) + len(defective_images)}")
    print(f"Total unique images: {len(unique_images)}")
    
    return train_test_split(unique_images, unique_labels, test_size=0.2, random_state=42)

In [9]:
import hashlib

# Set up data transforms
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load and prepare the data
good_dir = 'Images/Good'
defective_dir = 'Images/Defective'
train_images, test_images, train_labels, test_labels = load_data(good_dir, defective_dir)

# Print breakdown of train and test sets
print("\nTraining set:")
print(f"Total images: {len(train_images)}")
print(f"Good images: {sum(1 for label in train_labels if label == 0)}")
print(f"Defective images: {sum(1 for label in train_labels if label == 1)}")

print("\nTest set:")
print(f"Total images: {len(test_images)}")
print(f"Good images: {sum(1 for label in test_labels if label == 0)}")
print(f"Defective images: {sum(1 for label in test_labels if label == 1)}")

train_dataset = LegoDataset(train_images, train_labels, transform=transform)
test_dataset = LegoDataset(test_images, test_labels, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Total good images: 10
Unique good images: 0
Total defective images: 10
Unique defective images: 0
Total images: 20
Total unique images: 20

Training set:
Total images: 16
Good images: 8
Defective images: 8

Test set:
Total images: 4
Good images: 2
Defective images: 2


In [12]:
# Initialize the model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LegoCNN().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 15
for epoch in range(num_epochs):
    model.train()
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device).float().unsqueeze(1)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    # Evaluate on test set
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            predicted = (torch.sigmoid(outputs) > 0.5).float()
            total += labels.size(0)
            correct += (predicted.squeeze() == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Test Accuracy: {accuracy:.2f}%')

Epoch [1/15], Loss: 0.6962, Test Accuracy: 50.00%
Epoch [2/15], Loss: 0.6856, Test Accuracy: 50.00%
Epoch [3/15], Loss: 0.6759, Test Accuracy: 50.00%
Epoch [4/15], Loss: 0.6368, Test Accuracy: 50.00%
Epoch [5/15], Loss: 0.6486, Test Accuracy: 50.00%
Epoch [6/15], Loss: 0.6639, Test Accuracy: 50.00%
Epoch [7/15], Loss: 0.5936, Test Accuracy: 100.00%
Epoch [8/15], Loss: 0.5929, Test Accuracy: 75.00%
Epoch [9/15], Loss: 0.3908, Test Accuracy: 75.00%
Epoch [10/15], Loss: 0.4122, Test Accuracy: 75.00%
Epoch [11/15], Loss: 0.2654, Test Accuracy: 100.00%
Epoch [12/15], Loss: 0.1984, Test Accuracy: 100.00%
Epoch [13/15], Loss: 0.1133, Test Accuracy: 100.00%
Epoch [14/15], Loss: 0.1834, Test Accuracy: 100.00%
Epoch [15/15], Loss: 0.1762, Test Accuracy: 100.00%


In [13]:
def preprocess_image(image_path):
    transform = transforms.Compose([
        transforms.Resize((64, 64)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    image = Image.open(image_path).convert('RGB')
    return transform(image).unsqueeze(0)  

def predict_image(image_path, model):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        image = preprocess_image(image_path).to(device)
        output = model(image)
        probability = torch.sigmoid(output).item()
        prediction = "Good" if probability < 0.5 else "Defective"
        # normalized_prob = probability*100
        # return_prob =  100-normalized_prob
        return prediction, probability, output

In [15]:
test_images = [  
    'Images/Good/image_0d06e4a3-c6a6-4130-9e6f-363e44831dcc.jpg',    
    'Images/Good/image_23120a2b-0ead-408a-824a-bc909e9de2b9.jpg',
    'Images/Defective/0-change lego block to purple.png',
    'Images/Defective/1-change lego block to light green.png'
]

for img_path in test_images:
    prediction, probability, output = predict_image(img_path, model)
    print(f"Image: {img_path}")
    print(f"Prediction: {prediction}")
    print(f"Output: {output}")
    print(f"Probability of being defective: {probability:.2f}")
    print("---")

Image: Images/Good/image_0d06e4a3-c6a6-4130-9e6f-363e44831dcc.jpg
Prediction: Good
Output: tensor([[-5.7247]])
Probability of being defective: 0.00
---
Image: Images/Good/image_23120a2b-0ead-408a-824a-bc909e9de2b9.jpg
Prediction: Good
Output: tensor([[-4.3019]])
Probability of being defective: 0.01
---
Image: Images/Defective/0-change lego block to purple.png
Prediction: Defective
Output: tensor([[5.7365]])
Probability of being defective: 1.00
---
Image: Images/Defective/1-change lego block to light green.png
Prediction: Defective
Output: tensor([[2.2022]])
Probability of being defective: 0.90
---


In [16]:
import mlflow
import mlflow.pytorch
from mlflow import log_metric, log_param, log_artifacts
import tempfile

2024-12-19 03:51:53.907765: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [17]:
# Initialize MLflow

arn = 'arn:aws:sagemaker:us-east-1:316413003582:mlflow-tracking-server/sample-mlflow-tracking'

mlflow.set_tracking_uri(arn)
mlflow.set_experiment("Lego-Quality-Classification")

2024/12/19 03:52:43 INFO mlflow.tracking.fluent: Experiment with name 'Lego-Quality-Classification' does not exist. Creating a new experiment.


<Experiment: artifact_location='s3://sm-mlflow-vsnak-iad-316413003582/34', creation_time=1734580363554, experiment_id='34', last_update_time=1734580363554, lifecycle_stage='active', name='Lego-Quality-Classification', tags={}>

In [18]:
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs, device):
    with mlflow.start_run():
        # Log model parameters
        mlflow.log_param("learning_rate", optimizer.param_groups[0]['lr'])
        mlflow.log_param("num_epochs", num_epochs)
        mlflow.log_param("batch_size", train_loader.batch_size)
        mlflow.log_param("optimizer", type(optimizer).__name__)

        best_accuracy = 0.0
        for epoch in range(num_epochs):
            model.train()
            running_loss = 0.0
            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device).float().unsqueeze(1)
                
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
            
            # Calculate average loss for the epoch
            epoch_loss = running_loss / len(train_loader)
            
            # Evaluate on test set
            model.eval()
            correct = 0
            total = 0
            test_loss = 0.0
            
            with torch.no_grad():
                for images, labels in test_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    test_loss += criterion(outputs, labels.float().unsqueeze(1)).item()
                    predicted = (torch.sigmoid(outputs) > 0.5).float()
                    total += labels.size(0)
                    correct += (predicted.squeeze() == labels).sum().item()
            
            accuracy = 100 * correct / total
            test_loss = test_loss / len(test_loader)

            # Log metrics to MLflow
            mlflow.log_metric("train_loss", epoch_loss, step=epoch)
            mlflow.log_metric("test_loss", test_loss, step=epoch)
            mlflow.log_metric("test_accuracy", accuracy, step=epoch)

            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Test Accuracy: {accuracy:.2f}%')

            # Save best model
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                # Save model to MLflow
                mlflow.pytorch.log_model(model, "best_model")

        # Log the final model
        mlflow.pytorch.log_model(model, "final_model")

        # Log additional artifacts if needed
        with tempfile.TemporaryDirectory() as temp_dir:
            # Save model architecture diagram or other artifacts
            model_summary_path = os.path.join(temp_dir, "model_summary.txt")
            with open(model_summary_path, "w") as f:
                f.write(str(model))
            mlflow.log_artifact(model_summary_path)

In [19]:
model = LegoCNN().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model with MLflow tracking
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=15, device=device)

Epoch [1/15], Loss: 0.6912, Test Accuracy: 50.00%




Epoch [2/15], Loss: 0.6905, Test Accuracy: 50.00%
Epoch [3/15], Loss: 0.6782, Test Accuracy: 100.00%




Epoch [4/15], Loss: 0.6575, Test Accuracy: 100.00%
Epoch [5/15], Loss: 0.6348, Test Accuracy: 100.00%
Epoch [6/15], Loss: 0.5604, Test Accuracy: 100.00%
Epoch [7/15], Loss: 0.4991, Test Accuracy: 100.00%
Epoch [8/15], Loss: 0.4241, Test Accuracy: 100.00%
Epoch [9/15], Loss: 0.3245, Test Accuracy: 75.00%
Epoch [10/15], Loss: 0.2429, Test Accuracy: 100.00%
Epoch [11/15], Loss: 0.1758, Test Accuracy: 75.00%
Epoch [12/15], Loss: 0.1626, Test Accuracy: 100.00%
Epoch [13/15], Loss: 0.1510, Test Accuracy: 100.00%
Epoch [14/15], Loss: 0.0371, Test Accuracy: 100.00%
Epoch [15/15], Loss: 0.2122, Test Accuracy: 100.00%




In [22]:
def predict_with_mlflow(image_path, run_id=None):
    with mlflow.start_run(run_name="prediction"):
        if run_id:
            # Load the model from MLflow
            loaded_model = mlflow.pytorch.load_model(f"runs:/{run_id}/best_model")
            loaded_model.to(device)
            prediction, probability, output = predict_image(image_path, loaded_model)
        else:
            prediction, probability, output = predict_image(image_path, model)
        
        # Log prediction results
        mlflow.log_param("image_path", image_path)
        mlflow.log_metric("prediction_probability", probability)
        mlflow.log_param("predicted_class", prediction)
        
        # Optional: Log the input image as an artifact
        mlflow.log_artifact(image_path)
        
        # Optional: Create and log a prediction results summary
        results_path = "prediction_results.txt"
        with open(results_path, "w") as f:
            f.write(f"Image: {image_path}\n")
            f.write(f"Prediction: {prediction}\n")
            f.write(f"Probability: {probability}\n")
            f.write(f"Raw Output: {output}\n")
        mlflow.log_artifact(results_path)
        
        return prediction, probability, output


In [23]:
# Test predictions
for img_path in test_images:
    prediction, probability, output = predict_with_mlflow(img_path)
    print(f"Image: {img_path}")
    print(f"Prediction: {prediction}")
    print(f"Output: {output}")
    print(f"Probability of being defective: {probability:.2f}")
    print("---")

Image: Images/Good/image_0d06e4a3-c6a6-4130-9e6f-363e44831dcc.jpg
Prediction: Good
Output: tensor([[-4.7222]])
Probability of being defective: 0.01
---
Image: Images/Good/image_23120a2b-0ead-408a-824a-bc909e9de2b9.jpg
Prediction: Good
Output: tensor([[-4.7580]])
Probability of being defective: 0.01
---
Image: Images/Defective/0-change lego block to purple.png
Prediction: Defective
Output: tensor([[11.0956]])
Probability of being defective: 1.00
---
Image: Images/Defective/1-change lego block to light green.png
Prediction: Defective
Output: tensor([[3.1716]])
Probability of being defective: 0.96
---
