In [12]:
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch import optim
from PIL import Image
import os
import csv
import numpy as np

# Define path variables
base_path = "C:\\Users\\leo_b\\OneDrive\\Documentos\\Projects\\personal_projects\\CCC38_2023\\"

train_data_folder = os.path.join(base_path, 'train_data')
train_label_path = os.path.join(base_path, 'train_data_labels.csv')  # Assuming CSV has 'x,y' for each image pair
test_data_folder = os.path.join(base_path, 'test_data')

# Combine the two cloudy images into a single clearer image
def combine_images(img_path1, img_path2):
    img1 = np.array(Image.open(img_path1).convert('RGB'))
    img2 = np.array(Image.open(img_path2).convert('RGB'))
    combined_img = np.clip(img1 + img2, 0, 255)
    return Image.fromarray(combined_img.astype('uint8'), 'RGB')

# Load the training data
with open(train_label_path, 'r') as f:
    reader = csv.reader(f)
    labels = [list(map(int, row[0].split(','))) for row in reader]

# Process training images
train_image_filenames = sorted(os.listdir(train_data_folder))
train_image_paths = [(os.path.join(train_data_folder, train_image_filenames[i]), os.path.join(train_data_folder, train_image_filenames[i+1])) for i in range(0, len(train_image_filenames), 2)]
train_images = [combine_images(img_pair[0], img_pair[1]) for img_pair in train_image_paths]

# Data loading
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        
        if self.transform:
            image = self.transform(image)

        return image, label

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Split data into training and validation (80-20 split)
split_idx = int(0.8 * len(train_images))
train_dataset = CustomDataset(train_images[:split_idx], labels[:split_idx], transform=transform)
val_dataset = CustomDataset(train_images[split_idx:], labels[split_idx:], transform=transform)

train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=16)

# Model (using MobileNetV2 as a regression model)
model = torch.hub.load('pytorch/vision:v0.6.0', 'mobilenet_v2', pretrained=True)
model.classifier[1] = torch.nn.Linear(model.last_channel, 2)  # Predicting two values, x and y coordinates

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Loss and optimizer
criterion = torch.nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-5)

# Training loop
num_epochs = 60
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f}")

# Evaluation on validation set
model.eval()
all_predictions = []
all_labels = []
for images, labels in val_loader:
    images, labels = images.to(device), labels.to(device)
    with torch.no_grad():
        outputs = model(images)
    all_predictions.append(outputs.cpu().numpy())
    all_labels.append(labels.cpu().numpy())

all_predictions = np.vstack(all_predictions)
all_labels = np.vstack(all_labels)
mse_distance = np.mean(np.sum((all_predictions - all_labels)**2, axis=1))
print(f"Mean Squared Euclidean Distance on validation set: {mse_distance}")

# If this is within acceptable limits, you can then proceed to generate predictions for the test set.

Downloading: "https://github.com/pytorch/vision/zipball/v0.6.0" to C:\Users\leo_b/.cache\torch\hub\v0.6.0.zip
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/10 - Loss: 12162.2582
Epoch 2/10 - Loss: 11971.6738
Epoch 3/10 - Loss: 11742.2716
Epoch 4/10 - Loss: 11400.5501
Epoch 5/10 - Loss: 10953.1561
Epoch 6/10 - Loss: 10534.6061
Epoch 7/10 - Loss: 10177.7531
Epoch 8/10 - Loss: 9886.1740
Epoch 9/10 - Loss: 9584.3437
Epoch 10/10 - Loss: 9323.1285
Mean Squared Euclidean Distance on validation set: 17721.2734375


In [16]:
# Process test images
test_image_filenames = sorted(os.listdir(test_data_folder))
test_image_paths = [(os.path.join(test_data_folder, test_image_filenames[i]), os.path.join(test_data_folder, test_image_filenames[i+1])) for i in range(0, len(test_image_filenames), 2)]
test_images = [combine_images(img_pair[0], img_pair[1]) for img_pair in test_image_paths]

# Create a DataLoader for test images
test_dataset = CustomDataset(test_images, [0]*len(test_images), transform=transform)  # labels are not used
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)

# Predict on test set
model.eval()
test_predictions = []

for images, _ in test_loader:  # we don't need labels for test data
    images = images.to(device)
    with torch.no_grad():
        outputs = model(images)
    test_predictions.append(outputs.cpu().numpy())

test_predictions = np.vstack(test_predictions)

