In [None]:
import numpy as np
import pandas as pd
import torch 
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import model_selection, metrics
import tensorflow as tf
from tensorflow.keras.layers import Layer, Multiply,GlobalAveragePooling1D,MultiHeadAttention,Embedding,Lambda,Dense,Flatten,Conv2D,Dropout, Conv2DTranspose, MaxPooling2D, Input, Activation, Concatenate, UpSampling2D, Resizing,Reshape,Add,LayerNormalization,BatchNormalization
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam,SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, CSVLogger, EarlyStopping
from tensorflow.keras.saving import register_keras_serializable
import cv2
from PIL import Image
from tensorflow import keras
import pickle
from tensorflow.keras.models import load_model
from sklearn.metrics import precision_score, recall_score, f1_score, jaccard_score

In [None]:
path = '/kaggle/input/satellite-images-of-water-bodies/Water Bodies Dataset'

In [None]:
data = tf.keras.utils.image_dataset_from_directory(directory = path, image_size = (128, 128), batch_size = 6000, shuffle = False)

In [None]:
for images, masks in data:
    X = images.numpy().astype("uint8")
    y = masks.numpy().astype("uint8")

print(X.shape, y.shape)

In [None]:
images = X[y == 0]
masks = X[y == 1]

print(images.shape, masks.shape)

In [None]:
X_train, X_test, y_train, y_test = model_selection.train_test_split(images, masks, test_size = 0.2, random_state = 3)

print(X_train.shape, X_test.shape)

In [None]:
# Convert masks to single channel (binary)
y_train = (y_train[..., 0] > 0).astype("uint8")  # Assuming the first channel represents the mask
y_test = (y_test[..., 0] > 0).astype("uint8")
print(y_train.shape, y_test.shape)
# Reshape your target arrays to match the model's output shape
y_train = y_train.reshape((-1, 128, 128, 1))
y_test = y_test.reshape((-1, 128, 128, 1))

print(y_train.shape, y_test.shape)

In [None]:
# Hyperparameters
config = {}

config["image_size"] = 128
config["num_channels"] = 3
config["patch_size"] = 16
config["num_patches"] = (config["image_size"]**2) // (config["patch_size"]**2) # 128x128/16x16 = 64
config["flat_patches_shape"] = (config["num_patches"], config["patch_size"],config["patch_size"],config["num_channels"]) # 113 x 113 x 113 x 3
config["input_shape"] = (config["num_patches"], config["patch_size"]*config["patch_size"]*config["num_channels"]) # 113 patches, 768 elements each
config["classes"] = ["water","no water" ]
config["window_size"] = 8
config["num_stages"] = 4

config["num_layers"] = 12
config["hidden_dim"] = 64
config["mlp_dim"] = 128
config["num_heads"] = 8
config["dropout_rate"] = 0.1

In [None]:
def create_patches(images, patch_size):
    # first get the number of patches in each dimension
    #print(f"Batch Size: {images.shape[0]}")
    #print(f"Image Size: {images.shape[1]}")
    #print(f"Number of Channels: {images.shape[3]}")
    #print(f"Patch Size: {patch_size}")
    H_patches = images.shape[1] // patch_size
    W_patches = images.shape[2] // patch_size
    #print(f"Patches in each dimension: {num_patches_per_dim}")

    # Reshape images to (batch_size, H_patches, patch_size, W_patches, patch_size, num_channels)
    patches = images.reshape(
        images.shape[0],
        H_patches,
        patch_size,
        W_patches,
        patch_size,
        images.shape[3]
    )
    #print(f"Reahped Image: {patches.shape} and length is {len(patches.shape)}")
    # Transpose to get patches: (batch_size, H_patches, W_patches, patch_size^2 * channels)
    patches = patches.transpose(0, 1, 2, 3, 4, 5).reshape(
        images.shape[0], H_patches, W_patches, patch_size * patch_size * images.shape[3]
    )
    #print(f"Final Patches {patches.shape}")

    return patches

In [None]:
X_train_patches = create_patches(X_train, config["patch_size"])
X_test_patches = create_patches(X_test, config["patch_size"])

In [None]:
print(f"Train Patches: {X_train_patches.shape} \nTest Patches: {X_test_patches.shape}")
print(f"Image size: {config['image_size']} X {config['image_size']}")
print(f"Patch Size: {config['patch_size']} X {config['patch_size']}")
print(f"Patch per Image: {X_train_patches.shape[1]} \nPatch Dimension: {X_train_patches.shape[-1]}")

In [None]:
sample_image = X_train[0]
sample_patches = create_patches(np.expand_dims(sample_image, axis=0), config["patch_size"])[0]
print(f"Patch Shape{sample_patches.shape}")

