# Language Identification based on deep neural networks with ngrams
This approach of language identification follows the paper: Language Identification a Neural Network Approach, https://core.ac.uk/download/pdf/62918899.pdf

Feature extraction is partly inspired by:
https://github.com/conorosully/medium-articles/blob/master/src/language_classification.ipynb

In [1]:
# Specify whether model should train or be loaded from file

TRAINING = False

## Prepare dataset
The data can be downloaded from: https://downloads.tatoeba.org/exports/

In [2]:
# imports

import pandas as pd
from torch.utils.data import Dataset
from sklearn.feature_extraction.text import CountVectorizer
import torch
import numpy as np

In [3]:
# Set data parameters

FILES = {'SENTENCES': 'sentences.csv'}
MIN_SENTENCE_LENGTH = 20
MAX_SENTENCE_LENGTH = 200
MAX_SAMPLES = 5000
LANG = ['deu', 'eng', 'fra']
MAX_FEATURES = 40
N = 3

In [4]:
# Define helper functions

# get most frequent ngrams for a specific language
def _get_ngrams(corpus):
    vectorizer = CountVectorizer(analyzer='char',
                                ngram_range=(N, N),
                                max_features=MAX_FEATURES)
    
    X = vectorizer.fit_transform(corpus)
    
    feature_names = vectorizer.get_feature_names()
    return X, feature_names

# get set of most frequent ngrams for every language
def _get_vocab(data):
    
    vocab = set()
        
    # get ngrams for every language
    for l in LANG:
        corpus = data[data.lang==l]['text']
        _, ngrams = _get_ngrams(corpus)
        vocab.update(ngrams)
    
    return vocab

def _count_ngrams(data, vocab):
    # Get data based on vocab
    vectorizer = CountVectorizer(analyzer='char',
                        ngram_range=(N, N),
                        vocabulary=vocab)

    X = vectorizer.transform(data['text'])
    feature_names = vectorizer.get_feature_names()
    features = pd.DataFrame(data=X.toarray(), columns=feature_names)

    return features

# Get the ordinal representation of a language, given a language identifier
def _langToIndex(l):
    return int(LANG.index(l))

# Get the text representation of a language, given the ordinal language value
def _indexToLang(i):
    return LANG[i]


# Get the ordinal representation of a language tensor
def _lineToTensor(line):
    tensor = torch.zeros(len(line))
    for li, l in enumerate(line):
        tensor[li] = _langToIndex(l)
        tensor = tensor.long()
    return tensor

def _normalize(features):
    # Normalize matrix
    count_min = features.min()
    count_max = features.max()

    # replace zero values to prevent division by zero
    diff = (count_max - count_min).replace(0, 1)

    features = (features - count_min) / diff
    return features

In [5]:
# Setup dataset generates a dataset fromm the specified csv_file and parameters
# The dataset consists of a normalized frequency count of occurences of ngrams
def _setup_datasets(csv_file, vocab=None):
    data_frame = pd.read_csv(csv_file,
              sep='\t',
              encoding='utf8',
              index_col=0,
              names=['lang', 'text'])

    # Filter text by length
    filter_len = [True if MIN_SENTENCE_LENGTH <= len(t)
                  <= MAX_SENTENCE_LENGTH else False for t in data_frame['text']]
    data_frame = data_frame[filter_len]

    # Filter text by language
    filter_lang = [True if l in LANG else False for l in data_frame['lang']]
    data_frame = data_frame[filter_lang]
    
    # Sample data per language
    data_trim = pd.DataFrame(columns=['lang', 'text'])
    
    for l in LANG:
        n_samples = min(MAX_SAMPLES, len(data_frame[data_frame['lang']==l]))
        lang_trim = data_frame[data_frame['lang']==l].sample(n_samples, random_state = 100)
        data_trim = data_trim.append(lang_trim)

    # Setup vocab from ngrams
    if vocab is None:
        vocab = _get_vocab(data_trim)

    # Get data based on vocab
    features = _count_ngrams(data_trim, vocab)

    # Normalize matrix
    _normalize(features)

    labels = _lineToTensor(list(data_trim['lang']))
    data = torch.tensor(features.values.astype(np.float32))

    return LanguageIdentificationDataset(data, labels, vocab)

In [6]:
# Define Dataset for Language Identification

class LanguageIdentificationDataset(Dataset):
    """Dataset with text in various languages"""
    
    def __init__(self, data, labels, vocab):
        
        self._data = data
        self._labels = labels
        self._vocab = vocab
        
    def __len__(self):
        return len(self._data)
    
    def __getitem__(self, i):
        return self._data[i], self._labels[i]
    
    def __iter__(self):
        for x in self._data:
            yield x
            
    def get_labels(self):
        return self._labels
    
    def get_vocab(self):
        return self._vocab
    
    def get_lang(self):
        return self._lang

In [7]:
# Generate specific datasets

# The sentences dataset consists of sentences in different languages
def Sentences():
    return _setup_datasets(FILES['SENTENCES'])

# Load a custom dataset by specifying a csv file.
# The csv file should have columns ['text', 'lang']
def CustomDataset(csv_file, vocab):
    return _setup_datasets(csv_file, vocab=vocab)

## Load data
Loading the dataset might take a while

In [8]:
# Select dataset

if TRAINING:

    dataset = Sentences()
    
    # dataset = CustomDataset(small.csv)

## Modelling

