In [None]:
import pandas as pd
import numpy as np
import os
from PIL import Image
import requests
from io import BytesIO
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
#start here
final_df = pd.read_csv('nrcan_dataset.csv')

In [None]:
import os
import glob
import pandas as pd
import random
from sklearn.model_selection import train_test_split

def find_image_file_improved(image_id, base_dirs=None):
    if base_dirs is None:
        # Use absolute paths based on your directory structure
        current_dir = '/Users/naeemujeeb/developement/MartimeHabitatPrediction/nrcan-20'
        base_dirs = [
            os.path.join(current_dir, 'nrcan-2000047'),
            os.path.join(current_dir, 'nrcan-2013002PGC'),
            os.path.join(current_dir, 'nrcan-2003054')
        ]

    # Strip any file extension from the image_id if it exists
    image_id_base = os.path.splitext(image_id)[0]

    # Try multiple potential file extensions
    extensions = ['.jpg', '.jpeg', '.png', '.tif', '.tiff']

    for base_dir in base_dirs:
        # Try to find the image with a glob pattern
        # This handles files directly in base_dir or any level of subdirectories
        for ext in extensions:
            # Try exact match first
            pattern = os.path.join(base_dir, '**', f"{image_id_base}{ext}")
            matches = glob.glob(pattern, recursive=True)

            # If found, return the first match
            if matches:
                return matches[0]

    # If the image is not found in any directory
    return None

In [None]:
print(final_df['substrate_class'].value_counts())

In [None]:
# Filter for just the two classes you want
binary_df = final_df[final_df['catami_substrate'].str.contains('Gravel \(2-10mm\)|Consolidated \(hard\)')]

# Create binary labels
binary_df['label'] = (binary_df['catami_substrate'].str.contains('Gravel \(2-10mm\)')).astype(int)

# Balance the dataset
gravel_samples = binary_df[binary_df['label'] == 1]
consolidated_samples = binary_df[binary_df['label'] == 0]

# Take the smaller size to ensure balance
sample_size = min(len(gravel_samples), len(consolidated_samples))
balanced_gravel = gravel_samples.sample(sample_size, random_state=42)
balanced_consolidated = consolidated_samples.sample(sample_size, random_state=42)

# Combine and shuffle
balanced_df = pd.concat([balanced_gravel, balanced_consolidated])
balanced_df = balanced_df.sample(frac=1, random_state=42).reset_index(drop=True)



In [None]:
from sklearn.model_selection import train_test_split

# Split into train (70%), validation (15%), and test (15%)
train_df, temp_df = train_test_split(balanced_df, test_size=0.3, stratify=balanced_df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df['label'], random_state=42)

In [None]:
import pandas as pd
import numpy as np
import os
from PIL import Image
import requests
from io import BytesIO
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
class SubstrateDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = self.dataframe.iloc[idx]['image']
        img_path = find_image_file_improved(img_name)

        if img_path is None:
            raise FileNotFoundError(f"Could not find image file for {img_name}")

        image = Image.open(img_path).convert('RGB')
        label = self.dataframe.iloc[idx]['label']

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Create datasets
train_dataset = SubstrateDataset(train_df, transform=train_transform)
val_dataset = SubstrateDataset(val_df, transform=val_test_transform)
test_dataset = SubstrateDataset(test_df, transform=val_test_transform)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)
test_loader = DataLoader(test_dataset, batch_size=16)



In [None]:
import torch.nn as nn
import torchvision.models as models

# Set up device
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")

# Create model
def create_model(num_classes=2):
    model = models.resnet50(weights='IMAGENET1K_V2')

    # Freeze early layers
    for param in list(model.parameters())[:-20]:
        param.requires_grad = False

    # Replace final layer
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, num_classes)

    return model

# Create and move model to device
model = create_model().to(device)


In [None]:
import torch.optim as optim
from tqdm import tqdm

# Set up training components
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