plt.figure(figsize=(10,5))
plt.subplot(1, 2, 1)
plt.imshow(sample_image.astype("uint8"))
plt.title("Original Image")
plt.axis("off")

In [None]:
num_patches = sample_patches.shape[0] * sample_patches.shape[1]
print(num_patches)

In [None]:
n = int(np.sqrt(num_patches)) # n should be square root of number of patches
print(f"Total Patch of a image: {num_patches} \nNumber of patches in 1 dimension: {n}")

plt.figure(figsize = (4,4))
sample_patch = sample_patches.reshape(num_patches,sample_patches.shape[2])
print(sample_patch.shape)

# Iterate through the patches, not elements of a patch
for i, patch in enumerate(sample_patch):
    ax = plt.subplot(n, n, i + 1)
    # Reshape the entire patch
    patch_image = patch.reshape(config['patch_size'], config['patch_size'], sample_image.shape[-1])
    plt.imshow(patch_image.astype('uint8'))
    plt.axis("off")

plt.show()

In [None]:
def linear_embedding(inputs, cf):
    embdedding = Dense(cf['hidden_dim'])(inputs)
    print(embdedding.shape)
    
    return embdedding

In [None]:
X_train_embedded = linear_embedding(X_train_patches, config)
X_test_embedded = linear_embedding(X_test_patches, config)

