# Add punctuation to a text

In this notebook we'll try to build a small neural network to apply punctuation to a text, using Skorch to integrate Pytorch with Scikit-learn and be able to easily compare the performance with other ML tools for this use case.

For simplicity, the network will be quite small and fast to train.

In [None]:

import gzip
from random import shuffle

MAX_UTTERANCES_TO_LOAD = 400_000

utterances = []

with gzip.open('paisa.raw.utf8.gz', 'rb') as f:
    for line in (l.decode() for l in f):
        if len(line.strip()) < 15:
            continue
        if line.startswith('<text') or line.startswith('</text>'):
            continue
        utterances.append(line.strip())
        if len(utterances) >= MAX_UTTERANCES_TO_LOAD:
            break

shuffle(utterances)

# rough number of tokens
print('Utterances:', len(utterances))
print('Longest one (chars):', max(len(u) for u in utterances))

A function return tuples of text and punctuation, using `isalpha()` and `isdigit()` to tell them apart and accept multiple characters for the punctuation.

In [None]:
def words_punctuation_tuples(text: str):
    """Return tuples of text and subsequent punctuation from a text."""
    am_in_token = True
    token_start = 0
    punctuation_start = 0
    
    for idx, char in enumerate(text):
        if am_in_token:
            if char.isalpha() or char.isdigit():
                # just a new char for this token
                pass
            else:
                # switch to punctuation
                am_in_token = False
                punctuation_start = idx
        else:
            if char.isalpha() or char.isdigit():
                # a new token, the punctuation is over
                am_in_token = True
                yield (
                    text[token_start:punctuation_start],
                    text[punctuation_start:idx])
                token_start = idx
            else:
                # just a new char for the punctuation
                pass
    
    if am_in_token:
        # the last text has an empty token associated
        yield (text[token_start:], '')
    else:
        # the punctuation was the end
        yield (text[token_start:punctuation_start], text[punctuation_start:])

In [None]:
for w, p in words_punctuation_tuples(utterances[0]):
    print(f"'{w}', '{p}'")
    

In [None]:
from collections import Counter

# possible punctuations used for hot encoding, ignore the others
KNOWN_PUNCTUATIONS = 40

punctuation = Counter([punct for ut in utterances for _, punct in words_punctuation_tuples(ut)])
total_tokens = sum(punctuation.values())
print(f'Total tokens: {total_tokens}')
print(f'Will use the {KNOWN_PUNCTUATIONS} most common, which are:')

total_partial = 0
punct_categories = []
for punct, count in punctuation.most_common(KNOWN_PUNCTUATIONS):
    punct_categories.append(punct)
    punct = punct.replace('\n', '\\n')
    total_partial += count
    print(f"Symbol \t'{punct}'\t appears {count} \t times"
          f' ({count * 100 / total_tokens:0.2f}%)')
print('---')
print(f'Eventually covering {total_partial} out of {total_tokens}'
      f' ({total_partial * 100 / total_tokens:0.2f}%)')

In [None]:
# word frequencies, uncased
words_freq = Counter([word for ut in utterances for word, _ in words_punctuation_tuples(ut)])

In [None]:
# number of tokens for the hot encoding, ignore the others
# or if using embeddings, the size of the embedding + casing encoding
TOKEN_INPUT_SIZE = 100 + 2

In [None]:
from sklearn.preprocessing import OneHotEncoder
import numpy as np


token_categories = [w for w, _ in words_freq.most_common(TOKEN_INPUT_SIZE)]
total_covered = sum(c for _, c in words_freq.most_common(TOKEN_INPUT_SIZE))
print(f'Most common five words: {words_freq.most_common(5)}')
print(f'With {TOKEN_INPUT_SIZE} we cover {total_covered} tokens out of {total_tokens}'
      f', ({total_covered * 100 / total_tokens:0.2f}% of the total)')

