---
title: "Dependency locality and information locality in natural language"
authors:
    - name: "Philippos Triantafyllou"
    - name: "Mikolaj Golecki"
    - name: "Benoît Crabbé (for the dependency trees)"
date: last-modified
toc: true
embed-resources: true
---

## Introduction

In [49]:
import random
import torch
import numpy as np
from typing import List, Dict, Tuple

Wet set a global random seed for reproducibility.

In [3]:
random.seed(42)
np.random.seed(42)

torch.manual_seed(42)

if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

## Naive tokenizer

Simple tokenizer class. Roughly resembles HuggingFace tokenizer.

In [4]:
import torch
from typing import List

class NaiveTokenizer:
  
    def __init__(self, base_vocabulary, unk='<unk>', pad='<pad>'):
        assert(type(base_vocabulary) == list)
        self.unk = unk
        self.pad = pad
        self.vocabulary = []
        self.types2idx  = {}
        self.add_tokens([self.unk, self.pad] + base_vocabulary)


    def add_tokens(self, tokens):
        if not type(tokens) == list:
            tokens = [tokens]

        for token in tokens:
            if token not in self.vocabulary:
                self.vocabulary.append(token)

        self.types2idx = {el: idx for idx, el in enumerate(self.vocabulary)}


    def tokenize(self, string: str):
        tokens = string.split()
        return tokens


    def convert_tokens_to_ids(self, tokens: List[str]):
        unkid = self.types2idx[self.unk]
        return [self.types2idx.get(token, unkid) for token in tokens]


    def encode(self, string):
        tokens = self.tokenize(string)
        return self.convert_tokens_to_ids(tokens)


    def decode(self, ids):
        tokens = [self.vocabulary[idx] for idx in ids]
        return " ".join(tokens)


    def __call__(self, string):
        return self.encode(string)


    @property
    def pad_id(self):
        return self.types2idx[self.pad]


    @property
    def vocab_size(self):
        return len(self.vocabulary)


    def decode_ngram(self, ngram_sequence):
        return self.decode(ngram[-1] for ngram in ngram_sequence)


    def pad_batch(self, batch_codes: List[List[int]]):
        max_len = max([len(sentence) for sentence in batch_codes])
        padded_codes = [
            sentence + [self.pad_id] * (max_len - len(sentence))
            for sentence in batch_codes
        ]
        return torch.LongTensor(padded_codes)

In [9]:
base_vocabulary = "Language models are cool ."
tokenizer = NaiveTokenizer(base_vocabulary.split())
codes = tokenizer("Language models are not so cool .")
decoded = tokenizer.decode(codes)
tokens = tokenizer.tokenize("Language models are not so cool .")

In [12]:
#| echo: false

print("Base vocab:", [base_vocabulary])
print("Tokens:", tokens)
print("Codes:", codes)
print("Decoded:", decoded)
print("Tokens:", tokens)

Base vocab: ['Language models are cool .']
Tokens: ['Language', 'models', 'are', 'not', 'so', 'cool', '.']
Codes: [2, 3, 4, 0, 0, 5, 6]
Decoded: Language models are <unk> <unk> cool .
Tokens: ['Language', 'models', 'are', 'not', 'so', 'cool', '.']


We can give two sentences and batch.

In [16]:
sentences = ["Language models are cool .", "Language models are not so cool ."]
batch = [tokenizer(sentence) for sentence in sentences]

In [17]:
#| echo: false

print(batch)

[[2, 3, 4, 5, 6], [2, 3, 4, 0, 0, 5, 6]]


We see that the first sentence is shorter than the second one. We need to pad. Padding also converts the batch to a tensor. 

In [18]:
#| echo: false

print(tokenizer.pad_batch(batch))

tensor([[2, 3, 4, 5, 6, 1, 1],
        [2, 3, 4, 0, 0, 5, 6]])


## Dataloader

For the training/test sets of the UD corpora:

- Vocabulary to be passed to the tokenizer is a list of words;
- Data to be passed to the dataloader is a list of strings (examples);
- Dedicated function handles this later on.

We can see in the `ngramify` method that we also pad with the length of the ngram size - 1 in order to work around the common problem of ngram models: they use the first tokens as context, and don't predict them.

In [19]:
from torch.utils.data import Dataset, DataLoader

