In [1]:
!pip install torch_geometric

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torch_geometric
  Downloading torch_geometric-2.3.0.tar.gz (616 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m616.2/616.2 KB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: torch_geometric
  Building wheel for torch_geometric (pyproject.toml) ... [?25l[?25hdone
  Created wheel for torch_geometric: filename=torch_geometric-2.3.0-py3-none-any.whl size=909897 sha256=e2ec20d81a9ba16e8540d99777c10e0185f2901a08679eca443a5a88671dc32d
  Stored in directory: /root/.cache/pip/wheels/cd/7d/6b/17150450b80b4a3656a84330e22709ccd8dc0f8f4773ba4133
Successfully built torch_geometric
Installing collected packages: torch_geometric
Successfully installed torch_geomet

#Zad 1

In [2]:
# Funkcja do rysowania grafu oryginalnego
import networkx as nx

def draw_true_graph(G):
    pos = nx.spring_layout(G, seed=42)
    colors = ['#1f78b4' if data.y[i] == 0 else '#33a02c' for i in range(G.number_of_nodes())]
    plt.subplot(1, 2, 1)
    nx.draw(G, pos=pos, with_labels=True, node_color=colors, edgecolors='gray')
    plt.title('True Labels')

# Wyświetl predykcje
def draw_graph_predict(G, pred):
    pos = nx.spring_layout(G, seed=42)
    colors = ['#1f78b4' if p == 0 else '#33a02c' for p in pred]
    plt.subplot(1, 2, 2)
    nx.draw(G, pos=pos, with_labels=True, node_color=colors, edgecolors='gray')
    plt.title('Predicted Labels')
    plt.show()


In [4]:
import torch
import torch.nn.functional as F
from torch_geometric.datasets import KarateClub
from torch_geometric.nn import GCNConv
from torch_geometric.data import DataLoader

# Załaduj zbiór danych Karate Club
dataset = KarateClub()

# Wczytaj pierwszy graf z zestawu danych
data = dataset[0]

# Wyznacz losowo 10 węzłów uczących się
import random
data.train_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
data.train_mask[random.sample(range(data.num_nodes), 10)] = 1

# Zdefiniuj model GNN
class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = GCNConv(dataset.num_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)  # pierwsza warstwa konwolucyjna na wejściu x i krawędziach edge_index
        x = F.relu(x)  # nieliniowa funkcja aktywacji - ReLU
        x = F.dropout(x, training=self.training)  # regularyzacja - dropout
        x = self.conv2(x, edge_index)  # druga warstwa konwolucyjna na wyjściu z poprzedniej warstwy i krawędziach edge_index
        return F.log_softmax(x, dim=1)  # funkcja softmax zwracająca prawdopodobieństwa przynależności do klas, zwracana wartość jest zlogarytmowana (log_softmax)


# Zainicjuj model i funkcję kosztu
model = Net()
criterion = torch.nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


import matplotlib.pyplot as plt
from torch_geometric.utils import to_networkx

# Konwertuj dane na graf
G = to_networkx(data)

from torch_geometric.utils.convert import to_networkx

# Pętla ucząca z wyświetlaniem klasyfikacji za każdą epoką
for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    acc = (pred[data.train_mask] == data.y[data.train_mask]).float().mean()
    print(f'Epoch {epoch}: Train Accuracy: {acc:.4f}')
    
    draw_true_graph(to_networkx(data))
    draw_graph_predict(to_networkx(data), pred)


# Ustaw model w tryb ewaluacji i wyświetl wyniki
model.eval()
pred = model(data.x, data.edge_index).argmax(dim=1)
acc = (pred[data.train_mask] == data.y[data.train_mask]).float().mean()
print(f'Final Training Accuracy: {acc:.4f}')

# Wyświetl wyniki klasyfikacji dla wszystkich węzłów
pred = model(data.x, data.edge_index).argmax(dim=1)
print(f'Predicted Class Labels: {pred}')


Output hidden; open in https://colab.research.google.com to view.

#Zad2

In [6]:
import matplotlib.pyplot as plt
import networkx as nx

def draw_true_graph_texas(G):
    plt.figure(figsize=(12, 8))
    pos = nx.spring_layout(G, seed=42)
    nx.draw_networkx_nodes(G, pos, node_color=data.y, cmap='Set1', node_size=500)
    nx.draw_networkx_edges(G, pos)
    plt.title("True Graph")
    plt.show()

def draw_graph_predict_texas(G, pred):
    plt.figure(figsize=(12, 8))
    pos = nx.spring_layout(G, seed=42)
    nx.draw_networkx_nodes(G, pos, node_color=pred, cmap='Set1', node_size=500)
    nx.draw_networkx_edges(G, pos)
    plt.title("Predicted Graph")
    plt.show()


In [7]:
import torch
import torch.nn.functional as F
from torch_geometric.datasets import WebKB
from torch_geometric.nn import GCNConv
from torch_geometric.data import DataLoader

# Load the WebKB dataset
dataset = WebKB(root='.', name='texas')

# Load the first graph from the dataset
data = dataset[0]

# Randomly select 10 nodes as training nodes
import random
data.train_mask = torch.zeros(data.num_nodes, dtype=torch.bool)
data.train_mask[random.sample(range(data.num_nodes), 10)] = 1

# Define the GNN model
class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = GCNConv(dataset.num_node_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# Initialize the model and loss function
model = Net()
criterion = torch.nn.NLLLoss()

# Define the optimizer and train for 200 epochs
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

import matplotlib.pyplot as plt
from torch_geometric.utils import to_networkx

# Convert the data to a networkx graph
G = to_networkx(data)

# Training loop with classification output for each epoch
for epoch in range(30):
    print(f"---------------------------- EPOCH: {epoch}---------------------------------------")
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=1)
    acc = (pred[data.train_mask] == data.y[data.train_mask]).float().mean()
    print(f'Epoch {epoch}: Train Accuracy: {acc:.4f}')

    # Draw the true graph and the predicted graph
    draw_true_graph_texas(to_networkx(data))
    draw_graph_predict_texas(to_networkx(data), pred)

    print("-------------------------------------------------------------------")


# Set the model to evaluation mode and display the results
model.eval()
pred = model(data.x, data.edge_index).argmax(dim=1)
acc = (pred[data.train_mask] == data.y[data.train_mask]).float().mean()
print(f'Final Training Accuracy: {acc:.4f}')

# Display the classification results for all nodes
pred = model(data.x, data.edge_index).argmax(dim=1)
print(f'Predicted Class Labels: {pred}')


Output hidden; open in https://colab.research.google.com to view.

#Zad3

In [8]:
import matplotlib.pyplot as plt

import matplotlib.pyplot as plt
import networkx as nx

def visualize_graph_classification(graph, true_label, predicted_label):
    G = nx.Graph()
    G.add_nodes_from(range(graph.num_nodes))
    G.add_edges_from(graph.edge_index.t().tolist())

    node_colors = []
    if len(true_label) == graph.num_nodes:
        for i in range(graph.num_nodes):
            if true_label[i] == 1:
                node_colors.append('red')
            else:
                node_colors.append('blue')
    else:
        node_colors = ['blue'] * graph.num_nodes

    pos = nx.spring_layout(G)
    nx.draw(G, pos, with_labels=True, node_color=node_colors)
    plt.title(f"Predicted label: {predicted_label}")
    plt.show()



In [None]:
# Import biblioteki PyTorch oraz klasy TUDataset z biblioteki torch_geometric.datasets
import torch
from torch_geometric.datasets import TUDataset

# Pobierz zbiór danych MUTAG z bazy TUDataset
dataset = TUDataset(root='data/TUDataset', name='MUTAG')

# Wyświetl informacje o zbiorze danych MUTAG
print()
print(f'Dataset: {dataset}:')
print('====================')
print(f'Number of graphs: {len(dataset)}') # Liczba grafów w zbiorze danych
print(f'Number of features: {dataset.num_features}') # Liczba cech w każdym grafie
print(f'Number of classes: {dataset.num_classes}') # Liczba klas, na które można podzielić grafy

# Pobierz pierwszy graf z dataset
data = dataset[0]

# Wyświetl informacje o pierwszym grafie
print()
print(data) # Wyświetl informacje o pierwszym grafie
print('=============================================================')

# Oblicz niektóre statystyki dla pierwszego grafu
print(f'Number of nodes: {data.num_nodes}') # Liczba węzłów w grafie
print(f'Number of edges: {data.num_edges}') # Liczba krawędzi w grafie
print(f'Average node degree: {data.num_edges / data.num_nodes:.2f}') # Średni stopień węzła
print(f'Has isolated nodes: {data.has_isolated_nodes()}') # Czy graf zawiera izolowane węzły?
print(f'Has self-loops: {data.has_self_loops()}') # Czy graf zawiera pętle?
print(f'Is undirected: {data.is_undirected()}') # Czy graf jest nieskierowany?

# Ustaw ziarno losowości dla biblioteki PyTorch
torch.manual_seed(12345)

# Przetasuj zbiór danych MUTAG
dataset = dataset.shuffle()

# Podziel zbiór danych na zbiór treningowy i testowy
train_dataset = dataset[:150]
test_dataset = dataset[150:]

# Wyświetl informacje o podziale zbioru danych
print(f'Number of training graphs: {len(train_dataset)}') # Liczba grafów w zbiorze treningowym
print(f'Number of test graphs: {len(test_dataset)}') # Liczba grafów w zbiorze testowym

# Załaduj dane z podzielonego zbioru danych do DataLoader
from torch_geometric.loader import DataLoader
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True) # DataLoader dla zbioru treningowego
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False) # DataLoader dla zbioru testowego

# Zdefiniuj model GCN
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool


class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GCNConv(dataset.num_node_features, hidden_channels)  # pierwsza warstwa konwolucyjna
        self.conv2 = GCNConv(hidden_channels, hidden_channels)  # druga warstwa konwolucyjna
        self.conv3 = GCNConv(hidden_channels, hidden_channels)  # trzecia warstwa konwolucyjna
        self.lin = Linear(hidden_channels, dataset.num_classes)  # warstwa liniowa klasyfikatora

    def forward(self, x, edge_index, batch):
        # 1. Obliczenie osadzeń węzłów
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = self.conv2(x, edge_index)
        x = x.relu()
        x = self.conv3(x, edge_index)

        # 2. Warstwa agregacji
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Zastosowanie klasyfikatora
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GCN(hidden_channels=64)  # Tworzenie obiektu reprezentującego GCN o rozmiarze warstwy ukrytej 64
print(model)


model = GCN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)  # Definicja optymalizatora Adam
criterion = torch.nn.CrossEntropyLoss()  # Definicja funkcji kosztu CrossEntropyLoss

