**Data Transformation**

In [1]:
import os
import sys

# Add the project root to the Python path
# This allows us to import our custom modules
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from src.components.data_ingestion import DataIngestion
from src.exception import CustomException

# Get the structured data paths
try:
    ingestion = DataIngestion()
    data_paths = ingestion.get_data_paths()
    print(f"Found {len(data_paths)} data points.")
except Exception as e:
    raise CustomException(e, sys)

Found 51288 data points.


In [2]:
from src.components.dataset import YCBVideoDataset
from torch.utils.data import DataLoader # Add this line
from torchvision import transforms


# Define the transformations for the input image
image_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224), antialias=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Use only the first 5000 samples for a quick test run
num_samples_for_dev = 5000
development_data_paths = data_paths[:num_samples_for_dev]

# Create an instance of our custom dataset
ycb_dataset = YCBVideoDataset(data_paths=development_data_paths, transform=image_transform)

# Create the DataLoader
batch_size = 16
data_loader = DataLoader(ycb_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

# Let's test it by getting one batch
data = next(iter(data_loader))
images, rotations, translations = data['image'], data['rotation'], data['translation']

# Print the shape of the batch
print(f"Image batch shape: {images.shape}")
print(f"Rotation batch shape: {rotations.shape}")
print(f"Translation batch shape: {translations.shape}")

Image batch shape: torch.Size([16, 3, 224, 224])
Rotation batch shape: torch.Size([16, 6])
Translation batch shape: torch.Size([16, 3])


In [3]:
import torch
import torch.nn as nn
import torchvision.models as models

class PoseNet(nn.Module):
    def __init__(self, pretrained=True):
        super(PoseNet, self).__init__()
        
        # 1. Load a pre-trained ResNet50 model
        self.backbone = models.resnet50(pretrained=pretrained)
        
        # 2. Isolate the feature-extracting layers (remove the final classification layer)
        # The output of this will be 2048 features for ResNet50
        num_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity() # Replace the final layer with an identity layer

        # 3. Define the regression head
        # This common head will process the features before splitting into two predictions
        self.head = nn.Sequential(
            nn.Linear(num_features, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU()
        )

        # 4. Define the two output layers: one for translation, one for rotation
        self.fc_translation = nn.Linear(512, 3) # Predicts 3 values for (x, y, z)
        
        # We will predict a 6D continuous representation for rotation, which is more stable
        # for deep learning than other formats like quaternions or Euler angles.
        self.fc_rotation = nn.Linear(512, 6) 

    def forward(self, x):
        # Pass the image through the backbone to get features
        features = self.backbone(x)
        
        # Process features through the common head
        head_output = self.head(features)
        
        # Predict translation and rotation from the head's output
        translation = self.fc_translation(head_output)
        rotation_6d = self.fc_rotation(head_output)
        
        return translation, rotation_6d

# --- Let's test the model with a dummy input ---
# Instantiate the model
model = PoseNet()

# Create a dummy batch of images (like one from our DataLoader)
# Batch size = 16, 3 color channels, 224x224 image size
dummy_images = torch.randn(16, 3, 224, 224)

# Get the model's predictions
pred_translation, pred_rotation = model(dummy_images)

# Print the shapes of the predictions
print("Model created successfully!")
print(f"Predicted Translation Shape: {pred_translation.shape}") # Should be [16, 3]
print(f"Predicted Rotation Shape: {pred_rotation.shape}")      # Should be [16, 6]



Model created successfully!
Predicted Translation Shape: torch.Size([16, 3])
Predicted Rotation Shape: torch.Size([16, 6])


In [4]:
# Define the loss function
# We use Mean Squared Error as we are predicting continuous values (regression)
loss_function = nn.MSELoss()

# Define the optimizer
# Adam is a popular choice that works well for many problems
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# A simple test to see how the loss is calculated on our dummy data
# Create dummy ground truth labels
true_translation = torch.randn(16, 3)
true_rotation = torch.randn(16, 6)

# Calculate the loss for each output
loss_trans = loss_function(pred_translation, true_translation)
loss_rot = loss_function(pred_rotation, true_rotation)

# The total loss is the sum of the individual losses
total_loss = loss_trans + loss_rot

print(f"Loss on dummy data:")
print(f"  Translation Loss: {loss_trans.item()}")
print(f"  Rotation Loss: {loss_rot.item()}")
print(f"  Total Loss: {total_loss.item()}")

Loss on dummy data:
  Translation Loss: 1.3962674140930176
  Rotation Loss: 1.0647740364074707
  Total Loss: 2.4610414505004883


In [7]:
import torch # Ensure torch is imported in this cell

if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}") # Add this line to verify

model.to(device)

# Set the model to training mode
model.train()

# Define the number of epochs (how many times we'll loop through the entire dataset)
num_epochs = 10

print("Starting model training...")

# Loop over the dataset for the specified number of epochs
for epoch in range(num_epochs):
    
    # Keep track of the total loss for this epoch
    running_loss = 0.0
    
    # Loop over each batch of data from the DataLoader
    for i, data in enumerate(data_loader):
        
        # 1. Get the inputs and true labels from the data batch
        # To
        images = data['image'].to(device).float()
        true_translations = data['translation'].to(device).float()
        true_rotations = data['rotation'].to(device).float()            

        # 2. Zero the parameter gradients
        # This is important to do on every batch
        optimizer.zero_grad()

        # 3. Forward pass: get the model's predictions
        pred_translations, pred_rotations = model(images)

        # 4. Calculate the loss
        loss_trans = loss_function(pred_translations, true_translations)
        loss_rot = loss_function(pred_rotations, true_rotations)
        total_loss = loss_trans + loss_rot

        # 5. Backward pass: compute the gradient of the loss
        total_loss.backward()

        # 6. Update the model's weights
        optimizer.step()

        # 7. Print statistics
        running_loss += total_loss.item()
        if (i + 1) % 100 == 0: # Print every 100 batches
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(data_loader)}], Loss: {total_loss.item():.4f}')

    # Print the average loss for the epoch
    avg_epoch_loss = running_loss / len(data_loader)
    print(f'--- End of Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_epoch_loss:.4f} ---')


