In [1]:
import pandas as pd
import numpy as np
import random
from nltk.tokenize import sent_tokenize, word_tokenize

In [2]:
import string
import re
from typing import List
import json

In [3]:
def write_to_file(data: str, file: str):
    with open(file, 'w') as f:
        f.write(data)

In [4]:
def read_file(file):
    result = []
    with open(file) as f:
        lines = f.readlines()
        for line in lines:
            l = line.strip()
            if len(l) > 0:
                result.append(l)
    return result

In [5]:
def load_json(file):
    with open(file) as json_file:
        data = json.load(json_file)
    
    return data

# Data

In [6]:
data_file = "data/corpus.txt"

In [7]:
data = read_file(data_file)

## Generate dataset

In [33]:
class DatasetGenerator:
    
    END_OF_SENT_PUNCT = ['.', '!', '?']
    
    
    def generate(self, data: List[str]):
        result = []
        for data_line in data:
            result += self.generate_for_text(data_line)
        return result

    def generate_for_text(self, text: str):
        sentences = sent_tokenize(text)

        size = len(sentences)
        start = 0
        result = []
        i = 0
        while i < size:
            num_of_sents = self.get_random_num_of_sentences()
            end = i + num_of_sents if i + num_of_sents < len(sentences) else size

            data = self.__generate_data_for_sentences(sentences[i:end])
            i = end

            if len(data) > 0:
                result.append(data)

        return result

    def __generate_data_for_sentences(self, sentences: List[str]):

        result = []
        sentences_size = len(sentences)
        for i in range(0, sentences_size):

            sent_data = []
            tokens = word_tokenize(sentences[i])
            tokens_size = len(tokens) 
            should_set_end_word = False
            for j in range(tokens_size- 1, -1, -1):
                token = tokens[j]
                
                if token in self.END_OF_SENT_PUNCT:
                    if j == tokens_size - 1 and i < sentences_size - 1:
                        should_set_end_word = True
                    continue

                word = self.__randomly_lowercase_word(token, word_idx=j, sent_idx=i)
                
                if should_set_end_word:
                    sent_data.insert(0, [word, True])
                    should_set_end_word = False
                else:
                    sent_data.insert(0, [word, False])
            
            if len(sent_data) > 0:
                result += sent_data
            
        return result

    def __randomly_lowercase_word(self, word: str, word_idx: int, sent_idx: int):
        if sent_idx == 0 or word_idx > 0:
            return word

        should_be_lower = False #random.choice([True, False])
        if should_be_lower:
            return word.lower()

        return word

    def __is_punctuation(self, token: str):
        if token in string.punctuation:
            return True
        return False

    def __is_end_word_in_sent(self, word: str, word_idx: int, num_token_in_sents: int):
        if self.word_pattern.match(word) is None:
            return False

        if word_idx == num_token_in_sents - 1:
            return True
        
        return False

    def get_random_num_of_sentences(self):
        num = random.choices([1, 2, 3, 4], weights=[1, 12, 4, 3], k=1)[0]
        return num

In [34]:
dataset_generator = DatasetGenerator()

In [35]:
text = "Suicide won't oure shyness problems.  [Headline over Beth Winship's teen-advice column, Morning Union , Springfield, (Massachusetts), .  Submitted by .]"

In [36]:
dataset_generator.generate_for_text("Suicide won't oure shyness problems.  [Headline over Beth Winship's teen-advice column, Morning Union , Springfield, (Massachusetts), .  Submitted by .]")

[[['Suicide', False],
  ['wo', False],
  ["n't", False],
  ['oure', False],
  ['shyness', False],
  ['problems', True],
  ['[', False],
  ['Headline', False],
  ['over', False],
  ['Beth', False],
  ['Winship', False],
  ["'s", False],
  ['teen-advice', False],
  ['column', False],
  [',', False],
  ['Morning', False],
  ['Union', False],
  [',', False],
  ['Springfield', False],
  [',', False],
  ['(', False],
  ['Massachusetts', False],
  [')', False],
  [',', False]],
 [['Submitted', False], ['by', False], [']', False]]]

In [37]:
dataset = dataset_generator.generate(data)

In [38]:
dataset[:1]

