In [2]:
import os
import re
from PIL import Image

import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

# text models
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, StackingClassifier
from xgboost import XGBClassifier
from sklearn.neighbors import KNeighborsClassifier
from lightgbm import LGBMClassifier

# for handling imbalanced data
from collections import Counter
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import NearMiss
from imblearn.pipeline import make_pipeline
from sklearn.utils import class_weight

import pickle

# for load data
# from glob import glob
import os # for file handling
import glob # for file handling
import shutil # for moving files

# scientific computing library
from sklearn.linear_model import LogisticRegression

# for data preprocessing
from collections import Counter
from imblearn.under_sampling import RandomUnderSampler
from sklearn.utils import class_weight

# for image preprocessing
# from keras.preprocessing.image import ImageDataGenerator
# for model building
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
# from tensorflow.keras import mixed_precision
from tensorflow.keras.layers import Layer, DepthwiseConv2D, Conv2D, Activation, BatchNormalization
from tensorflow.keras.layers import Add, GlobalAveragePooling2D, Dense, Input, MaxPooling2D, Flatten, Dropout
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.applications.imagenet_utils import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from keras import metrics
import tensorflow_hub as hub
from segment_anything import sam_model_registry, SamPredictor

import joblib

import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

# Set Up

In [3]:
data_path = 'G:\\HSUHK\\COM6003\\project\\archive'

In [4]:
os.environ['KERAS_BACKEND'] = 'tensorflow'
INPUT_SHAPE = (256, 256, 3)
IMG_SIZE = (256, 256)
BATCH_SIZE = 8
EPOCHS = 50
SEED = 11
keras.utils.set_random_seed(SEED)
AUTO = tf.data.experimental.AUTOTUNE

# Preprocess the Text Data

In [17]:
class TextPreprocessor:
    def __init__(self, data):
        self.data = pd.DataFrame(data)
        
    def clean_text(self):
        self.data.drop(columns=['patient_id', 'lesion_id', 'biopsed'], inplace=True)
        self.data['label'] = self.data['diagnostic']
        self.data.drop(columns='diagnostic', inplace=True)
        self.data.replace({'UNK':-1, 'NaN':-1, np.nan:-1, 'FALSE': 'False', 'TRUE': 'True'}, inplace=True)

    def encode_text(self):
        binary_column_name = ['smoke', 'drink', 'pesticide', 'gender', 'skin_cancer_history', 
                              'cancer_history', 'has_piped_water', 'has_sewage_system', 
                              'itch', 'grew', 'hurt', 'changed', 'bleed', 'elevation']
        self.convert_to_binary(binary_column_name)
        self.convert_to_integer('background_father', {'AUSTRIA':0, 'BRASIL':1, 'BRAZIL':2, 'CZECH':3,
                                                             'GERMANY':4, 'ISRAEL':5, 'ITALY':6, 'NETHERLANDS':7,
                                                                'POLAND':8, 'POMERANIA':9, 'PORTUGAL':10, 'SPAIN':11})
        self.convert_to_integer('background_mother', {'BRAZIL':0, 'FRANCE':1, 'GERMANY':2, 'ITALY':3,
                                                                'NETHERLANDS':4, 'NORWAY':5, 'POLAND':6, 'POMERANIA':7,
                                                                'PORTUGAL':8, 'SPAIN':9})
        self.convert_to_integer('region', {'ABDOMEN':0, 'ARM':1, 'BACK':2, 'CHEST':3, 
                                                                     'EAR':4, 'FACE':5, 'FOOT':6, 'FOREARM':7, 
                                                                     'HAND':8, 'LIP':9, 'NECK':10, 'NOSE':11, 
                                                                     'SCALP':12, 'THIGH':13})
        self.convert_to_integer('label', {'ACK':0, 'BCC':1, 'MEL':2, 'NEV':3, 'SCC':4, 'SEK':5})
    
    def convert_to_binary(self, column_name):
        for i in column_name:
            if i =='gender':
                self.data[i] = self.data[i].replace({'FEMALE':0, 'MALE':1}).astype(int)
            else:
                self.data[i] = self.data[i].replace({'False': 0, 'True': 1}).astype(int)

    def convert_to_integer(self, column_name, mapping):
        self.data[column_name] = self.data[column_name].replace(mapping).astype(int)


    def text_preprocess(self):
        self.clean_text()
        self.encode_text()
        self.convert_image_path()
        return self.data
    
    def get_image_path(self):
        image_path = []
        for root, dirs, files in os.walk(data_path):
            for file in files:
                if file.endswith('.png'):
                    file_path = os.path.join(root, file)
                    image_path.append(os.path.abspath(file_path))
        return image_path
    
    def convert_image_path(self):
        pattern = {}
        for path in self.get_image_path():
            if os.path.basename(path) in self.data['img_id'].values:
                pattern[os.path.basename(path)] = path
        self.data['img_id'] = self.data['img_id'].replace(pattern)

    
class ImagePreprocessor:
    def __init__(self, data, image_size=(256, 256), batch_size=32):
        self.data = pd.DataFrame(data)
        self.image_path = self.data['img_id']
        self.labels = self.data['label']
        self.image_size = image_size
        self.batch_size = batch_size

    def load_image(self):
        image = Image.open(self.image_path)
        image = image.resize(IMG_SIZE)
        image = np.array(image)
        return image

In [19]:
metadata = pd.read_csv(data_path+'\\metadata.csv')
df = TextPreprocessor(metadata).text_preprocess()
df['img_id']