class NgramsLanguageModelDataSet(Dataset):

    def __init__(self, N: int, data: List[str], tokenizer):
        self.N = N
        self.tokenizer = tokenizer
        self.data = [self.ngramify(tokenizer(sentence)) for sentence in data]
        self.data = [ngram for sent in self.data for ngram in sent if len(ngram) == self.N]

    def ngramify(self, token_list: List[str]):
        padded_tokens = [self.tokenizer.pad_id] * (self.N - 1) + token_list
        return [padded_tokens[i:i+self.N] for i in range(len(token_list))]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [20]:
def normalize(sentence: str):
    return sentence.replace(".", " . ")

Mock dataset.

In [22]:
zebra_dataset = """
There are five houses.
The Englishman lives in the red house.
The Spaniard owns the dog.
Coffee is drunk in the green house.
The Ukrainian drinks tea.
The green house is immediately to the right of the ivory house.
The Old Gold smoker owns snails.
Kools are smoked in the yellow house.
Milk is drunk in the middle house.
The Norwegian lives in the first house.
The man who smokes Chesterfields lives in the house next to the man with the fox.
Kools are smoked in the house next to the house where the horse is kept.
The Lucky Strike smoker drinks orange juice.
The Japanese smokes Parliaments.
The Norwegian lives next to the blue house.
I have lost my keys, yet now I found them.
"""

In [23]:
base_vocabulary = normalize(zebra_dataset).split()
data = [normalize(sent) for sent in zebra_dataset.split('\n')]
print("Vocabulary:", base_vocabulary[:10])
print("Dataset:", sentences)

Vocabulary: ['There', 'are', 'five', 'houses', '.', 'The', 'Englishman', 'lives', 'in', 'the']
Dataset: ['Language models are cool .', 'Language models are not so cool .']


We instantiate a tokenizer and ngramify our dataset, we can see that the prefix padding is successful.

In [24]:
tokenizer = NaiveTokenizer(base_vocabulary)
dataset = NgramsLanguageModelDataSet(5, data, tokenizer)

for ngram in dataset[:5]:
    print(ngram)

[1, 1, 1, 1, 2]
[1, 1, 1, 2, 3]
[1, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]


Pass it through the dataloader and pad to normalize example lengths.

In [26]:
dataloader = DataLoader(dataset, batch_size=10, shuffle=True, collate_fn=tokenizer.pad_batch)

for batch in dataloader:
    print(batch)
    break

tensor([[13, 18, 24, 25, 11],
        [60, 61, 62, 63, 57],
        [ 1,  1,  1, 36, 18],
        [25, 11, 56, 13,  6],
        [ 1,  1,  1,  1, 17],
        [19, 10, 11, 20, 13],
        [ 1,  1,  1,  7, 20],
        [38,  9, 10, 11, 39],
        [ 9, 10, 11, 12, 13],
        [ 1,  1,  1,  1, 57]])


## Neural network language model

In [27]:
from tqdm import tqdm

In [28]:
import torch.nn as nn
from torch.nn.functional import tanh, log_softmax

class NNLM (nn.Module):

    def __init__(self, emb_size, vocab_size, hidden_size, memory_size, pad_id):
        super().__init__()
        self.pad_id = pad_id
        self.wordemb = nn.Embedding(vocab_size, emb_size, padding_idx=pad_id)
        self.lm_in = nn.Linear(emb_size * memory_size, hidden_size)
        self.lm_out = nn.Linear(hidden_size, vocab_size)


    def forward(self, X):
        input_embeddings = self.wordemb(X)
        input_embeddings = torch.flatten(input_embeddings, start_dim=1)
        hidden_embeddings = self.lm_in(input_embeddings)
        logits = self.lm_out(tanh(hidden_embeddings))
        return logits


    def __call__(self, batch):
        model_device = next(self.parameters()).device
        batch = batch.to(model_device)

        X = batch[:,:-1]
        Y = batch[:,-1]
        logits = log_softmax(self.forward(X), dim=1)
        target_logits = torch.gather(logits, 1, Y.unsqueeze(1))
        return target_logits, logits


    def train(self, dataloader, epochs, device="cpu"): # type: ignore
        self.to(device)
        cross_entropy = nn.CrossEntropyLoss(ignore_index=self.pad_id)
        optimizer = torch.optim.AdamW(self.parameters(), lr=0.005)
        epoch_losses = []

        pbar = tqdm(range(1, epochs+1))
        for epoch in pbar:
            loss_list = []
            for batch in dataloader:
                X =  batch[:,:-1]
                Y =  batch[:,-1]
                X = X.to(device)
                Y = Y.to(device)
                logits = self.forward(X)
                loss = cross_entropy(logits,Y)
                loss.backward()
                loss_list.append(loss.item())
                optimizer.step()
                optimizer.zero_grad()

            avg_loss = sum(loss_list) / len(loss_list)
            epoch_losses.append(avg_loss)
            pbar.set_description(f"Epoch {epoch}")
            pbar.set_postfix(loss=avg_loss)
        return epoch_losses