# Training loop
num_epochs = 10
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Train
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Statistics
        train_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_loss = train_loss / total
    train_acc = 100 * correct / total

    # Validate
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc="Validating"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss = val_loss / total
    val_acc = 100 * correct / total

    # Print statistics
    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

    # Learning rate scheduler
    scheduler.step(val_loss)

    # Save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_substrate_model.pth')
        print("Model saved!")



In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Load best model
model.load_state_dict(torch.load('best_substrate_model.pth'))

# Evaluate on test set
model.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Convert to numpy arrays
y_true = np.array(y_true)
y_pred = np.array(y_pred)

# Print classification report
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=['Consolidated (hard)', 'Gravel (2-10mm)']))

# Create confusion matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
           xticklabels=['Consolidated (hard)', 'Gravel (2-10mm)'],
           yticklabels=['Consolidated (hard)', 'Gravel (2-10mm)'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.show()




In [None]:
def visualize_predictions(model, test_loader, num_samples=5):
    """
    Visualize example test images with their true and predicted classifications
    """
    # Get batch of test images
    dataiter = iter(test_loader)
    images, labels = next(dataiter)

    # Get predictions
    model.eval()
    with torch.no_grad():
        outputs = model(images.to(device))
        _, preds = torch.max(outputs, 1)

    # Convert to numpy for visualization
    images = images.cpu().numpy()
    labels = labels.cpu().numpy()
    preds = preds.cpu().numpy()

    # Class names for readability
    class_names = ['Consolidated (hard)', 'Gravel (2-10mm)']

    # Plot images with labels
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 5))

    for i in range(num_samples):
        # Convert image from tensor to displayable format
        img = np.transpose(images[i], (1, 2, 0))
        img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
        img = np.clip(img, 0, 1)

        # Display image and labels
        axes[i].imshow(img)
        title_color = 'green' if preds[i] == labels[i] else 'red'
        axes[i].set_title(f"True: {class_names[labels[i]]}\nPred: {class_names[preds[i]]}",
                          color=title_color)
        axes[i].axis('off')

    plt.tight_layout()
    plt.savefig("substrate_predictions.png")
    plt.show()

# Call the function to visualize example predictions
visualize_predictions(model, test_loader)


In [None]:
def show_class_examples(model, test_loader, class_names=['Consolidated (hard)', 'Gravel (2-10mm)']):
    """
    Show examples of correct and incorrect predictions for each class
    """
    # Get the device that the model is on
    device = next(model.parameters()).device

    # Collect examples
    correct_examples = [[] for _ in range(len(class_names))]
    incorrect_examples = [[] for _ in range(len(class_names))]

    model.eval()
    with torch.no_grad():
        for images, labels in test_loader:
            # Move both images and labels to the same device as the model
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            # Move back to CPU for numpy conversion
            images_np = images.cpu().numpy()
            labels_np = labels.cpu().numpy()
            preds_np = preds.cpu().numpy()

            for i in range(len(labels_np)):
                true_class = labels_np[i]

                # Store the example in the appropriate list
                if preds_np[i] == true_class:
                    correct_examples[true_class].append((images_np[i], true_class, preds_np[i]))
                else:
                    incorrect_examples[true_class].append((images_np[i], true_class, preds_np[i]))

                # Break if we have enough examples
                enough_examples = all([len(examples) >= 2 for examples in correct_examples]) and \
                                  all([len(examples) >= 2 for examples in incorrect_examples])
                if enough_examples:
                    break

    # Create visualization
    fig, axes = plt.subplots(len(class_names), 4, figsize=(16, 8))

    for class_idx in range(len(class_names)):
        # Correct examples (up to 2)
        for i in range(min(2, len(correct_examples[class_idx]))):
            img, true_class, pred_class = correct_examples[class_idx][i]
            img = np.transpose(img, (1, 2, 0))
            img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
            img = np.clip(img, 0, 1)

            axes[class_idx, i].imshow(img)
            axes[class_idx, i].set_title(f"True: {class_names[true_class]}\nPred: {class_names[pred_class]}",
                                          color='green')
            axes[class_idx, i].axis('off')

        # Incorrect examples (up to 2)
        for i in range(min(2, len(incorrect_examples[class_idx]))):
            img, true_class, pred_class = incorrect_examples[class_idx][i]
            img = np.transpose(img, (1, 2, 0))
            img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
            img = np.clip(img, 0, 1)

            axes[class_idx, i+2].imshow(img)
            axes[class_idx, i+2].set_title(f"True: {class_names[true_class]}\nPred: {class_names[pred_class]}",
                                          color='red')
            axes[class_idx, i+2].axis('off')

    plt.tight_layout()
    plt.savefig("substrate_class_examples.png")
    plt.show()

