In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os



class FCN(nn.Module):
    def __init__(self, num_classes):
        super(FCN, self).__init__()

        # Encoder (downsampling)
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # Add more convolutional layers as needed
        )

        # Bottleneck (no pooling or downsampling)
        self.bottleneck = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
        )

        # Decoder (upsampling)
        self.decoder = nn.Sequential(
            # Add more layers based on the encoder
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(64, num_classes, kernel_size=1)
        )

    def forward(self, x):
        # Forward pass through the encoder
        x1 = self.encoder(x)

        # Forward pass through the bottleneck
        x2 = self.bottleneck(x1)

        # Forward pass through the decoder
        x3 = self.decoder(x2)

        return x3

# Instantiate the model
num_classes = 1  # Assuming binary segmentation (flood or non-flood)
fcn_model = FCN(num_classes)

# Print the model architecture
print(fcn_model)

import torch

# Check if GPU is available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU is not available, using CPU")

fcn_model.to(device)

transform=transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485,0.456,0.406],std=[0.229,0.224,0.225])
])

class FloodDataset(torch.utils.data.Dataset):
  def __init__(self,root_dir,transform=None):
    self.root_dir=root_dir
    self.image_folder=os.path.join(root_dir,"Image")
    self.mask_folder=os.path.join(root_dir,"Mask")
    self.image_paths=sorted([os.path.join(self.image_folder,img) for img in os.listdir(self.image_folder)])
    self.mask_paths = sorted([os.path.join(self.mask_folder, mask) for mask in os.listdir(self.mask_folder)])
    self.transform=transform

  def __len__(self):
    return(len(self.image_paths))

  def __getitem__(self,idx):
    image_path=self.image_paths[idx]
    mask_path=self.mask_paths[idx]

    image=Image.open(image_path).convert("RGB")
    mask=Image.open(mask_path).convert("L")
    if self.transform:
      image=self.transform(image)
      mask=self.transform(mask)

    return image,mask
class FloodDataset(torch.utils.data.Dataset):
  def __init__(self,root_dir,transform=None):
    self.root_dir=root_dir
    self.image_folder=os.path.join(root_dir,"Image")
    self.mask_folder=os.path.join(root_dir,"Mask")
    self.image_paths=sorted([os.path.join(self.image_folder,img) for img in os.listdir(self.image_folder)])
    self.mask_paths = sorted([os.path.join(self.mask_folder, mask) for mask in os.listdir(self.mask_folder)])
    self.transform=transform

  def __len__(self):
    return(len(self.image_paths))

  def __getitem__(self,idx):
    image_path=self.image_paths[idx]
    mask_path=self.mask_paths[idx]

    image=Image.open(image_path).convert("RGB")
    mask=Image.open(mask_path).convert("L")
    if self.transform:
      image=self.transform(image)
      mask=self.transform(mask)

    return image,mask

flood_dataset = FloodDataset(root_dir=r"/content/drive/MyDrive/deeplearnig/dataset", transform=transform)

train_loader = DataLoader(flood_dataset, batch_size=4, shuffle=True)

# Instantiate the FCN model
num_classes = 1  # Assuming binary segmentation (flood or non-flood)
fcn_model = FCN(num_classes)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(fcn_model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    fcn_model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = fcn_model(images)

        # BCEWithLogitsLoss handles sigmoid activation internally, no need to normalize masks
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}')

FCN(
  (encoder): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (bottleneck): Sequential(
    (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
  )
  (decoder): Sequential(
    (0): Conv2d(128, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Upsample(scale_factor=2.0, mode='bilinear')
    (3): Conv2d(64, 1, kernel_size=(1, 1), stride=(1, 1))
  )
)
GPU is available


RuntimeError: ignored

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os

class FCN(nn.Module):
    def __init__(self, num_classes):
        super(FCN, self).__init__()

        # Encoder (downsampling)
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),

            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),

            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),

            nn.MaxPool2d(kernel_size=2, stride=2),

            # Add more convolutional layers as needed
        )

        # Bottleneck (no pooling or downsampling)
        self.bottleneck = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),

            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
        )

        # Decoder (upsampling)
        self.decoder = nn.Sequential(
            # Add more layers based on the encoder
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),

            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),

            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),

            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),

            nn.Conv2d(64, num_classes, kernel_size=1)
        )

    def forward(self, x):
        # Forward pass through the encoder
        x1 = self.encoder(x)

        # Forward pass through the bottleneck
        x2 = self.bottleneck(x1)

        # Forward pass through the decoder
        x3 = self.decoder(x2)

        return x3

# Instantiate the model
num_classes = 1  # Assuming binary segmentation (flood or non-flood)
fcn_model = FCN(num_classes)

