In [2]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, TensorDataset, DataLoader, random_split

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Building models

In [4]:
class PositionWiseFFN(nn.Module):
    """Same MLP applied to all token(position) representations"""
    def __init__(self, emb_dim, ffn_dim):
        super().__init__()
        self.fc1 = nn.Linear(emb_dim, ffn_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(ffn_dim, emb_dim)
    
    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, emb_dim, max_seq_len):
        super().__init__()

        pe = torch.zeros(max_seq_len, emb_dim)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, emb_dim, 2).float() * -(math.log(10000.0) / emb_dim))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class MultiHeadAttention(nn.Module):
    def __init__(self, emb_dim, num_heads):
        super().__init__()
        assert emb_dim % num_heads == 0, "Embedding dimension must be divided by number of heads"

        # Dimensions initialization
        self.emb_dim = emb_dim
        self.num_heads = num_heads
        # all features are divided into multi head, each head have a part of features
        self.head_emb_dim = self.emb_dim // self.num_heads

        # Transformation matrixs
        self.W_q = nn.Linear(emb_dim, emb_dim)
        self.W_k = nn.Linear(emb_dim, emb_dim)
        self.W_v = nn.Linear(emb_dim, emb_dim)
        self.W_o = nn.Linear(emb_dim, emb_dim)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_emb_dim)

        # Mask scores (where positions are 0) with near negative inf
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        # Apply sofxmax to attention scores
        attn_scores = torch.softmax(attn_scores, dim=-1)

        # Get the final output
        output = torch.matmul(attn_scores, V)
        return output
    
    def split(self, x):
        # Reshape the input emb_dim (to multi-head, each head owns a part of input features) for multi-head attention
        batch_size, seq_len, emb_dim = x.size()
        # transpose to fix batch_size and num_heads, let seq_len, head_emb_dim participate in matrix multiplication
        return x.view(batch_size, seq_len, self.num_heads, self.head_emb_dim).transpose(1, 2)

    def combine(self, x):
        batch_size, num_heads, seq_len, head_emb_dim = x.size()
        # contiguous() ensures the memory layout of the tensor is contiguous
        return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.emb_dim)
    
    def forward(self, Q, K, V, mask=None):
        # Split input to multi heads
        Q = self.split(self.W_q(Q))
        K = self.split(self.W_k(K))
        V = self.split(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)

        # Combine outputs and apply transformation
        output = self.W_o(self.combine(attn_output))
        return output
    