print('Finished Training')
# After training, you would save the model's weights
# torch.save(model.state_dict(), 'posenet_model.pth')

Using device: mps
Starting model training...
Epoch [1/10], Step [100/313], Loss: 0.0207
Epoch [1/10], Step [200/313], Loss: 0.0232
Epoch [1/10], Step [300/313], Loss: 0.0176
--- End of Epoch [1/10], Average Loss: 0.0439 ---
Epoch [2/10], Step [100/313], Loss: 0.0133
Epoch [2/10], Step [200/313], Loss: 0.0134
Epoch [2/10], Step [300/313], Loss: 0.0105
--- End of Epoch [2/10], Average Loss: 0.0142 ---
Epoch [3/10], Step [100/313], Loss: 0.0134
Epoch [3/10], Step [200/313], Loss: 0.0170
Epoch [3/10], Step [300/313], Loss: 0.0087
--- End of Epoch [3/10], Average Loss: 0.0115 ---
Epoch [4/10], Step [100/313], Loss: 0.0084
Epoch [4/10], Step [200/313], Loss: 0.0082
Epoch [4/10], Step [300/313], Loss: 0.0110
--- End of Epoch [4/10], Average Loss: 0.0104 ---
Epoch [5/10], Step [100/313], Loss: 0.0068
Epoch [5/10], Step [200/313], Loss: 0.0124
Epoch [5/10], Step [300/313], Loss: 0.0097
--- End of Epoch [5/10], Average Loss: 0.0096 ---
Epoch [6/10], Step [100/313], Loss: 0.0063
Epoch [6/10], Ste

In [8]:
torch.save(model.state_dict(), 'posenet_model.pth')