In [None]:
def window_partition(x, window_size):
    B, H, W, C = x.shape
    x = tf.reshape(x, [B, H // window_size, window_size, W // window_size, window_size, C])
    windows = tf.reshape(x, [-1, window_size, window_size, C])
    
    return windows

In [None]:
def shifted_window_partition(x, window_size):
    """Shifts and partitions the input into windows."""
    # Shift the input tensor by half the window size
    shift_amount = window_size // 2
    shifted_x = tf.roll(x, shift=[-shift_amount, -shift_amount], axis=[1, 2])
    
    # Partition into windows (similar to window_partition function)
    B, H, W, C = shifted_x.shape
    x = tf.reshape(shifted_x, [B, H // window_size, window_size, W // window_size, window_size, C])
    windows = tf.reshape(x, [-1, window_size, window_size, C])
    
    return windows

In [None]:
def merge_windows(windows, window_size, original_shape):
    """Merge windows back to original image shape after attention."""
    B, H, W, C = original_shape
    x = tf.reshape(windows, [B, H // window_size, W // window_size, window_size, window_size, C])
    x = tf.transpose(x, [0, 1, 3, 2, 4, 5])
    x = tf.reshape(x, [B, H, W, C])
    
    return x

In [None]:
def create_attention_mask(window_size):
    # Initialize a square mask of size (window_size^2, window_size^2)
    mask = tf.zeros((window_size**2, window_size**2), dtype=tf.float32)
    
    # Define the indices for masking (example: upper triangle)
    indices = []
    for i in range(window_size):
        for j in range(window_size // 2, window_size):
            indices.append([i * window_size + j, j * window_size + i])  # Adjust indices

    indices = tf.convert_to_tensor(indices, dtype=tf.int32)

    # Define the update values for the mask
    updates = tf.fill([len(indices)], float('-inf'))  # Match number of updates with indices

    # Apply scatter update
    mask = tf.tensor_scatter_nd_update(mask, indices, updates)

    return mask

In [None]:
def mlp(x, cf):
    x = Dense(cf["mlp_dim"], activation = "gelu")(x)
    x = Dropout(cf["dropout_rate"])(x)
    #x = LayerNormalization()(x)
    x = Dense(cf["hidden_dim"])(x)
    x = Dropout(cf["dropout_rate"])(x)
    
    return x

In [None]:
def patch_merging(x, cf):
    H, W, C = x.shape[1], x.shape[2], x.shape[3]
    x = tf.reshape(x, [-1, H // 2, 2, W // 2, 2, C])
    x = tf.transpose(x, [0, 1, 3, 2, 4, 5])
    x = tf.reshape(x, [-1, H // 2, W // 2, 4 * C])
    x = Dense(cf['hidden_dim'] * 2)(x)
    x = LayerNormalization()(x)
    return x

In [None]:
class SwinEncoderBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, window_size, mask, cf):
        super(SwinEncoderBlock, self).__init__()
        self.layer_norm = LayerNormalization()
        self.dropout = Dropout(0.1)
        self.window_size = window_size
        self.mask = mask
        self.hidden_dim = cf["hidden_dim"]

    def call(self, x, cf):
        # Window Self-Attention (WSA)
        res1 = self.dropout(self.layer_norm(x) + x)
        windows = window_partition(res1, self.window_size)  # Partition into windows
        attention_output = multihead_attention(
            windows, self.num_heads, self.hidden_dim, self.window_size
        )

        # Shifted Window Self-Attention (SWSA)
        res2 = self.dropout(self.layer_norm(attention_output) + attention_output)
        shifted_windows = shifted_window_partition(res2, self.window_size)  # Shift windows
        attention_output2 = multihead_attention(
            shifted_windows, self.num_heads, self.hidden_dim, self.window_size
        )

        # Apply MLP after shifted window self-attention
        x = self.layer_norm(attention_output2)
        x = mlp(x, cf)
        return self.dropout(x + res2)

In [None]:
class AlternatingEncoderBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, window_size, cf = None):
        super(AlternatingEncoderBlock, self).__init__()
        self.WSA = SwinEncoderBlock(
            embed_dim=embed_dim, 
            num_heads=num_heads, 
            window_size=window_size, 
            mask=False,
            cf = cf
        )
        self.SWSA = SwinEncoderBlock(
            embed_dim=embed_dim, 
            num_heads=num_heads, 
            window_size=window_size, 
            mask=True,
            cf = cf
        )

    def call(self, x):
        x = self.WSA(x) # Apply Window Self-Attention first
        return self.SWSA(x)

In [None]:
class SwinTransformer(tf.keras.Model):
    def __init__(self, cf):
        super(SwinTransformer, self).__init__()
        self.Embedding = linear_embedding  # You can define your custom embedding layer
        self.PatchMerge1 = patch_merging
        self.PatchMerge2 = patch_merging
        self.PatchMerge3 = patch_merging

        self.Stage1 = AlternatingEncoderBlock(96, cf['num_heads'], cf['window_size'], cf)
        self.Stage2 = AlternatingEncoderBlock(192, cf['num_heads'], cf['window_size'], cf)
        self.Stage3_1 = AlternatingEncoderBlock(384, cf['num_heads'], cf['window_size'], cf)
        self.Stage3_2 = AlternatingEncoderBlock(384, cf['num_heads'], cf['window_size'], cf)
        self.Stage3_3 = AlternatingEncoderBlock(384, cf['num_heads'], cf['window_size'], cf)
        self.Stage4 = AlternatingEncoderBlock(768, cf['num_heads'], cf['window_size'], cf)

    def call(self, x, cf):
        x = self.Embedding(x, cf)
        x = self.PatchMerge1(self.Stage1(x, cf))
        x = self.PatchMerge2(self.Stage2(x, cf))
        x = self.Stage3_1(x, cf)
        x = self.Stage3_2(x, cf)
        x = self.Stage3_3(x, cf)
        x = self.PatchMerge3(x, cf)
        x = self.Stage4(x, cf)
        return x

In [None]:
inputs = tf.keras.Input(shape=(128, 128, 3))  # Example input shape
swin_transformer_model = SwinTransformer(config)

# Pass the input tensor through the model
x = swin_transformer_model(inputs, config)

model = tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
def transformer_encoder(x, cf):
    skip1 = x
    x = LayerNormalization()(x)
    x = MultiHeadAttention(num_heads = cf["num_heads"], key_dim = cf["hidden_dim"])(x,x)
    x = Add()([x, skip1])
    
    skip2 = x
    x = LayerNormalization()(x)
    x = mlp(x,cf)
    x = Add()([x, skip2])

    skip3 = x
    x = LayerNormalization()(x)
    x = shifted_window_partition(x, cf["window_size"])
    attention_mask = create_attention_mask(cf["window_size"])
    x = tf.reshape(x, (x.shape[0], -1, x.shape[-1]))
    x = MultiHeadAttention(num_heads = cf["num_heads"], key_dim = cf["hidden_dim"])(x,x, attention_mask=attention_mask)
    x = merge_windows(x, cf["window_size"], skip1.shape)
    x = Add()([x, skip3])

    skip4 = x
    x = LayerNormalization()(x)
    x = mlp(x,cf)
    x = Add()([x, skip4])
    
    return x

In [None]:
x = window_partition(X_train_embedded, config["window_size"])
x = transformer_encoder(x, config)

In [None]:
outputs = swin_transformer(X_train, config)

In [None]:
print(outputs.shape)

In [None]:
model = Model(inputs= Input(shape = (128, 128, 3)), outputs=outputs)

In [None]:
def window_reverse(windows, window_size, H, W):
    B = windows.shape[0] // (H // window_size * W // window_size)
    
    # Check for valid shapes
    assert (H % window_size == 0) and (W % window_size == 0), "H and W must be divisible by window_size"
    
    x = tf.reshape(windows, [B, H // window_size, W // window_size, window_size, window_size, -1])
    x = tf.transpose(x, perm=[0, 1, 3, 2, 4, 5])  # Transpose back to original spatial positions
    x = tf.reshape(x, [B, H, W, -1])
    return x


In [None]:
def swin_transformer(x, cf):
    x = window_partition(x, 4)
    print(x.shape)

In [None]:
swin_transformer(X_train_embedded, config)