class EncoderLayer(nn.Module):
    def __init__(self, emb_dim, num_heads, ffn_dim, dropout):
        super().__init__()
        self.self_atten = MultiHeadAttention(emb_dim, num_heads)
        self.ffn = PositionWiseFFN(emb_dim, ffn_dim)
        self.norm1 = nn.LayerNorm(emb_dim)
        self.norm2 = nn.LayerNorm(emb_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output = self.self_atten(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(x)
        x = self.norm2(x + self.dropout(ffn_output))
        return x

In [5]:
class PolygonEncoder(nn.Module):
    def __init__(self, emb_dim, num_heads,
                num_layers, ffn_dim, max_seq_len, dropout):
        super().__init__()
        self.encoder_layers = nn.ModuleList([EncoderLayer(emb_dim, num_heads, ffn_dim, dropout) for _ in range(num_layers)])
        self.class_embedding = nn.Parameter(torch.randn(1, 1, emb_dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, 1 + max_seq_len, emb_dim))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # token_mask = (tokens != 0).unsqueeze(1).unsqueeze(2)
        batch_size, seq_len, emb_dim = x.shape
        class_embedding = self.class_embedding.repeat(batch_size, 1, 1)
        x = torch.cat([class_embedding, x], dim=1)
        # print(x.shape, self.pos_embedding[:, :seq_len+1].shape)
        x = x + self.pos_embedding[:, :seq_len+1]
        x = self.dropout(x)

        # Create a new tensor with True values in the first column (for cls token)
        if mask is not None:
            cls_mask = torch.ones((batch_size, 1, 1, 1), dtype=torch.bool).to(device)
            mask = torch.cat((cls_mask, mask), dim=3)
        
        for enc_layer in self.encoder_layers:
            x = enc_layer(x, mask)
        
        return x
    
class PolygonTransformer(nn.Module):
    def __init__(self, num_types, emb_dim, num_heads, num_layers, ffn_dim, max_seq_len, dropout):
        super().__init__()
        self.encoder = PolygonEncoder(emb_dim, num_heads, num_layers, ffn_dim, max_seq_len, dropout)
        self.mlp_head = nn.Sequential(nn.Linear(emb_dim, ffn_dim),
                                      nn.ReLU(),
                                      nn.Linear(ffn_dim, num_types))
        
    def forward(self, x, mask=None):
        x = self.encoder(x, mask)
        x = x[:, 0, :] # grab the class embedding
        x = self.mlp_head(x)
        return x

In [6]:
import torch
import torch.nn as nn

class CompareModel(nn.Module):
    def __init__(self, emb_dim, dense_size, dropout, output_size):
        super().__init__()
        
        # Define the layers
        self.conv1 = nn.Conv1d(emb_dim, 32, kernel_size=5, padding=2)  # Assuming input channels=1
        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.maxpool = nn.MaxPool1d(kernel_size=3)
        self.global_avgpool = nn.AdaptiveAvgPool1d(1)  # Global average pooling
        self.dense1 = nn.Linear(64, dense_size)
        self.dropout = nn.Dropout(dropout)
        self.dense2 = nn.Linear(dense_size, output_size)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
    
    def forward(self, x):
        # Input shape: (batch_size, seq_len, geom_vector_len)
        # Convolutional layers
        x = x.permute(0, 2, 1)  # Permute to (batch_size, channels, seq_len)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.global_avgpool(x)
        
        # Flatten
        x = x.view(x.size(0), -1)  # Reshape to (batch_size, num_features)
        
        # Fully connected layers
        x = self.dense1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.dense2(x)

        # No need to add softmax (already included in CrossEntropyLossFunction), otherwise it will be double softmax and converge slower

        return x

# Prepare Dataset

In [7]:
import pandas as pd
import numpy as np
from deep_geometry import vectorizer as gv
from deep_geometry import GeomScaler


max_seq_len = 64
batch_size = 32


gs = GeomScaler()
types_dict = {'PK':0, 'MR': 1, 'KL':2, 'NV':3, 'WA':4, 'LG':5, 'HO':6, 'GR':7, 'REC':8, 'PGK':9}
df = pd.read_csv("archaeology.csv")
df['type'] = df['Aardspoor'].map(types_dict)
df = df.dropna().reset_index(drop=True)

def count_points(wkt):
    try:
        num_points = gv.num_points_from_wkt(wkt)
        return num_points
    except:
        print("Invalid wkt string, skip it")
        return np.inf

filtered_df = df[df['WKT'].apply(lambda x: count_points(x) <= max_seq_len)]
df = filtered_df

df = df[:1000]

Invalid wkt string, skip it


In [8]:
def dataset_split(df, val_split_ratio, test_split_ratio):

    data, labels = np.array(df['WKT'].tolist()), np.array(df['type'].tolist())

    num_val = int(val_split_ratio * len(df))
    num_test = int(test_split_ratio * len(df))

    indices = np.arange(len(df))
    np.random.shuffle(indices)

    train_indices, val_indices, test_indices = indices[num_val+num_test:], indices[:num_val], indices[num_val:num_val+num_test]

    train_data, train_labels = data[train_indices], labels[train_indices]
    val_data, val_labels = data[val_indices], labels[val_indices]
    test_data, test_labels = data[test_indices], labels[test_indices]

    return train_data, train_labels, val_data, val_labels, test_data, test_labels

ori_train_data, ori_train_labels, ori_val_data, ori_val_labels, ori_test_data, ori_test_labels = dataset_split(df, 0.1, 0.2)

In [9]:
def prepare_polygon_dataset(wkts, types, max_seq_len): # TODO - 1. split into train, validate, test. 2. randomly sample
    geoms, labels, start_points = [], [], []
    for i, wkt in enumerate(wkts):
        num_point = gv.num_points_from_wkt(wkt)
        if  num_point > max_seq_len:
             continue
        geom = gv.vectorize_wkt(wkt, max_points=max_seq_len, fixed_size=True)
        geoms.append(geom)
        labels.append(types[i])
        start_points.append(num_point)

    start_points = torch.tensor(start_points).unsqueeze(1)
    indices = torch.arange(max_seq_len).unsqueeze(0)
    mask = indices < start_points
    mask = mask.unsqueeze(1).unsqueeze(2)
    tokens = np.stack(geoms, axis=0)
    gs.fit(tokens)
    tokens = gs.transform(tokens)
    tokens = torch.tensor(tokens, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.long)
    
    return tokens, labels, mask

In [10]:
# Define your custom dataset
class MyDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

In [11]:
train_tokens, train_labels, train_mask = prepare_polygon_dataset(ori_train_data, ori_train_labels, max_seq_len)
val_tokens, val_labels, val_mask = prepare_polygon_dataset(ori_val_data, ori_val_labels, max_seq_len)
test_tokens, test_labels, test_mask = prepare_polygon_dataset(ori_test_data, ori_test_labels, max_seq_len)

In [12]:
# val_split_ratio, test_split_ratio = 0.1, 0.2
# train_dataset, val_dataset, test_dataset = random_split(dataset, [0.7, 0.1, 0.2])

train_loader = DataLoader(TensorDataset(train_tokens, train_labels, train_mask), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(val_tokens, val_labels, val_mask), batch_size=batch_size)
test_loader = DataLoader(TensorDataset(test_tokens, test_labels, test_mask), batch_size=batch_size)

# Model transformer

In [13]:
pot = PolygonTransformer(num_types=10,
                        emb_dim=7,
                        num_heads=1,
                        num_layers=3,
                        ffn_dim=64, 
                        max_seq_len=max_seq_len,
                        dropout=0.5)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(pot.parameters(), lr=0.001, betas=(0.9, 0.98), eps=1e-9)

num_epochs = 20

for epoch in range(num_epochs):
    pot.train()
    train_loss = 0.0
    for batch_x, batch_y, batch_mask in train_loader:
        batch_x, batch_y, batch_mask = batch_x.to(device), batch_y.to(device), batch_mask.to(device)
        optimizer.zero_grad()
        outputs = pot(batch_x, batch_mask)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    pot.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_x, batch_y, batch_mask in val_loader:
            batch_x, batch_y, batch_mask = batch_x.to(device), batch_y.to(device), batch_mask.to(device)
            outputs = pot(batch_x, batch_mask)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += batch_y.size(0)
            correct += (predicted == batch_y).sum().item()
    val_loss /= len(val_loader)
    val_acc = correct / total
    print(f"Epoch: {epoch+1}, Train Loss: {train_loss/len(train_loader)}, Val Loss: {val_loss}, Val Acc: {val_acc}")


# Test
pot.eval()
test_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for batch_x, batch_y, batch_mask in test_loader:
        batch_x, batch_y, batch_mask = batch_x.to(device), batch_y.to(device), batch_mask.to(device)
        outputs = pot(batch_x, batch_mask)
        loss = criterion(outputs, batch_y)
        test_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
test_loss /= len(test_loader)
test_acc = correct / total
print(f"Test Loss: {test_loss}, Test Acc: {test_acc}")

    

Epoch: 1, Train Loss: 1.9073412743481724, Val Loss: 1.6210010647773743, Val Acc: 0.57
Epoch: 2, Train Loss: 1.5043791749260642, Val Loss: 1.4942034482955933, Val Acc: 0.57
Epoch: 3, Train Loss: 1.3994573734023354, Val Loss: 1.4798990488052368, Val Acc: 0.57
Epoch: 4, Train Loss: 1.3711142431605945, Val Loss: 1.457416981458664, Val Acc: 0.57
Epoch: 5, Train Loss: 1.362732307477431, Val Loss: 1.453051209449768, Val Acc: 0.57
Epoch: 6, Train Loss: 1.357334630055861, Val Loss: 1.4269547760486603, Val Acc: 0.57
Epoch: 7, Train Loss: 1.3497822826558894, Val Loss: 1.4270367920398712, Val Acc: 0.57
Epoch: 8, Train Loss: 1.3507517522031611, Val Loss: 1.425373613834381, Val Acc: 0.57
Epoch: 9, Train Loss: 1.3483836271546104, Val Loss: 1.4310104548931122, Val Acc: 0.57
Epoch: 10, Train Loss: 1.3431221138347278, Val Loss: 1.4185477495193481, Val Acc: 0.57
Epoch: 11, Train Loss: 1.3429289785298435, Val Loss: 1.4106917083263397, Val Acc: 0.57
Epoch: 12, Train Loss: 1.3369650840759277, Val Loss: 1.40

# Model Conv
##### refer to https://arxiv.org/pdf/1806.03857.pdf

In [14]:
def prepare_dataset(wkts, types):
    train_geoms = [gv.vectorize_wkt(wkt) for wkt in wkts]
    
    zipped = zip(train_geoms, types)
    train_input_sorted = {}
    train_labels_sorted = {}

    for geom, label in sorted(zipped, key=lambda x: len(x[0]), reverse=True):
        seq_len = geom.shape[0]
        if seq_len in train_input_sorted:
            train_input_sorted[seq_len].append(geom)
            train_labels_sorted[seq_len].append(label)
        else:
            train_input_sorted[seq_len] = [geom]
            train_labels_sorted[seq_len] = [label]
    
    return train_input_sorted, train_labels_sorted

In [15]:
train_input_sorted, train_labels_sorted = prepare_dataset(ori_train_data, ori_train_labels)
val_input_sorted, val_labels_sorted = prepare_dataset(ori_val_data, ori_val_labels)
test_input_sorted, test_labels_sorted = prepare_dataset(ori_test_data, ori_test_labels)

In [16]:
# Create training data
sequence_length = 10
geom_vector_len = 7  # Assuming geom_vector_len is known
dense_size = 64  # Size of the dense layer
dropout = 0.5  # Dropout rate
num_classes = 10  # Number of output classes
batch_size = 32

# Define the model, loss function, and optimizer
conv_model = CompareModel(emb_dim=geom_vector_len, dense_size=dense_size, dropout=dropout, output_size=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(conv_model.parameters(), lr=0.001, betas=(0.9, 0.98), eps=1e-9)

# Training process
num_epochs = 20

for epoch in range(num_epochs):
    conv_model.train()
    train_loss = 0.0
    total_batch_train = 0
    for seq_len in train_input_sorted:
        inputs = torch.tensor(train_input_sorted[seq_len], dtype=torch.float32)
        labels = torch.tensor(train_labels_sorted[seq_len], dtype=torch.long)
        dataset = TensorDataset(inputs, labels)
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        for batch_x, batch_y in loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = conv_model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            total_batch_train += 1

    conv_model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    total_batch_val = 0
    with torch.no_grad():
        for seq_len in val_input_sorted:
            inputs = torch.tensor(val_input_sorted[seq_len], dtype=torch.float32)
            labels = torch.tensor(val_labels_sorted[seq_len], dtype=torch.long)
            dataset = TensorDataset(inputs, labels)
            loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
            for batch_x, batch_y in loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                outputs = conv_model(batch_x)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += batch_y.size(0)
                total_batch_val += 1
                correct += (predicted == batch_y).sum().item()
    val_loss /= total_batch_val
    val_acc = correct / total
    print(f"Epoch: {epoch+1}, Train Loss: {train_loss/total_batch_train}, Val Loss: {val_loss}, Val Acc: {val_acc}")

# Test
conv_model.eval()
test_loss = 0.0
correct = 0
total_batch_test = 0
with torch.no_grad():
    for seq_len in test_input_sorted:
        inputs = torch.tensor(test_input_sorted[seq_len], dtype=torch.float32)
        labels = torch.tensor(test_labels_sorted[seq_len], dtype=torch.long)
        dataset = TensorDataset(inputs, labels)
        loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        for batch_x, batch_y in loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = conv_model(batch_x)
            loss = criterion(outputs, batch_y)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += batch_y.size(0)
            total_batch_test += 1
            correct += (predicted == batch_y).sum().item()
test_loss /= total_batch_test
test_acc = correct / total
print(f"Test Loss: {test_loss}, Test Acc: {test_acc}")

  inputs = torch.tensor(train_input_sorted[seq_len], dtype=torch.float32)


Epoch: 1, Train Loss: 1.7568874460245882, Val Loss: 1.7431108048983983, Val Acc: 0.57
Epoch: 2, Train Loss: 1.8315633440774584, Val Loss: 1.480326327255794, Val Acc: 0.57
Epoch: 3, Train Loss: 1.6594277071574377, Val Loss: 1.4110818326473236, Val Acc: 0.57
Epoch: 4, Train Loss: 1.592458915142786, Val Loss: 1.3925866969994136, Val Acc: 0.57
Epoch: 5, Train Loss: 1.4769629656322418, Val Loss: 1.3637228731598172, Val Acc: 0.57
Epoch: 6, Train Loss: 1.4584554254062592, Val Loss: 1.3256951293775014, Val Acc: 0.57
Epoch: 7, Train Loss: 1.409654146149045, Val Loss: 1.32930477474417, Val Acc: 0.57
Epoch: 8, Train Loss: 1.329162864931046, Val Loss: 1.3992993329252517, Val Acc: 0.57
Epoch: 9, Train Loss: 1.3306236579304649, Val Loss: 1.2731047076838358, Val Acc: 0.63
Epoch: 10, Train Loss: 1.2663685021892426, Val Loss: 1.2467984991414207, Val Acc: 0.63
Epoch: 11, Train Loss: 1.268877767381214, Val Loss: 1.2466001553194863, Val Acc: 0.63
Epoch: 12, Train Loss: 1.2206146262940907, Val Loss: 1.2363

In [17]:
# Count the number of parameters
total_params_conv_model = sum(p.numel() for p in conv_model.parameters())
print(f"Total number of parameters in the Conv model: {total_params_conv_model}")

total_params_pot_model = sum(p.numel() for p in pot.parameters())
print(f"Total number of parameters in the Transformer model: {total_params_pot_model}")

Total number of parameters in the Conv model: 16266
Total number of parameters in the Transformer model: 5281
