In [1]:
import numpy as np
import cv2
import imageio
import networkx as nx
from collections import deque
import math
import os

### Read image

In [22]:
def show_image(img, title_name = 'image'):
    cv2.imshow(title_name, img.astype('uint8'))
    key = cv2.waitKey(0)
    cv2.destroyAllWindows()

In [3]:
image_dir = './images/'
image_names = ['balls','brain_mri_small', 'coins_small', 'house_small', 'Karyl_small', 'leaves_small',
              'pets_small', 'example']

images = []
for img_name in image_names:
    images.append(cv2.imread(image_dir + img_name + '.jpg'))

In [4]:
for im in images:
    show_image(im)

### util function

In [5]:
def image_negative(image):
    return 255-image

In [6]:
class Filter:
    def __init__(self, array = None, is_statistics = False, statistics = '', size = None):
        self.array = array                                          # kernal array
        self.is_statistics = is_statistics                          # is the kernal a statistics filter
        self.statistics = statistics                                # statistics filter name
        self.shape = self.array.shape if size == None else size     # kernal size
        
    def __str__(self): 
        if not self.is_statistics: print(self.array)
        return str(self.shape) + ( ' Spatial Filter' if not self.is_statistics else 'Statistic Filter' )


In [7]:
Identity_Filter = Filter(np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]]))
Top_Sobel_Filter = Filter(np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]]))
Left_Sobel_Filter = Filter(np.array([[1, 0, -1], [2, 0, -2], [1, 0, -1]]))
Bottom_Sobel_Filter = Filter(np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]]))
Right_Sobel_Filter = Filter(np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]]))
Laplacian_Filter = Filter(np.array([[0, -1, 0], [-1, 4, -1], [0, -1, 0]]))
Sharpen_Filter = Filter(np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]))
Outline_Filter = Filter(np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]]))

def Averaging_Filter(size = (3,3)):
    return Filter(np.full(size,1)/(size[0]*size[1]))

def Gaussian_Smooting_Filter(size = (3,3), sigma = 1):
    output = np.zeros(size)
    h = (size[0]-1)/2
    w = (size[1]-1)/2
    for i in range(size[0]):
        for j in range(size[1]):
            output[i,j] = (1/(2*math.pi*(sigma**2)))*math.exp(-((i-h)**2+(j-w)**2)/(2*sigma**2))
    return Filter(output)

def Max_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Max', size = size)

def Min_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Min', size = size)

def Mean_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Mean', size = size)

def Medium_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Medium', size = size)

def Midpoint_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Midpoint', size = size)

def Adaptive_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Adaptive', size = size)

def Adaptive_Medium_Filter(size = (3,3)):
    return Filter(is_statistics = True, statistics = 'Adaptive Medium', size = size)

