In [1]:
import numpy as np
import torch
import torch.nn as nn
from IPython.display import Image as im, display
from PIL import Image
import os
import numpy as np
import random
from psutil import cpu_count
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
! pip install kaggle
! mkdir /root/.kaggle
! cp /content/drive/MyDrive/kaggle/kaggle.json /root/.kaggle/
! chmod 600 /root/.kaggle/kaggle.json
! kaggle datasets download -d maxwilcoxson/hanzi-writer-dataset-png
! unzip hanzi-writer-dataset-png.zip
! rm hanzi-writer-dataset-png.zip

In [4]:
transform = transforms.RandomResizedCrop(size=(256, 256), scale=(0.8, 1), ratio=(3/4, 4/3))

#transforms a list ofimages with a rotating crop, and returns the original and rotated images
def transformImages(original):
    images = []
    for i in range(len(original)):
        images.append(transforms.functional.pil_to_tensor(original[i]))
    images = torch.stack(images)
    images = images.squeeze(1)
    t1 = transform(images).numpy()
    t1im = []
    for i in range(len(t1)):
        t1im.append(Image.fromarray(t1[i]))
    return {"or": original, "t1": t1im}

#loads and transforms subset of images into a numpy array, resizing for 140x140
#to compose inputs and 28x28 images to be arranged to make grid for outputs
def loadImages(start, end):
    path = "/content/data/"
    inputs = {}
    outputs = {}
    for character in os.listdir(path)[start:end]:
        if not character.startswith("."):
            images = []
            for filename in sorted(os.listdir(path + character), key=lambda x: int(x.split(".")[0][1:])):
                if filename.endswith(".png"):
                    original = Image.open(path + character + "/" + filename).convert("L")
                    images.append(original)
        versions = transformImages(images)
        for k, imgSet in versions.items():
            full_size = []
            small_size = []
            for img in imgSet:
                full_size.append(np.array(img.resize((140,140))))
                small_size.append(np.array(img.resize((28,28))))
            currentSmall = 255.0*((np.array(small_size)/255.0)**3) #cube normalized pixel values to increase contrast
            currentLarge = 255.0*((np.array(full_size)/255.0)**3)
            inputs[k + "-" + character] = currentLarge
            outputs[k + "-" + character] = currentSmall
    return inputs, outputs

#for a given set of 25x25 images, concatenates them into a 5x5 grid
#pads unused entries with zeroes, and returns the resulting 140x140 numpy array
def concatentatePartsIntoGrid(images):
    newArray = 255*np.ones((140, 140))
    for i in range(5):
        for j in range(5):
            if i*5 + j >= len(images):
                break
            newArray[i*28:(i+1)*28, j*28:(j+1)*28] = images[i*5+j]
    return newArray

#to check the input processing, given an array of 140x140 numpy arrays, this function
#saves each array as an image and returns the list of names for the saved images
def saveImages(images, t):
    names = []
    for i in range(len(images)):
        image = images[i]
        img = Image.fromarray(image)
        img = img.convert('RGB')
        if not os.path.exists("/content/outputtest"):
            os.mkdir("/content/outputtest")
        outputfile = "/content/outputtest/" + str(i) + t + ".png"
        with(open(outputfile, "w")) as f:
            f.write("")
        f.close()
        img.save(outputfile)
        names.append(outputfile)
    return names

#augments the dataset by taking every prefix of the n strokes required to write
#each character, and creates a new datapoint from this prefix, with the k stroke
# prefix of the character and sequence of the first k strokes in a grid
def augmentDataset(full, small):
    Y_aug = []
    X_aug = []
    for character in full:
        full_images = full[character]
        small_images = small[character]
        for i in range(1, len(full_images)+1):
            Y_aug.append(concatentatePartsIntoGrid(small_images[:i]))
            X_aug.append(full_images[i-1])
    return np.array(X_aug), np.array(Y_aug)

