Imports

In [3]:
import os
import xml.etree.ElementTree as ET
import cv2
import numpy as np
import matplotlib.pyplot as plt
import glob

#use seaborn for plotting defaults
import seaborn as sns; sns.set()

Data aquisition

In [4]:
def from_folder(folder_path):
    strokesets = []
    xml_paths = glob.glob(os.path.join(folder_path, "**/*.xml"), recursive=True)
    for xml_path in xml_paths:
        tree = ET.parse(xml_path)
        _strokesets = tree.find(".//StrokeSet")
        strokeSet = [np.array([(float(point.get("x")), float(point.get("y")), float(point.get("time")))
                               for point in stroke.findall("./Point")]) for stroke in _strokesets.findall("./Stroke")]
        strokesets.append(format_strokeset(strokeSet))
    return strokesets

def format_strokeset(strokeset):
    # compute bbox properties of strokeset
    pts = np.concatenate(strokeset, axis=0)  # extract the x,y coordinates
    pts = pts.reshape((-1, 3))[:, :2]
    x_min, y_min = np.min(pts, axis=0)
    x_max, y_max = np.max(pts, axis=0)
    w, h = x_max - x_min, y_max - y_min
    if w == 0 or h == 0:
        return None
    
    # transform the points in the strokeset so that the top-left of the bbox is their origin
    # starting time is set to 0 for that substract the first point time from all points time
    for stroke in strokeset:
        stroke -= [x_min, y_min, stroke[0, 2]]

    return strokeset

def display_strokeset(strokeset, scale_factor=0.1, title="Strokeset"):
    pts = np.concatenate(strokeset, axis=0)
    x_min, y_min = np.min(pts[:, :2], axis=0)
    x_max, y_max = np.max(pts[:, :2], axis=0)
    w, h = int(scale_factor * (x_max - x_min)), int(scale_factor * (y_max - y_min))
    if w == 0 or h == 0:
        return
    image = np.zeros((h, w), dtype=np.uint8)
    pts -= [x_min, y_min, 0]
    pts *= scale_factor
    pts = pts[:, :2]  # remove the z-dimension
    pts = pts.astype(np.int32)
    strokes = np.split(pts, np.cumsum([len(s) for s in strokeset[:-1]]))
    for stroke in strokes:
        cv2.polylines(image, [stroke], isClosed=False, color=255, thickness=2)
    cv2.imshow(title, image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [5]:
strokesets = from_folder("original")

Preprocessing

In [105]:
# convert strokesets to a list of strokes efficiently
strokes = []   
for strokeset in strokesets:
    strokes.extend(strokeset)
len(strokes)

from collections import Counter

lengths = [len(stroke) for stroke in strokes]
freq_list = dict(Counter(lengths + [0]))


# save the sorted dictionary to txt file
with open('strokes_length.txt', 'w') as f:
    for key in sorted(freq_list.keys()):
        f.write("%s %s\n"%(key, freq_list[key]))

#remove every stroke with length greater than 300 and less than 2
strokes = [stroke for stroke in strokes if len(stroke) <= 300 and len(stroke) > 2]

# replace every stroke with length less than 300 with a stroke of length 300
max_length = 300
strokes = np.array([np.concatenate([stroke, np.zeros((max_length - len(stroke), 3))]) for stroke in strokes])

def normalize_strokes(strokes):
    # Orient all the strokes to start from the origin
    strokes = strokes - strokes[:,0:1,:]

    # Rescale the strokes to have unit norm
    means = np.mean(strokes, axis=(0, 1), keepdims=True)
    stds = np.std(strokes, axis=(0, 1), keepdims=True)
    normalized_strokes = (strokes - means) / stds
    return normalized_strokes

strokes = normalize_strokes(strokes)


In [112]:
strokes.shape

torch.Size([267141, 300, 3])

Auto-encoder for correcting anomalies

In [113]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torch import optim


In [111]:
strokes = torch.from_numpy(strokes).float()

In [None]:
MAX_STROKE_LEN = 300

In [None]:
# Dataset

class StrokeDataset(Dataset):
    def __init__(self, strokes):
        self.strokes = strokes

    def __len__(self):
        return len(self.strokes)

    def __getitem__(self, index):
        stroke = self.strokes[index]
        return stroke, stroke

In [None]:
# Auto-encoder

# LSTM() returns tuple of (tensor, (recurrent state))
class extract_tensor(nn.Module):
    def forward(self,x):
        # Output shape (batch, features, hidden)
        tensor, _ = x
        # Reshape shape (batch, hidden)
        return tensor[:, -1, :]

class StrokeAutoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(StrokeAutoencoder, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim

        # Encoder layers
        self.encoder = nn.Sequential(
            nn.LSTM(input_dim, hidden_dim, batch_first=True),
            extract_tensor(),
            nn.ReLU(),
            nn.Linear(hidden_dim, latent_dim),
            nn.ReLU()
        )

        # Decoder layers
        self.decoder = nn.Sequential(
            nn.LSTM(latent_dim, hidden_dim, batch_first=True),
            extract_tensor(),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim * MAX_STROKE_LEN),
            nn.ReLU()
        )

    def forward(self, x):
        # Encode the input sequence
        latent = self.encoder(x)

        # Reshape the hidden state for the decoder
        z = latent.view(-1, 1, self.latent_dim)

        # Decode the encoded sequence
        x_hat = self.decoder(z)
        x_hat = x_hat.view(-1, MAX_STROKE_LEN, self.input_dim)

        return x_hat


In [None]:
# Model and dataset properties

model = StrokeAutoencoder(input_dim=3, hidden_dim=128, latent_dim=64)

dataset = StrokeDataset(strokes)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop

num_epochs = 10
for epoch in range(num_epochs):
    train_loss = 0.0
    # Training loop
    for i, batch in enumerate(train_loader):
        optimizer.zero_grad()
        input_seq, target_seq = batch
        output_seq = model(input_seq)
        loss = criterion(output_seq, target_seq)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

        print_every = 10
        if (i+1) % print_every == 0:
            print(f"Epoch {epoch+1}/{num_epochs}, Batch {i+1}/{len(train_loader)}: train_loss={train_loss/print_every:.4f}")
            train_loss = 0.0

    # Validation loop
    with torch.no_grad():
        val_loss = 0.0
        for batch in val_loader:
            input_seq, target_seq = batch
            output_seq = model(input_seq)
            val_loss += criterion(output_seq, target_seq).item() * input_seq.size(0)
        val_loss /= len(val_dataset)

    # Print the training and validation losses for each epoch
    print(f"Epoch {epoch+1}/{num_epochs}: train_loss={train_loss/len(train_loader):.4f} val_loss={val_loss:.4f}")