In [9]:
# imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [10]:
class Network(nn.Module):
    def __init__(self, vocab):
        super(Network, self).__init__()
        self.vocab = vocab
        dim_input = len(vocab)
        dim_output = len(LANG)
        self.fc1 = nn.Linear(dim_input, dim_input)
        self.fc2 = nn.Linear(dim_input, dim_input)
        self.fc3 = nn.Linear(dim_input, dim_output)
        
    def forward(self, x):
        x = F.sigmoid(self.fc1(x))
        x = F.sigmoid(self.fc2(x))
        x = F.softmax(self.fc3(x))
        return x

In [11]:
def train_step(model, dataset_train, optimizer, criterion):
    
    train_loss = 0
    train_acc = 0
    
    data = torch.utils.data.DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
    for i, (inp, lab) in enumerate(data):
        optimizer.zero_grad()
        pred = model(inp)
        loss = criterion(pred, lab)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_acc += (pred.argmax(1) == lab).sum().item()

    return train_loss / len(dataset_train), train_acc / len(dataset_train)

In [12]:
def test(model, dataset_val):
    criterion = nn.CrossEntropyLoss()
    
    val_loss = 0
    val_acc = 0
    
    data = torch.utils.data.DataLoader(dataset_val, batch_size=BATCH_SIZE, shuffle=True)
    
    for i, (inp, lab) in enumerate(data):
        with torch.no_grad():
            pred = model(inp)
            loss = criterion(pred, lab)
            val_loss += loss.item()
            val_acc += (pred.argmax(1) == lab).sum().item()

    return val_loss / len(dataset_val), val_acc / len(dataset_val)

## Training

In [13]:
import time
from torch.utils.data.dataset import random_split

In [14]:
# Set training parameters

TEST_SIZE = 0.2
VAL_SIZE = 0.05
TRAIN_SIZE = 0.75

N_EPOCHS = 5
RATE = 0.001
BATCH_SIZE = 4

if TRAINING:
    # Split data for training, validation and testing
    data_len = len(dataset)
    train_len = int(TRAIN_SIZE * data_len)
    val_len = int(VAL_SIZE * data_len)
    test_len = int(TEST_SIZE * data_len)

    train_data, val_data, test_data = random_split(dataset, [train_len, val_len, test_len])

In [15]:
def train(model, train_data, val_data):

    optimizer = torch.optim.Adam(model.parameters(), lr=RATE)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(N_EPOCHS):

        start_time = time.time()
        train_loss, train_acc = train_step(model, train_data, optimizer, criterion)
        valid_loss, valid_acc = test(model, val_data)

        secs = int(time.time() - start_time)
        mins = secs / 60
        secs = secs % 60


        print('Epoch: %d' %(epoch + 1), " | time in %d minutes, %d seconds" %(mins, secs))
        print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
        print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')

In [16]:
if TRAINING:
    vocab = dataset.get_vocab()
    model = Network(vocab)
    train(model, train_data, val_data)

## Saving and loading the model

In [17]:
PATH = "model_saved"

# Save model
def save_model(model):
    torch.save(model, PATH)

# Load model
def load_model():
    model = torch.load(PATH)
    model.eval()
    return model

if TRAINING:
    save_model(model)
else:
    model = load_model()

## Testing

In [18]:
if TRAINING:
    # Test with test data
    
    test_loss, test_acc = test(model, test_data)
    print(f'\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)')

In [19]:
# Test with custom dataset from csv file

def test_dataset(csv_file, model):
    vocab = model.vocab
    dataset = CustomDataset(csv_file, vocab)
    test_loss, test_acc = test(model, dataset)
    print(f'\tLoss: {test_loss:.4f}(test)\t|\tAcc: {test_acc * 100:.1f}%(test)')
    
test_dataset("sample.csv", model)

	Loss: 0.1838(test)	|	Acc: 100.0%(test)


  x = F.softmax(self.fc3(x))


# Predictions

In [24]:
def predict_sentence(text, model):
    vocab = model.vocab
    df = pd.DataFrame([text], columns=['text'])
    features = _count_ngrams(df, vocab)
    with torch.no_grad():
        tensor = torch.tensor(features.values.astype(np.float32))
        pred = model(tensor)
        return _indexToLang(pred.argmax(1).item())

In [25]:
# Predict single sentence

sample_string_eng = "Hell is empty and all the devils are here."
sample_string_deu = "Kein Genuß ist vorübergehend; denn der Eindruck, den er zurückläßt, ist bleibend."
sample_string_fra = "Il faut manger pour vivre et non pas vivre pour manger."


# English
print(f"{sample_string_eng}")
print(f"Prediction: {predict_sentence(sample_string_eng, model)}")
print(f"Ground truth: eng\n")

# German
print(f"{sample_string_deu}")
print(f"Prediction: {predict_sentence(sample_string_deu, model)}")
print(f"Ground truth: deu\n")

# French
print(f"{sample_string_fra}")
print(f"Prediction: {predict_sentence(sample_string_fra, model)}")
print(f"Ground truth: fra\n")

Hell is empty and all the devils are here.
Prediction: eng
Ground truth: eng

Kein Genuß ist vorübergehend; denn der Eindruck, den er zurückläßt, ist bleibend.
Prediction: deu
Ground truth: deu

Il faut manger pour vivre et non pas vivre pour manger.
Prediction: fra
Ground truth: fra



  x = F.softmax(self.fc3(x))