In [31]:
lang_model = NNLM(128, tokenizer.vocab_size, 128, 4, tokenizer.pad_id)
losses = lang_model.train(dataloader, 100, device="cpu")

Epoch 100: 100%|██████████| 100/100 [00:00<00:00, 122.21it/s, loss=0.392]


Here we test our model on a mock test input.

In [32]:
test_example = ["The Lucky Strike smoker drinks orange juice ."]
test_set = NgramsLanguageModelDataSet(5, test_example, tokenizer)
test_loader = DataLoader(test_set, len(test_set), shuffle=False, collate_fn=tokenizer.pad_batch)

In [33]:
#| echo: false

for batch in test_loader:
    print(batch)

tensor([[ 1,  1,  1,  1,  7],
        [ 1,  1,  1,  7, 50],
        [ 1,  1,  7, 50, 51],
        [ 1,  7, 50, 51, 31],
        [ 7, 50, 51, 31, 22],
        [50, 51, 31, 22, 52],
        [51, 31, 22, 52, 53],
        [31, 22, 52, 53,  6]])


We can view the surprisal values for each predicted token.

In [34]:
for batch in test_loader:
    example = tokenizer.decode_ngram(batch)
    surprisals, logits = lang_model(batch)
    print(example)
    print(list(zip(example.split(),(-(surprisals)).tolist())))

The Lucky Strike smoker drinks orange juice .
[('The', [0.26668089628219604]), ('Lucky', [2.749154806137085]), ('Strike', [0.0012287693098187447]), ('smoker', [0.0006437613046728075]), ('drinks', [0.0007314390386454761]), ('orange', [0.0004146431456319988]), ('juice', [0.00056429672986269]), ('.', [0.00017021637177094817])]


## Recurrent neural network language model

In [35]:
import torch.nn as nn
from torch.nn.functional import log_softmax

class myLSTM (nn.Module):

    def __init__(self, emb_size, vocab_size, hidden_size, pad_id):
        super().__init__()
        self.pad_id = pad_id
        self.wordemb  = nn.Embedding(vocab_size, emb_size, padding_idx=pad_id)
        self.lstm     = nn.LSTM(emb_size, hidden_size, batch_first=True)
        self.lm_out   = nn.Linear(hidden_size, vocab_size)


    def enable_eval_mode(self):
        return super().train(False)


    def forward(self, X):
        input_embeddings = self.wordemb(X)
        hidden_embeddings, _ = self.lstm(input_embeddings)
        latest_hidden = hidden_embeddings[:,-1,:]
        logits = self.lm_out(latest_hidden)
        return logits


    def __call__(self, batch):
        model_device = next(self.parameters()).device
        batch = batch.to(model_device)
        X = batch[:,:-1]
        Y = batch[:,-1]
        logits = log_softmax(self.forward(X),dim=1)
        target_logits = torch.gather(logits, 1, Y.unsqueeze(1))
        return target_logits, logits


    def train(self, dataloader, epochs, device="cpu"): # type: ignore
        self.to(device)
        cross_entropy = nn.CrossEntropyLoss(ignore_index=self.pad_id)
        optimizer = torch.optim.AdamW(self.parameters(), lr=0.005)
        epoch_losses = []

        pbar = tqdm(range(1, epochs+1))
        for epoch in pbar:
            loss_list = []
            for batch in dataloader:
                X = batch[:,:-1]
                Y = batch[:,-1]
                X = X.to(device)
                Y = Y.to(device)
                logits = self.forward(X)
                loss = cross_entropy(logits,Y)
                loss.backward()
                loss_list.append(loss.item())
                optimizer.step()
                optimizer.zero_grad()

            avg_loss = sum(loss_list) / len(loss_list)
            epoch_losses.append(avg_loss)
            pbar.set_description(f"Epoch {epoch}")
            pbar.set_postfix(loss=avg_loss)
        return epoch_losses

In [37]:
lang_model = myLSTM(128, tokenizer.vocab_size, 128, tokenizer.pad_id)
losses = lang_model.train(dataloader, 100, device="cpu")

Epoch 100: 100%|██████████| 100/100 [00:01<00:00, 53.76it/s, loss=0.416]


