In [2]:
import cv2 as cv
import numpy as np
import os
import pandas as pd
import random
from sklearn.model_selection import train_test_split
from joblib import dump, load
from sklearn.neural_network import MLPClassifier
from matplotlib import pyplot as plt
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
from joblib import dump, load
from PIL import Image, ImageFilter, ImageChops
from skimage import feature
from sklearn import svm
import sklearn.model_selection as model_selection
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from skimage.transform import resize
from skimage.feature import hog

### Read images

In [None]:
def read_images(input_path):
    """
    Read images in the input_path,
    save image, patient of each image and the class (group/labels)

    Params:
    input_path = path to the original images

    Return:
    images = list of all images
    labels = list with class for each image
    """

    # Lists to save images, patients and labels
    images = []
    labels = []
    names = []

    # Browse input path
    for class_dir in os.listdir(input_path):
        class_path = os.path.join(input_path, class_dir)

        # If it is a directory
        if os.path.isdir(class_path):

            for image_file in os.listdir(class_path):

                image_name = f'{image_file[:-4]}_{class_dir[0]}'

                image_path = os.path.join(class_path, image_file)

                image = cv.imread(image_path, cv.IMREAD_GRAYSCALE)

                # Append image, patient id and class to list
                images.append(image)
                labels.append(class_dir)
                names.append(image_name)

    return (images, labels, names)

In [None]:
%%bash
wget -q https://www.inf.ufpr.br/vsa20/dataset.tar.gz
#curl -0 https://www.inf.ufpr.br/vsa20/dataset.tar.gz
tar -xf /content/dataset.tar.gz

### Preprocessing

In [None]:
def preprocessing(images_data):

    for key, value in images_data.items():

        image = value[0]
        label = value[1]

        norm_image = cv.normalize(image, None, 0, 1.0, cv.NORM_MINMAX, dtype=cv.CV_16UC1)
        blur_image = cv.GaussianBlur(norm_image, (0, 0), 2)

        images_data[key] = (blur_image, label)

    return images_data

In [None]:
x_train, y_train, images_names_train = read_images("/content/dataset/train")
x_test, y_test, images_names_test = read_images("/content/dataset/test")

In [None]:
def assemble_data(x, y, names):

  images_data = {}

  for i in range(len(names)):
      name = names[i]
      image = x[i]
      label = y[i]

      images_data[name] = (image, label)

  return images_data

images_data_train = assemble_data(x_train, y_train, images_names_train)
images_data_test = assemble_data(x_test, y_test, images_names_test)

In [None]:
# Run preprocessing
images_data_train = preprocessing(images_data_train)
images_data_test = preprocessing(images_data_test)

In [None]:
def augmentation(data, label):

    augmented = []
    degrees = [cv.ROTATE_90_COUNTERCLOCKWISE, cv.ROTATE_90_CLOCKWISE, cv.ROTATE_180]
    for image in data:

        # Flip the image horizontally
        image = cv.flip(image, random.randint(0, 1))

        # Rotate the image by 90 degrees
        image = cv.rotate(image, random.sample(degrees, 1)[0])

        # Resize the image to half its original size
        image = cv.resize(image, (0, 0), fx=random.uniform(0.3, 0.6), fy=random.uniform(0.3, 0.8))

        augmented.append(image)

    images_aug = {f'augmented{i}':(augmented[i], label) for i in range(len(augmented))}

    return images_aug

In [None]:
# Data augmentation for class 2 - no_tumor
train_labels = list(df_train['label'])
class2_imgs_train = [x_train[i] for i in range(len(train_labels)) if train_labels[i]==2]

images_train_aug = augmentation(class2_imgs_train, 2)

In [None]:
images_data_train.update(images_train_aug)

### Extract features

