In [39]:
import numpy as np
import tensorflow as tf
import os
from os import path, walk
import json

In [41]:
def read_json(json_path):
    """
    :param json_path: path to load json from
    :returns: a python dictionary of json features
    """
    annotations = json.load(open(json_path))
    return annotations

In [2]:
def get_disaster_info(feature):
    """
    
    """
 
    try:
        disaster = feature['metadata']['disaster']
        disaster_type = feature['metadata']['disaster_type']
    except:
        disaster = 'unclassified'
        disaster_type = 'unclassified'
        print("no disaster information in "+feature['metadata']['img_name'])
         

    return disaster,disaster_type

In [35]:
# this function is using labels,only with the propose of showcasing --> we want to identify which type of disaster and its name.
# in the case of real new images, it will not be possible to get that information, and only the images directory is needed 
def create_image_tensors(images_path, json_path=None):
    
    """_summary_

    Returns:
        _type_: _description_
        
    """
    arr_img_pre_list = []
    arr_img_post_list = []

    # Check if json_path is a valid directory
    if json_path and os.path.isdir(json_path):
        #arr_img_pre_list = []
        #arr_img_post_list = []
        id_pre_list = []
        id_post_list = []
        dis_list = []
        dis_type_list = []

        # For each feature in the json we will create a separate mask
        # Getting all files in the directory provided for jsons
        jsons_pre = [j for j in next(walk(json_path))[2] if '_pre' in j]
        # After removing non-json items in dir (if any)
        for j in [j for j in jsons_pre if j.endswith('json')]:
            # Our chips start off in life as PNGs
            chip_image_id_pre = path.splitext(j)[0] + '.png'
            chip_image_id_post = path.splitext(j)[0].replace('_pre', '_post') + '.png'

            id_pre = path.splitext(j)[0]
            id_post = path.splitext(j)[0].replace('_pre', '_post')

            # Loading the per chip json pre-disaster
            j_full_path_pre = path.join(json_path, j)
            chip_json_pre = read_json(j_full_path_pre)
            
            #getting the name of the post-json
            j_post = j.replace('_pre', '_post')
            #print(j_post)
            
            # Loading the per chip json post-disaster
            j_full_path_post = path.join(json_path, j_post)
            chip_json_post = read_json(j_full_path_post)

            # Getting the full chip path, and loading the size dimensions (same for post)
            chip_file_pre = path.join(images_path, chip_image_id_pre)
            chip_file_post = path.join(images_path, chip_image_id_post)

            #chip_img_size = get_dimensions(chip_file_pre)
            #chip_size = (chip_img_size[0],chip_img_size[0],1)
            # Reading in the polygons from the json file
            #polys_pre = get_feature_info(chip_json_pre)
            #polys_post = get_feature_damage(chip_json_post)
            # creating tensors from images
            img_pre = tf.io.read_file(chip_file_pre)
            img_post = tf.io.read_file(chip_file_post)
            array_pre = tf.image.decode_png(img_pre, channels=3, dtype=tf.uint8)
            array_post = tf.image.decode_png(img_post, channels=3, dtype=tf.uint8)
            
            dis,dis_type = get_disaster_info(chip_json_pre)
            
            arr_img_pre_list.append(array_pre)
            arr_img_post_list.append(array_post)
            #creating id and id_disaster lists to identify the arrays
            id_pre_list.append(id_pre)
            id_post_list.append(id_post)
            dis_list.append(dis)
            dis_type_list.append(dis_type)
        return arr_img_pre_list, arr_img_post_list, id_pre_list, id_post_list, dis_list, dis_type_list
       
    else:
        #arr_img_pre_list = []
        #arr_img_post_list = []
       

        # For each feature in the json we will create a separate mask
        # Getting all files in the directory provided for jsons
        images_pre = [j for j in next(walk(images_path))[2] if '_pre' in j]
        # After removing non-json items in dir (if any)
        for j in [j for j in images_pre if j.endswith('png')]:
            # Our chips start off in life as PNGs
            chip_image_id_pre = j
            chip_image_id_post = j.replace('_pre', '_post')

            id_pre = path.splitext(j)[0]
            id_post = path.splitext(j)[0].replace('_pre', '_post')

            # Getting the full chip path, and loading the size dimensions (same for post)
            chip_file_pre = path.join(images_path, chip_image_id_pre)
            chip_file_post = path.join(images_path, chip_image_id_post)

            img_pre = tf.io.read_file(chip_file_pre)
            img_post = tf.io.read_file(chip_file_post)
            array_pre = tf.image.decode_png(img_pre, channels=3, dtype=tf.uint8)
            array_post = tf.image.decode_png(img_post, channels=3, dtype=tf.uint8)
            
            
            
            arr_img_pre_list.append(array_pre)
            arr_img_post_list.append(array_post)
            
        return arr_img_pre_list, arr_img_post_list
        

     
    

