In [None]:
import numpy as np
import xir
import vitis_ai_library
import os
import re
import time
import sys

from functools import partial

from skimage import io
from torchvision import transforms as T
from torchvision.transforms import functional as F

In [None]:
dataset_dir = "./images_folder"
results_path = "./predicted/"
model_path = "./build_test/comp_model/UNet2D_compiled.xmodel"
save_images = True

In [None]:
image_folder_path = os.path.join(dataset_dir, 'images')

In [None]:
def correct_dims(*images):
    corr_images = []
    for img in images:
        if len(img.shape) == 2:
            corr_images.append(np.expand_dims(img, axis=2))
        else:
            corr_images.append(img)
    if len(corr_images) == 1:
        return corr_images[0]
    else:
        return corr_images

In [None]:
def preprocess_fn(image_path):
    '''
    Image pre-processing.
    Opens image as grayscale then normalizes to range 0:1
    input arg: path of image file
    return: numpy array
    '''
    image = io.imread(image_path)
    image= correct_dims(image)
    image = F.to_pil_image(image)
    #random crop
    i, j, h, w = T.RandomCrop.get_params(image, (256, 256))
    image = F.crop(image, i, j, h, w)
    
    data = np.asarray(image, dtype=np.float32) / 255 #maybe only divide by 128 as the scaling is in signed integer, probably it is also necessary to substract by 1 after that
    return data

In [None]:
def reshape_inputs(image_list, inputTensors):
    for i in range(0, len(image_list)):
        image_list[i] = image_list[i].reshape(inputTensors.dims)

In [None]:
def run_inference(runner, image_list):
    inputTensors = runner.get_input_tensors()
    output_tensor_buffers = runner.get_outputs() 

    #list to buffer outputs
    outputs = []
    
    reshape_inputs(image_list[:0], inputTensors)
    
    
    #DPU execution
    print("run DPU")
    time1 = time.time()
    
    for image, name in image_list:
        #prepare inputData
        inputData = []
        for inputTensor in inputTensors:
            inputData.append(image)
        
        job_id = runner.execute_async(inputData, output_tensor_buffers)
        runner.wait(job_id)
        outputs.append( (np.array(output_tensor_buffers[0]), name) )
    
    time2 = time.time()
    timetotal = time2 - time1
    fps = float(runTotal / timetotal)
    print(" ")
    print("FPS=%.2f, total frames = %.0f , time=%.4f seconds" %(fps,len(image_list), timetotal))
    print(" ")
    
    return outputs

In [None]:
#get a list of subgraphs from the compiled model file
g = xir.Graph.deserialize(model_path)
runner = vitis_ai_library.GraphRunner.create_graph_runner(g)

In [None]:
#Pre Processing images    
images_list=[f for f in os.listdir(image_folder_path) if re.match(r'.*\.png', f)]
runTotal = len(images_list)           
print('Found',len(images_list),'images - processing',runTotal,'of them')
img = []
for i in range(runTotal):
    path = os.path.join(image_folder_path,images_list[i])
    img.append((preprocess_fn(path), images_list[i]))

In [None]:
outputs = run_inference(runner, img)

In [None]:
if save_images:
    for mask, name in outputs:
        mask = np.squeeze(mask)
        first_channel = mask[:,:,0] * 255
        second_channel = mask[:,:,1] * 255
        io.imsave(os.path.join(results_path, name +'_channel_0.png'), first_channel.astype(np.uint8))
        io.imsave(os.path.join(results_path, name +'_channel_1.png'), second_channel.astype(np.uint8))