#given a list of 140x140 arrays, returns a dictionary where the 140x140 arrays
#have been sliced into 25 28x28 arrays (these will be individual elements fed to transformer)
def sliceImages(images, key):
    inputs = {}
    for index in range(images.shape[0]):
        inputs[str(index) + "-" + key] = np.array([images[index][i:i+28, j:j+28].flatten() for i in range(0, 140, 28) for j in range(0, 140, 28)])
        inputs[str(index) + "-" + key] = (inputs[str(index) + "-" + key]/255.0) - 0.5 #center data around 0, range of about 1
    return inputs

#returns the processed, augmented X input images, output images Y, and filesX and filesY for
#images from start to end
def getDataSubset(start, end):
    X, Y = loadImages(start, end)
    X, Y = augmentDataset(X, Y)
    data = list(zip(X,Y))
    random.shuffle(data)
    X, Y = zip(*data)
    X, Y = np.array(X), np.array(Y)
    filesY = saveImages(Y, "Y")
    filesX = saveImages(X, "X")
    X = sliceImages(X, str(start) + "-" + str(end))
    Y = sliceImages(Y, str(start) + "-" + str(end))
    return X, Y, filesX, filesY

#computes a 75-25 split of the training and validation data
def testTrainSplit(X, Y):
    k_t = list(X.keys())[:int(len(X)*0.75)]
    k_v = list(X.keys())[int(len(X)*0.75):]
    X_t = {}
    Y_t = {}
    X_v = {}
    Y_v = {}
    for k in k_t:
        X_t[k] = X[k]
        Y_t[k] = Y[k]
    for k in k_v:
        X_v[k] = X[k]
        Y_v[k] = Y[k]
    return X_t, X_v, Y_t, Y_v


In [None]:
#loads the data and displays some smaples
X, Y, filesX, filesY = getDataSubset(0,4000)

X, X_test, Y, Y_test = testTrainSplit(X, Y)

for fx, fy in zip(filesX[:20], filesY[:20]):
    display(im(filename=fx))
    display(im(filename=fy))
print(len(X))

In [9]:
#some utility functions to save preprocessed input to persistent disk
import pickle
import shutil
def save_variable_to_disk(var, filename):
    dest_folder = '/content/drive/MyDrive/' + filename
    with open(filename, 'wb') as f:
        pickle.dump(var, f)
    shutil.move(filename, dest_folder)
def load_variable_from_disk(filename):
    filename = '/content/drive/MyDrive/' + filename
    with open(filename, 'rb') as f:
      return pickle.load(f)
def saveDictionaries(X, Y, Xv, Yv, pre):
    save_variable_to_disk(X, pre + 'X.pkl')
    save_variable_to_disk(Y, pre + 'Y.pkl')
    save_variable_to_disk(Xv, pre + 'Xv.pkl')
    save_variable_to_disk(Yv, pre + 'Yv.pkl')
def loadDictionaries(pre):
    X = load_variable_from_disk(pre + 'X.pkl')
    Y = load_variable_from_disk(pre + 'Y.pkl')
    Xv = load_variable_from_disk(pre + 'Xv.pkl')
    Yv = load_variable_from_disk(pre + 'Yv.pkl')
    return X, Y, Xv, Yv



In [31]:
X, Y, X_test, Y_test = loadDictionaries("")

In [14]:
import math

# PositionalEncoding class is from https://pytorch.org/tutorials/beginner/transformer_tutorial.html
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

#trainable linear projection, scaled by sqrt d
class Projection(nn.Module):
    def __init__(self, n, d, device):
        super(Projection, self).__init__()
        self.d = d
        self.linearProjection = nn.Linear(n, d).to(device)
    def forward(self, x):
        return math.sqrt(self.d)*self.linearProjection(x)

