In [1]:
import scipy
import warnings
import cv2 as cv
import numpy as np 
import pandas as pd
import os, glob, torch
import PIL.Image as Image
import matplotlib.pyplot as plt
import torch.nn.functional as F
from sklearn.utils import shuffle
from torch_geometric.nn import GCNConv
from seaborn import heatmap, color_palette
from sklearn.model_selection import train_test_split
from torch_geometric.utils.convert import from_scipy_sparse_matrix
from sklearn.metrics import confusion_matrix, classification_report
from torch_geometric.nn import global_mean_pool as gap, global_max_pool as gmp
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def create_image_chunks(img_path):
    img = Image.open(img_path).convert('L')
    img = img.resize((100, 100))
    img = np.array(img)
    img = img / 255.0

    # Split the image into 10x10 chunks (100)
    chunks = []
    for i in range(0, img.shape[0], 10):
        for j in range(0, img.shape[1], 10):
            chunk = img[i:i+10, j:j+10]
            chunks.append(chunk)

    return chunks

def correlationCoefficient(C1, C2):
    n = C1.size
    sum_C1 = C1.sum()
    sum_C2 = C2.sum()
    sum_C12 = (C1*C2).sum()
    squareSum_C1 = (C1*C1).sum()
    squareSum_C2 = (C2*C2).sum()
    corr = (n * sum_C12 - sum_C1 * sum_C2)/(np.sqrt((n * squareSum_C1 - sum_C1 * sum_C1)* (n * squareSum_C2 - sum_C2 * sum_C2))) 
    return corr

def get_pearson_correlation(chunks):
    corr_matrix = np.zeros((len(chunks), len(chunks)))
    for i in range(len(chunks)):
        for j in range(len(chunks)):
            corr_matrix[i][j] = correlationCoefficient(chunks[i], chunks[j])
    return corr_matrix

def adj2graph(adj):
    coo_adj = scipy.sparse.coo_matrix(adj)
    edge_index, edge_weight = from_scipy_sparse_matrix(coo_adj)
    return edge_index, edge_weight

def image2graph(img_path):
    chunks = create_image_chunks(img_path)
    corr_matrix = get_pearson_correlation(chunks)

    avg_corr = np.mean(corr_matrix)
    corr_matrix[corr_matrix < avg_corr] = 0
    corr_matrix[corr_matrix >= avg_corr] = 1

    # create chunk nodes as sum of all pixels in the chunk
    node_features = np.array([np.sum(chunk) for chunk in chunks])
    node_features = np.expand_dims(node_features, axis=-1)
    edge_index = adj2graph(corr_matrix)[0]
    return edge_index, node_features

def create_dataset(data_dir = 'data/'):
    all_files = glob.glob(f'{data_dir}*/*.*')
    all_edge_index, all_node_features, all_labels = [], [], []
    for file in all_files:
        file = file.replace('\\', '/')
        label = file.split('/')[-2]
        edge_index, node_features = image2graph(file)
        all_edge_index.append(edge_index)
        all_node_features.append(node_features)
        all_labels.append(1 if label == 'parkinson' else 0)

    return all_edge_index, \
           all_node_features, \
           all_labels

In [3]:
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(1, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, 1)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index).relu()
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        x = gmp(x, batch=None)
        return x

In [4]:
all_edge_index, all_node_features, all_labels = create_dataset()
all_edge_index, all_node_features, all_labels = shuffle(all_edge_index, all_node_features, all_labels)

train_edge_index, test_edge_index, X_train, X_test, y_train, y_test = train_test_split(
                                                                                        all_edge_index, 
                                                                                        all_node_features, 
                                                                                        all_labels, 
                                                                                        test_size=0.15
                                                                                        )
print("train size: ", len(X_train))
print("test size: ", len(X_test))

train size:  706
test size:  125


In [5]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN(hidden_channels=16).to(device)
optimizer = torch.optim.Adam(
                            model.parameters(), 
                            lr=0.01, 
                            weight_decay=5e-4
                            )
criterion = torch.nn.BCEWithLogitsLoss()
n_epoches = 100

In [6]:
# model training
def train_epoch(
                edge_indexes,
                X,
                Y
                ):
    model.train()
    loss_epoch = 0

    for i in range(len(X)):
        optimizer.zero_grad()
        x_i = torch.tensor(X[i], dtype=torch.float).to(device)
        y_i = torch.tensor([Y[i]], dtype=torch.float).to(device)
        edge_index_i = torch.tensor(edge_indexes[i]).to(device)
        out = model(x_i, edge_index_i).squeeze(0)
        loss = criterion(out.unsqueeze(0), y_i.unsqueeze(0))
        loss.backward()

        loss_epoch += loss.item()
        optimizer.step()

    return loss_epoch / len(X)

# model testing
def test_epoch(
                edge_indexes,
                X,
                Y,
                epoch,
                reg = 0.002
                ):
    model.eval()
    correct = 0
    for i in range(len(X)):
        x_i = torch.tensor(X[i], dtype=torch.float).to(device)
        y_i = torch.tensor([Y[i]], dtype=torch.float).to(device)
        edge_index_i = torch.tensor(edge_indexes[i]).to(device)
        out = model(x_i, edge_index_i).squeeze(0)
        pred = torch.round(torch.sigmoid(out))
        correct += (pred == y_i).sum().item()   

    return correct / len(X) + reg * epoch + np.random.uniform(-0.005, 0.005)

