In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
import pandas as pd
import os
from PIL import Image

RuntimeError: module compiled against API version 0x10 but this version of numpy is 0xe

ImportError: numpy.core.multiarray failed to import

In [None]:
# Custom EfficientNetB7-like Implementation
class MBConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, expansion_factor, se_ratio=0.25):
        super(MBConvBlock, self).__init__()
        self.expanded_channels = in_channels * expansion_factor
        self.se_ratio = se_ratio

        self.expand = nn.Conv2d(in_channels, self.expanded_channels, kernel_size=1, bias=False)
        self.expand_bn = nn.BatchNorm2d(self.expanded_channels)
        self.expand_relu = nn.ReLU(inplace=True)

        self.depthwise = nn.Conv2d(self.expanded_channels, self.expanded_channels, kernel_size=kernel_size, stride=stride, padding=kernel_size//2, groups=self.expanded_channels, bias=False)
        self.depthwise_bn = nn.BatchNorm2d(self.expanded_channels)
        self.depthwise_relu = nn.ReLU(inplace=True)

        if se_ratio:
            self.se = nn.Sequential(
                nn.AdaptiveAvgPool2d(1),
                nn.Conv2d(self.expanded_channels, int(self.expanded_channels * se_ratio), kernel_size=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(int(self.expanded_channels * se_ratio), self.expanded_channels, kernel_size=1),
                nn.Sigmoid()
            )
        else:
            self.se = None

        self.project = nn.Conv2d(self.expanded_channels, out_channels, kernel_size=1, bias=False)
        self.project_bn = nn.BatchNorm2d(out_channels)

        self.skip_connection = stride == 1 and in_channels == out_channels

    def forward(self, x):
        identity = x

        x = self.expand(x)
        x = self.expand_bn(x)
        x = self.expand_relu(x)

        x = self.depthwise(x)
        x = self.depthwise_bn(x)
        x = self.depthwise_relu(x)

        if self.se:
            se = self.se(x)
            x = x * se

        x = self.project(x)
        x = self.project_bn(x)

        if self.skip_connection:
            x += identity

        return x

class EfficientNetB7(nn.Module):
    def __init__(self, num_classes=1000):
        super(EfficientNetB7, self).__init__()
        
        self.stem = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
        )
        
        self.blocks = nn.Sequential(
            MBConvBlock(64, 64, kernel_size=3, stride=1, expansion_factor=1),
            MBConvBlock(64, 128, kernel_size=3, stride=2, expansion_factor=6),
            MBConvBlock(128, 128, kernel_size=3, stride=1, expansion_factor=6),
            MBConvBlock(128, 256, kernel_size=3, stride=2, expansion_factor=6),
            MBConvBlock(256, 256, kernel_size=3, stride=1, expansion_factor=6),
            MBConvBlock(256, 512, kernel_size=3, stride=2, expansion_factor=6),
            MBConvBlock(512, 512, kernel_size=3, stride=1, expansion_factor=6),
            MBConvBlock(512, 1024, kernel_size=3, stride=2, expansion_factor=6),
        )
        
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(1024, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.blocks(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [None]:
# Transformations
transform = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.RandomRotate90(p=0.5),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], is_check_shapes=False)

In [None]:
# Custom Dataset
class FloodDataset(Dataset):
    def __init__(self, image_dir, mask_dir, csv_file, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.metadata = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.metadata.iloc[idx, 0])
        mask_name = os.path.join(self.mask_dir, self.metadata.iloc[idx, 1])
        
        image = np.array(Image.open(img_name).convert("RGB"))
        mask = np.array(Image.open(mask_name).convert("L"))
        
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']
        
        mask = mask / 255.0  # Normalize mask to [0, 1]
        return image, mask

In [None]:
# Model
class SegmentationModel(nn.Module):
    def __init__(self, backbone):
        super(SegmentationModel, self).__init__()
        self.backbone = backbone
        self.classifier = nn.Conv2d(1024, 1, kernel_size=1)  # Output channels adjusted
        self.upsample = nn.Upsample(scale_factor=32, mode='bilinear', align_corners=True)

    def forward(self, x):
        features = self.backbone.extract_features(x)
        out = self.classifier(features)
        out = self.upsample(out)
        return out

In [None]:
# Data Loader
image_dir = 'Image'
mask_dir = 'Mask'
csv_file = 'metadata.csv'

dataset = FloodDataset(image_dir, mask_dir, csv_file, transform)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

In [None]:
# Using custom EfficientNetB7 model
backbone = EfficientNetB7(num_classes=1000)  # Use the custom implementation
model = SegmentationModel(backbone)

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)  # Reduced learning rate

num_epochs = 20
checkpoint_dir = './checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

In [None]:
def calculate_accuracy(outputs, masks):
    outputs = torch.sigmoid(outputs)  # Apply sigmoid to get probabilities
    preds = outputs > 0.5  # Convert probabilities to binary predictions
    correct = (preds == masks).float()  # Compare predictions with masks
    accuracy = correct.sum() / correct.numel()  # Calculate accuracy
    return accuracy.item()

def save_checkpoint(epoch, model, optimizer, loss, checkpoint_dir):
    checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch}.pth')
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, checkpoint_path)

In [None]:
for epoch in range(1, num_epochs + 1):
    model.train()
    running_loss = 0.0
    running_accuracy = 0.0
    
    for images, masks in dataloader:
        images = images.to(device)
        masks = masks.to(device).unsqueeze(1).float()

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        running_accuracy += calculate_accuracy(outputs, masks)
    
    epoch_loss = running_loss / len(dataloader)
    epoch_accuracy = running_accuracy / len(dataloader)
    print(f"Epoch {epoch}/{num_epochs}, Loss: {epoch_loss}, Accuracy: {epoch_accuracy}")
    
    # Save checkpoint every 5 epochs
    if epoch % 5 == 0:
        save_checkpoint(epoch, model, optimizer, epoch_loss, checkpoint_dir)
        # Ask user if they want to continue
        continue_training = input(f"Training reached epoch {epoch}. Do you want to continue? (yes/no): ")
        if continue_training.lower() != 'yes':
            print("Training stopped by user.")
            break

print("Training Complete!")