0       G:\HSUHK\COM6003\project\archive\Index6\val\PA...
1       G:\HSUHK\COM6003\project\archive\imgs_part_1\i...
2       G:\HSUHK\COM6003\project\archive\imgs_part_3\i...
3       G:\HSUHK\COM6003\project\archive\Index2\train\...
4       G:\HSUHK\COM6003\project\archive\Index6\val\PA...
                              ...                        
2293    G:\HSUHK\COM6003\project\archive\imgs_part_3\i...
2294    G:\HSUHK\COM6003\project\archive\imgs_part_1\i...
2295    G:\HSUHK\COM6003\project\archive\imgs_part_3\i...
2296    G:\HSUHK\COM6003\project\archive\Index6\train\...
2297    G:\HSUHK\COM6003\project\archive\imgs_part_3\i...
Name: img_id, Length: 2298, dtype: object

In [26]:
        def split_data(self):
        self.train, self.test = train_test_split(self.metadata, test_size=0.2, random_state=42)
        self.train, self.val = train_test_split(self.train, test_size=0.2, random_state=42)
        
    
    def balance_data(self):
        class_count = Counter(self.train['label'])
        min_class_count = min(class_count.values())

        over_sample_strategy = {label: 2*min_class_count for label in class_count.keys() if class_count[label] <= (2 * min_class_count)}
        under_sample_strategy = {label: 2* min_class_count for label in class_count.keys() if class_count[label] > (2 * min_class_count)}

        pipe = make_pipeline(
        SMOTE(sampling_strategy=over_sample_strategy),
        NearMiss(sampling_strategy=under_sample_strategy)
    )
        
        self.x_train, self.y_train = pipe.fit_resample(self.x_train, self.y_train)

    def create_datagen(self, img_size=(256, 256), batch_size=16, channels=3):
        def img_preprocessing(image, label):
            image = tf.io.read_file(image)
            image = tf.image.decode_png(image, channels=channels)
            image = tf.image.resize(image, img_size)
            image = tf.cast(image, tf.float32) / 255.0
            return image, label
        
        def augmentation(image, label):
            image = tf.image.random_flip_left_right(image)
            image = tf.image.random_flip_up_down(image)
            image = tf.image.random_brightness(image, 0.2)
            image = tf.image.random_contrast(image, 0.8, 1.2)
            image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
            return image, label
        
        def create_dataset(data, augment=True):
            loader = tf.data.Dataset.from_tensor_slices((data))
            if augment:
                dataset = (loader.map(img_preprocessing, num_parallel_calls=AUTO)
                                .map(augmentation, num_parallel_calls=AUTO)
                                .batch(batch_size)
                                .shuffle(batch_size * 10)
                                .prefetch(AUTO).repeat())
            else:
                dataset = (loader.map(img_preprocessing, num_parallel_calls=AUTO)
                                .batch(batch_size)
                                .prefetch(AUTO))
            
            return dataset

        train_dataset = create_dataset(self.train, self.train['label'], augment=True)
        val_dataset = create_dataset(self.val, self.val['label'])
        test_dataset = create_dataset(self.test, self.test['label'])
        
        return train_dataset, val_dataset, test_dataset

    def preprocess(self):
        self.clean_data()
        self.impute_data()
        # self.split_data()
        # self.balance_data()
        return self.train, self.val, self.test
        


# Preprocess the Image Data

In [None]:
def create_datagen(x_train, y_train, x_val, y_val, x_test, y_test, img_size=(256, 256), batch_size=16, channels=3):
    """ Create data generators for training, validation, and testing datasets. """
    def img_preprocessing(img, label):
        """ Image preprocessing function """
        img = tf.io.read_file(img)  # Read the image file
        img = tf.image.decode_png(img, channels=channels)  # Decode the PNG image
        img = tf.image.resize(img, img_size)  # Resize the image
        img = tf.cast(img, tf.float32) / 255.0  # Normalize pixel values to [0, 1] range
        return img, label

    def augmentation(image, label):
        """ Data augmentation function """
        image = tf.image.random_flip_left_right(image)  # Randomly flip the image horizontally
        image = tf.image.random_flip_up_down(image)  # Randomly flip the image vertically
        image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32)) # Randomly rotate the image
        image = tf.image.random_brightness(image, max_delta=0.2)  # Randomly adjust brightness
        image = tf.image.random_contrast(image, lower=0.8, upper=1.2)  # Randomly adjust contrast
        return image, label

    def create_dataset(x, y, augment=False):
    # Create dataset loaders and tf.datasets
        loader = tf.data.Dataset.from_tensor_slices((x.iloc[:,0], y))
        if augment:
            dataset = (loader.map(img_preprocessing, num_parallel_calls=AUTO)  # Apply image preprocessing function)
                            .map(augmentation, num_parallel_calls=AUTO)  # Apply data augmentation function
                            .batch(batch_size)  # Batch the data
                            .shuffle(batch_size * 10)  # Shuffle the dataset
                            .prefetch(AUTO).repeat())  # Prefetch data for performance
        else:
            dataset = (loader.map(img_preprocessing, num_parallel_calls=AUTO)
                            .batch(batch_size) # Batch the data
                            .prefetch(AUTO)) # Prefetch data for performance
            
        return dataset

    train_dataset = create_dataset(x_train, y_train, augment=True)
    val_dataset = create_dataset(x_val, y_val)
    test_dataset = create_dataset(x_test, y_test)
    

    return train_dataset, val_dataset, test_dataset

In [28]:
dataprocessing = DataProcessing()
train, val, test = dataprocessing.preprocess()
print(train)

None
