In [10]:
%reset -f

In [1]:
import numpy as np
import os
import math
import skimage
from skimage import io  # for reading TIFF images
from tqdm import tqdm  # for progress bars
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
from mpl_toolkits.mplot3d import Axes3D
from tensorflow.keras import layers
import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from osgeo import gdal
import os




In [None]:
# Set the desired number of patches
desired_num_patches = 3480

# Your data directories
optic_images_dir = r'F:\Sent2'
sar_images_dir = r'F:\Sent1'
agb_maps_dir = r'F:\AGB'

# Get the list of file names in the directories without sorting
agb_map_files = os.listdir(agb_maps_dir)
optic_image_files = os.listdir(optic_images_dir)
sar_image_files = os.listdir(sar_images_dir)

In [4]:
# Initialize empty lists to store data
x_train_list = []
y_train_list = []
agb_values_list = []

# Loop through each patch with tqdm for progress bars
# Loop through each patch with tqdm for progress bars
for i in tqdm(range(desired_num_patches), desc='Loading Patches'):
    # Get file names for the current index
    optic_file = optic_image_files[i]
    sar_file = sar_image_files[i]
    agb_file = agb_map_files[i]
    # Load images
    optic_image = io.imread(os.path.join(optic_images_dir, optic_file))
    sar_image = io.imread(os.path.join(sar_images_dir, sar_file))
    agb_map = io.imread(os.path.join(agb_maps_dir, agb_file))

    # Save AGB values as a NumPy array
    agb_values_list.append(agb_map.flatten())

    # Ensure optic image has 13 bands
    if optic_image.shape[-1] == 13:
        x_train = optic_image
    else:
        raise ValueError("Optic image should have 13 bands. Found: {}".format(optic_image.shape[-1]))

    # Check if SAR image has 2 bands
    if len(sar_image.shape) == 2:
        # Expand dimensions to make it 3D
        sar_image = np.stack([sar_image, sar_image], axis=-1)

    # Normalize the data using min-max scaling
    optic_image_normalized = (optic_image - np.min(optic_image)) / (np.max(optic_image) - np.min(optic_image))
    sar_image_normalized = (sar_image - np.min(sar_image)) / (np.max(sar_image) - np.min(sar_image))

    # Concatenate optic and SAR images
    x_train = np.concatenate([optic_image_normalized, sar_image_normalized], axis=-1)

    x_train_list.append(x_train)

    # Break the loop when desired_num_patches is reached
    if len(x_train_list) >= desired_num_patches:
        break

# Combine all patches into single arrays
x_data = np.stack(x_train_list)
agb_values = np.concatenate(agb_values_list)

# Normalize AGB values using min-max scaling
min_agb = np.min(agb_values)
max_agb = np.max(agb_values)
agb_scaled = (agb_values - min_agb) / (max_agb - min_agb)

# Reshape scaled AGB values for compatibility with the model
agb_normalized = np.expand_dims(agb_scaled, axis=-1)

# Split the data into training, validation, and test sets

x_train1, x_test, y_train1, y_test, _ , test_index = train_test_split(x_data, agb_normalized,range(len(x_data)), test_size=0.20, random_state=42)
x_train, x_val, y_train, y_val, train_index, val_index = train_test_split(x_train1, y_train1, range(len(x_train1)), test_size=0.20, random_state=42)

print('test_index:', test_index)
print('train_index:', train_index)
print('val_index:', test_index)
print('x_train:', x_train.shape)
print('y_train:', y_train.shape)
print('x_val shape:', x_val.shape)
print('y_val shape:', y_val.shape)
print('x_test shape:', x_test.shape)
print('y_test shape:', y_test.shape)
input_shape = x_train.shape[1:]

Loading Patches:   0%|          | 0/3480 [00:00<?, ?it/s]

Loading Patches: 100%|█████████▉| 3479/3480 [04:24<00:00, 13.16it/s]


test_index: [2457, 1114, 1298, 1442, 1665, 51, 322, 2653, 2164, 676, 1192, 1864, 2815, 2742, 134, 2004, 1623, 3107, 1320, 1842, 2037, 693, 184, 2658, 192, 1904, 3271, 450, 1146, 621, 2832, 538, 2266, 1537, 2801, 26, 999, 1718, 179, 32, 1468, 1490, 921, 2804, 2383, 2932, 1937, 893, 70, 1068, 2248, 691, 443, 2835, 1584, 298, 1429, 1878, 1736, 678, 1786, 670, 969, 183, 978, 1553, 2381, 3442, 257, 3269, 1234, 1258, 2910, 3019, 2543, 1615, 578, 2603, 1666, 3330, 557, 139, 1966, 2473, 1432, 2115, 295, 1738, 1784, 196, 2484, 2967, 1556, 925, 3229, 942, 2739, 3204, 52, 3374, 1037, 3167, 1091, 495, 3124, 170, 1662, 3310, 478, 309, 2836, 402, 76, 2740, 965, 2059, 17, 3255, 144, 3055, 602, 551, 2023, 2943, 3361, 2189, 3315, 1173, 2717, 3394, 1765, 2957, 1034, 1501, 194, 1871, 1228, 1569, 2154, 3148, 1897, 1832, 1746, 1818, 1600, 30, 879, 1626, 2367, 2643, 985, 869, 2551, 203, 501, 2534, 897, 1313, 3206, 1397, 3344, 2891, 2587, 1674, 1374, 33, 1023, 948, 665, 2830, 3111, 1018, 949, 3197, 3358, 180

In [5]:

print(min_agb)
print(max_agb)

0
234600


In [6]:
"""
## Configure the hyperparameters
"""
num_classes = 1
learning_rate = 0.00001
weight_decay = 0.0001
batch_size = 8
num_epochs = 150  # For real training, use num_epochs=100. 10 is a test value
image_size = 96  # We'll resize input images to this size
patch_size = 6  # Size of the patches to be extract from the input images
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 8
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [
    2048,
    1024,
]  # Size of the dense layers of the final classifier
"""
## Use data augmentation
"""

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(height_factor=0.2, width_factor=0.2),
    ],
    name="data_augmentation",
)
# Compute the mean and the variance of the training data for normalization.
data_augmentation.layers[0].adapt(x_train)
"""
## Implement multilayer perceptron (MLP)
"""