# map them to non-sparse one-hot encoding
oh_enc_words = OneHotEncoder(
    sparse=False, # use an array
    handle_unknown='ignore', # return all 0 in case of unknown string, no error
    categories='auto', # enumerate categories later, during training
    dtype=np.float32, # use float32 or pytorch will convert to double
)

oh_enc_words.fit_transform(np.array(token_categories).reshape(-1, 1))

# show that it works both ways
tokens = np.array(['il', 'che']).reshape(-1, 1)
encoded = oh_enc_words.transform(tokens)
decoded = oh_enc_words.inverse_transform(encoded)
print(f'Original: {tokens}')
print(f'Reconstructed: {decoded}')
assert np.array_equal(decoded, tokens)

# show that it ignores unknown tokens
tokens = np.array(['fakewordnotexisting']).reshape(-1, 1)
encoded = oh_enc_words.transform(tokens)
assert np.all(encoded == 0)

del encoded, decoded, tokens

# same with punctuation

oh_enc_punct = OneHotEncoder(
    sparse=False,
    handle_unknown='ignore',
    categories='auto' # enumerate categories later, during training
)
oh_enc_punct.fit_transform(np.array(punct_categories).reshape(-1, 1))


Let's define a function to encode words this way, but then let's encode using word embeddings. I use my own GloVe data for Italian, cased, available here: https://github.com/jacopofar/glove-tools/releases

Still the function based on embeddings is available and can be used by changing the `ENCODER_FUNCTION` variable accordingly

In [None]:
EMBEDDINGS = {}

with gzip.open('vector_it_100.txt.gz', 'rb') as f:
    for vec in (l.split() for l in f):
        # note that float32 becomes a pytorch float, but float64 becomes a double!
        EMBEDDINGS[vec[0].decode()] = np.array(
            [float(x) for x in vec[1:]],
            dtype=np.float32
        )

def encode_from_embeddings(tokens):
    """Transform tokens into vectors using their embeddings and casing.
    The GloVe embedding is used, zeros if not found, and then two values
    are used to represent the casing.
    """
    result_array = []
    for token in tokens:
        if token.lower() not in EMBEDDINGS:
            result_array.append(np.zeros(TOKEN_INPUT_SIZE))
            continue
            
        if token.lower() == token:
            meta = np.array([0, 0], dtype=np.float32)
        elif token.capitalize() == token:
            meta = np.array([1, 0], dtype=np.float32)
        elif token.upper() == token:
            meta = np.array([1, 1], dtype=np.float32)
        else:
            # unnamed casing
            meta = np.array([0, 1], dtype=np.float32)
        
        vect = np.hstack((EMBEDDINGS[token.lower()], meta))
        
        result_array.append(vect)
    return np.array(result_array, dtype=np.float32)
        
def encode_as_one_hot(tokens):
    return oh_enc_words.transform(np.array(tokens).reshape(-1, 1))


test_words = ['il', 'mio', 'fakewordsnotexisting']
print(encode_as_one_hot(test_words))
print(encode_from_embeddings(test_words))

assert encode_as_one_hot(test_words).shape == encode_from_embeddings(test_words).shape

ENCODER_FUNCTION = encode_from_embeddings

Now an utterance can be transformed in a list of fixed-length features and corresponding outputs.

To do this, for every token a fixed window of tokens is exported:

In [None]:
DEMO_TEXT = 'e ora che faccio?'

WINDOW_LEFT_SIZE = 7 # words before the punctuation element
WINDOW_RIGHT_SIZE = 5 # words after the punctuation element

