In [25]:
import os
import pandas as pd 
import matplotlib.pyplot as plt 
from PIL import Image 
import csv
import cv2
import data_aug
import importlib

from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
import shutil


Run with trunk kernel

### PREPROCESSING DATA
Trim dataset for single image set to one x3, y3, z3 set per image. This contains the entire dataset

In [19]:
def make_tip_pos_csv(input_csv, output_csv):
    # Check if the output CSV exists and create it with the header if it doesn't
    if not os.path.exists(output_csv):
        header = ['ID', 'x3', 'y3', 'z3', 'img_filename']
        with open(output_csv, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(header)

    # Read the input CSV
    positions_df = pd.read_csv(input_csv)
    
    # Initialize variables to keep track of the last image filename and the last row to write
    last_img_filename = None
    last_output_row = None

    # Process each row in the input CSV
    for index, row in positions_df.iterrows():
        cur_img_filename = row['img_filename']
        img_name = cur_img_filename.removesuffix(".jpg")
        output_row = [row['ID'], row['x3'], row['y3'], row['z3'], cur_img_filename]
        #output_row_augmented = [row['ID'], row['x3'], row['y3'], row['z3'], img_name+'_augmented.jpg']

        # If the current image filename is different from the last, write the last output row
        if last_img_filename is not None and last_img_filename != cur_img_filename:
            with open(output_csv, 'a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow(last_output_row) #non-augmented image
                #writer.writerow(last_output_row_augmented)# augmented image

        # Update the last image filename and last output row
        last_img_filename = cur_img_filename
        last_output_row = output_row
        #last_output_row_augmented = output_row_augmented

    # Write the last row (for the final image in the sequence)
    if last_output_row is not None:
        with open(output_csv, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(last_output_row) # non-augmented image
            #writer.writerow(last_output_row_augmented) # augmented image



In [20]:
data_dir = 'data/single'
input_csv = os.path.join(data_dir, 'single_img_regression.csv')  # replace with your actual CSV filename
output_csv = os.path.join(data_dir, 'single_img_regression_single_tip_pos.csv')
make_tip_pos_csv(input_csv, output_csv)

Plot tip position overlaid on images to verify dataset quality

In [61]:
def convert_to_pillow_coords(df, img_width, img_height):
    """
    Convert robot coordinates to Pillow image coordinates.

    Parameters:
    df (pd.DataFrame): DataFrame containing 'x' and 'z' columns in robot coordinates.
    img_width, img_height (int): Dimensions of the Pillow image.

    Returns:
    pd.DataFrame: DataFrame with 'img_x' and 'img_y' columns for Pillow image coordinates.
    """
    buffer = .01 #m

    # Calculate the min and max values for x and z
    x_min, x_max = df['x'].min() - buffer, df['x'].max() + buffer
    z_min, z_max = df['z'].min() - buffer, df['z'].max() + buffer

    # Calculate the scaling factors for x and z coordinates
    x_scale = img_width / (x_max - x_min)
    z_scale = img_height / (z_max - z_min)

    # Calculate the shifts to center the robot's origin within the image
    x_shift = (x_max + x_min) / 2
    z_shift = (z_max + z_min) / 2

    # Convert robot coordinates to Pillow image coordinates
    df['img_x'] = (df['x'] - x_shift) * x_scale + img_width / 2
    df['img_y'] = (df['z'] - z_shift) * z_scale + img_height / 2

    # Invert the y-axis and x-axis to match Pillow's coordinate system (where (0, 0) is top-left)
    df['img_y'] = img_height - df['img_y']
    df['img_x'] = img_width - df['img_x']

    return df[['img_x', 'img_y']]



In [69]:
def plot_tip_positions_on_images(data_dir):
    """
    Plot tip positions on images based on coordinates from a CSV file.

    Parameters:
    data_dir (str): Directory containing the CSV file and the images.
    """

    # Load the CSV file
    csv_file = os.path.join(data_dir, 'single_img_regression_single_tip_pos.csv')
    positions_df = pd.read_csv(csv_file)

    # Folder containing images
    image_folder = os.path.join(data_dir, 'images')

    # Convert entire DataFrame coordinates to Pillow image coordinates once
    coords_df = positions_df[['x3', 'z3']].rename(columns={'x3': 'x', 'z3': 'z'})

    # Get image dimensions from the first image
    sample_image_filename = positions_df['img_filename'].iloc[0]
    sample_image_path = os.path.join(image_folder, sample_image_filename)
    with Image.open(sample_image_path) as img:
        img_width, img_height = img.size

    # Convert all coordinates to image coordinates
    img_coords_df = convert_to_pillow_coords(coords_df, img_width, img_height)

    # Add converted coordinates to the original DataFrame
    positions_df['img_x'] = img_coords_df['img_x']
    positions_df['img_y'] = img_coords_df['img_y']

    # Iterate through each row to plot points on the corresponding images
    for index, row in positions_df.iterrows():
        image_filename = row['img_filename']
        image_path = os.path.join(image_folder, image_filename)

        # Open the image
        with Image.open(image_path) as img:
            img_width, img_height = img.size

            # Create a figure with the same dimensions as the image
            fig, ax = plt.subplots(figsize=(img_width / 100, img_height / 100), dpi=100)

            # Plot the image
            ax.imshow(img)

            # Plot the tip position for this specific row
            ax.scatter([row['img_x']], [row['img_y']], color='blue', s=200)  # s is the size of the point

            # Remove axes for a cleaner output
            ax.axis('off')

            # Save the image with the original dimensions
            output_filename = os.path.join(data_dir, f"verification/output_{image_filename}")
            plt.savefig(output_filename, bbox_inches='tight', pad_inches=0)
            plt.close(fig)

            print(f"Plotted tip position on {image_filename} and saved as {output_filename}.")


In [70]:
plot_tip_positions_on_images('data/single')

Plotted tip position on sample_0.jpg and saved as data/single/verification/output_sample_0.jpg.
Plotted tip position on sample_1.jpg and saved as data/single/verification/output_sample_1.jpg.
Plotted tip position on sample_2.jpg and saved as data/single/verification/output_sample_2.jpg.
Plotted tip position on sample_3.jpg and saved as data/single/verification/output_sample_3.jpg.
Plotted tip position on sample_4.jpg and saved as data/single/verification/output_sample_4.jpg.
Plotted tip position on sample_5.jpg and saved as data/single/verification/output_sample_5.jpg.
Plotted tip position on sample_6.jpg and saved as data/single/verification/output_sample_6.jpg.
Plotted tip position on sample_7.jpg and saved as data/single/verification/output_sample_7.jpg.
Plotted tip position on sample_8.jpg and saved as data/single/verification/output_sample_8.jpg.
Plotted tip position on sample_9.jpg and saved as data/single/verification/output_sample_9.jpg.
Plotted tip position on sample_10.jpg an

Train-val-test split

Create new csvs for train, val, and test

In [29]:
input_dir = 'data/single'
output_dir = 'data/single/split'
os.makedirs(output_dir, exist_ok=True)
csv_file = os.path.join(input_dir, 'single_img_regression_single_tip_pos.csv')
image_dir = os.path.join(input_dir, 'raw/images')

# Load the original CSV file
df = pd.read_csv(csv_file)

# Split the data (80-10-10)
train_filenames, test_filenames = train_test_split(df['img_filename'].tolist(), test_size=0.1, random_state=42)
train_filenames, val_filenames = train_test_split(train_filenames, test_size=0.12, random_state=42) # 0.12 * 0.9 = 0.1

# Filter the original DataFrame to create train, validation, and test CSVs
train_df = df[df['img_filename'].isin(train_filenames)]
val_df = df[df['img_filename'].isin(val_filenames)]
test_df = df[df['img_filename'].isin(test_filenames)]

# Save the new CSVs
train_csv_path = os.path.join(output_dir, 'train.csv')
val_csv_path = os.path.join(output_dir, 'val.csv')
test_csv_path = os.path.join(output_dir, 'test.csv')

train_df.to_csv(train_csv_path, index=False)
val_df.to_csv(val_csv_path, index=False)
test_df.to_csv(test_csv_path, index=False)

Augment images and update train.csv

In [34]:
# Ensure output directories exist
test_dir = os.path.join(output_dir, 'images/test')
val_dir = os.path.join(output_dir, 'images/val')
train_dir = os.path.join(output_dir, "images/train")
os.makedirs(test_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)

importlib.reload(data_aug)

# Augment the training images and update the CSV
augmented_rows = []
for filename in train_filenames:
    image_path = os.path.join(image_dir, filename)
    augmented_filename = data_aug.augment_image(image_path, train_dir)
    
    # Add the new row for the augmented image to the augmented_rows list
    new_row = train_df[train_df['img_filename'] == filename].copy()
    new_row['img_filename'] = augmented_filename
    augmented_rows.append(new_row)

# Create a new DataFrame for the augmented data and append to train_df
augmented_df = pd.concat(augmented_rows)
final_train_df = pd.concat([train_df, augmented_df])

# Save the updated train CSV
final_train_df.to_csv(train_csv_path, index=False)

# Copy validation and test images and CSVs without augmentation
for filename in val_filenames:
    src_path = os.path.join(image_dir, filename)
    # dst_path = os.path.join(val_dir, filename)
    # shutil.copy(src_path, dst_path)
    data_aug.crop_and_resize(src_path, val_dir)

for filename in test_filenames:
    src_path = os.path.join(image_dir, filename)
    # dst_path = os.path.join(test_dir, filename)
    # shutil.copy(src_path, dst_path)
    data_aug.crop_and_resize(src_path, test_dir)


Augment images

In [26]:
input_dir = 'data/single/raw'
output_dir = 'data/single/augmented'
importlib.reload(data_aug)

for filename in os.listdir(input_dir+"/images"):
    if filename.lower().endswith(('png', 'jpg', 'jpeg')):
        image_path = os.path.join(input_dir+"/images/", filename)
        data_aug.augment_image(image_path, output_dir)

# after cropping, the size of the images are (1080, 1671, 3)


### TRAINING THE MODEL

1. Load the Dataset

In [12]:
class ImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame.iloc[idx, 4])
        image = Image.open(img_name).convert("RGB")
        labels = self.data_frame.iloc[idx, 1:4].values
        labels = labels.astype('float').tolist()

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(labels)

# Dataset parameters
csv_file = 'data/single/single_img_regression_single_tip_pos.csv'
root_dir = 'data/single/augmented/images/'

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)), # image input size to model (224,224)
    # out of memory error with 1080x1080, batch size 32
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load dataset
dataset = ImageDataset(csv_file=csv_file, root_dir=root_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


2. Define the Model

In [13]:
# Check if CUDA is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available. Using GPU.")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Using CPU.")

CUDA is available. Using GPU.


In [14]:
# Load a pre-trained ResNet model
model = models.resnet18(pretrained=True)

# Replace the last fully connected layer to output 3 values (x3, y3, z3)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 3) #output layers is 3-vector

# Move model to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(f"Running on device: {device}")



Running on device: cuda:0


3. Train the model

In [15]:
# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Training loop
num_epochs = 20
for epoch in range(num_epochs):
    running_loss = 0.0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader):.4f}')