# Call the function to visualize examples for each class
def run_class_examples():
    # Initialize model and move it to the correct device
    device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = models.resnet50(weights='IMAGENET1K_V2')

    # Make sure to move model to the same device as your data
    model = model.to(device)

    # Replace fc layer for binary classification
    num_features = model.fc.in_features
    model.fc = torch.nn.Linear(num_features, 2)
    model = model.to(device)

    # Load your trained weights if available
    try:
        model.load_state_dict(torch.load('final_substrate_model.pth', map_location=device))
        print("Loaded trained model weights")
    except:
        print("No trained weights found, using untrained model")

    show_class_examples(model, test_loader)
run_class_examples()



In [None]:
def train_with_params(learning_rate, batch_size, freeze_layers):
    """Train model with different hyperparameters and return validation accuracy"""
    # Create new model
    model = models.resnet50(weights='IMAGENET1K_V2')

    # Freeze layers according to parameter
    for param in list(model.parameters())[:-freeze_layers]:
        param.requires_grad = False

    # Replace final layer
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, 2)
    model = model.to(device)

    # Create dataloaders with specified batch size
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Set up optimizer with specified learning rate
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    # Train for 5 epochs
    for epoch in range(5):
        # Train
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # Validate
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_acc = 100 * correct / total

    return val_acc

# Test different hyperparameter combinations
results = []
learning_rates = [0.01, 0.001, 0.0001]
batch_sizes = [8, 16, 32]
freeze_layers = [10, 20, 50]

for lr in learning_rates:
    for bs in batch_sizes:
        for fl in freeze_layers:
            print(f"Testing: LR={lr}, BS={bs}, Freeze={fl}")
            val_acc = train_with_params(lr, bs, fl)
            results.append({
                'learning_rate': lr,
                'batch_size': bs,
                'freeze_layers': fl,
                'val_accuracy': val_acc
            })
            print(f"Validation Accuracy: {val_acc:.2f}%")

# Convert results to DataFrame for analysis
results_df = pd.DataFrame(results)
print(results_df.sort_values('val_accuracy', ascending=False).head())

# Visualize hyperparameter effects
plt.figure(figsize=(15, 5))

# Learning rate effect
plt.subplot(1, 3, 1)
sns.barplot(x='learning_rate', y='val_accuracy', data=results_df)
plt.title('Effect of Learning Rate')
plt.xlabel('Learning Rate')
plt.ylabel('Validation Accuracy (%)')

# Batch size effect
plt.subplot(1, 3, 2)
sns.barplot(x='batch_size', y='val_accuracy', data=results_df)
plt.title('Effect of Batch Size')
plt.xlabel('Batch Size')
plt.ylabel('Validation Accuracy (%)')

# Freeze layers effect
plt.subplot(1, 3, 3)
sns.barplot(x='freeze_layers', y='val_accuracy', data=results_df)
plt.title('Effect of Frozen Layers')
plt.xlabel('Number of Frozen Layers')
plt.ylabel('Validation Accuracy (%)')

plt.tight_layout()
plt.savefig('hyperparameter_effects.png')
plt.show()

# commentgit statu