def utterance_to_features_set(utterance: str):
    tokens = list(words_punctuation_tuples(utterance))
    # fill with 2 just to later check that nothing is left unassigned
    X = np.ones((
        len(tokens),
        TOKEN_INPUT_SIZE * (WINDOW_LEFT_SIZE + WINDOW_RIGHT_SIZE)
    ), dtype=np.float32) * 2
    
    Y = np.ones((
        len(tokens),
        KNOWN_PUNCTUATIONS), dtype=np.float32)
    
    words_vectors = ENCODER_FUNCTION([w for w, _ in tokens])
    words_vectors = np.vstack((
        np.ones((WINDOW_LEFT_SIZE, TOKEN_INPUT_SIZE)),
        words_vectors,
        np.ones((WINDOW_RIGHT_SIZE, TOKEN_INPUT_SIZE))
    ))
    
    for idx, (word, punct) in enumerate(tokens):
        words_window = words_vectors[idx: idx + WINDOW_RIGHT_SIZE + WINDOW_LEFT_SIZE , :]
        
        X[idx, :] = words_window.reshape(-1)
        Y[idx, :] = oh_enc_punct.transform(np.array([punct]).reshape(-1, 1))
    return X, Y

X, Y = utterance_to_features_set(DEMO_TEXT)
# check that the value 2 was not left anywhere
assert np.max(X) != 2.0
    


Conversely, I immediately define an helper to generate and show the punctuation from the same Y array, to immediately see the result from the model

In [None]:
X, Y = utterance_to_features_set(DEMO_TEXT)
def punct_to_text(utterance: str, Y: np.array) -> (str, int, int):
    """Return the utterance with the predicted punctuation.
    
    Also count how many were correct and return the (OK, total) count
    
    The original punctuation is replaced with the one in the vector Y,
    which is one-hot encoded.
    """
    assigned_puncts = oh_enc_punct.inverse_transform(Y)
    ret = []
    ok = 0
    for (w, correct), predicted in zip(words_punctuation_tuples(utterance), assigned_puncts):
        ret.append(w)
        if predicted[0] is None:
            ret.append('[???]')
        else:
            ret.append(predicted[0])
        if predicted[0] == correct:
            ok += 1
    return ''.join(ret), ok, len(assigned_puncts)

print(punct_to_text(DEMO_TEXT, Y))


Let's try to predict some punctuation using a sample text as training and a 2-NN as model

In [None]:
from sklearn.neighbors import KNeighborsRegressor
neigh = KNeighborsRegressor(n_neighbors=2)

print('training text:\n', utterances[0])
X_train, Y_train = utterance_to_features_set(utterances[0])
neigh.fit(X_train, Y_train)

X_predict, Y_predict = utterance_to_features_set(utterances[2])
Y = neigh.predict(X_predict)
print('\nprediction text:\n', utterances[2])
print('---')
print('\npredicted punctuation:\n', punct_to_text(utterances[2], Y))

In [None]:
from datetime import datetime

from sklearn.ensemble import RandomForestClassifier

VALIDATION_UTTERANCES = 3000
PARTIAL_FIT_CHUNK_SIZE = 1000

def measure_model(model, partial_fit=None):
    """Train and predict many times and show the accuracy.
    
    partial_fit enforces an incremental fit, if left None the
    behavior is decided based on the attribute availability
    """
    # in case of partial_fit available
    if partial_fit is None:
        partial_fit = hasattr(model, 'partial_fit')
    
    if partial_fit:
        training_ranges = [
            (i, i + PARTIAL_FIT_CHUNK_SIZE) 
            for i in range(0, len(utterances) -  VALIDATION_UTTERANCES, PARTIAL_FIT_CHUNK_SIZE)
        ]
    else:
        training_ranges = [
            (0, i) 
            for i in range(10, len(utterances) -  VALIDATION_UTTERANCES, PARTIAL_FIT_CHUNK_SIZE)
        ]
        
    for start_idx, end_idx in training_ranges:
        X_train = None
        Y_train = None
        
        for ut in utterances[start_idx:end_idx]:
            X, Y = utterance_to_features_set(ut)
            if X_train is None:
                X_train = X
                Y_train = Y
            else:
                X_train = np.vstack((X_train, X))
                Y_train = np.vstack((Y_train, Y))
        print(f'{datetime.now().isoformat()} Training with range'
              f' {(start_idx, end_idx)}, partial fit: {partial_fit}')
        if partial_fit:
            model.partial_fit(X_train, Y_train)
        else:
            model.fit(X_train, Y_train)

        total_ok = 0
        total_in_validation = 0

        for ut in utterances[-VALIDATION_UTTERANCES:]:
            X, _ = utterance_to_features_set(ut)
            Y = model.predict(X)
            reconstructed, ok, total = punct_to_text(ut, Y)

            total_in_validation += total
            total_ok += ok
            # print('\nprediction text:\n', ut)
            # print('\npredicted punctuation:\n', reconstructed)

        print(f'{datetime.now().isoformat()} - After training until utterance {end_idx}'
              f' there were {total_ok}/{total_in_validation} correct values'
              f', ({total_ok * 100 / total_in_validation:0.3f}% of the total)')
        with open('score.csv', 'a') as f:
            f.write(f'{datetime.now().isoformat()}\t{end_idx}\t{total_ok * 100 / total_in_validation:0.4f}\n')