In [8]:
def convolution2D(image_array, kernal, zero_padding = True):
    if(len(image_array.shape)!=2):
        print('this is for 2d image, your input image is ' + str(len(image_array.shape)) + 'd')
        return
    
    image_array = image_array.astype(int)
    if zero_padding:
        h = (kernal.shape[0]-1)//2
        w = (kernal.shape[1]-1)//2
        image_array = np.pad(image_array, ((h,h),(w,w)), 'constant')

    image_height, image_width = image_array.shape
    kernal_height, kernal_width = kernal.shape 
    output_array = np.zeros((image_height-kernal_height+1, image_width-kernal_width+1)).astype(int)
    noice_var = np.var(image_array)
    for x in range(image_height-kernal_height+1):
        for y in range(image_width-kernal_width+1):
            if not kernal.is_statistics:
                output_array[x,y] = np.sum(image_array[x:x+kernal_height,y:y+kernal_width]*kernal.array)
            else:
                if kernal.statistics == 'Max':
                    output_array[x,y] = np.max(image_array[x:x+kernal_height,y:y+kernal_width])
                elif kernal.statistics == 'Min':
                    output_array[x,y] = np.min(image_array[x:x+kernal_height,y:y+kernal_width])
                elif kernal.statistics == 'Medium':
                    output_array[x,y] = np.median(image_array[x:x+kernal_height,y:y+kernal_width])
                elif kernal.statistics == 'Mean':
                    output_array[x,y] = np.mean(image_array[x:x+kernal_height,y:y+kernal_width])
                elif kernal.statistics == 'Midpoint':
                    output_array[x,y] = (np.max(image_array[x:x+kernal_height,y:y+kernal_width]) +
                                         np.min(image_array[x:x+kernal_height,y:y+kernal_width])) / 2
                elif kernal.statistics == 'Adaptive':
                    g_xy = image_array[x+(kernal_height-1)//2,y+(kernal_width-1)//2]
                    local_var = np.var(image_array[x:x+kernal_height,y:y+kernal_width])
                    local_var = max(noice_var, local_var)
                    output_array[x,y] = g_xy-\
                    (noice_var/local_var)*(g_xy-np.mean(image_array[x:x+kernal_height,y:y+kernal_width]))
                elif kernal.statistics == 'Adaptive Medium':
                    kernal_height, kernal_width = kernal.shape
                    Z_xy = image_array[x+(kernal_height-1)//2,y+(kernal_width-1)//2]
                    i = 0 #increase
                    while(x-i>=0 and x+kernal_height+i<image_height \
                          and y-i>=0 and y+kernal_width+i<image_width):
                        z_min = np.min(image_array[x-i:x+kernal_height+i,y-i:y+kernal_width+i])
                        z_max = np.max(image_array[x-i:x+kernal_height+i,y-i:y+kernal_width+i])
                        z_med = np.median(image_array[x-i:x+kernal_height+i,y-i:y+kernal_width+i])
                        
                        if(not(z_min<z_med and z_med<z_max)):
                            i += 1
                        else: #Level b
                            if(z_min<Z_xy and Z_xy<z_max):
                                output_array[x,y] = Z_xy
                            else:
                                output_array[x,y] = z_med
                            break;
                    else:
                        output_array[x,y] = z_med
    output_array = np.clip(output_array, 0, 255)
    return output_array.astype('uint8')

def convolution3D(image_array, kernal, zero_padding = True):
    if(len(image_array.shape)!=3):
        print('this is for 3d image, your input image is ' + str(len(image_array.shape)) + 'd')
        return
    
    output_array = np.zeros_like(image_array)
    
    image_height, image_width, image_channel = image_array.shape
    for i in range(image_channel):
        output_array[:,:,i] = convolution2D(image_array[:,:,i], kernal, zero_padding)
    return output_array

In [9]:
def get_line(start, end):
    # Setup initial conditions
    x1, y1 = start
    x2, y2 = end
    dx = x2 - x1
    dy = y2 - y1
 
    # Determine how steep the line is
    is_steep = abs(dy) > abs(dx)
 
    # Rotate line
    if is_steep:
        x1, y1 = y1, x1
        x2, y2 = y2, x2
 
    # Swap start and end points if necessary and store swap state
    swapped = False
    if x1 > x2:
        x1, x2 = x2, x1
        y1, y2 = y2, y1
        swapped = True
 
    # Recalculate differentials
    dx = x2 - x1
    dy = y2 - y1
 
    # Calculate error
    error = int(dx / 2.0)
    ystep = 1 if y1 < y2 else -1
 
    # Iterate over bounding box generating points between start and end
    y = y1
    points = []
    for x in range(x1, x2 + 1):
        coord = (y, x) if is_steep else (x, y)
        points.append(coord)
        error -= abs(dy)
        if error < 0:
            y += ystep
            error += dx
 
    # Reverse the list if the coordinates were swapped
    if swapped:
        points.reverse()
    return points

### Watershed Transformation

In [10]:
AUTOMATICALLY_MARKER = 0
INTERACTIVE_MARKER = 1
class WatershedTransformation():
    def __init__(self, level_jump = 5, with_marker = False, marker_mode = AUTOMATICALLY_MARKER, marker_threshold = 15):
        self.level_jump = level_jump      # flood each level_jump
        self.neighbor_direction = np.array([[-1,-1,-1, 0,0, 1,1,1],[-1, 0, 1,-1,1,-1,0,1]]) # for find neighbor
        self.with_marker = with_marker
        if self.with_marker:
            self.marker_mode = marker_mode
            self.marker_threshold = marker_threshold
            
    def data_initialization(self, image = np.array([])):
        self.image = image.astype(int)
        self.raw_image = image
        self.RAG = nx.Graph()      # region adjacency graph
        self.maze = np.array([])   # I will treat water flooding as maze searching using DFS
                                   # 0: unseen, -1: wall , -2: edge for watershed, other: seen by player 
        self.processed = np.zeros_like(image)    # used to record which pixel is processed(done in maze)
        
        self.watershed = np.zeros_like(image)    # watershed map, 0: unflood, -2: edge, other:connected component
        self.watersheds = []       # Recorded for generating gif
        
        self.players = 0           # I treat connected components as players for playing maze
        
        self.pixels_val = []       # flood existed pixel value
        uni_image = np.unique(image)
        if self.level_jump == 1:
            self.pixels_val = uni_image
        else:
            self.pixels_val = np.append(uni_image[self.level_jump::self.level_jump], uni_image[-1])            
        
    def get_neightbor_indices(self, index, player):
        neighbor_indices = []      # Record the neighbor of index
        neightbor_all_zero = True  # check the pixel is processed totally
        for d in range(8):
            neighbor_index = self.neighbor_direction[:,d] + index
            
            # skip those neighbor which can't visit
            if np.any(neighbor_index<0) or np.any(neighbor_index>=self.image.shape) or \
                self.maze[neighbor_index[0], neighbor_index[1]] == -2 or \
                self.processed[neighbor_index[0], neighbor_index[1]] == 1 or \
                self.maze[neighbor_index[0], neighbor_index[1]] == player:
                continue
            # neighbor have not been able to processed
            elif self.maze[neighbor_index[0], neighbor_index[1]] == -1:
                neightbor_all_zero = False 
            # collect neighbors which can access
            elif self.maze[neighbor_index[0], neighbor_index[1]] == 0:
                neighbor_indices.append(neighbor_index)
            # touch other's component's area 
            elif self.maze[neighbor_index[0], neighbor_index[1]] != player:
                self.maze[index[0], index[1]] = -2        # record as the edge in image
                self.processed[index[0], index[1]] = 1    # edge can't process, so record as processed
                
                # generate edge in RAG
                if not self.RAG.has_edge(player, self.maze[neighbor_index[0], neighbor_index[1]]):
                    self.RAG.add_edge(player, self.maze[neighbor_index[0], neighbor_index[1]])
                    self.RAG[player][self.maze[neighbor_index[0], neighbor_index[1]]]['bound'] = []
                # record pixel of edge
                self.RAG[player][self.maze[neighbor_index[0], neighbor_index[1]]]['bound'].append(index)
                
                return []
            
        if neightbor_all_zero:
            self.processed[index[0], index[1]] = 1
            
        if len(neighbor_indices) != 0:
            neighbor_indices = np.stack(neighbor_indices,axis = 0)
        return neighbor_indices
    
    def DFS(self, queue, player):
        # iterate depth first search (using queue)
        while queue:
            p_out = queue.popleft()
            if self.maze[p_out[0],p_out[1]] == 0:
                self.maze[p_out[0],p_out[1]] = player
                
                neighbors_p_out = self.get_neightbor_indices(p_out, player)
                for npo in neighbors_p_out:
                    queue.append(npo)
                
    def draw_marker(self):
        print('painting marker...')
        global pt1_x,pt1_y,drawing
        drawing = False      # true if mouse is pressed
        pt1_x , pt1_y = None , None
        painting_point = []
        
        img = np.copy(self.image).astype('uint8')
        
        # mouse callback function
        def line_drawing(event,x,y,flags,param):
            global pt1_x,pt1_y,drawing
            if event==cv2.EVENT_LBUTTONDOWN:
                drawing=True
                pt1_x,pt1_y=x,y
            elif event==cv2.EVENT_MOUSEMOVE:
                if drawing==True:
                    cv2.line(img,(pt1_x,pt1_y),(x,y),color=(255,255,255),thickness=1)
                    line_point = get_line((pt1_x,pt1_y),(x,y))[:-1]
                    painting_point.extend(line_point)    # record the point user have drawn
                    pt1_x,pt1_y=x,y
            elif event==cv2.EVENT_LBUTTONUP:
                drawing=False
                cv2.line(img,(pt1_x,pt1_y),(x,y),color=(255,255,255),thickness=1) 
                line_point = get_line((pt1_x,pt1_y),(x,y))
                painting_point.extend(line_point)        # record the point user have drawn
        
        cv2.namedWindow('(press esc exit) draw markers')
        cv2.setMouseCallback('(press esc exit) draw markers',line_drawing)
        
        # call window
        while(1):
            cv2.imshow('(press esc exit) draw markers',img)
            if cv2.waitKey(1) & 0xFF == 27:    #press esc exit
                break
        cv2.destroyAllWindows()
        
        # delete out-of-bound point
        paint_i = [p[1] for p in painting_point if p[1] < img.shape[0] and p[0] < img.shape[1]\
                  and p[1] >= 0 and p[0] >= 0]
        paint_j = [p[0] for p in painting_point if p[1] < img.shape[0] and p[0] < img.shape[1]\
                  and p[1] >= 0 and p[0] >= 0]
        return (paint_i, paint_j)
        
    def marker_initializatin(self):
        if self.marker_mode == AUTOMATICALLY_MARKER:
            # if this pixel smaller than marker_threshold, this pixel treat as marker
            self.image = np.where(self.image <= self.marker_threshold,0, self.image)
            threshold = np.where(self.image == 0, 1, 0)   
            self.maze = np.where(self.watershed == -2, -2, self.watershed + (threshold - 1))
        elif self.marker_mode == INTERACTIVE_MARKER:
            # get user-choosed marker
            marker_points = self.draw_marker()
            self.image[marker_points] = -3
            threshold = np.where(self.image == -3, 1, 0)
            self.maze = np.where(self.watershed == -2, -2, self.watershed + (threshold - 1))
            image[marker_points] = 0

        print('choosing marker...')
        # make connected component of markers
        while np.any(self.maze == 0):
            self.players += 1
            init_pos = np.argwhere(self.maze == 0)[0]
            queue = deque(init_pos[np.newaxis,:])
            self.DFS(queue, self.players)
            
        self.watershed = np.where(self.maze == -1, self.watershed, self.maze)
                    
    def fit(self, image):
        self.data_initialization(image)  # initial data
        
        if self.with_marker:
            self.marker_initializatin()
        
        print('start watersheding...')
        for water_level in self.pixels_val:
            threshold = np.where(self.image<=water_level, 1, 0)   # flooding
            self.maze = np.where(self.watershed == -2, -2, self.watershed + (threshold - 1)) # generate maze
            for p in range(self.players):                         # for all player(existed component)
                p_indices = np.argwhere(self.maze == p+1)         # get the player's posision
                for p_i in p_indices:
                    if self.processed[p_i[0],p_i[1]] == 0:        # process the unprocessed pixel
                        neighbors = self.get_neightbor_indices(p_i, p+1)
                        queue = deque(neighbors)                  # get the neighbor and run DFS
                        self.DFS(queue, p+1)
        
            # if there are some pixel unflood, create a component to flood it
            if not self.with_marker:
                while np.any(self.maze == 0):
                    self.players += 1
                    init_pos = np.argwhere(self.maze == 0)[0]
                    queue = deque(init_pos[np.newaxis,:])
                    self.DFS(queue, self.players)

            # get watershed from maze
            self.watershed = np.where(self.maze == -1, self.watershed, self.maze) 
            self.watersheds.append(self.watershed)
            print('water level: ', water_level, end = '\r')
        
        print('\nFinished watersheding!')
        
    def region_merging(self, similarity_threshold, apply = True):
        RAG_copy = self.RAG.copy()
        watershed_copy = np.copy(self.watershed)
        
        any_smaller = True    # flag for check whether there are similar component
        while any_smaller:
            any_smaller = False
            for e in RAG_copy.edges:
                # use mean as the metric of similarity
                region1 = self.image[watershed_copy == e[0]]
                region2 = self.image[watershed_copy == e[1]]
                rigion_mean1 = np.mean(region1)
                rigion_mean2 = np.mean(region2)
                if abs(rigion_mean2 - rigion_mean1) < similarity_threshold:
                    any_smaller = True
                    
                    # collect merged edge in watershed, after will delete it
                    bound = RAG_copy[e[0]][e[1]]['bound']
                    bound_merge = []
                    
                    # merged the bound of the component
                    commom_neighbors = sorted(nx.common_neighbors(RAG_copy, e[0], e[1]))
                    for n in commom_neighbors:
                        bound1 = RAG_copy[e[0]][n]['bound']
                        bound2 = RAG_copy[e[1]][n]['bound']
                        bound1.extend(bound2) 
                        bound_merge.append(bound1)
                        
                    # merge node
                    RAG_copy = nx.contracted_nodes(RAG_copy, e[0],e[1]) 
                    RAG_copy.remove_edge(e[0], e[0])
                    watershed_copy[watershed_copy == e[1]] = e[0]
                    
                    # merged the bound of the component
                    for i, n in enumerate(commom_neighbors):
                        RAG_copy[e[0]][n]['bound'] = bound_merge[i]

                    # delete the merged edge
                    for b in bound:
                        watershed_copy[b[0],b[1]] = e[0]
                        
        if apply:
            self.RAG = RAG_copy.copy()
            self.watershed = np.copy(watershed_copy)
            
        return watershed_copy
    
    def random_watershed_color_dictionary(self):
        max_val = np.max(self.watershed)
        color_dict = {0: [0,0,0], -2:[255,255,255]}
        for i in range(1,max_val+1):
            color_dict[i] = list(np.random.choice(range(256), size=3))
        return color_dict
                  
    def generate_watershed_gif(self, dir_pos):
        # random color to every connected component
        color_dict = self.random_watershed_color_dictionary()
            
        watersheds_gif = []    # collect all colored watershed
        watersheds_gif.append(self.image.astype('uint8'))
        for frame in self.watersheds:
            H,W = frame.shape
            frame3D = np.repeat(self.image[:, :, np.newaxis], 3, axis=2)
            for h in range(H):
                for w in range(W):
                    if frame[h,w] != 0:
                        frame3D[h,w,:] = color_dict[frame[h,w]]
            frame3D = frame3D.astype('uint8')
            frame3D = cv2.cvtColor(frame3D, cv2.COLOR_BGR2RGB)
            watersheds_gif.append(frame3D)
        
        # make collected colored watershed to gif image
        imageio.mimsave(dir_pos, watersheds_gif)
        
    def show_segmentation(self):
        img = np.where(self.watershed == -2, 255, 0).astype('uint8') # remember to change type
        show_image(img, 'segmentation')
    
    def save_segmentation(self, dir_pos = ''):
        img = np.where(self.watershed == -2, 255, 0).astype('uint8') # remember to change type
        print('writing to', dir_pos, ':', cv2.imwrite(dir_pos, img))
        
    def generate_color_watershad(self):
        color_dict = self.random_watershed_color_dictionary()
        H,W = self.watershed.shape
        output = np.repeat(self.watershed[:, :, np.newaxis], 3, axis=2)
        for h in range(H):
            for w in range(W):
                output[h,w,:] = color_dict[self.watershed[h,w]]
        return output.astype('uint8')
        
    def show_watershad(self):
        color_watershed = self.generate_color_watershad()
        show_image(color_watershed, 'watershed')
        
    def save_watershad(self, dir_pos = ''):
        color_watershed = self.generate_color_watershad()
        print('writing to', dir_pos, ':', cv2.imwrite(dir_pos, color_watershed))
        
    def show_segmentation_on_image(self, watershed = np.array([])):
        if watershed.size == 0:
            watershed = self.watershed
        img = np.where(watershed == -2, 255, self.raw_image).astype('uint8')
        show_image(img, 'segmentation_on_image')
        
    def save_segmentation_on_image(self, watershed = np.array([]), dir_pos = ''):
        if watershed.size == 0:
            watershed = self.watershed
        img = np.where(watershed == -2, 255, self.raw_image).astype('uint8')
        print('writing to', dir_pos, ':', cv2.imwrite(dir_pos, img))