<a href="https://colab.research.google.com/github/miinkang/PyTorch/blob/main/CORA_Node_Classification_with_GCN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
use_colab = True
assert use_colab in [True, False]

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import scipy.sparse as sp
import torch


def encode_onehot(labels):
    classes = set(labels)
    classes_dict = {c: np.identity(len(classes))[i, :] for i, c in
                    enumerate(classes)}
    labels_onehot = np.array(list(map(classes_dict.get, labels)),
                             dtype=np.int32)
    return labels_onehot

In [None]:
# !pip install stellargraph

In [None]:
from stellargraph import datasets

In [None]:
dataset = datasets.Cora()
content, cites = dataset.load()

In [None]:
print(content)

In [None]:
def load_data(G, node_subjects):

    idx_features_labels = G
    features = sp.csr_matrix(idx_features_labels[:, 1:-1], dtype=np.float32)
    labels = encode_onehot(idx_features_labels[:, -1])

    # build graph
    idx = np.array(idx_features_labels[:, 0], dtype=np.int32)
    idx_map = {j: i for i, j in enumerate(idx)}
    edges_unordered = np.genfromtxt("{}{}.cites".format(path, dataset),
                                    dtype=np.int32)
    edges = np.array(list(map(idx_map.get, edges_unordered.flatten())),
                     dtype=np.int32).reshape(edges_unordered.shape)
    adj = sp.coo_matrix((np.ones(edges.shape[0]), (edges[:, 0], edges[:, 1])),
                        shape=(labels.shape[0], labels.shape[0]),
                        dtype=np.float32)

    # build symmetric adjacency matrix
    adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)

    features = normalize(features)
    adj = normalize(adj + sp.eye(adj.shape[0]))

    idx_train = range(140)
    idx_val = range(200, 500)
    idx_test = range(500, 1500)

    features = torch.FloatTensor(np.array(features.todense()))
    labels = torch.LongTensor(np.where(labels)[1])
    adj = sparse_mx_to_torch_sparse_tensor(adj)

    idx_train = torch.LongTensor(idx_train)
    idx_val = torch.LongTensor(idx_val)
    idx_test = torch.LongTensor(idx_test)

    return adj, features, labels, idx_train, idx_val, idx_test

In [None]:
A, features, labels, idx_train, idx_val, idx_test = load_data()

In [None]:
class GCN_layer(nn.Module):
    def __init__(self, in_features, out_features, A):
        super(GCN_layer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.A = A
        self.fc = nn.Linear(in_features, out_features)
        
    def forward(self, X):
        return self.fc(torch.spmm(self.A, X)) #이웃 정보 종합

class GCN(nn.Module):
    def __init__(self, num_feature, num_class, A):
        super(GNN, self).__init__()

        self.feature_extractor = nn.Sequential(
                                    GNN_layer(num_feature, 16, A),
                                    nn.ReLU(),
                                    GNN_layer(16, num_class, A)
                                )
        
    def forward(self, X):
        return self.feature_extractor(X)

In [None]:
def train(model, Loss, optimizer, num_epochs):
  train_loss_arr = []
  test_loss_arr = []

  best_test_loss = 99999999
  early_stop, early_stop_max = 0., 10.

  for epoch in range(num_epochs):

    # Forward Pass
    model.train()
    output = model(features)
    train_loss = criterion(output[idx_train], labels[idx_train])

    # Backward and optimize
    train_loss.backward()
    optimizer.step()
        
    train_loss_arr.append(train_loss.data)
    
    if epoch % 10 == 0:
        model.eval()
        
        output = model(features)
        val_loss = criterion(output[idx_val], labels[idx_val])
        test_loss = criterion(output[idx_test], labels[idx_test])
        
        val_acc = accuracy(output[idx_val], labels[idx_val])
        test_acc = accuracy(output[idx_test], labels[idx_test])
        
        test_loss_arr.append(test_loss)
        
        if best_ACC < val_acc:
            best_ACC = val_acc
            early_stop = 0
            final_ACC = test_acc
            print('Epoch [{}/{}], Train Loss: {:.4f}, Test Loss: {:.4f}, Test ACC: {:.4f} *'.format(epoch, 100, train_loss.data, test_loss, test_acc))
        else:
            early_stop += 1

            print('Epoch [{}/{}], Train Loss: {:.4f}, Test Loss: {:.4f}, Test ACC: {:.4f}'.format(epoch, 100, train_loss.data, test_loss, test_acc))

    if early_stop >= early_stop_max:
        break
        
  print("Final Accuracy::", final_ACC)

In [None]:
class FCN(nn.Module):
    def __init__(self, num_feature, num_class):
        super(FCN, self).__init__()

        self.feature_extractor = nn.Sequential(
                                    nn.Linear(num_feature, 16),
                                    nn.ReLU(),
                                    nn.Linear(16, num_class)
                                )

    def forward(self, x):
        return self.feature_extractor(x)

In [None]:
# FCN 학습 돌려서 epoch에 따른 Loss 확인
model = FCN(features.size(1) , labels.unique().size(0))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.1, weight_decay=0.0001)

train(model, criterion, optimizer, 1000)


# GCN 학습 돌려서 epoch에 따른 Loss 확인
model = GCN(features.size(1) , labels.unique().size(0), A)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.1, weight_decay=0.0001)

train(model, criterion, optimizer, 1000)