# Modeling

In [5]:
import cv2
import os
from collections import defaultdict
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, Flatten
from tensorflow.keras.layers import Activation, MaxPooling2D, Concatenate, UpSampling2D

from sklearn.model_selection import train_test_split

In [6]:
# Function to get polygon annotations for each building
def get_polygon_annotations(feature) :
    poly_annotations = {}
    
    for feat in feature :
        # Convert string format to polygon object
        feat_shape = wkt.loads(feat['wkt'])
        
        # Extract coordinates of the polygon 
        coords = list(mapping(feat_shape)['coordinates'][0])
        
        # Store unique id and coordinates for each building as a Numpy array
        poly_annotations[feat['properties']['uid']] = (np.array(coords, np.int32))
        
    return poly_annotations

# Function to get image dimensions 
def get_image_dimensions(image_folder, filename) :

    image_path = os.path.join(image_folder, filename)
    
    # Read and convert the image to a numpy array to get the size
    image = io.imread(image_path)
    image_arr = np.array(image)
    image_size = image_arr.shape
    
    return image_size

# Function to locate buildings using polygon annotations
def mask_polygons(size, poly_annotations) :

    # Creating black empty mask image 
    mask_img = np.zeros(size, dtype=np.uint8)
    
    for points in poly_annotations :
        
        # Creating empty mask image to hold one polygon 
        blank_img = np.zeros(size, dtype=np.uint8)
        
        # Extract list of points to locate the building
        poly = poly_annotations[points]
        
        # Fill the blank image with polygon points 
        cv2.fillPoly(blank_img, [poly], (1,1,1))
        
        # Draw the border around the polygon
        cv2.polylines(blank_img, [poly], isClosed=True, color=(2, 2, 2), thickness=2)
        
        # Adding the filled image to the main mask image
        mask_img += blank_img
        
    # Set pixel values greater than 2 to 0 to retain non-overlapping areas
    mask_img[mask_img > 2] = 0
    
    # Convert non-overlapping areas to white to locate buildings
    mask_img[mask_img == 1] = 255
    mask_img[mask_img == 2] = 127
    
    return mask_img

In [7]:
def create_masked_images(masked_dir, hurricane_pre_df, images_dir):
    os.makedirs(masked_dir, exist_ok=True) 
    #TODO SK: if the length inside masked_dir is == len(images) , then dont run the below code
    for index, row in hurricane_pre_df.iterrows() :
        feature = row['xy']
        poly_annotations = get_polygon_annotations(feature)
        image_size = get_image_dimensions(images_dir, row['img_name'])
        mask_image = mask_polygons(image_size, poly_annotations)
        
        # Save the mask image to the output folder
        filename = row['img_name'].split('.')[0]
        masked_dirpath = os.path.join(masked_dir, f"{filename}_mask.png")
        cv2.imwrite(masked_dirpath, mask_image)
        
    print(f"Completed saving mask images to the {masked_dir} directory")
    
    pre_hurricane_mask_images = []
    
    for image in glob.iglob(f'{masked_dir}/*'):
        if image.endswith(".png"):
            pre_hurricane_mask_images.append(image)
    
    print(f"Total pre-disaster mask images: {len(pre_hurricane_mask_images)}")
    return pre_hurricane_mask_images

In [8]:
# Function to resize images to a standard scale
def resize_images(image_paths, masked=False, target_size=(256, 256)):
    resized_images=[]
    
    for path in image_paths:
        image = cv2.imread(path)
        # Resize the image to the target dimensions
        image = cv2.resize(image, target_size, interpolation=cv2.INTER_AREA)
        
        if(not masked):
            # Normalize pixel values to the range [0, 1]
            image = image.astype(np.float32) / 255.0
        else:
            image = (image > 0).astype(np.uint8)
            if image.shape[2] == 3:
                # Convert from 3 channels to 1 channel 
                image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
                # Expand dimensions to single channel shape
                image = np.expand_dims(image, axis=-1)
        resized_images.append(image)
    
    return resized_images

