In [None]:
import numpy as np
import matplotlib.pyplot as plt 
import tensorflow as tf 
from tensorflow import keras 
from keras import Model 
from keras.layers import (
    Add , Dense,Dropout , Embedding ,GlobalAveragePooling1D ,
    Input, Layer, LayerNormalization, MultiHeadAttention,
    Softmax 
)

from keras.initializers import TruncatedNormal

In [None]:
class PatchPartition(Layer):
    def __init__(self, window_size =4 , channels=3, **kwargs):
        super() .__init__(PatchPartition,self, **kwargs)
        self.window_size = window_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images = images, 
            sizes = [ 1, self.window_size, self.window_size ,1],
            strides = [1 , self.window_size, self.window_size , 1],
            rates = [1 , 1 ,1, 1],
            padding = 'VALID',

        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size , -1, patch_dims])
        return patches 

In [None]:
!curl -s -o flower.jpeg https://images.unsplash.com/photo-1604085572504-a392ddf0d86a?ixlib=rb-1.2.1&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=224&q=224 

In [None]:
image = plt.imread('flower.jpeg')
image = tf.image.resize(tf.convert_to_tensor(image) , size=(244,244))
plt.imshow(image.numpy().astype('uint8'))
plt.axis('off')

In [None]:
batch = tf.expand_dims(image, axis=0)
patches = PatchPartition()(batch)
patches.shape

In [None]:
n = int(np.sqrt(patches.shape[1]))
for i, patch in enumerate(patches[0]):
    ax = plt.subplot(n, n, i + 1)
    patch_img = tf.reshape(patch, (4, 4, 3))
    ax.imshow(patch_img.numpy().astype("uint8"))
    ax.axis("off")

In [None]:
class LinearEmbedding(Layer):
    def __init__(self, num_patches , projection_dim , **kwargs):
        super(LinearEmbedding, self).__init__(**kwargs)
        self.num_pacthes = num_patches 
        self.projection = Dense(projection_dim)
        self.position_embedding = Embedding(input_dim=num_patches, output_dim=projection_dim)
    
    def call (self, patch):
        # tính toán bản vá nhúng 
        patches_embed = self.projection(patch)
        # tính toán vị trí nhúng từ 
        positions = tf.range(start=0 , limit=self.num_pacthes , delta=1)
        return patches_embed + self.position_embedding(positions) 

In [None]:
embeddings = LinearEmbedding(3136, 96)(patches)
embeddings.shape

