In [1]:
import torch
import numpy as np


# Data Loading

In [2]:
df = open('./CSVs/play_style_train.csv').read().splitlines()
games = [i.split(',',2)[-1] for i in df]
game_styles = [int(i.split(',',2)[-2]) for i in df]

In [3]:
chars = 'abcdefghijklmnopqrs'
coordinates = {k:v for v,k in enumerate(chars)}
chartonumbers = {k:v for k,v in enumerate(chars)}
coordinates

{'a': 0,
 'b': 1,
 'c': 2,
 'd': 3,
 'e': 4,
 'f': 5,
 'g': 6,
 'h': 7,
 'i': 8,
 'j': 9,
 'k': 10,
 'l': 11,
 'm': 12,
 'n': 13,
 'o': 14,
 'p': 15,
 'q': 16,
 'r': 17,
 's': 18}

In [4]:
def prepare_input(moves):
    x = torch.zeros((19,19,4) , dtype=torch.short)
    for move in moves:
        color = move[0]
        column = coordinates[move[2]]
        row = coordinates[move[3]]
        if color == 'B':
            x[row,column,0] = 1
            x[row,column,2] = 1
        if color == 'W':
            x[row,column,1] = 1
            x[row,column,2] = 1
    if moves:
        x[row,column,3] = 1
    x[:,:,2] = torch.where(x[:,:,2] == 0, 1, 0)
    return x

In [5]:
# Check how many samples can be obtained
n_games = 0
for game in games:
    n_games += 1
print(f"Total Games: {n_games}")

Total Games: 26615


In [6]:
x = []
for game in games:
    moves_list = game.split(',')
    x.append(prepare_input(moves_list))
x = torch.stack(x)
y = torch.from_numpy(np.array(game_styles)-1)

In [7]:
x.shape

torch.Size([26615, 19, 19, 4])

In [8]:
y.shape

torch.Size([26615])

In [9]:
np.bincount(y)

array([8184, 9403, 9028])

In [10]:
datset = torch.utils.data.TensorDataset(x , y)
train_size = int(len(datset) * .8)
valid_size = len(datset) - train_size
train_set, valid_set = torch.utils.data.random_split(datset , [train_size , valid_size] )


In [11]:
import torch.nn as nn

import math
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.0, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x:torch.nn) -> torch.nn:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x.transpose(0,1)
        x = x + self.pe[:x.size(0)]
        return self.dropout(x).transpose(0,1)
    
class AttentionBlock(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads=8, dropout=0, batch_first=True)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.feed_forward = nn.Sequential(
            nn.Linear(d_model , d_model * 2),
            nn.GELU(),
            nn.Linear(d_model * 2, d_model),
        )

    def forward(self , target , memory , **kwargs):
        target = self.norm1(target + self.self_attn(target , memory , memory , need_weights=False)[0])
        target = self.norm2(target + self.feed_forward(target))
        return target

class EmbeddingModel(nn.Module):
    def __init__(self,):
        super().__init__()
        self.hidden_size = 4
        self.atten_size = 384

        self.input_layer = nn.Sequential(
            nn.Linear(4 , self.atten_size),
            nn.GELU(),
        )

        self.position_encoding = PositionalEncoding(self.atten_size , dropout=0 , max_len=361)

        encoder_layer = nn.TransformerEncoderLayer(d_model=self.atten_size,  nhead=6 , dim_feedforward=1024 , batch_first=True , activation="gelu")
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=5)

        # self.output_conv = nn.Conv1d(self.atten_size , 1 , kernel_size=1 , stride=1 , padding=0)
        # self.output_linear = nn.Linear(self.atten_size , 1)

    def forward(self , x):
        # [batch , w , h , dim]
        x = self.input_layer(x).flatten(1,2)

        # [batch , w * h , dim]
        x = self.position_encoding(x)
        x = self.transformer_encoder(x)

        # x = x.transpose(1 , 2)
        # [batch , dim , w * h]
        # x = self.output_conv(x).squeeze(1)
        # [batch , w * h]

        # return self.output_linear(x).squeeze(-1)
        return x