def train():
    model.train()  # Ustawienie modelu w trybie treningowym

    for data in train_loader:  # Iteracja po danych treningowych w paczkach
         out = model(data.x, data.edge_index, data.batch)  # Wykonanie jednego przejścia sieci
         loss = criterion(out, data.y)  # Obliczenie funkcji kosztu
         loss.backward()  # Obliczenie gradientów
         optimizer.step()  # Aktualizacja parametrów na podstawie gradientów
         optimizer.zero_grad()  # Wyczyszczenie gradientów

def test(loader):
     model.eval()  # Ustawienie modelu w trybie testowym

     correct = 0
     for data in loader:  # Iteracja po danych testowych w paczkach
         out = model(data.x, data.edge_index, data.batch)  
         pred = out.argmax(dim=1)  # Użycie klasy z najwyższym prawdopodobieństwem
         correct += int((pred == data.y).sum())  # Sprawdzenie z etykietami prawdziwymi
     return correct / len(loader.dataset)  # Obliczenie wskaźnika poprawności predykcji


for epoch in range(1, 171):
    print(f"------------------------------ START {epoch}------------------------------")
    train()
    train_acc = test(train_loader)
    test_acc = test(test_loader)
    
    # Add visualization code here
    for data in train_loader:
        out = model(data.x, data.edge_index, data.batch)
        pred = out.argmax(dim=1)
        visualize_graph_classification(data, data.y.tolist(), pred.tolist())

    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

    print(f"------------------------------ END {epoch}------------------------------")