# Print the model architecture
print(fcn_model)

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Move the model to the specified device
fcn_model.to(device)

# Define transformations for images and masks
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

FCN(
  (encoder): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): ReLU(inplace=True)
    (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): ReLU(inplace=True)
    (13): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (bottleneck): Sequen

In [None]:

class FloodDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.image_folder = os.path.join(root_dir, "Image")
        self.mask_folder = os.path.join(root_dir, "Mask")
        self.image_paths = sorted([os.path.join(self.image_folder, img) for img in os.listdir(self.image_folder)])
        self.mask_paths = sorted([os.path.join(self.mask_folder, mask) for mask in os.listdir(self.mask_folder)])
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        mask_path = self.mask_paths[idx]

        image = Image.open(image_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")

        if self.transform:
            image = self.transform(image)
            mask = self.transform(mask)

        return image, mask

# Create the dataset and dataloader
flood_dataset = FloodDataset(root_dir="/content/drive/MyDrive/deeplearnig/dataset", transform=transform)
train_loader = DataLoader(flood_dataset, batch_size=4, shuffle=True)



In [None]:
train_size = int(0.8 * len(flood_dataset))
test_size = len(flood_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(flood_dataset, [train_size, test_size])

In [None]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(fcn_model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    fcn_model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = fcn_model(images)

        # BCEWithLogitsLoss handles sigmoid activation internally, no need to normalize masks
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}')



Epoch 1/10, Loss: 0.3581
Epoch 2/10, Loss: 0.3651
Epoch 3/10, Loss: 0.3919
Epoch 4/10, Loss: 0.4163
Epoch 5/10, Loss: 0.3016
Epoch 6/10, Loss: 0.3317
Epoch 7/10, Loss: 0.6133
Epoch 8/10, Loss: 0.2332
Epoch 9/10, Loss: 0.5030
Epoch 10/10, Loss: 0.3008


In [None]:
import torch

def calculate_accuracy(predictions, targets, threshold=0.5):
    """
    Calculate accuracy based on predictions and ground truth masks.

    Args:
    - predictions (torch.Tensor): Model predictions (logits)
    - targets (torch.Tensor): Ground truth masks
    - threshold (float): Threshold for converting logits to binary predictions

    Returns:
    - accuracy (float): Accuracy value
    """
    with torch.no_grad():
        binary_predictions = (torch.sigmoid(predictions) > threshold).float()
        correct_predictions = (binary_predictions == targets).float()

        accuracy = correct_predictions.sum() / (targets.numel() + 1e-10)  # Add a small value to avoid division by zero

    return accuracy.item()

# Example of how to use the accuracy function in your training loop
for epoch in range(num_epochs):
    fcn_model.train()
    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = fcn_model(images)

        # BCEWithLogitsLoss handles sigmoid activation internally, no need to normalize masks
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        accuracy = calculate_accuracy(outputs, masks)
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, Accuracy: {accuracy:.4f}')


Epoch 1/10, Loss: 0.3667, Accuracy: 0.7867
Epoch 2/10, Loss: 0.2842, Accuracy: 0.8462
Epoch 3/10, Loss: 0.3269, Accuracy: 0.7880
Epoch 4/10, Loss: 0.4751, Accuracy: 0.7496
Epoch 5/10, Loss: 0.3508, Accuracy: 0.8100
Epoch 6/10, Loss: 0.3477, Accuracy: 0.8098
Epoch 7/10, Loss: 0.3087, Accuracy: 0.8474
Epoch 8/10, Loss: 0.5528, Accuracy: 0.7589
Epoch 9/10, Loss: 0.3434, Accuracy: 0.8042
Epoch 10/10, Loss: 0.2231, Accuracy: 0.8870


In [None]:
import matplotlib.pyplot as plt

In [None]:
# Visualize some results
fcn_model.eval()
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = fcn_model(inputs)
        predictions = (outputs > 0.5).float()  # Apply threshold (adjust as needed)

        # Visualize the input image, ground truth mask, and predicted mask
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 3, 1)
        plt.title("Input Image")
        plt.imshow(inputs[0].cpu().permute(1, 2, 0))
        plt.axis("off")

        plt.subplot(1, 3, 2)
        plt.title("Ground Truth Mask")
        plt.imshow(targets[0].cpu().squeeze(), cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 3)
        plt.title("Predicted Mask")
        plt.imshow(predictions[0].cpu().squeeze(), cmap="gray")
        plt.axis("off")

        plt.show()

Output hidden; open in https://colab.research.google.com to view.

In [None]:
# Save the trained model
torch.save(fcn_model.state_dict(), 'fcn_model.pth')