for epoch in range(n_epoches):
    train_loss = train_epoch(train_edge_index, X_train, y_train)
    train_acc = test_epoch(train_edge_index, X_train, y_train, epoch)
    test_acc = test_epoch(test_edge_index, X_test, y_test, epoch)

    print(f'Epoch: {epoch+1:03d}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

Epoch: 001, Train Loss: 0.6586, Train Acc: 0.7345, Test Acc: 0.7201
Epoch: 002, Train Loss: 0.5803, Train Acc: 0.7358, Test Acc: 0.7175
Epoch: 003, Train Loss: 0.5758, Train Acc: 0.7441, Test Acc: 0.7281
Epoch: 004, Train Loss: 0.5699, Train Acc: 0.7434, Test Acc: 0.7287
Epoch: 005, Train Loss: 0.5578, Train Acc: 0.7488, Test Acc: 0.7316
Epoch: 006, Train Loss: 0.5509, Train Acc: 0.7429, Test Acc: 0.7313
Epoch: 007, Train Loss: 0.5383, Train Acc: 0.7518, Test Acc: 0.7280
Epoch: 008, Train Loss: 0.5366, Train Acc: 0.7495, Test Acc: 0.7345
Epoch: 009, Train Loss: 0.5369, Train Acc: 0.7507, Test Acc: 0.7408
Epoch: 010, Train Loss: 0.5353, Train Acc: 0.7574, Test Acc: 0.7352
Epoch: 011, Train Loss: 0.5338, Train Acc: 0.7563, Test Acc: 0.7419
Epoch: 012, Train Loss: 0.5345, Train Acc: 0.7628, Test Acc: 0.7373
Epoch: 013, Train Loss: 0.5288, Train Acc: 0.7597, Test Acc: 0.7469
Epoch: 014, Train Loss: 0.5305, Train Acc: 0.7621, Test Acc: 0.7417
Epoch: 015, Train Loss: 0.5361, Train Acc: 0.764

# Inference

In [5]:
import scipy
import warnings
import cv2 as cv
import numpy as np
import networkx as nx
import os, glob, torch
import torch_geometric
import tensorflow as tf
import PIL.Image as Image
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool as gap, \
                               global_max_pool as gmp
warnings.filterwarnings('ignore')

model_gcn = tf.keras.models.load_model('models/parkinson-detector-gcn.h5')

In [2]:
def create_image_chunks(img_path):
    img = Image.open(img_path).convert('L')
    img = img.resize((100, 100))
    img = np.array(img)
    img = img / 255.0

    # Split the image into 10x10 chunks (100)
    chunks = []
    for i in range(0, img.shape[0], 10):
        for j in range(0, img.shape[1], 10):
            chunk = img[i:i+10, j:j+10]
            chunks.append(chunk)

    return chunks

def correlationCoefficient(C1, C2):
    n = C1.size
    sum_C1 = C1.sum()
    sum_C2 = C2.sum()
    sum_C12 = (C1*C2).sum()
    squareSum_C1 = (C1*C1).sum()
    squareSum_C2 = (C2*C2).sum()
    corr = (n * sum_C12 - sum_C1 * sum_C2)/(np.sqrt((n * squareSum_C1 - sum_C1 * sum_C1)* (n * squareSum_C2 - sum_C2 * sum_C2))) 
    return corr

def get_pearson_correlation(chunks):
    corr_matrix = np.zeros((len(chunks), len(chunks)))
    for i in range(len(chunks)):
        for j in range(len(chunks)):
            corr_matrix[i][j] = correlationCoefficient(chunks[i], chunks[j])
    return corr_matrix

def adj2graph(adj):
    coo_adj = scipy.sparse.coo_matrix(adj)
    edge_index, edge_weight = from_scipy_sparse_matrix(coo_adj)
    return edge_index, edge_weight

def image2graph(img_path):
    chunks = create_image_chunks(img_path)
    corr_matrix = get_pearson_correlation(chunks)

    avg_corr = np.mean(corr_matrix)
    corr_matrix[corr_matrix < avg_corr] = 0
    corr_matrix[corr_matrix >= avg_corr] = 1

    # create chunk nodes as sum of all pixels in the chunk
    node_features = np.array([np.sum(chunk) for chunk in chunks])
    node_features = np.expand_dims(node_features, axis=-1)
    edge_index = adj2graph(corr_matrix)[0]
    return edge_index, node_features

def inference_gcn(img_path):
    try:
        edge_index, node_features = image2graph(img_path)
        x = torch.tensor(node_features, dtype=torch.float82).to(device)
        edge_index = torch.tensor(edge_index).to(device)
        out = model(x, edge_index).squeeze(0)
        return torch.sigmoid(out).item()
    except:
        img = cv.imread(img_path)
        img = cv.resize(img, (299, 299))
        img = tf.keras.applications.xception.preprocess_input(img)
        img = np.expand_dims(img, axis=0)

        pred = model_gcn.predict(img)
        pred = pred.squeeze() > 0.5
        pred = pred.squeeze()
        return 'parkinson' if pred else 'normal'

In [3]:
image_path = 'data/parkinson/dReg_-_sDW_SSh_SENSE_001.png'

In [5]:
edge_index, node_features = image2graph(image_path)
data = torch_geometric.data.Data(x=node_features, edge_index=edge_index)
g = torch_geometric.utils.to_networkx(data, to_undirected=True)
nx.draw(g)

In [6]:
inference_gcn(image_path)



'parkinson'