In [10]:
# without label info
# img_pre,img_post = create_image_tensors("/Users/gmeneses/DScourse/00_capstone/xView2_baseline_fork/data4inference/images")

# images_pre = np.stack(img_pre)
# images_post = np.stack(img_post)

In [42]:
# with label info = give json_path and images_path (only to get the disaster label, showcase proposes)
img_pre,img_post,id_pre, id_post, dis, dis_type = create_image_tensors("/Users/gmeneses/DScourse/00_capstone/xView2_baseline_fork/data4inference/images","/Users/gmeneses/DScourse/00_capstone/xView2_baseline_fork/data4inference/labels")

images_pre = np.stack(img_pre)
images_post = np.stack(img_post)

In [71]:
#type(images_post)
#images_post.shape

(182, 1024, 1024, 3)

In [45]:
np.unique(dis)

array(['joplin-tornado', 'lower-puna-volcano', 'moore-tornado',
       'nepal-flooding', 'pinery-bushfire', 'portugal-wildfire',
       'sunda-tsunami', 'tuscaloosa-tornado', 'woolsey-fire'],
      dtype='<U18')

In [93]:
# n = 0
# image_showcase_post = images_post[n,:,:,:]
# print(image_showcase_post.shape)
# print(type(image_showcase_post))

# disaster = dis[n]
# disaster_type=dis_type[n]
# print(disaster,disaster_type)

In [3]:

# LOADING TENSORS
#to recover images and mask arrays:
# loaded_arrays_pre = np.load('/Users/gmeneses/DScourse/00_capstone/xView2_baseline_fork/tensors_pre_subset_tensors_pre_xBD_mini.npz')

# loaded_arrays_post = np.load('/Users/gmeneses/DScourse/00_capstone/xView2_baseline_fork/tensors_post_subset_tensors_post_xBD_mini.npz')

In [4]:
# images_pre = loaded_arrays_pre['images']
# id = loaded_arrays_pre['id'] # file names for each image-mask pair

In [73]:
images_post = loaded_arrays_post['images']


In [94]:
# for a list of tensors
#pre_dataset = tf.data.Dataset.from_tensor_slices((images_pre))
post_dataset = tf.data.Dataset.from_tensor_slices((images_post))
#for only one tensor
#image_showcase_post_expanded = tf.expand_dims(image_showcase_post, axis=0)
#post_dataset = tf.data.Dataset.from_tensor_slices(image_showcase_post_expanded)

#post_dataset


In [48]:
import matplotlib.pyplot as plt