# Save the predictions to a CSV file
with open('test_predictions.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['x', 'y'])
    writer.writerows(test_predictions)

print("Test predictions saved to test_predictions.csv")

Test predictions saved to test_predictions.csv


In [25]:
print(len(labels), len(train_images), len(train_image_paths))

499 500 500


In [26]:
import torch
import torchvision.transforms as transforms
from torchvision.models import mobilenet_v2
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import pandas as pd
import os

# Define path variables
base_path = "C:\\Users\\leo_b\\OneDrive\\Documentos\\Projects\\personal_projects\\CCC38_2023\\"

train_data_folder = os.path.join(base_path, 'train_data')
train_label_path = os.path.join(base_path, 'train_data_labels.csv')  # Assuming CSV has 'x,y' for each image pair
test_data_folder = os.path.join(base_path, 'test_data')



# Load training labels (assuming CSV has 'x', 'y' columns)
df_labels = pd.read_csv(train_label_path)
print(df_labels.columns)
df_labels.columns = ['x', 'y']
labels = df_labels[['x', 'y']].values
labels = df_labels[['x', 'y']].values

# Combine images and pre-process
def combine_images(path1, path2):
    img1 = Image.open(path1).convert('RGB')
    img2 = Image.open(path2).convert('RGB')
    
    combined_img = np.maximum(np.array(img1), np.array(img2))
    return Image.fromarray(combined_img.astype('uint8'), 'RGB')

train_image_filenames = sorted(os.listdir(train_data_folder))
train_image_paths = [(os.path.join(train_data_folder, train_image_filenames[i]), os.path.join(train_data_folder, train_image_filenames[i+1])) for i in range(0, len(train_image_filenames)-1, 2)]
train_images = [combine_images(img_pair[0], img_pair[1]) for img_pair in train_image_paths]

# Custom dataset
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx-1]
        label = torch.tensor(self.labels[idx-1], dtype=torch.float32)
        
        if self.transform:
            image = self.transform(image)

        return image, label

# Data augmentation and normalization
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset = CustomDataset(train_images, labels, transform=transform)

# Splitting data into train and validation subsets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=16)

# Model: Using MobileNetV2 and adapting last layer for regression
model = mobilenet_v2(pretrained=True)
model.classifier[1] = torch.nn.Linear(model.last_channel, 2)  # 2 output neurons for x, y coordinates

# Training setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
criterion = torch.nn.MSELoss()  # Regression task, hence MSE
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
num_epochs = 10

# Training the model
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        
        for images, coords in train_loader:
            images, coords = images.to(device), coords.to(device)
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward
            outputs = model(images)
            loss = criterion(outputs, coords)
            
            # Backward and optimize
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        
        # Validate on the validation set
        model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        with torch.no_grad():
            for images, coords in val_loader:
                images, coords = images.to(device), coords.to(device)
                outputs = model(images)
                loss = criterion(outputs, coords)
                val_loss += loss.item() * images.size(0)

        val_epoch_loss = val_loss / len(val_loader.dataset)

        print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {epoch_loss:.4f}, Validation Loss: {val_epoch_loss:.4f}")

    print("Finished Training")

train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=num_epochs)

# Evaluate the performance
def compute_mean_squared_euclidean_distance(pred, true):
    return ((pred - true) ** 2).sum(1).mean().item()

model.eval()
total_msed = 0.0
with torch.no_grad():
    for images, coords in val_loader:
        images, coords = images.to(device), coords.to(device)
        outputs = model(images)
        msed = compute_mean_squared_euclidean_distance(outputs, coords)
        total_msed += msed * images.size(0)

avg_msed = total_msed / len(val_loader.dataset)
print(f"Mean Squared Euclidean Distance on Validation set: {avg_msed:.4f}")



Index(['67', '140'], dtype='object')




Epoch 1/10 - Training Loss: 11932.4604, Validation Loss: 12302.6286
Epoch 2/10 - Training Loss: 11762.5370, Validation Loss: 12133.9271
Epoch 3/10 - Training Loss: 11590.9665, Validation Loss: 12011.3538
Epoch 4/10 - Training Loss: 11423.7551, Validation Loss: 11825.2613
Epoch 5/10 - Training Loss: 11236.3523, Validation Loss: 11574.1909
Epoch 6/10 - Training Loss: 11038.6578, Validation Loss: 11507.8681
Epoch 7/10 - Training Loss: 10828.7107, Validation Loss: 11203.0730
Epoch 8/10 - Training Loss: 10606.0601, Validation Loss: 11066.4394
Epoch 9/10 - Training Loss: 10388.6463, Validation Loss: 11142.0090
Epoch 10/10 - Training Loss: 10170.5149, Validation Loss: 10598.9123
Finished Training
Mean Squared Euclidean Distance on Validation set: 21197.6589


IndexError: index 499 is out of bounds for axis 0 with size 499

In [29]:
# Combine and preprocess test images
test_image_filenames = sorted(os.listdir(test_data_folder))
test_image_paths = [(os.path.join(test_data_folder, test_image_filenames[i]), os.path.join(test_data_folder, test_image_filenames[i+1])) for i in range(0, len(test_image_filenames)-1, 2)]
test_images = [combine_images(img_pair[0], img_pair[1]) for img_pair in test_image_paths]

# Using the same custom dataset but without labels for test images
class TestDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = images
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        if self.transform:
            image = self.transform(image)
        return image

test_dataset = TestDataset(test_images, transform=transform)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)

# Get predictions on the test set
model.eval()  # Set model to evaluation mode
test_predictions = []

with torch.no_grad():
    for images in test_loader:
        images = images.to(device)
        outputs = model(images)
        test_predictions.append(outputs.cpu().numpy())

# Flatten list of arrays into one array
test_predictions = np.concatenate(test_predictions, axis=0)

# Pass test_predictions to int with no decimal cases
test_predictions = test_predictions.astype(int)

# Pass negatives to positives
test_predictions[test_predictions < 0] = 1

# Optionally, save predictions to a CSV file
save_path = os.path.join(base_path, 'test_predictions.csv')
pd.DataFrame(test_predictions, columns=['x', 'y']).to_csv(save_path, index=False)

print("Test predictions saved to test_predictions.csv")


Test predictions saved to test_predictions.csv