class MyModel(nn.Module):
    def __init__(self,):
        super().__init__()
        self.hidden_size = 32
        self.atten_size = 384

        self.embedding_model = EmbeddingModel()
        
        self.atten_layer = nn.TransformerDecoder(AttentionBlock(self.atten_size), num_layers=3)
        self.query = nn.Parameter(torch.rand(1,self.atten_size,))

        self.output_layer = nn.Sequential(
            nn.Linear(self.atten_size , 3),
            # nn.Softmax(dim=1),
        )

    def forward(self , x):
        x = self.embedding_model(x)

        query = self.query.repeat(x.shape[0] , 1 , 1)

        x = self.atten_layer(query , x).squeeze(1)
        return self.output_layer(x)
    

In [12]:
from tqdm import tqdm

model = MyModel()

device = torch.device("cuda")

model.load_state_dict(torch.load("model/play_style_backup2.pt") , strict=False)
model.embedding_model.load_state_dict(torch.load("model/play_kyu_large_backup.pt") , strict=False)
# for param in model.embedding_model.transformer_encoder.layers[2:3].parameters():
#     param.requires_grad = False
model.to(device)

# optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
optimizer = torch.optim.AdamW([
    {"params": model.embedding_model.input_layer.parameters(), "lr": 1e-6},
    {"params": model.embedding_model.transformer_encoder.layers[:3].parameters(), "lr": 1e-6},
    {"params": model.embedding_model.transformer_encoder.layers[3:].parameters(), "lr": 1e-5},
    {"params": model.atten_layer.parameters(), "lr": 2e-5},
    {"params": model.query, "lr": 2e-5},
    {"params": model.output_layer.parameters(), "lr": 2e-5},
])
scheduler =  torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.97)
scaler = torch.cuda.amp.GradScaler()

train_dataloader = torch.utils.data.DataLoader(train_set , batch_size=64 , shuffle=True)
valid_dataloader = torch.utils.data.DataLoader(valid_set , batch_size=64)

loss_fn = torch.nn.CrossEntropyLoss()

from sklearn.metrics import f1_score , accuracy_score

pbar = tqdm(range(300))
best_loss = 1000
for i in pbar:
    train_loss = 0
    valid_loss = 0

    model.train()
    for x , y in train_dataloader:
        
        optimizer.zero_grad()
        x = x.to(device).float()
        y = y.to(device)
        with torch.cuda.amp.autocast(dtype=torch.float16):
            output = model(x)

            loss = loss_fn(output , y)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item()
    scheduler.step()
    
    metrics_true = []
    metrics_pred = []
    model.eval()
    with torch.no_grad():
        for x , y in valid_dataloader:
            x = x.to(device).float()
            y = y.to(device)
            
            output = model(x)
            loss = loss_fn(output , y)
            valid_loss += loss.item()
            
            metrics_true.extend(y.cpu().flatten().tolist())
            metrics_pred.extend(torch.argmax(output , dim=-1).cpu().flatten().tolist())

    train_loss /= len(train_dataloader)
    valid_loss /= len(valid_dataloader)

    valid_f_score = f1_score(metrics_true , metrics_pred , average='macro')
    valid_accuracy = accuracy_score(metrics_true , metrics_pred)

    pbar.set_postfix({"train_loss":train_loss , "valid_loss":valid_loss , "valid_f1": valid_f_score , "valid_accuracy":valid_accuracy} )
    pbar.refresh()

    if best_loss > valid_loss:
        best_loss = valid_loss
        torch.save(model.state_dict() , "model/play_style.pt")

100%|██████████| 300/300 [5:06:00<00:00, 61.20s/it, train_loss=0.628, valid_loss=0.727, valid_f1=0.708, valid_accuracy=0.709]  


In [13]:
# best valid accuracy 0.744