In [None]:
import pandas as pd
import torch
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from shapely.wkt import loads
import torch_geometric
import random 
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
import torch_geometric.transforms as T
import imblearn

We have already normalized the data - we can create a second version of each `Data` object that contains self-loops.

In [None]:
# We have already normalized the data
add_selfloops = T.AddSelfLoops()

oil_gas_classification_data = torch.load('pyg_objects/oil+gas/classification_data')
oil_gas_classification_data_selfloops = add_selfloops(oil_gas_classification_data)

coal_classification_data = torch.load('pyg_objects/coal/coal_classification_data')
coal_classification_data_selfloops = add_selfloops(coal_classification_data)

We will use a fairly simple GCN for this task to more clearly analyze the effect of varying connectivity on accuracy.

In [None]:
class GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super().__init__()
        torch.manual_seed(637)
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, out_channels)
        self.double()

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return x
    


def gcn_train(model, data, optimizer, criterion):
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(data.x, data.edge_index)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def gcn_test(model, data):
      model.eval()
      out = model(data.x, data.edge_index)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc, pred


In [None]:
def dist_sweep(model, edges_path, data, optimizer, criterion, distances):
    preds = []
    accs = []
    losses = []
    
    for distance in distances:     
        optimizer.zero_grad()
        filepath = edges_path + str(distance) + "km"
        edges = torch.load(filepath)
        data.edge_index = edges
        print(len(edges[0]), 'edges')
        for epoch in range(1, 500):
                loss = gcn_train(model=model,
                            data=data,
                            optimizer=optimizer,
                            criterion=criterion)
                #print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

        test_acc, pred = gcn_test(model=model,
                            data=data)
        preds.append(pred)
        accs.append(test_acc)
        losses.append(loss)
        print(distance, 'km',  test_acc, '%')

    return preds, accs, losses

We will test the accuracy of this model over a range of different connectivity thresholds:

In [None]:
coal_model = GCN(in_channels=coal_classification_data.num_features, 
                hidden_channels=16,
                out_channels=len(coal_classification_data.y.unique()))
coal_optimizer = torch.optim.NAdam(coal_model.parameters(), lr=0.001, weight_decay=5e-4)
coal_criterion = torch.nn.CrossEntropyLoss()

distances =[1,2,3,4,5,25,50,100,200]
coal_preds, coal_accs, coal_losses = dist_sweep(coal_model, 'pyg_objects/coal/coal-ch4-edges-', coal_classification_data, coal_optimizer, coal_criterion, distances)

Carrying out the same test for the graph with selfloops:

In [None]:
distances =[1,2,3,4,5,25,50,100,200,500,1000]
coal_preds_selfloops, coal_accs_selfloops, coal_losses_selfloops = dist_sweep(coal_model, 'pyg_objects/coal/coal-ch4-edges-', coal_classification_data_selfloops, coal_optimizer, coal_criterion, distances)


### MLP baseline

In [None]:
from torch.nn import Linear

class MLP(torch.nn.Module):
    def __init__(self, input_dim, hidden_channels, output_dim):
        super().__init__()
        torch.manual_seed(637)
        self.lin1 = Linear(input_dim, hidden_channels)
        self.lin2 = Linear(hidden_channels, output_dim)
        self.double()
        
    def forward(self, x):
        x = self.lin1(x)
        x = x.relu()
        x = self.lin2(x)
        return x

model = MLP(input_dim=coal_classification_data.num_features,
            hidden_channels=16,
            output_dim=4)
