In [1]:
import time
import re
import numpy as np
import operator
from threading import active_count
from multiprocessing import Pool
from nltk.collocations import TrigramCollocationFinder
from nltk.metrics.scores import accuracy
from nltk.metrics import ConfusionMatrix

languages = ['deu', 'eng', 'fra', 'ita', 'nld', 'spa']

In [2]:
class TextProcessor():
    
    @staticmethod
    def clean_string(string):
        punctuation_marks_regex = r'[\n\t/\d\':,-_\(\)\.\?\";!]'
        punctuation_free_text = re.sub(punctuation_marks_regex, '', string).strip()
        lower_puntuation_free_text = punctuation_free_text.lower()
        lower_puntuation_and_spaces_free_text = re.sub(' +', ' ', lower_puntuation_free_text)
        return lower_puntuation_and_spaces_free_text

    @staticmethod
    def call(text, freq_filter=0):
        corpora = TrigramCollocationFinder.from_words(
            TextProcessor.clean_string(text)
        )
        corpora.apply_freq_filter(freq_filter)
        return corpora

In [3]:
class Corpus:
    LAMBDA = 1
    
    def __init__(self, text):
        self.corpora = self.process_text(text)
        
        self.counts = {}
        self.lidstone_probabilities = {}
        
        self._len = None
        self._set_len = None    
        
    def process_text(self, text):
        return TextProcessor.call(text, 5)
    
    def lidstone_probability(self, n_gram):            
        if not n_gram in self.lidstone_probabilities:
            self.lidstone_probabilities[n_gram] = (self.count(n_gram) + Corpus.LAMBDA)\
                / (len(self) + self.set_len() * Corpus.LAMBDA)

        return self.lidstone_probabilities[n_gram]
    
    def count(self, n_gram):
        return self.corpora.ngram_fd[n_gram] if n_gram in self.corpora.ngram_fd else 0
    
    def set_len(self):
        if not self._set_len: self._set_len = len(self.corpora.ngram_fd)
        return self._set_len
    
    def __len__(self):
        if not self._len: self._len = sum(self.corpora.ngram_fd.values())
        return self._len
    
    def __repr__(self):
        return f'<Corpus>'

In [4]:
class LanguageClassifier():
    
    def __init__(self, training_set):
        """
        Arguments:
        training_set(Map) -- map cotaining the corpus as values and the lang name as key
        """
        self.training_set = training_set
    
    def classify(self, text):
        processed_text = TextProcessor.call(text).ngram_fd.keys()
        return max({
            lang: self.calc_probability(processed_text, corpus) 
            for lang, corpus in self.training_set.items()
        }.items(), key=operator.itemgetter(1))[0]
    
    def calc_probability(self, n_grams, corpus):
        return sum([
            np.log(corpus.lidstone_probability(n_gram))
            for n_gram in n_grams
        ])
        
    def __repr__(self):
        return f'<LanguageClassifier training_set={self.training_set}>'
        

In [5]:
train_time = time.time()

def read_file_to_corpus(lang):
    with open(f'./given/langId/{lang}_trn.txt', 'r') as raw_text:
        return (lang, Corpus(raw_text.read()))
                
with Pool(active_count()) as pool:
    train_languages_corpus = pool.map(read_file_to_corpus, languages)
                
cls = LanguageClassifier({ c[0]: c[1] for c in train_languages_corpus })

train_time = int(time.time() - train_time)

In [6]:
test_corpus = []
for lang in languages:
    with open(f'./given/langId/{lang}_tst.txt', 'r') as raw_text:
        [test_corpus.append((lang, line)) for line in raw_text.readlines()]

In [7]:
test_time = time.time()

def classify(line):
    return {
        'line': line[1],
        'cls': cls.classify(line[1]),
        'actual': line[0]
    }

with Pool(active_count()) as pool:
    classification = pool.map(classify, test_corpus)

test_time = int(time.time() - test_time)

In [8]:
classes = [cls['cls'] for cls in classification]
actual = [cls['actual'] for cls in classification]

In [9]:
print(f"""
Learning time: {train_time} seconds.
Test time: {test_time} seconds.
Accuracy is {accuracy(actual, classes)}
Confussion_matrix:
{ConfusionMatrix(actual, classes).pretty_format()}
""")


Learning time: 76 seconds.
Test time: 68 seconds.
Accuracy is 0.9984994247794988
Confussion_matrix:
    |    d    e    f    i    n    s |
    |    e    n    r    t    l    p |
    |    u    g    a    a    d    a |
----+-------------------------------+
deu |<9975>   6    .    1    6    2 |
eng |    .<9982>   1    1    3    . |
fra |    .    8<9982>   3    3    4 |
ita |    1    7    3<9978>   .   11 |
nld |    5    9    .    3<9982>   1 |
spa |    .    3    2    7    .<9988>|
----+-------------------------------+
(row = reference; col = test)