[[['The', False],
  ['Decline', False],
  ['of', False],
  ['the', False],
  ['Northeastern', False],
  ['Ohio', False],
  ['Steel', False],
  ['Market', False],
  [':', False],
  ['Why', False],
  ['Did', False],
  ['Attempts', False],
  ['by', False],
  ['Union', False],
  ['Leaders', False],
  ['and', False],
  ['Concerned', False],
  ['Citizens', False],
  ['to', False],
  ['Revive', False],
  ['the', False],
  ['Steel', False],
  ['Industry', False],
  ['Ultimately', False],
  ['Fail', False],
  ['An', False],
  ['Extended', False],
  ['Essay', False],
  ['in', False],
  ['HistoryBy', False],
  ['Philosophy', False],
  ['E.', False],
  ['WalkerCandidate', False],
  ['Number', False],
  [':', False],
  ['xxxxxxxxxSupervisor', False],
  [':', False],
  ['Alexis', False],
  ['MamauxDate', False],
  [':', False],
  ['January', False],
  ['12', False],
  [',', False],
  ['2004Word', False],
  ['Count', False],
  [':', False],
  ['3215United', False],
  ['World', False],
  ['College', F

Save dataet to json file

In [39]:
dataset_file = "data/dataset.json"
write_to_file(data=json.dumps(dataset), file=dataset_file)

## N-Gram

In [40]:
def read_n_gram_file(ngram_file):
    result = dict()
    lines = read_file(ngram_file)
    for line in lines:
        line_split = line.split('\t')
        key = ' '.join(line_split[1:])
        result[key] = line_split[0]
        
    return result

Here, we use ngrams downloaded from here: https://www.ngrams.info/download_coca.asp

In [41]:
bi_gram_dict = read_n_gram_file("data/w2.txt")
three_gram_dict = read_n_gram_file("data/w3.txt")
four_gram_dict = read_n_gram_file("data/w4.txt")
five_gram_dict = read_n_gram_file("data/w5.txt")

# Testing

In [42]:
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

- Напишіть базове рішення та метрику для тестування якості.
- Для тестування використайте корпус run-on-test.json. Формат корпусу:
```
[
  [
    ["Thanks", false],
    ["for", false],
    ["talking", false],
    ["to", false],
    ["me", true],
    ["let", false],
    ["'s", false],
    ["meet", false],
    ["again", false],
    ["tomorrow", true],
    ["Bye", false],
    [".", false]
  ],
...
]
```

In [43]:
test_file = "../../../tasks/06-language-as-sequence/run-on-test.json"

In [44]:
test_data = load_json(test_file)

## Baseline

In [45]:
class StubBaselineRunOnSentences:
    
    def find_sentences(self, merged_sentences: List[str]):
        result = []
        sentence_tokens_list = []
        for merged_sentence in merged_sentences:
            tokens = word_tokenize(merged_sentence)
            sentence_tokens_list.append(tokens)
            
            
        return self.classify_stub(sentence_tokens_list)
        
        
    
    
    def classify_stub(self, sentence_tokens_list):
        #TODO: insert call of classification model
        result = []
        for tokens in sentence_tokens_list:
            sent_result = [[token, False] for token in tokens]
            result.append(sent_result)
        return result

## Metric

In [46]:
class Metric:
    
    @staticmethod
    def measure(result_true, result_predicted):
        assert len(result_true) == len(result_predicted)
        
        size = len(result_true)
        y_pred_flatten, y_true_flatten = [], []
        num_of_mathced_sentences = 0
        
        for i in range(0, size):
            sent_true = result_true[i]
            sent_pred = result_predicted[i]
            
            
            y_true = np.array(sent_true)[:, 1]
            y_pred = np.array(sent_pred)[:, 1]
            
            y_true_flatten += y_true.tolist()
            y_pred_flatten += y_pred.tolist()
            
            if sum(y_true == y_pred) == len(y_pred):
                num_of_mathced_sentences += 1
            
            
        return classification_report(y_true_flatten, y_pred_flatten), num_of_mathced_sentences / size

Let's show example of this metric on test data vs predicted result in stub baseline

In [47]:
stub_baseline_run_on_sentecnes = StubBaselineRunOnSentences()

In [48]:
sentence_tokens_list = [ np.array(item)[:, 0].tolist() for item in test_data]

In [49]:
test_pred_data = stub_baseline_run_on_sentecnes.classify_stub(sentence_tokens_list)

In [50]:
clf_report, matched_sents = Metric.measure(test_data, test_pred_data)

In [51]:
print(clf_report)

              precision    recall  f1-score   support

       False       0.97      1.00      0.98      4542
        True       0.00      0.00      0.00       155

   micro avg       0.97      0.97      0.97      4697
   macro avg       0.48      0.50      0.49      4697
weighted avg       0.94      0.97      0.95      4697



In [52]:
print(f"There are matched {100 * matched_sents}% sentences.")

There are matched 25.0% sentences.


## Classifier

In [53]:
import sklearn_crfsuite

In [57]:
X = [np.array(sent)[:, 0].tolist() for sent in dataset]
y = [np.array(sent)[:, 1].tolist() for sent in dataset]

In [58]:
class RunOnSentenceCRFClassifier:
    def __init__(self, ngrams_dict_list):
        self.crf = sklearn_crfsuite.CRF(
            algorithm='lbfgs',
            c1=0.1,
            c2=0.1,
            max_iterations=100,
            all_possible_transitions=True)
        
        self.ngrams_dict_list = [bi_gram_dict, three_gram_dict, four_gram_dict, five_gram_dict]
    
    
    def fit(self, X, y):
        X_data = [self.sent2features(s) for s in X]
        self.crf.fit(X_data, y)
        
    def predict(self, X):
        X_data = [self.sent2features(s) for s in X]
        return self.crf.predict(X)    
    
    def word2features(self, sent, pos):
        word = sent[pos]

        features = {
            'bias': 1.0,
            'word.lower()': word.lower(),
            'word.isupper()': word.isupper(),
            'word.istitle()': word.istitle(),
            'word.isdigit()': word.isdigit(),
        }
        if pos > 0:
            word1 = sent[pos-1]
            features.update({
                '-1:word.lower()': word1.lower(),
                '-1:word.istitle()': word1.istitle(),
                '-1:word.isupper()': word1.isupper(),
            })
        else:
            features['BOS'] = True

        if pos < len(sent)-1:
            word1 = sent[pos+1]
            features.update({
                '+1:word.lower()': word1.lower(),
                '+1:word.istitle()': word1.istitle(),
                '+1:word.isupper()': word1.isupper(),
            })
        else:
            features['EOS'] = True
            
            
        self.__update_features_with_nagram(features, sent, pos)

        return features
    
    
    def sent2features(self, sent):
        return [self.word2features(sent, i) for i in range(len(sent))]
    
    
    def split_dataset_into_X_and_y(self, dataset):
        X = [np.array(sent)[:, 0].tolist() for sent in dataset]
        y = [np.array(sent)[:, 1].tolist() for sent in dataset]
        
        return X, y
    
    def X_y_to_dataset_format(self, X, y):
        assert len(X) == len(y)
        
        result = []
        for i in range(0, len(y)):
            sent = [[X[i][j], y[i][j]] for j in range(0, len(X[i]))]
            result.append(sent)
        
        return result
    
    
    def __update_features_with_nagram(self, features, sent, pos):
        
        i = pos
        while i < len(sent) - 1 and i - pos < len(self.ngrams_dict_list):
            ngram = ' '.join(sent[pos: pos + i + 1])
            
            
            n = i - pos + 2
            if ngram in self.ngrams_dict_list[i - pos]:
                features[f"{n}-gram"] = self.ngrams_dict_list[i - pos][ngram]
            else:
                features[f"{n}-gram"] = 0
            
            i += 1
        

In [59]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [60]:
classifier = RunOnSentenceCRFClassifier([bi_gram_dict, three_gram_dict, four_gram_dict, five_gram_dict])

In [61]:
classifier.fit(X_train, y_train)

In [62]:
y_pred = classifier.predict(X_test)

In [63]:
y_pred_flatten, y_test_flatten = [], []
for i in range(0, len(y_pred)):
    y_pred_flatten += y_pred[i]
    y_test_flatten += y_test[i]

In [64]:
print(classification_report(y_test_flatten, y_pred_flatten))

              precision    recall  f1-score   support

       False       0.98      1.00      0.99     32271
        True       0.00      0.00      0.00       687

   micro avg       0.98      0.98      0.98     32958
   macro avg       0.49      0.50      0.49     32958
weighted avg       0.96      0.98      0.97     32958



### Test on available test corpus

In [65]:
X_test = [np.array(sent)[:, 0].tolist() for sent in test_data]
y_test = [np.array(sent)[:, 1].tolist() for sent in test_data]

In [66]:
y_pred = classifier.predict(X_test)

In [67]:
pred_data = classifier.X_y_to_dataset_format(X_test, y_pred)

In [68]:
test_pred_data = classifier.X_y_to_dataset_format(X_test, y_test)

In [69]:
clf_report, matched_sents = Metric.measure(test_data, pred_data)

In [70]:
print(clf_report)

              precision    recall  f1-score   support

       False       0.97      1.00      0.98      4542
        True       0.00      0.00      0.00       155

   micro avg       0.97      0.97      0.97      4697
   macro avg       0.48      0.50      0.49      4697
weighted avg       0.94      0.97      0.95      4697



In [71]:
matched_sents

0.25