In [None]:
from sklearn.neighbors import KNeighborsRegressor

print('K-neighbor with 3 neighbors:')
measure_model(KNeighborsRegressor(n_neighbors=3, n_jobs=-1))

from sklearn.tree import DecisionTreeRegressor
print('Decision tree:')
measure_model(DecisionTreeRegressor())

from sklearn.neighbors import KNeighborsRegressor
print('K-neighbor with 5 neighbors:')
measure_model(KNeighborsRegressor(n_neighbors=5, n_jobs=-1))


from sklearn.ensemble import RandomForestClassifier
print('Random Forest with 5 trees:')
measure_model(RandomForestClassifier(n_estimators=5, n_jobs=-1))





Now let's try with neural networks using skorch.
First we define a network with 2 layers, each with 10 neurons, having tanh as the activation function with relu at the output layer.

Then, skorch wraps it and make it possible to use it as a scikit-learn model

In [None]:
from torch import nn
from torch import tanh
import torch.nn.functional as F
from skorch import NeuralNetRegressor

class RegressorModule(nn.Module):
    def __init__(
            self,
            num_units=40,
            nonlin=tanh,
    ):
        super(RegressorModule, self).__init__()
        self.num_units = num_units
        self.nonlin = nonlin

        self.dense0 = nn.Linear(
            TOKEN_INPUT_SIZE * (WINDOW_LEFT_SIZE + WINDOW_RIGHT_SIZE),
            num_units)
        self.nonlin = nonlin
        self.dense1 = nn.Linear(num_units, 40)
        self.dense2 = nn.Linear(num_units, 40)
        self.output = nn.Linear(40, KNOWN_PUNCTUATIONS)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.nonlin(self.dense1(X))
        X = F.relu(X)
        X = self.output(X)
        return X

net_regr = NeuralNetRegressor(
    RegressorModule,
    max_epochs=30,
    lr=0.003,
)

# these two steps are necessary to load the weights
net_regr.initialize()
net_regr.load_params(f_params='punctuation_weights.pkl')

print('Neural network:')
measure_model(net_regr)

In [None]:
from random import choice

for _ in range(10):
    ut = choice(utterances)
    print('Original:')
    print(ut)
    # remove most of the punctuation to ensure that there's no
    # side channel or bug allowing the model to "cheat"
    ut =  ut.replace('.', '-')
    ut =  ut.replace(',', '-')
    ut =  ut.replace(';', '-')
    ut =  ut.replace('’', '-')
    ut =  ut.replace('?', '-')
    ut =  ut.replace('!', '-')
    
    X, _ = utterance_to_features_set(ut)
    Y = net_regr.predict(X)
    reconstructed, ok, total = punct_to_text(ut, Y)
    print('Reconstructed:')
    print(reconstructed)
    print('\n---\n')

In [None]:
net_regr.save_params(f_params='punctuation_weights.pkl')

In [None]:
ut = "hey amico mi dici qual'è il problema?"
X, _ = utterance_to_features_set(ut)
Y = net_reg
r.predict(X)
reconstructed, ok, total = punct_to_text(ut, Y)
print(reconstructed)