optimizer = torch.optim.NAdam(model.parameters(), lr=0.001, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
print(model)

In [None]:
def train(model, data, optimizer, criterion):
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(data.x)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test(model, data):
      model.eval()
      out = model(data.x)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc


for epoch in range(1, 2000):
    loss = train(model, coal_classification_data, optimizer, criterion)
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test(model, coal_classification_data)
print(f'Test Accuracy: {test_acc:.4f}')

In [None]:
import matplotlib.pyplot as plt

plt.plot(coal_accs, label = "GCN accuracies")
plt.plot(coal_accs_selfloops, label="GCN w/ selfloops")
plt.xlabel('Edge Connectivity Threshold (km)')
plt.ylabel('Accuracy (%)')
plt.axhline(y=0.2904, color='r', linestyle='--', label='MLP accuracy')
plt.xticks([0,1,2,3,4,5,6,7,8,9,10], [1,2,3,4,5,25,50,100,200,500,1000], fontsize=8)
plt.legend()
plt.show()

### Synthetic Upsampling

In [None]:
# Synthetic data upsampling to fix class imbalance

from imblearn.combine import SMOTEENN

X_train = coal_classification_data.x[coal_classification_data.train_mask]
y_train = coal_classification_data.y[coal_classification_data.train_mask]
X_test = coal_classification_data.x[coal_classification_data.test_mask]
y_test = coal_classification_data.y[coal_classification_data.test_mask]

smote_enn = SMOTEENN(random_state=42)
X_resampled, y_resampled = smote_enn.fit_resample(X_train, y_train)

X_resampled = torch.tensor(X_resampled)
y_resampled = torch.tensor(y_resampled)

coal_classification_data.x = torch.cat((X_resampled, X_test), dim=0)
coal_classification_data.y = torch.cat((y_resampled, y_test), dim=0)

# Train and test masks
train_len = len(X_resampled)
test_len = len(X_test)

train_mask = ([True] * train_len) + ([False] * test_len) 
test_mask = ([False] * train_len) + ([True] * test_len) 

coal_classification_data.train_mask = torch.tensor(train_mask)
coal_classification_data.test_mask = torch.tensor(test_mask)

coal_classification_data_selfloops = add_selfloops(coal_classification_data)

coal_model = GCN(in_channels=coal_classification_data.num_features, 
                hidden_channels=16,
                out_channels=len(coal_classification_data.y.unique()))
coal_optimizer = torch.optim.Adam(coal_model.parameters(), lr=0.01, weight_decay=5e-4)
coal_criterion = torch.nn.CrossEntropyLoss()

distances =[1,2,3,4,5,25,50,100,200]
coal_preds, coal_accs, coal_losses = dist_sweep(coal_model, 'pyg_objects/coal/coal-ch4-edges-', coal_classification_data, coal_optimizer, coal_criterion, distances)

In [None]:
model = MLP(input_dim=coal_classification_data.num_features,
            hidden_channels=16,
            output_dim=4)
optimizer = torch.optim.NAdam(model.parameters(), lr=0.001, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
for epoch in range(1, 2000):
    loss = train(model, coal_classification_data, optimizer, criterion)
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test(model, coal_classification_data)
print(f'Test Accuracy: {test_acc:.4f}')

## Oil/Gas Data 

In [None]:
oil_gas_classification_model = GCN(in_channels=oil_gas_classification_data.num_features, 
                hidden_channels=16,
                out_channels=len(oil_gas_classification_data.y.unique()))
oil_gas_classification_optimizer = torch.optim.NAdam(oil_gas_classification_model.parameters(), lr=0.001, weight_decay=5e-4)
oil_gas_classification_criterion = torch.nn.CrossEntropyLoss()
print(oil_gas_classification_model)

In [None]:
distances =[1,2,3,4,5,25,50,100,200,500,1000,2000,5000,10000]
oil_preds, oil_accs, oil_losses = dist_sweep(oil_gas_classification_model, 'pyg_objects/oil+gas/co2+ch4-', oil_gas_classification_data, oil_gas_classification_optimizer, oil_gas_classification_criterion, distances)

In [None]:
preds_selfloops, accs_selfloops, losses_selfloops = dist_sweep(oil_gas_classification_model, 'pyg_objects/oil+gas/co2+ch4-', 
                                             oil_gas_classification_data_selfloops, oil_gas_classification_optimizer, 
                                             oil_gas_classification_criterion, distances)

In [None]:
model = MLP(input_dim=oil_gas_classification_data.num_features,
            hidden_channels=16,
            output_dim=4)
optimizer = torch.optim.NAdam(model.parameters(), lr=0.001, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
for epoch in range(1, 2000):
    loss = train(model, oil_gas_classification_data, optimizer, criterion)
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test(model, oil_gas_classification_data)
print(f'Test Accuracy: {test_acc:.4f}')

In [None]:
plt.plot(oil_accs, label="GCN accuracies")
plt.plot(accs_selfloops, label="GCN w/ selfloops")
plt.axhline(y=0.6579, color='r', linestyle='--', label='MLP accuracy')
plt.xlabel('Edge Connectivity Threshold (km)')
plt.ylabel('Accuracy (%)')
plt.xticks([0,1,2,3,4,5,6,7,8,9,10,11,12,13], [1,2,3,4,5,25,50,100,200,500,1000,2000,5000,10000], fontsize=8)
plt.title("Classification Accuracy on Oil/Gas Data")
plt.legend()

In [None]:
def count_occurrences(tensor, target_number):
    """
    Count the number of occurrences of a specific number in a PyTorch tensor.
    
    Parameters:
        tensor (torch.Tensor): Input PyTorch tensor.
        target_number: The number to count occurrences of.

    Returns:
        int: The number of occurrences of the target number in the tensor.
    """
    # Create a boolean tensor indicating equality with the target number
    equal_mask = torch.eq(tensor, target_number)

    # Count the occurrences using torch.sum
    occurrences = torch.sum(equal_mask).item()

    return occurrences