With the exact same test input as before, we can see how the surprisals change when we use an LSTM.

In [38]:
for batch in test_loader:
    example = tokenizer.decode_ngram(batch)
    surprisals, logits = lang_model(batch)
    print(example)
    print(list(zip(example.split(),(-(surprisals)).tolist())))

The Lucky Strike smoker drinks orange juice .
[('The', [0.507267415523529]), ('Lucky', [2.672044277191162]), ('Strike', [0.0005697772721759975]), ('smoker', [0.0004401430196594447]), ('drinks', [0.0006123098428361118]), ('orange', [0.000773250067140907]), ('juice', [0.0005809764843434095]), ('.', [0.00013517419574782252])]


## Universal dependencies datasets

In [54]:
import os

os.makedirs("data/train", exist_ok=True)
os.makedirs("data/test", exist_ok=True)

In [56]:
ud_languages = [
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_English-GUM/refs/heads/master/en_gum-ud-train.conllu", "english"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_German-HDT/refs/heads/master/de_hdt-ud-train-a-1.conllu", "german"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Greek-GDT/refs/heads/master/el_gdt-ud-train.conllu", "greek"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Polish-MPDT/refs/heads/master/pl_mpdt-ud-train.conllu", "polish"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Hungarian-Szeged/refs/heads/master/hu_szeged-ud-train.conllu", "hungarian1"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Hungarian-Szeged/refs/heads/dev/hu_szeged-ud-dev.conllu", "hungarian2"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Latin-ITTB/refs/heads/master/la_ittb-ud-train.conllu", "latin"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Arabic-PADT/refs/heads/master/ar_padt-ud-train.conllu", "arabic"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_French-GSD/refs/heads/master/fr_gsd-ud-train.conllu", "french"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Turkish-Atis/refs/heads/master/tr_atis-ud-train.conllu", "turkish"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Chinese-GSDSimp/refs/heads/master/zh_gsdsimp-ud-train.conllu", "mandarin")
]

for url, l_name in ud_languages:
    outpath = f"data/train/{l_name}.conllu"
    !wget -q "{url}" -O "{outpath}"

In [58]:
train_datasets["hungarian"] = train_datasets["hungarian1"] + train_datasets["hungarian2"]
del train_datasets["hungarian1"]
del train_datasets["hungarian2"]

NameError: name 'train_datasets' is not defined

In [57]:
ud_languages_test = [
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_English-GUM/refs/heads/master/en_gum-ud-test.conllu", "english"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_German-HDT/refs/heads/master/de_hdt-ud-test.conllu", "german"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Greek-GDT/refs/heads/master/el_gdt-ud-test.conllu", "greek"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Polish-MPDT/refs/heads/master/pl_mpdt-ud-test.conllu", "polish"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Hungarian-Szeged/refs/heads/master/hu_szeged-ud-test.conllu", "hungarian"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Latin-ITTB/refs/heads/master/la_ittb-ud-test.conllu", "latin"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Arabic-PADT/refs/heads/master/ar_padt-ud-test.conllu", "arabic"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_French-GSD/refs/heads/master/fr_gsd-ud-test.conllu", "french"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Turkish-Atis/refs/heads/master/tr_atis-ud-test.conllu", "turkish"),
    ("https://raw.githubusercontent.com/UniversalDependencies/UD_Chinese-GSDSimp/refs/heads/master/zh_gsdsimp-ud-test.conllu", "mandarin")
]

for url, l_name in ud_languages_test:
    outpath = f"data/test/{l_name}.conllu"
    !wget -q "{url}" -O "{outpath}"

### Methods

Here are the methods we use to format and process our datasets.

In case we do not have the conllu library.

In [42]:
!pip -q install conllu

Method to load a dataset from a `.conllu` file. Here we only store the fields that relate to dependencies but we can also store other information like PoS tags.

In [46]:
import conllu

def load_conllu(filename):
    for sent in conllu.parse_incr(open(filename, "r", encoding="utf-8")):
        sent = sent.filter(id=lambda x: isinstance(x, int))
        ids = [token["id"] for token in sent]
        deps = [token["head"] for token in sent]
        tokens = [token["form"] for token in sent]
        yield ids, deps, tokens

Method goes to the relevant path and loads the files in a dictionary where the keys are the different languages.

In [47]:
import os
import glob

