In [2]:
import numpy as np
import os
import requests

In [5]:
import cv2

In [None]:
img = cv2.imread('image_path')

In [3]:
def do_otsu(img: np.ndarray) -> np.ndarray:
        """
        :param img: ndarray (height,width,channels)
        :return: ndarray (height,width), Boolean
        """
        #
        # бинаризация отсу
        #
        global_thresh = filters.threshold_otsu(img)
        binary_global = img > global_thresh

        return binary_global.astype('uint8')
    
def image_preprocess(image: np.ndarray) -> np.ndarray:
        """
        :param image: ndarray (height,width,channels)
        :return: ndarray (height,width)
        """
        #
        # комбинация медианного фильтра, биноризации и гражиента
        # у зерен значение пикселя - 0, у регионов связ. в-ва - 127,а у их границы - 254
        #
        unsigned_image = util.img_as_ubyte(image)
        denoised = filters.rank.median(unsigned_image, ball(3))
        binary = cls.do_otsu(denoised)
        grad = abs(filters.rank.gradient(binary, ball(1)))
        bin_grad = (1 - binary + grad) * 127

        return bin_grad.astype(np.uint8)    

In [4]:
class grainPreprocess():

    @classmethod
    def imdivide(cls, image: np.ndarray, h: int, side: str) -> np.ndarray:
        """
        :param image: ndarray (height,width,channels)
        :param h: int scalar
        :param side: str 'left'
        :return: ndarray (height,width/2,channels)
        """
        #
        # возвращает левую или правую часть полученного изображения
        #
        height, width = image.shape
        sides = {'left': 0, 'right': 1}
        shapes = [(0, height - h, 0, width // 2), (0, height - h, width // 2, width)]
        shape = shapes[sides[side]]

        return image[shape[0]:shape[1], shape[2]:shape[3]]

    @classmethod
    def combine(cls, image: np.ndarray, h: int, k=0.5) -> np.ndarray:
        """
        :param image: ndarray (height,width,channels)
        :param h: int scalar
        :param k: float scalar
        :return: ndarray (height,width/2,channels)
        """
        #
        #  накладывает левую и правые части изображения
        #  если k=1, то на выходе будет левая часть изображения, если k=0, то будет правая часть
        #
        left_img = cls.imdivide(image, h, 'left')
        right_img = cls.imdivide(image, h, 'right')

        l = k
        r = 1 - l
        gray = np.array(left_img) * l
        gray += np.array(right_img) * r

        return gray.astype('uint8')

    @classmethod
    def do_otsu(cls, img: np.ndarray) -> np.ndarray:
        """
        :param img: ndarray (height,width,channels)
        :return: ndarray (height,width), Boolean
        """
        #
        # бинаризация отсу
        #
        global_thresh = filters.threshold_otsu(img)
        binary_global = img > global_thresh

        return binary_global.astype('uint8')

    @classmethod
    def image_preprocess(cls, image: np.ndarray) -> np.ndarray:
        """
        :param image: ndarray (height,width,channels)
        :return: ndarray (height,width)
        """
        #
        # комбинация медианного фильтра, биноризации и гражиента
        # у зерен значение пикселя - 0, у регионов связ. в-ва - 127,а у их границы - 254
        #
        unsigned_image = util.img_as_ubyte(image)
        denoised = filters.rank.median(unsigned_image, ball(3))
        binary = cls.do_otsu(denoised)
        grad = abs(filters.rank.gradient(binary, ball(1)))
        bin_grad = (1 - binary + grad) * 127

        return bin_grad.astype(np.uint8)

    @classmethod
    def image_preprocess_kmeans(cls, image: np.ndarray, h=135, k=1, n_clusters=3, pos=1) -> np.ndarray:
        """
        :param image: array (height,width,channels)
        :param h: int scalar
        :param k: float scalar
        :param n_clusters: int scalar
        :param pos: int scalar, cluster index
        :return: ndarray (height,width)
        """
        #
        # выделение границ при помощи кластеризации 
        # и выравнивание шума медианным фильтром
        # pos отвечает за выбор кластера, который будет отображен на возвращенном изображении
        #
        combined = cls.combine(image, h, k)

        clustered, colors = grainMorphology.kmeans_image(combined, n_clusters)
        cluster = clustered == colors[pos]
        cluster = np.array(cluster * 255, dtype='uint8')

        new_image = filters.median(cluster, disk(2))
        return new_image

    @classmethod
    def read_preprocess_data(cls, images_dir, max_images_num_per_class=100, preprocess=False, save=False,
                             crop_bottom=False,
                             h=135, resize=True, resize_shape=None,
                             save_name='all_images.npy'):

        folders_names = glob.glob(images_dir + '*')
        images_paths = [glob.glob(folder_name + '/*')[:max_images_num_per_class] for folder_name in folders_names]

        l = np.array(images_paths).flatten().shape[0]

        # Initial call to print 0% progress
        GrainLogs.printProgressBar(0, l, prefix='Progress:', suffix='Complete', length=50)

        preproc_images = []

        start_time = time.time()
        step = 0
        for i, images_list_paths in enumerate(images_paths):
            preproc_images.append([])
            for image_path in images_list_paths:
                step += 1
                image = io.imread(image_path).astype(np.uint8)
                # вырезает нижнюю полоску фотографии с линекой и тд
                if crop_bottom:
                    image = grainPreprocess.combine(image, h)

                # ресайзит изображения
                if resize:
                    if resize_shape is not None:
                        image = transform.resize(image, resize_shape)
                    else:
                        print('No resize shape')

                # последовательно применяет фильтры (медианный, отсу, собель и тд)
                if preprocess:
                    image = grainPreprocess.image_preprocess(image)
                end_time = time.time()
                eta = round((end_time - start_time) * (l - step), 1)
                GrainLogs.printProgressBar(step, l, eta=eta, prefix='Progress:', suffix='Complete', length=50)
                start_time = time.time()
                preproc_images[i].append(image)

        if save:
            np.save(save_name, preproc_images)
        return preproc_images

    @classmethod
    def tiff2jpg(cls, folder_path, start_name=0, stop_name=-4, new_folder_path='resized'):
        #
        # переводит из tiff 2^16 в jpg 2^8 бит
        #
        folders = os.listdir(folder_path)

        if not os.path.exists(new_folder_path):
            os.mkdir(new_folder_path)

        for folder in folders:
            if not os.path.exists(new_folder_path + '/' + folder):
                os.mkdir(new_folder_path + '/' + folder)

        for i, folder in enumerate(folders):
            images_names = os.listdir(folder_path + '/' + folder)
            for i, name in enumerate(images_names):
                if 'hdr' not in name:
                    img = io.imread(folder_path + '/' + folder + '/' + name)
                    img = (img / 255).astype('uint8')

                    io.imsave(new_folder_path + '/' + folder + '/' + name[start_name:stop_name] + '.jpg', img)

    @classmethod
    def get_example_images(cls):
        '''
        :return: ndarray [[img1],[img2]..]
        '''
        #
        # скачивает из контейнера s3 по 1 снимку каждого образца
        #

        urls = CfgDataset.images_urls
        images = []

        for url in urls:
            logger.warning(f'downloading {url}')
            file = requests.get(url, stream=True).raw
            img = np.asarray(Image.open(file))
            images.append([img])

        return np.array(images)