In [1]:
import re
import requests
import io
import tarfile
import csv
import torch
import torch.nn as nn
import random
import sys
import concurrent.futures
import time
from collections import Counter
from collections import namedtuple

from nestedtensor import torch

URL = "https://github.com/le-scientifique/torchDatasets/raw/master/dbpedia_csv.tar.gz"

In [2]:
Point = namedtuple('Point', 'label text')

def get_data(URL):
    r = requests.get(URL)
    file_like_object = io.BytesIO(r.content)
    tar = tarfile.open(fileobj=file_like_object)
    d = {}
    for member in tar.getmembers():
        if member.isfile() and member.name.endswith('csv'):
            k = 'train' if 'train' in member.name else 'test'
            d[k] = tar.extractfile(member)
    return d


def preprocess(iterator):
    def _preprocess(line):
        line = line.decode('UTF-8')
        line = line.lower()
        line = re.sub(r'[^0-9a-zA-Z,\s]', "", line)
        line = line.split(',')
        label = int(line[0]) - 1
        text = (" ".join(line[1:])).split()
        if len(line) > 2:
            return Point(label=label, text=text)
    for line in iterator:
        yield _preprocess(line)


def build_vocab(iterator):
    counter = Counter()
    labels = set()
    for point in iterator:
        counter.update(point.text)
        labels.add(point.label)
    vocab = {}
    for i, (word, count) in enumerate(counter.most_common()):
        vocab[word] = i

    return vocab, labels

In [3]:
data = get_data(URL)
data = {k: list(preprocess(v)) for (k, v) in data.items()}
vocab, labels = build_vocab(data['train'])
UNK = len(vocab)

In [4]:
class TextSentiment(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super().__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text):
        return self.fc(self.embedding(text))

In [5]:
embed_dim = 10
model = TextSentiment(len(vocab) + 1, embed_dim, len(labels)).cuda()
criterion = torch.nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=1.0)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

In [6]:
def create_batch(data):
    data = torch.nested_tensor(
        [torch.tensor(list(map(lambda x: vocab.get(x, UNK), tokens))) for tokens in data])
    return data

def yield_data_futures(data):
    random.shuffle(data)
    labels = []
    batch_data = []
    futures = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        for i, point in enumerate(data):
            # Stop accumulating lines of text once we reach 4000 tokens or more
            # This yields variable batch sizes, but with consistent memory pressure
            if sum(map(len, batch_data), 0) < 10000:
                labels.append(point.label)
                batch_data.append(point.text)
            else:
                if len(futures) < 40:
                    futures.append((torch.tensor(labels), executor.submit(create_batch, batch_data)))
                else:
                    yield futures[0]
                    futures = futures[1:]
                labels = []
                batch_data = []

    for future in futures:
        yield future

In [7]:
num_tokens = sum(map(lambda x: len(x.text), data['train']))
print("Total number of tokens: {}".format(num_tokens))
for epoch in range(5):
    i = 0
    t0 = time.time()
    for labels, future in yield_data_futures(data['train']):
        batch = future.result()
        labels = labels.to('cuda', non_blocking=True)
        batch = batch.to('cuda', non_blocking=True)
        optimizer.zero_grad()
        output = model(batch)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        if i % 16 == 1:
            sys.stderr.write(
                "\rtime: {:3.0f}s epoch: {:3.0f} lr: {:3.6f} loss: {:3.6f}".format(
                    time.time() - t0, 
                    epoch, 
                    scheduler.get_lr()[0],
                    loss, 
                )
            )
            sys.stderr.flush()
        i += batch.numel()
    scheduler.step()
    sys.stderr.write('\n')

Total number of tokens: 27205880


time:  21s epoch:   0 lr: 1.000000 loss: 0.767902
time:  21s epoch:   1 lr: 0.810000 loss: 0.425534
time:  21s epoch:   2 lr: 0.729000 loss: 0.274846
time:  21s epoch:   3 lr: 0.656100 loss: 0.274525
time:  21s epoch:   4 lr: 0.590490 loss: 0.209720


In [8]:
output = [(tb[0], model(tb[1].result().to('cuda')).argmax(1).cpu()) for tb in yield_data_futures(data['test'])]
predictions = torch.cat(list(map(lambda x: x[1], output)))
labels = torch.cat(list(map(lambda x: x[0], output)))

print("Test accuracy: {}".format((labels == predictions).sum().float() / len(labels)))

Test accuracy: 0.940569281578064