In [9]:
def preprocess_data(hurricane_pre_df, images_dir):
    # Output folder to save mask images
    masked_dir = os.path.join(os.pardir, "masks")
    pre_hurricane_mask_images= create_masked_images(masked_dir, hurricane_pre_df, images_dir)
    pre_resized_img = resize_images(pre_hurricane_images)
    pre_mask_resized_img = resize_images(pre_hurricane_mask_images, masked=True)

    return pre_resized_img, pre_mask_resized_img

In [10]:
def split_data(pre_resized_img, pre_mask_resized_img):
    # Convert lists to Numpy arrays 
    X = np.array(pre_resized_img)
    y = np.array(pre_mask_resized_img)
    print(X.shape)
    print(y.shape)
    # Split into training and test sets
    X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_valid, y_train, y_valid

In [11]:
def build_fcn_model(input_shape):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same', strides=(2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    
    # Decoder
    x = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding='same', activation='relu')(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(x)  # 1-channel output for binary mask

    model = tf.keras.Model(inputs, outputs)
    return model

# Instantiate the model
fcn_model = build_fcn_model((256, 256, 3))
# Compile the model
fcn_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [12]:
def Unet_conv_block(inputs, num_filters) :
    """Convolution layer with 3x3 filter 
    followed by BatchNormalization 
    and ReLU activation"""
    
    x = Conv2D(num_filters, (3, 3), padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x
    
inputs = tf.keras.Input((256, 256, 3))
x = Unet_conv_block(inputs, 32)

def Unet_encoder_block(inputs, num_filters) :
    
    x = Unet_conv_block(inputs, num_filters)
    # Max pooling with 2x2 filter
    x = MaxPooling2D((2,2))(x)
    
    return x

inputs = tf.keras.Input((256, 256, 3))
x = Unet_encoder_block(inputs, 32)

def Unet_decoder_block(inputs, num_filters, skip) :
    x = Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding="same")(inputs)
    
    # Check the dimension of upsampled output and skip connection
    if x.shape[1] != skip.shape[1] or x.shape[2] != skip.shape[2]:
        skip = UpSampling2D((2, 2))(skip)
        
    x = Concatenate()([x, skip])
    x = Unet_conv_block(x, num_filters)
    
    return x

inputs = tf.keras.Input((256, 256, 3))
skip = tf.keras.Input((512, 512, 3))
x = Unet_decoder_block(inputs, 32, skip)
print(x.shape)

def build_unet_model(input_shape) :
    inputs = tf.keras.Input(input_shape)
    
    # Encoders
    encoder1 = Unet_encoder_block(inputs, 64)
    encoder2 = Unet_encoder_block(encoder1, 128)
    encoder3 = Unet_encoder_block(encoder2, 256)
    encoder4 = Unet_encoder_block(encoder3, 512)
    
    # Bottleneck
    bridge1 = Unet_conv_block(encoder4, 1024)
    
    # Decoders
    decoder1 = Unet_decoder_block(bridge1, 512, encoder4)
    decoder2 = Unet_decoder_block(decoder1, 256, encoder3)
    decoder3 = Unet_decoder_block(decoder2, 128, encoder2)
    decoder4 = Unet_decoder_block(decoder3, 64, encoder1)
    
    # Output
    outputs = Conv2D(1, (1, 1), padding="same", activation="sigmoid")(decoder4)
    
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")
    
    return unet_model

# Instantiate the model
unet_model = build_unet_model((256, 256, 3))
# Compile the model
unet_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

(None, 512, 512, 32)


In [13]:
def train_FCN_model(batch_size=16, epochs=50):
    # Training the model    
    fcn_model.summmary()
    history = fcn_model.fit(X_train, y_train, 
                        validation_data=(X_valid, y_valid),
                        epochs=epochs,
                        batch_size=batch_size)

    fcn_model.save('model/FCN/FCN_model.keras')

def train_Unet_model(batch_size=16, epochs=50):
    unet_model.summary()
    history = unet_model.fit(X_train, y_train, 
                    validation_data=(X_valid, y_valid),
                    epochs=epochs,
                    batch_size=batch_size)
    unet_model.save('model/Unet/Unet_model.keras')