def load_from_file(data_dir="."):
    datasets = {}
    for path in glob.glob(os.path.join(data_dir, "*.conllu")):
        lang = os.path.splitext(os.path.basename(path))[0]
        datasets[lang] = list(load_conllu(path))
        print(f"Loaded {lang}.conllu")
    return datasets

As we will train models on the datasets, we do not want to have datasets of varying lengths as this will bias the parameters.

In [48]:
def normalize_lengths(language_datasets):
    for lang, vals in language_datasets.items():

        filtered = [
            (ids, deps, tokens)
            for ids, deps, tokens in vals
            if len(tokens) >= 5
        ]

        if len(filtered) > 2000:
            filtered = random.sample(filtered, 2000)

        language_datasets[lang] = filtered
    return language_datasets

We then want to load dependency trees. For this we have a small `DependencyTree` class.

In [50]:
from random import shuffle

class DependencyTree:

    def __init__(self, tokens=None, edges=None):
        self.edges: List[Tuple[int, int]]  = [] if edges is None else edges
        self.tokens: List[str] = ["ROOT"] if tokens is None else tokens


    def copy(self):
        return DependencyTree(self.tokens[:], self.edges[:])


    def N(self):
        return len(self.tokens)


    def sum_dep_length(self):
        return sum( abs(dep_idx-gov_idx) for (gov_idx,dep_idx) in self.edges )


    def mean_dep_length(self):
        return np.mean([abs(dep_idx - gov_idx) for (gov_idx, dep_idx) in self.edges]).item()


    @property
    def sentence(self):
        return self.tokens[1:]


    def shuffle_random(self, new_order=None, seed=42):
        random.seed(seed)
        original_order = list(range(len(self.tokens)))

        if new_order:
            target_order = new_order
            original_order, target_order = target_order, original_order
        else:
            target_order = original_order[1:]
            shuffle(target_order)
            target_order = [0] + target_order

        dependency_mappings = dict(zip(original_order, target_order))

        self.edges = [
            (dependency_mappings[gov], dependency_mappings[dep])
            for gov, dep in self.edges
        ]

        new_tokens = ["NA"] * len(self.tokens)
        new_tokens[0] = self.tokens[0]

        for original_order in range(len(self.tokens)):
            new_tokens[dependency_mappings[original_order]] = self.tokens[original_order]
        self.tokens = new_tokens


    def shuffle_projective(self):
        children = {}
        for gov, dep in self.edges:
            deps = children.get(gov, [])
            deps.append(dep)
            children[gov] = deps

        for key in children:
            if key == 0:
                shuffle(children[key])
                children[key] = [0] + children[key]
            else:
                children[key].append(key)
                shuffle(children[key])

        stack = set([])
        new_order = [0]
        while len(new_order) < len(self.tokens):
            for idx, el in enumerate(new_order):
                if el not in stack:
                    stack.add(el)
                    left, right = new_order[:idx], new_order[idx+1:]
                    new_order = left + children.get(el, [el]) + right
                    break

        assert(len(new_order) == len(self.tokens))
        expected = set(range(len(self.tokens)))
        actual = set(new_order)
        assert(expected == actual)

        self.shuffle_random(new_order)

Two different read functions, one reads directly form the `.conllu` file, another form the loaded datasets. The second one is used.

In [51]:
def read_v1(data):
    for ids, deps, tokens in data:
        dep_tree = DependencyTree()

        for token in tokens:
            dep_tree.tokens.append(token)

        for gov, child in zip(deps, ids):
            dep_tree.edges.append((int(gov), int(child)))

        yield dep_tree


def read_v2(stream):
    exclude_chars =".-"
    dep_tree = DependencyTree()

    for line in stream:
        line = line.strip()
        if (line.isspace() or line == "" or line.startswith('#')):
            if dep_tree.N() > 1:
                yield dep_tree
                dep_tree = DependencyTree()
        else:
            idx, word, _, _, _, _, gov_idx, *_  = line.split()
            if not any(char in gov_idx for char in exclude_chars) and not any(char in idx for char in exclude_chars):
                dep_tree.tokens.append(word)
                dep_tree.edges.append((int(gov_idx), int(idx)))

    return dep_tree

Convert to trees.

In [53]:
def convert_to_trees(data, links, is_stream=False):
    lang_trees = {}

    if is_stream:
        for _, lang in links:
            lang_trees[lang] = list(read_v2(open(f"{lang}.conllu", 'r', encoding='utf-8')))

    else:
        for lang, vals in data.items():
            lang_trees[lang] = list(read_v1(vals))

    return lang_trees

### Pipeline

We will install train and test datasets from universal dependencies.