def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=keras.activations.relu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x
"""
## Implement patch creation as a layer
"""

class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding='VALID'
        )
        patches = tf.reshape(
            patches,
            (batch_size, -1, self.patch_size * self.patch_size * 15)
        )
        return patches

class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        positions = tf.expand_dims(positions, axis=0)
        projected_patches = self.projection(patch)
        encoded = projected_patches + self.position_embedding(positions)
        return encoded

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.keras.activations.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x
"""
## Build the ViT model

The ViT model consists of multiple Transformer blocks,
which use the `layers.MultiHeadAttention` layer as a self-attention mechanism
applied to the sequence of patches. The Transformer blocks produce a
`[batch_size, num_patches, projection_dim]` tensor, which is processed via an
classifier head with softmax to produce the final class probabilities output.

Unlike the technique described in the [paper](https://arxiv.org/abs/2010.11929),
which prepends a learnable embedding to the sequence of encoded patches to serve
as the image representation, all the outputs of the final Transformer block are
reshaped with `layers.Flatten()` and used as the image
representation input to the classifier head.
Note that the `layers.GlobalAveragePooling1D` layer
could also be used instead to aggregate the outputs of the Transformer block,
especially when the number of patches and the projection dimensions are large.
"""


def create_vit_classifier():
    inputs = keras.Input(shape=input_shape)
    # Augment data.
    augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    print(features.shape)
    # Classify outputs.
    logits = layers.Dense(num_classes,
                    activation='linear',
                    )(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model
vit_classifier = create_vit_classifier()



(None, 1024)


In [7]:
# Create the model using your custom architecture
vit_classifier = create_vit_classifier()

# Load the weights into the model
vit_classifier.load_weights('weights_ViT.h5')

(None, 1024)


In [8]:
vit_classifier.save('complete_model.h5')



  saving_api.save_model(


In [9]:
from tensorflow.keras.models import load_model

# Assuming Patches and PatchEncoder are already defined in your code
custom_objects = {
    'Patches': Patches,
    'PatchEncoder': PatchEncoder
}

# Now load the model
model = load_model('complete_model.h5', custom_objects=custom_objects)



In [10]:
# Predictions on training data
y_pred = model.predict(x_data)
y_pred = y_pred * ((max_agb - min_agb) + min_agb)  # Denormalize predictions
y_denormalized = agb_normalized * ((max_agb - min_agb) + min_agb)  # Denormalize true values



In [None]:
from osgeo import gdal
import numpy as np
import os

for i in range(3480):

  ff = agb_map_files [i].split(".")
  #print(ff[0])
  input_file = gdal.Open(os.path.join(agb_maps_dir, f"{ff[0]}.tif"))
  #print(input_file.RasterXSize)
  # Create the output GeoTIFF file
  output_file = gdal.GetDriverByName('GTiff').Create(r"F:\VIT" 
                                                     + f"{ff[0]}" + '.tif', input_file.RasterXSize, 
                                                     input_file.RasterYSize, 1, gdal.GDT_Float32)

  # Set the spatial reference and geotransform of the output file
  output_file.SetProjection(input_file.GetProjection())
  output_file.SetGeoTransform(input_file.GetGeoTransform())

  # Get the output band of the output file
  output_band = output_file.GetRasterBand(1)
  output_band.WriteArray(y_pred[i].reshape(input_file.RasterXSize,input_file.RasterYSize))

  # Compute the output band's statistics
  output_band.ComputeStatistics(False)

  # Close the input and output files
  input_file = None
  output_file = None

In [12]:
import os
import shutil

def copy_images(source_folder, destination_folder, image_list):
    if not os.path.exists(destination_folder):
        os.makedirs(destination_folder)

    for filename in image_list:
      filename = str(int(filename) + 1).zfill(5) + ".tif"
      print(filename)
      source_path = os.path.join(source_folder, filename)
      if os.path.exists(source_path):
            destination_path = os.path.join(destination_folder, filename)
            shutil.copyfile(source_path, destination_path)
            print(f"Copied {filename} to {destination_folder}")
      else:
            print(f"File {filename} not found in {source_folder}")

In [None]:
# Define source folder containing TIFF images
source_folder = "F:\output"

# Define destination folder where images will be copied
destination_folder_Test = "F:\Test_OutPut"
destination_folder_Train = "F:\Train_OutPut"
destination_folder_Val = "F:\Val_OutPut"

# List of filenames to be copied
#image_list = ["image1.tif", "image2.tif", "image3.tif"]  # Add your filenames here

copy_images(source_folder, destination_folder_Test, test_index)
copy_images(source_folder, destination_folder_Train, train_index)
copy_images(source_folder, destination_folder_Val, val_index)