In [None]:
# Extract features: HOG
def extract_hog(images_data, max_len = None):

    hogs = []

    for key, value in images_data.items():

        image = value[0]

        image = resize(image, (image.shape[0]//5, image.shape[1]//5))
        fd = hog(image, orientations=9, pixels_per_cell=(8, 8),
                        cells_per_block=(2, 2), multichannel=False)

        hogs.append(np.array(fd))

    if max_len == None:
        max_len = max(len(desc) for desc in hogs)

    padded = []
    for h in hogs:
        # Wrap the padding width in a list to make it a 2D array
        padded_width = [(0, max_len - len(h))]

        padded_d = np.pad(h, padded_width)
        padded.append(padded_d)

    return padded, max_len

In [None]:
ft_hog_train, max_len = extract_hog(images_data_train)

  fd = hog(image, orientations=9, pixels_per_cell=(8, 8),


In [None]:
ft_hog_test, _ = extract_hog(images_data_test, max_len)

  fd = hog(image, orientations=9, pixels_per_cell=(8, 8),


In [None]:
names_hog = [f'hog{i}' for i in range(len(ft_hog_train[0]))]

In [None]:
# Extract features: LBP
def extract_lbp(images_data, eps=1e-7, points=24, radius=8):

    ft_lbp = []

    for key, value in images_data.items():

        image = value[0]

        lbp = feature.local_binary_pattern(image,
                                           points,
                                           radius,
                                           method="uniform")

        (hist, _) = np.histogram(lbp.ravel(),
                                 bins = np.arange(0, points + 3),
                                                 range=(0, points + 2))

        # normalize the histogram
        hist = hist.astype("float")
        hist /= (hist.sum() + eps)

        ft_lbp.append(hist)

    # return the histogram of Local Binary Patterns
    return ft_lbp

In [None]:
ft_lbp_train = extract_lbp(images_data_train)

In [None]:
ft_lbp_test = extract_lbp(images_data_test)

In [None]:
names_lbp = [f'lbp{i}' for i in range(len(ft_lbp_train[0]))]

Save all features and feature names

In [None]:
def save_features(features, images_names, out_path):

    ft_dict = {images_names[i]:features[i] for i in range(len(images_names))} #dict(zip(image_names, features))

    os.makedirs(out_path, exist_ok=True)

    for key, value in ft_dict.items():

        filename = f'{key}.txt'

        with open(os.path.join(out_path, filename), 'w') as f:
            for elem in value:
                f.write(f'{elem}\n')

In [None]:
# Save features as .txt
save_features(ft_lbp_train, images_names_train, 'ft_lbp_train_aug')
save_features(ft_lbp_test, images_names_test, 'ft_lbp_test')

save_features(ft_hog_train, images_names_train, 'ft_hog_train_aug')
save_features(ft_hog_test, images_names_test, 'ft_hog_test')

In [None]:
# Download features
!zip -r /content/ft_lbp_train.zip /content/ft_lbp_train
!zip -r /content/ft_lbp_test.zip /content/ft_lbp_test

!zip -r /content/ft_hog_train.zip /content/ft_hog_train
!zip -r /content/ft_hog_test.zip /content/ft_hog_test

from google.colab import files
files.download("/content/ft_lbp_train.zip")
files.download("/content/ft_lbp_test.zip")

files.download("/content/ft_hog_train.zip")
files.download("/content/ft_hog_test.zip")

updating: content/ft_hog_test/ (stored 0%)
updating: content/ft_hog_test/image(104)_n.txt (deflated 97%)
updating: content/ft_hog_test/image(15)_m.txt (deflated 92%)
updating: content/ft_hog_test/image(29)_g.txt (deflated 92%)
updating: content/ft_hog_test/image(13)_g.txt (deflated 92%)
updating: content/ft_hog_test/image(80)_g.txt (deflated 93%)
updating: content/ft_hog_test/image(42)_m.txt (deflated 96%)
updating: content/ft_hog_test/image(78)_p.txt (deflated 95%)
updating: content/ft_hog_test/image(91)_p.txt (deflated 83%)
updating: content/ft_hog_test/image(5)_n.txt (deflated 96%)
updating: content/ft_hog_test/image(84)_n.txt (deflated 97%)
updating: content/ft_hog_test/image(113)_m.txt (deflated 91%)
updating: content/ft_hog_test/image(88)_g.txt (deflated 89%)
updating: content/ft_hog_test/image(30)_g.txt (deflated 96%)
updating: content/ft_hog_test/image(32)_g.txt (deflated 95%)
updating: content/ft_hog_test/image(17)_n.txt (deflated 97%)
updating: content/ft_hog_test/image(66)_m

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# Save feature names
with open('ft_hog_names.txt', 'w') as f:
    f.write('\n'.join(names_hog))

with open('ft_lbp_names.txt', 'w') as f:
    f.write('\n'.join(names_lbp))

Read features to create dfs

In [None]:
# !unzip -q ft_hog_test.zip
# !unzip -q ft_lbp_test.zip
! unzip -q ft_hog_train.zip
# ! unzip -qft_lbp_train.zip

In [None]:
def read_features(ft_path):
    """
    Read features from files

    Params:
    ft_path: path to the features .txt file

    Returns:
    features: list of features
    names: list of names
    labels: list of labels
    """

    # Lists to return
    features = []
    names = []
    labels = []

    # For feature file in the path
    for ft_file in os.listdir(ft_path):

        # List of features for this image
        ft_o = []

        with open(os.path.join(ft_path, ft_file), 'r') as f:
            for line in f:

                # Remove linebreak
                x = line[:-1]

                # Add current feature to the list
                ft_o.append(float(x))

        label = ft_file[-5:-4]
        names.append(ft_file[:-4])

        if label == 'g':
            labels.append(0)
        elif label == 'n':
            labels.append(2)
        elif label == 'm':
            labels.append(1)
        elif label == 'p':
            labels.append(3)
        elif 'augmented' in ft_file:
            labels.append(2)

        features.append(ft_o)

    return features, names, labels

In [None]:
def read_files(ft_names_path):
    """
    Read feature names

    Params:
    ft_names_path: path to the feature names .txt file

    Returns:
    ft_names: list of radiomics feature names
    """

    ft_names = []

    # Open file and read the content in a list
    with open(ft_names_path, 'r') as f:
        for line in f:

            # Remove linebreak
            x = line[:-1]

            # Add feature name to the list
            ft_names.append(str(x))

    return ft_names

In [None]:
names_lbp = read_files('/content/ft_lbp_names.txt')

In [None]:
names_hog = read_files('/content/ft_hog_names.txt')

In [None]:
# Load feature files to create dataframes
hog_train, hog_train_names, hog_train_labels = read_features('/content/ft_hog_train')
hog_test, hog_test_names, hog_test_labels = read_features('content/ft_hog_test')

lbp_train, lbp_train_names, lbp_train_labels = read_features('content/ft_lbp_train')
lbp_test, lbp_test_names, lbp_test_labels = read_features('content/ft_lbp_test')


Creating dataframes and saving as .csv

In [None]:
df_hog_train = pd.DataFrame(hog_train, columns = names_hog, index = hog_train_names)
df_hog_train['label'] = hog_train_labels
df_lbp_train = pd.DataFrame(lbp_train, columns = names_lbp, index = lbp_train_names)
df_lbp_train['label'] = lbp_train_labels

df_hog_test = pd.DataFrame(hog_test, columns = names_hog, index = hog_test_names)
df_hog_test['label'] = hog_test_labels
df_lbp_test = pd.DataFrame(lbp_test, columns = names_lbp, index = lbp_test_names)
df_lbp_test['label'] = lbp_test_labels

df_train = pd.concat([df_hog_train, df_lbp_train], axis = 1)
df_test = pd.concat([df_hog_test, df_lbp_test], axis = 1)

In [None]:
df_train = df_train.loc[:, (df_train != 0).any(axis=0)]
df_test = df_test[list(df_train)]

In [48]:
df_train.to_csv("/content/df_train_aug_no0.csv")
df_test.to_csv("df_test.csv")

Reading dataframe from .csv

In [58]:
df_train = pd.read_csv('df_train_noaug_no0.csv', index_col=0)
df_test = pd.read_csv('df_test.csv', index_col=0)

### PSO

In [None]:
# numero total de features disponiveis
max_feature_idx = len(df_train.columns) -1
feature_idxs = list(range(0, max_feature_idx, 1))

# number of dimensions
# i.e. feature number for each particle
n_dimensions = 10
n_particles = 30

# initial particles position
# since we can't use the same feature repeated,
# the initial position of every particle is a n_dimensions array with a random and unique combination of features
initial_pos = []

i = 0
while i < n_particles:
    # sample = random.randrange(0, max_feature_id, 1)
    list_sample = random.sample(feature_idxs, n_dimensions)

    equal = False
    for particle in initial_pos:
        if set(list_sample) == set(particle):
            equal = True

    if not equal:
        initial_pos.append(list_sample)
        i += 1

# min and max values for the features
# 0 is the id for the first feature,
# and max_feature_id is the id for the last feature
bounds = [(0, max_feature_idx-1)]*n_dimensions


In [None]:
def cost_function(features, df_train, df_test):

    x_train = df_train.iloc[:,features]
    y_train = df_train['label']
    x_test = df_test.iloc[:,features]
    y_test = df_test['label']

    poly = svm.SVC(kernel='poly', degree=3, C=1)

    poly.fit(x_train, y_train)
    y_pred = poly.predict(x_test)

    poly_f1 = f1_score(y_test, y_pred, average='weighted')

    return poly_f1

In [None]:
class Particle:
    def __init__(self, initial_pos, i):

        self.position_i = []          # particle position, i.e. features
        self.velocity_i = []          # particle velocity
        self.pos_best_i = []          # best position individual
        self.f1_best_i = -1          # best error individual
        self.f1_i = -1               # error individual
        self.df_train = []
        self.df_test = []

        # initialize position
        self.position_i = initial_pos[i]
        # initialize velocity as values between -1 and 1
        for i in range(0, n_dimensions):
            self.velocity_i.append(random.uniform(-1,1))

    # evaluate current fitness
    def evaluate(self, cost_func):
        self.f1_i = cost_function(self.position_i, df_train, df_test)

        # check to see if the current position is an individual best
        if self.f1_i > self.f1_best_i or self.f1_best_i == -1:
            self.pos_best_i = self.position_i.copy()
            self.f1_best_i = self.f1_i

    # update new particle velocity
    def update_velocity(self, pos_best_g, w, c1, c2, n_dimensions):

        # constant inertia weight (how much to weigh the previous velocity)
        # cognitive constant (influences pbest)
        # social constant (influences gbest)

        for i in range(0, n_dimensions):

            # non-deterministic values to prevent particles
            # from getting stuck in local optima
            r1 = random.random()
            r2 = random.random()

            # update cognitive and social
            vel_cognitive = c1 * r1 * (self.pos_best_i[i] - self.position_i[i])
            vel_social = c2 * r2 * (pos_best_g[i] - self.position_i[i])

            self.velocity_i[i] = w * self.velocity_i[i] + vel_cognitive + vel_social

    # update the particle position based off new velocity updates
    def update_position(self, bounds, n_dimensions):
        for i in range(0, n_dimensions):

            # round value to get discrete position
            position = round(self.position_i[i] + self.velocity_i[i])

            # adjust maximum position if necessary
            if position > bounds[i][1]:
                position = bounds[i][1]

            # adjust minimum position if necessary
            if position < bounds[i][0]:
                position = bounds[i][0]

            # make sure the feature isn't already in the position array
            if position not in self.position_i:
                self.position_i[i] = position

def maximize(cost_function, initial_pos, bounds, n_particles,
             n_dimensions, maxiter, w, c1, c2, verbose=False):

    f1_best_g = -1                    # best f1 score for group
    pos_best_g = []                   # best position for group

    # establish the swarm
    swarm = []
    for i in range(0, n_particles):
        swarm.append(Particle(initial_pos, i))

    # begin optimization loop
    i = 0
    while i < maxiter:
        if verbose: print(f'iter: {i}, best f1-score: {f1_best_g:10.4f}')

        # cycle through particles in swarm and evaluate fitness
        for j in range(0, n_particles):
            swarm[j].evaluate(cost_function)

            # determine if current particle is the best (globally)
            if swarm[j].f1_i > f1_best_g or f1_best_g == -1:
                pos_best_g = swarm[j].position_i
                f1_best_g = float(swarm[j].f1_i)

        # cycle through swarm and update velocities and position
        for j in range(0, n_particles):
            swarm[j].update_velocity(pos_best_g, w, c1, c2, n_dimensions)
            swarm[j].update_position(bounds, n_dimensions)

        i += 1

    f1_best_g = round(f1_best_g, 6)
    pos_best_g = list(df_train.iloc[:,pos_best_g].columns)
    # print final results
    if verbose:
        print('\nFINAL SOLUTION:')
        print(f'Features: {pos_best_g}')
        print(f'Score: {f1_best_g}\n')

    return f1_best_g, pos_best_g

In [None]:
f1_best_g, pos_best_g = maximize(cost_function, initial_pos, bounds, n_particles, n_dimensions, 2, 0.5, 0.5, 0.5, verbose=True)

iter: 0, best f1-score:    -1.0000
iter: 1, best f1-score:     0.3454

FINAL SOLUTION:
Features: ['lbp14', 'lbp16', 'lbp18', 'lbp8', 'lbp9', 'lbp20', 'lbp5', 'lbp3', 'lbp10', 'lbp10']
Score: 0.345350435049269



In [None]:
inertias = [0.1, 0.5, 1]
social = [0, 1, 2, 3, 4]
cognitive = [0, 1, 2, 3, 4]
max_iter = 30

results = []

for w in inertias:
    for c1 in cognitive:
        for c2 in social:
            f1_best_g, pos_best_g = maximize(cost_function, initial_pos, bounds,
                                             n_particles, n_dimensions, max_iter,
                                             w, c1, c2, verbose=True)

            results.append([w, c1, c2, f1_best_g, pos_best_g])



Save the experiment results

In [None]:
import csv

columns_names = ["w", "c1", "c2", "f1_best_g", "pos_best_g"]
df = pd.DataFrame(results, columns=columns_names)
df.index.name = 'iteration'

df.to_csv('results.csv', index=True, header=True)