def convert_mask_to_3_channels(mask_1_channel):
    """convert 1 channel mask (numpy array) in 3 channel mask, preserving labels 
    defined in "category_colors" dictionary
    
    Args:
        mask (~numpy.ndarray): A mask array with 1 dimension.
        
    Returns:
        mask (~numpy.ndarray) with 3 channels.
    """
    # Assuming mask_1_channels has shape (height, width, 1)
    height, width, _ = mask_1_channel.shape

    # Create an empty array with shape (height, width, 1) for the single-channel mask
    deep_mask = np.zeros((height, width, 3), dtype=np.uint8)

    # Define the colors representing each category (RGB values)
    category_colors = {
        0:(0, 0, 0),        # Class 0 - Black (no building) or un-classified
        1:(255, 255, 255),  # Class 1 - White (no-damage)
        2:(255,255,0),     # Class 2 - Yellow (minor damage)
        3:(255,165,0),     # Class 3 - Orange (major damage)
        4:(255, 0, 0),     # Class 4 - Red (destroyed)
    }
    # Loop through each pixel and assign the corresponding category to the single-channel mask
    for y in range(height):
        for x in range(width):
            pixel_color = mask_1_channel[y, x, 0]
            category = category_colors.get(pixel_color, (-1,-1,-1))  # -1 for unknown category
            deep_mask[y, x] = category

    return deep_mask

def display(display_list):
    plt.figure(figsize=(15, 15))

    title = ["Input Image", "Predicted Mask"]

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i + 1)
        plt.title(title[i])
        plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
        plt.axis("off")
    # Create the folder if it doesn't exist
    plt.show()


In [49]:
def create_mask_single(pred_mask):
    pred_mask = tf.math.argmax(pred_mask, axis=1)
    pred_mask = tf.expand_dims(pred_mask, -1)
    return pred_mask

In [50]:
def map_fn(image):
    # Assign names to the elements in the dataset
    return {"image": image}

#named_pre_dataset = pre_dataset.map(map_fn)


In [95]:
named_post_dataset = post_dataset.map(map_fn)
# onefile only
#named_post_dataset = map_fn(post_dataset)


In [52]:
from tensorflow.keras import backend

image_size = 512
mean = tf.constant([0.485, 0.456, 0.406])
std = tf.constant([0.229, 0.224, 0.225])


def normalize(input_image):
    input_image = tf.image.convert_image_dtype(input_image, tf.float32)
    input_image = (input_image - mean) / tf.maximum(std, backend.epsilon())
    return input_image


def load_image(datapoint):
    input_image = tf.image.resize(datapoint["image"], (image_size, image_size))
    input_image = normalize(input_image)
    input_image = tf.transpose(input_image, (2, 0, 1))
    return {"pixel_values": input_image}    

In [96]:

#pre = (named_pre_dataset.map(load_image))
post = (named_post_dataset.map(load_image))
# one file only
#named_post_dataset["image"]
#post = load_image(named_post_dataset)

In [1]:
from transformers import TFSegformerForSemanticSegmentation
#pre
#name_model = "segmentation_model_test1_last_subset_no_augm"
#images = pre

#post
name_model="classification_model_test1"
images = post

seg_model =  TFSegformerForSemanticSegmentation.from_pretrained(name_model, local_files_only=True)

#
for i,elem in enumerate(images):
    if dis:
        print(dis[i], id_post[i])
    #get image tensor
    image = elem["pixel_values"]
    #print(images_pre.shape)
    # add batch dimension, the model needs it
    image_batched = np.expand_dims(image, axis=0)
    # Use the loaded model to make predictions
    pred_seg=seg_model.predict(image_batched).logits
    print("info prediction", pred_seg.shape,type(pred_seg))
    #convert the predicted mask to 
    final_pred_seg = create_mask_single(pred_seg)
    print("info pred after create mask", final_pred_seg.shape,type(final_pred_seg))
    resized_pred_seg=tf.image.resize(final_pred_seg, (512, 512), method="nearest")
    print("info resized pred", resized_pred_seg.shape)
    #checking if the classes predicted are in the correct range
    predicted_flat = np.array(final_pred_seg).astype(int).flatten()
    print("predicted classes: ",np.unique(predicted_flat))
    # # to plot
    image_plot = tf.transpose(image, (1, 2, 0))
    squeezed_predictions = tf.squeeze(resized_pred_seg, axis=0)  # Remove the batch dimension

    pred_mask_create_3ch = convert_mask_to_3_channels(np.array(squeezed_predictions))
    display([image_plot, pred_mask_create_3ch])


  from .autonotebook import tqdm as notebook_tqdm


NameError: name 'post' is not defined