In [15]:
class Transformer(nn.Module):
    def __init__(self, n, d, num_heads, num_en_layers, num_dec_layers, output_size, device):
        super(Transformer, self).__init__()
        self.projection = Projection(n, d, device).to(device)
        self.position = PositionalEncoding(d)
        self.transformer = nn.Transformer(d_model=d,
                                          nhead=num_heads, num_encoder_layers=num_en_layers,
                                          num_decoder_layers=num_dec_layers, dim_feedforward=2048,dropout=0.0).to(device)
        self.out = nn.Linear(d, output_size).to(device)
        self.device = device

    def forward(self, src, tgt):

        src = self.projection(src)
        src = src + self.position(src)
        #need to reshape, batch size must be second dimension, not first for transfomer
        src = src.reshape(src.shape[0], -1, src.shape[-1])
        tgt = self.projection(tgt)
        tgt = tgt + self.position(tgt)
        tgt = tgt.reshape(tgt.shape[0], -1, tgt.shape[-1])
        src = src.permute(1,0,2)
        tgt = tgt.permute(1,0,2)
        transformer_output = self.transformer(src, tgt, tgt_mask=nn.Transformer.generate_square_subsequent_mask(tgt.shape[0]).to(self.device))
        return self.out(transformer_output)

In [16]:
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(784, 256, 8, 4, 4, 784, device).to(device) 
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.001)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer=optimizer, T_max=3000)
loss_fn = nn.MSELoss()
#xavier uniform initialization
for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p, gain=nn.init.calculate_gain('relu'))

In [17]:


#converts dictionary of 140x140 numpy arrays to a tensor
def convertToTensors(inputs):
    result = []
    for character in inputs:
        result.append(torch.from_numpy(inputs[character]).to(torch.float32))
    return torch.stack(result)
X = convertToTensors(X).to(device)
Y = convertToTensors(Y).to(device)
X_test = convertToTensors(X_test).to(device)
Y_test = convertToTensors(Y_test).to(device)

In [18]:
#adds 28x28 block of all -1 as a start token (not a possible value)
def addStartTokens(inputs, device):
    result = []
    for i in range(inputs.shape[0]):
        result.append(torch.cat((torch.ones((1,784)).to(device)*-1, inputs[i]), dim=0))
    return torch.stack(result).to(device)

#adds 28x28 block of all 1 as a start token (not a possible value)
def addEndTokens(outputs, device):
    result = []
    for i in range(outputs.shape[0]):
        result.append(torch.cat((outputs[i], torch.ones((1,784)).to(device)*1), dim=0))
    return torch.stack(result).to(device)

#repeatedly calls the model to predict one token at a time
#have garbage for Y_part other than start token to demonstrate that mask is working
def inference(X_part):
    Y_part = 1000*torch.ones(X_part.shape[0], 25, X_part.shape[2]).to(device)
    X_part = addEndTokens(X_part, device)
    Y_part = addStartTokens(Y_part, device)
    for i in range(1,27):
        Y_part = model(X_part, Y_part)
        Y_part = Y_part.permute(1,0,2)
        if i < 26:
            Y_part = torch.roll(Y_part, 1, 1)
            Y_part[:,0,:] = -1
    return Y_part

#given an array of images, displays them
#kind variable controls what the names map to
def showImages(images, kind):
    images = images.reshape(images.shape[0], images.shape[1], 28, 28)
    gridImages = []
    for i in range(images.shape[0]):
      gridImages.append(concatentatePartsIntoGrid(255.0*(images[i] + 0.5)))
    files = saveImages(gridImages, kind)
    for f in files:
        display(im(filename=f))