print('Finished Training')
# for 224x224 took 5min on mac, took 2min on ceres cpu, took 30s

OutOfMemoryError: CUDA out of memory. Tried to allocate 286.00 MiB. GPU 0 has a total capacity of 11.90 GiB of which 163.12 MiB is free. Including non-PyTorch memory, this process has 11.53 GiB memory in use. Of the allocated memory 11.14 GiB is allocated by PyTorch, and 222.15 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

4. Evaluate the model

In [11]:
# Example prediction on a single image
model.eval()  # Set the model to evaluation mode


# testin on training set (should be able to memorize training data as a first step)
with torch.no_grad():
    for i in range(5):  # Test on the first 5 images
        inputs, labels = dataset[i]
        inputs = inputs.unsqueeze(0).to(device)  # Add batch dimension and move to GPU
        outputs = model(inputs)
        print(f'Predicted: {outputs.cpu().numpy()}, Ground Truth: {labels.numpy()}')

Predicted: [[ 0.07911618 -0.04242505  0.11051759]], Ground Truth: [-0.1637994  -0.22002804 -0.01108289]
Predicted: [[ 0.10839201 -0.05397307  0.09346988]], Ground Truth: [-0.1637994  -0.22002804 -0.01108289]
Predicted: [[ 0.4467522  -0.03234733  0.18067892]], Ground Truth: [ 0.1755904  -0.21333098  0.01334643]
Predicted: [[ 0.41552413 -0.06970471  0.10188471]], Ground Truth: [ 0.1755904  -0.21333098  0.01334643]
Predicted: [[ 0.24142988 -0.02351029 -0.03656781]], Ground Truth: [-0.00449705 -0.18612087 -0.16756463]


In [None]:
#todo: 
# save predicted tip positions, 
# save model outputs,
# display predicted vs actual tip positions