In [None]:
class PatchMerging(Layer):
    def __init__(self, input_resolution , channels):
        super(PatchMerging, self).__init__()
        self.input_resolution = input_resolution
        self.channels = channels 
        self.linear_trans = Dense(2 *channels , use_bias=False)

    def call (self, x):
        height , width = self.input_resolution
        _ , _ , C = x.get_shape().as_list()
        x = tf.reshape(x, shape=(-1 , height , width, C))
        x0 = x[:, 0::2 , 0::2 , :]
        x1 = x[:, 1::2 , 0::2 , :]
        x2 = x[:, 0::2 , 1::2 , :]
        x3 = x[:, 1::2 , 1::2 , :]

        x = tf.concat((x0 , x1 , x2 , x3), axis=-1)
        x = tf.reshape(x, shape=(-1, (height // 2) * (width // 2) , 4*C))
        return self.linear_trans

In [None]:
channels = 96
num_patch_x = 224 // 4
num_patch_y = 224 // 4
out_patches = PatchMerging((num_patch_x, num_patch_y), channels)(patches)
print(f'Input shape (B,   H * W,  C) = {patches.shape}')
print(f'Ouput shape (B, H/2*W/2, 4C) = {out_patches.shape}')

In [None]:
class MLP(Layer):
    def __init__(self, hidden_features , out_features, dropout_rate=0.1):
        super(MLP, self).__init__()
        self.dense1 = Dense(hidden_features , activation=tf.nn.gelu)
        self.dense2 = Dense(out_features)
        self.dropout = Dropout(dropout_rate)
    
    def call(self, x):
        x = self.dense1(x)
        x = self.dropout(x)
        x = self.dense2(x)
        y = self.dropout(x)
        return y

In [None]:
mlp = MLP(768 * 2, 768)
y = mlp(tf.zeros((1, 197, 768)))
y.shape

In [None]:
class WindowAttention(Layer):
    def __init__(
            self, dim , window_size , num_heads,
            qkv_bias=True, 
            dropout_rate = 0.0 , 
            **kwargs
    ):
        super().__init__(**kwargs)
        self.dim = dim 
        self.window_size = window_size
        self.num_heads = num_heads
        self.scale = (dim // num_heads) ** -0.5
        self.qkv = Dense(dim* 3 , use_bias=qkv_bias)
        self.dropout = Dropout(dropout_rate)
        self.proj = Dense(dim)

    def buil(self, input_shape):
        num_window_elements = (2 * self.window_size[0] - 1) *(
            2 * self.window_size[1] - 1
        )
        self.relative_position_bias_table = self.add_weight(
            shape = (num_window_elements , self.num_heads),
            initializer = tf.initializers.Zeros(),
            trainable= True,
        )
         # get pair-wise relative position index for each token inside the window
#Đoạn mã này thực hiện chức năng tính toán chỉ số vị trí tương đối giữa các cửa sổ trong mô hình Swin Transformer12. 
# Đoạn mã này thực hiện các bước sau:
# Tạo ra một ma trận tọa độ cho mỗi điểm ảnh trong ảnh, với hai chiều là chiều cao và chiều rộng.
# Nối hai ma trận tọa độ lại với nhau và làm phẳng chúng thành một vector hai chiều, mỗi hàng là một cặp tọa độ (y, x) cho một điểm ảnh.
# Tính toán sự khác biệt tọa độ giữa mỗi cặp điểm ảnh bằng cách trừ vector tọa độ với chính nó theo hai chiều khác nhau.
# Chuyển vị ma trận khác biệt tọa độ để có kích thước (size, size, 2), trong đó size là kích thước của một cửa sổ.
# Cộng thêm self.window_size[0] - 1 và self.window_size1 - 1 vào ma trận khác biệt tọa độ để dịch chuyển các giá trị từ âm sang dương.
# Nhân ma trận khác biệt tọa độ theo chiều ngang với 2 * self.window_size1 - 1 để biến đổi các giá trị theo chiều ngang của ma trận.
# Tính tổng ma trận khác biệt tọa độ theo chiều cuối cùng để thu được relative_position_index, là ma trận chỉ số vị trí tương đối giữa các cửa sổ.
# Đoạn mã này giúp cho mô hình Swin Transformer có thể học được các quan hệ không gian giữa các cửa sổ và áp dụng bias vị trí tương đối khi tính toán self-attention1.
        coords_h = tf.range(self.window_size[0])
        coords_w = tf.range(self.window_size[1])
        coords_matrix = np.meshgrid(coords_h , coords_w, indexing='ij')
        coords = np.stack(coords_matrix)
        coords_flatten = coords.reshape(2 , -1)
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.transpose([1 , 2 , 0])
        relative_coords[:, :, 0] += self.window_size[0] - 1
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)


        self.relative_position_index = tf.Variable(
            initial_value = tf.convert_to_tensor(
                relative_position_index
            ),
            trainable = False
        )

        
    def call(self, x , mask=None):
        _ ,  size , channels = x.shape 
        head_dim = channels // self.num_heads
        x_qkv = self.qkv(x)
        x_qkv = tf.reshape(x_qkv , shape=(-1 , size , 3  , self.num_heads , head_dim))
        x_qkv = tf.transpose(x_qkv , perm=(2,0,3,1,4))
        q , k ,v = x_qkv[0] , x_qkv[1] , x_qkv[2]
        q = q * self.scale 
        k = tf. transpose(k , perm=(0, 1, 2, 3))
        attn = q@ k 
        
        # Trèn tham số bias vào SW - MSA 
        num_window_elements = self.window_q[0] * self.window_size[1] # tính số phần tử trong một cửa sổ 
        relative_position_index_flat = tf.reshape( # định hình lại ma trận anyf thành vector 1 chiều 
            self.relative_position_index, shape=(-1,)
        )
        # Lấy các giá trị bias vị trí tương đối từ bảng self.relative_position_bias_table theo chỉ số trong vector trên.
        relavtive_position_bias = tf.gather(
            self.relative_position_bias_table , relative_position_index_flat
        )
        # Reshape lại ma trận bias vị trí tương đối thành kích thước (num_window_elements, num_window_elements, -1)
        # tức là một ma trận cho mỗi head attention.
        relative_position_bias = tf.reshape(
            relative_position_bias, shape=(num_window_elements, num_window_elements, -1)
        )
        # Chuyển vị ma trận bias vị trí tương đối theo thứ tự (2, 0, 1) và thêm một chiều ở đầu 
        # để có kích thước (1, -1, num_window_elements, num_window_elements).
        relative_position_bias = tf.transpose(relative_position_bias, perm=(2, 0, 1))
        # Cộng ma trận bias vị trí tương đối vào ma trận attention.
        attn = attn + tf.expand_dims(relative_position_bias, axis=0)

        if mask is not None:
            nW = mask.get_shape()[0]
            mask_float = tf.cast(
                tf.expand_dims(tf.expand_dims(mask , axis=1), axis=0), tf.float32
            )
            attn=  tf.reshape(attn, shape=(-1 , nW , self.num_heads, size , size))
            attn = keras.activations.softmax(attn , axis=-1)
        else: 
            attn = keras.activations.softmax(attn , axis= -1)
        attn = self.dropout(attn)

        x_qkv = attn @ v 
        x_qkv = tf.transpose(x_qkv, perm=(0 ,1 , 2, 3 ))
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, channels))
        x_qkv = self.proj(x_qkv)
        x_qkv = self.dropout(x_qkv)
        return x_qkv

In [None]:
attn = WindowAttention(96, window_size=(4, 4), num_heads=8, qkv_bias=True, qk_scale=None, attn_drop=0.0, proj_drop=0.0)
y = attn(tf.zeros((1, 196, 16, 96)))
y.shape

In [None]:
def window_partition(x , window_size):
    _, H , W, C = x.shape
    num_patch_y = H // window_size
    num_patch_x = W // window_size
    # định hinhf ảnh kích thước  -1 là số lượng lô  x và y  là số lượng cửa sổ theo chiều dọc và ngang 
    x = tf.reshape(x , [-1 , num_patch_y , window_size , num_patch_x, window_size, C])
    x = tf.transpose(x , perm=[0, 1, 2, 3, 4, 5])
    #  -1 là số lượng ảnh trong batch và num_patch_x * num_patch_y là tổng số lượng cửa sổ trong một ảnh.
    windows = tf.reshape(x, [-1, num_patch_x * num_patch_y, window_size, window_size, C])
    return windows

In [None]:
windows = window_partition(batch, 4)
print(f'Input shape (B,   H,  W,  C) = {batch.shape}')
print(f'Ouput shape (num_windows*B, window_size, window_size, C) = {windows.shape}')

In [None]:
def window_reverse(windows, window_size, height , width , channels):
    num_patch_y = height // window_size
    num_patch_x = width // window_size 
    x = tf.reshape(
        windows, 
        shape = (-1 , num_patch_y, num_patch_x , window_size , window_size , channels)
    )
    x = tf.transpose(x, perm=(0 ,1 ,2 ,3 ,4 ,5))
    x = tf.reshape(x , shape=(-1 , height , width , channels))
    return x 
  

In [None]:
y = window_reverse(windows, 4, 224, 224)
print(f'Input shape (B, num_windows*B, window_size, window_size, C) = {windows.shape}')
print(f'Ouput shape (B,   H,  W,  C) = {y.shape}')

In [None]:
class DropPath(Layer):
    def __init__(self, drop_prob =None , **kwargs):
        super() .__init__(**kwargs)
        self.drop_prob = drop_prob
    
    def call(self, x ):
        input_shape = tf.shape(x)
        batch_size = input_shape[0]
        rank = x.shape.rank
        shape = (batch_size,) +(1,) *(rank-1)
        random_tensor = (1 - self.drop_prob) + tf.random.uniform(shape , dtype = x.dtype)
        path_mask = tf.floor(random_tensor)
        output = tf.math.divide(x, 1 - self.drop_prob) * path_mask
        return output 

In [None]:
class SwinTransformerBlock(Layer):

    def __init__(self, dim, input_resolution, num_heads, window_size=7, shift_size=0,
                 mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., drop_path=0.):
        super().__init__()
        self.dim = dim
        self.input_resolution = input_resolution
        self.num_heads = num_heads
        self.window_size = window_size
        self.shift_size = shift_size
        self.mlp_ratio = mlp_ratio
        if min(self.input_resolution) <= self.window_size:
            # if window size is larger than input resolution, we don't partition windows
            self.shift_size = 0
            self.window_size = min(self.input_resolution)
        assert 0 <= self.shift_size < self.window_size, "shift_size must in 0-window_size"

        self.norm1 = LayerNormalization(epsilon=1e-5)
        self.attn = WindowAttention(
            dim, window_size=(self.window_size, self.window_size), num_heads=num_heads,
            qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop)

        self.drop_path = DropPath(drop_path) if drop_path > 0. else tf.identity
        self.norm2 = LayerNormalization(epsilon=1e-5)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = MLP(mlp_hidden_dim, dim, dropout_rate=drop)

        if self.shift_size > 0:
            # calculate attention mask for SW-MSA
            H, W = self.input_resolution
            img_mask = np.zeros([1, H, W, 1])  # 1 H W 1
            h_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            w_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            cnt = 0
            for h in h_slices:
                for w in w_slices:
                    img_mask[:, h, w, :] = cnt
                    cnt += 1
            img_mask = tf.constant(img_mask)
            mask_windows = window_partition(img_mask, self.window_size)  # nW, window_size, window_size, 1
            mask_windows = tf.reshape(mask_windows, [-1, self.window_size * self.window_size])
            attn_mask = mask_windows[:, None, :] - mask_windows[:, :, None]
            self.attn_mask = tf.where(attn_mask==0, -100., 0.)
        else:
            self.attn_mask = None

    def call(self, x):
        H, W = self.input_resolution
        B, L, C = x.shape
        assert L == H * W, "input feature has wrong size"

        shortcut = x
        x = self.norm1(x)
        x = tf.reshape(x, [-1, H, W, C])

        # cyclic shift
        if self.shift_size > 0:
            shifted_x = tf.roll(x, shift=[-self.shift_size, -self.shift_size], axis=(1, 2))
        else:
            shifted_x = x

        # partition windows
        x_windows = window_partition(shifted_x, self.window_size)  # nW*B, window_size, window_size, C
        x_windows = tf.reshape(x_windows, [-1, x_windows.shape[1], self.window_size * self.window_size, C])  # nW*B, window_size*window_size, C

        # W-MSA/SW-MSA
        attn_windows = self.attn(x_windows, mask=self.attn_mask)  # nW*B, window_size*window_size, C

        # merge windows
        attn_windows = tf.reshape(attn_windows, [-1, x_windows.shape[1], self.window_size, self.window_size, C])
        shifted_x = window_reverse(attn_windows, self.window_size, H, W)  # B H' W' C

        # reverse cyclic shift
        if self.shift_size > 0:
            x = tf.roll(shifted_x, shift=[self.shift_size, self.shift_size], axis=(1, 2))
        else:
            x = shifted_x
        x = tf.reshape(x, [-1, H * W, C])

        # FFN
        x = shortcut + self.drop_path(x)
        x = x + self.drop_path(self.mlp(self.norm2(x)))

        return x


In [None]:
block = SwinTransformerBlock(96, (56, 56), 8, window_size=4)
y = block(embeddings)
y.shape

In [None]:
def create_SwinTransformer(num_classes, input_shape=(224, 224, 3), window_size=4, embed_dim=96, num_heads=8):
    num_patch_x = input_shape[0] // window_size
    num_patch_y = input_shape[1] // window_size
    inputs = Input(shape=input_shape)
    # Patch extractor
    patches = PatchPartition(window_size)(inputs)
    patches_embed = LinearEmbedding(num_patch_x * num_patch_y, embed_dim)(patches)
    # first Swin Transformer block
    out_stage_1 = SwinTransformerBlock(
        dim=embed_dim,
        input_resolution=(num_patch_x, num_patch_y),
        num_heads=num_heads,
        window_size=window_size,
        shift_size=0
    )(patches_embed)
    # second Swin Transformer block
    out_stage_1 = SwinTransformerBlock(
        dim=embed_dim,
        input_resolution=(num_patch_x, num_patch_y),
        num_heads=num_heads,
        window_size=window_size,
        shift_size=1
    )(out_stage_1)
    # patch merging
    representation = PatchMerging((num_patch_x, num_patch_y), channels=embed_dim)(out_stage_1)
    # pooling
    representation = GlobalAveragePooling1D()(representation)
    # logits
    output = Dense(num_classes, activation="softmax")(representation)
    # Create model
    model = Model(inputs=inputs, outputs=output)
    return model

In [None]:
model = create_SwinTransformer(2)
model.summary()