#calculates the loss for a subset of the dataset, and shows a few of the images
def calculateLoss(X, Y, kind, epoch):
    with torch.no_grad():
      batch_size = 32
      l = 0
      for i in range(0, 128, batch_size):
          X_part = X[i:i+batch_size]
          Y_part = Y[i:i+batch_size]
          Y_exp = addEndTokens(Y_part, device)
          Y_act = inference(X_part)
          l += loss_fn(Y_act ,Y_exp)
          if i + batch_size >= batch_size:
              showImages(Y_act[:1].cpu().detach().numpy(), kind)
              showImages(Y_exp[:1].cpu().detach().numpy(), kind)
              showImages(X_part[:1].cpu().detach().numpy(), kind)
      print("the " + kind + " loss is " + str(l) + " at epoch " + str(epoch))
    return l / int(len(X)/batch_size)

In [None]:
n_epochs=3000
batch_size = 32
losses_train = []
losses_valid = []
for e in range(n_epochs):
    indices = list(range(0, len(X), batch_size))
    random.shuffle(indices) #very important! don't forget randomization
    print(indices)
    for i in indices:
        X_part = X[i:i+batch_size]
        X_part = addEndTokens(X_part, device)
        Y_part = Y[i:i+batch_size]
        Y_part_tgt = addStartTokens(Y_part, device)
        Y_part_exp = addEndTokens(Y_part, device)
        Y_part_pred = model(X_part, Y_part_tgt)
        Y_part_pred = Y_part_pred.permute(1,0,2)
        loss = loss_fn(Y_part_pred, Y_part_exp) #ignore end token
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
    if e % 10 == 0:
        showImages(Y_part_exp[3:4].cpu().detach().numpy(), "train")
        showImages(Y_part_pred[3:4].cpu().detach().numpy(), "train")
        showImages(X_part[3:4].cpu().detach().numpy(), "train")
        losses_train.append(calculateLoss(X, Y, "train", e))
        losses_valid.append(calculateLoss(X_test, Y_test, "valid", e))
        torch.save(model.state_dict(), '/content/drive/MyDrive/transformeraugmented' + str(e) + '.pth')

In [None]:
# plots the training and validation loss over time
import matplotlib.pyplot as plt
plt.plot(torch.stack(losses_train).cpu().detach().numpy(), label='Training Loss', color='blue')
plt.plot(torch.stack(losses_valid).cpu().detach().numpy(), label='Validation Loss', color='red')
plt.title('Training vs Validation Loss Over Time')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [54]:
def calculateTotalLoss(X, Y, kind):
    with torch.no_grad():
      batch_size = 32
      l = 0
      indices = list(range(0, len(X), batch_size))
      random.shuffle(indices)
      for i in indices:
          X_part = X[i:i+batch_size]
          Y_part = Y[i:i+batch_size]
          Y_exp = addEndTokens(Y_part, device)
          Y_act = inference(X_part)
          l += loss_fn(Y_act ,Y_exp)
          if random.random() < 0.05:
              showImages(Y_act[:1].cpu().detach().numpy(), kind)
              showImages(Y_exp[:1].cpu().detach().numpy(), kind)
              showImages(X_part[:1].cpu().detach().numpy(), kind)
    return l / int(len(X)/batch_size)


In [None]:
# loads many models and computes train and validation loss for each model
losses_train = []
losses_valid = []
def loadModels():
    for n in range(10, 830, 10):
        print(n)
        model.load_state_dict(torch.load("/content/drive/MyDrive/transformer" + str(n) + ".pth"))
        model.eval()
        losses_train.append(calculateTotalLoss(X, Y, "train", n))
        losses_valid.append(calculateTotalLoss(X_test, Y_test, "valid", n))
    return
loadModels()

In [None]:
# loads test data
model.load_state_dict(torch.load("/content/drive/MyDrive/transformeraugmented690.pth"))
model.eval()
X_extratest, Y_extratest, filesX, filesY = getDataSubset(4000,5000)
X_extratest = convertToTensors(X_extratest).to(device)
Y_extratest = convertToTensors(Y_extratest).to(device)

In [None]:
# calculates loss on test and visualizes a few data points
calculateLoss(X_extratest, Y_extratest, "valid", 0)