# POS-Tagger for the german language using keras

# Daten einlesen

In [1]:
import numpy as np
import re
import nltk as nltk

In [2]:
from nltk.corpus import brown
brown_news_tagged = brown.tagged_words(categories='news', tagset='universal')
tag_fd = nltk.FreqDist(tag for (word, tag) in brown_news_tagged)
universal_tags = tag_fd.most_common()

In [3]:
universal_tag_list = []
for i in range(len(universal_tags)):
    universal_tag_list.append(universal_tags[i][0])

In [4]:
train_text = open("POS_German_train.txt","r+") 
test_text = open("POS_German_minitest.txt")

In [5]:
train_text = train_text.readlines()
test_text = test_text.readlines()

# Text vorbereiten

In [6]:
def clean_text_to_sentences(train_text):
    sentences = []
    for i in range(len(train_text)):
        sentences.append(re.sub(r"([();])", "", train_text[i]))
        sentences[i] = re.split("\s", sentences[i])
        sentences[i] = list(filter(None, sentences[i]))
    return sentences

In [7]:
sentences = clean_text_to_sentences(train_text)
test_sentences = clean_text_to_sentences(test_text)

In [8]:
#helper method
def init_list_of_objects(size):
    list_of_objects = list()
    for i in range(0,size):
        list_of_objects.append( list() ) 
    return list_of_objects

In [9]:
# geht davon aus dass maximal 3 slashes in einem wort stecken... 
def seperate_words_and_tags(sentences):
    sentence_words = init_list_of_objects(len(sentences))
    sentence_tags = init_list_of_objects(len(sentences))
    for i in range(len(sentences)):
        for j in range(len(sentences[i])):
            if sentences[i][j].count('/') == 2:
                sentence_words[i].append(re.split("/", sentences[i][j])[0] + '/' + re.split("/", sentences[i][j])[1])
                sentence_tags[i].append(re.split("/", sentences[i][j])[2])
            elif sentences[i][j].count('/') == 3:
                sentence_words[i].append(re.split("/", sentences[i][j])[0] + '/' + re.split("/", sentences[i][j])[1]
                                        + '/' + re.split("/", sentences[i][j])[2])
                sentence_tags[i].append(re.split("/", sentences[i][j])[3])
            else:
                sentence_words[i].append(re.split("/", sentences[i][j])[0])
                sentence_tags[i].append(re.split("/", sentences[i][j])[1])
    return sentence_words, sentence_tags

In [10]:
sentence_words, sentence_tags = seperate_words_and_tags(sentences)
test_sentence_words, test_sentence_tags = seperate_words_and_tags(test_sentences)

In [11]:
myset = []
for i in range(len(sentence_tags)):
    for j in range(len(sentence_tags[i])):
        if(sentence_tags[i][j] not in myset):
            myset.append(sentence_tags[i][j])

In [12]:
myset_test = []
for i in range(len(test_sentence_tags)):
    for j in range(len(test_sentence_tags[i])):
        if(test_sentence_tags[i][j] not in myset_test):
            myset_test.append(test_sentence_tags[i][j])

In [13]:
#should be around 54 -> number of tags in Stuttgart tagset
len(myset)

54

# Stuttgart Tagset auf Universal Tagset reduzieren

Die tags im trainingsset werden auf das Universal Tagset durch ein mapping konvertiert. Folgende Quellen helfen dabei, die verschiedenen Tags auf das Universal Tagset abzustimmen: https://universaldependencies.org/tagset-conversion/de-stts-uposf.html, https://www.nltk.org/_modules/nltk/tag/mapping.html

In [14]:
conversion_mapping = {'$': '.',
                      '$.': '.',
                      '$,': '.',
                      'NE': 'NOUN',
                      'VAFIN': 'VERB',
                      'ADV': 'ADV',
                      'ART': 'DET',
                      'ADJA': 'ADJ',
                      'NN': 'NOUN',
                      'VVFIN': 'VERB',
                      'APPR': 'ADP',
                      'PTKVZ': 'ADP',
                      'PPOSAT': 'DET',
                      'VVPP': 'VERB',
                      'FM': 'X',
                      'ADJD': 'ADJ',
                      'APPRART': 'ADP',
                      'KON': 'CONJ',
                      'KOUS': 'CONJ',
                      'VVINF': 'VERB',
                      'VMFIN': 'VERB',
                      'PAV': 'ADV',
                      'PDAT': 'DET',
                      'KOUI': 'CONJ',
                      'PTKZU': 'PRT',
                      'PIAT': 'DET',
                      'PTKNEG': 'PRT',
                      'PIS': 'PRON',
                      'PRF': 'PRON',
                      'CARD': 'NUM',
                      'PPER': 'PRON',
                      'ITJ': 'PRT',
                      'PDS': 'PRON',
                      'KOKOM': 'CONJ',
                      'PRELS': 'PRON',
                      'APPO': 'ADP',
                      'PWAT': 'DET',
                      'PWAV': 'ADV',
                      'VVIZU': 'VERB',
                      'PWS': 'PRON',
                      'XY': 'X',
                      'PRELAT': 'DET',
                      'TRUNC': 'X',
                      'VAINF': 'VERB',
                      'VMINF': 'VERB',
                      'VAPP': 'VERB',
                      'PTKA': 'PRT',
                      'PTKANT': 'PRT',
                      'APZR': 'ADP',
                      'PPOSS': 'PRON',
                      'VVIMP': 'VERB',
                      'VAIMP': 'VERB',
                      'VMPP': 'VERB',
                     }

In [15]:
def convert_stuttgart_tagset_to_universal_tagset(tags):
    for i in range(len(tags)):
        for index, data in enumerate(tags[i]):
            for key, value in conversion_mapping.items():
                if key in data:
                    tags[i][index] = data.replace(key, conversion_mapping[key])   
    return tags

In [16]:
sentence_tags = convert_stuttgart_tagset_to_universal_tagset(sentence_tags)
test_sentence_tags = convert_stuttgart_tagset_to_universal_tagset(test_sentence_tags)

In [17]:
#make sure that there only real tags
def filter_correct_tags_only(tags):
    tag_lists = init_list_of_objects(len(tags))
    for i in range(len(tags)):
        for j in range(len(tags[i])):
            if(tags[i][j] in universal_tag_list):
                tag_lists[i].append(tags[i][j])
    return tag_lists

In [18]:
sentence_tags = filter_correct_tags_only(sentence_tags)
test_sentence_tags = filter_correct_tags_only(test_sentence_tags)

# Split data

In [19]:
from sklearn.model_selection import train_test_split
 
(train_sentences, dev_test_sentences, train_tags, dev_test_tags) = train_test_split(sentence_words, sentence_tags, test_size=0.2)

# Sätze für Keras Modell vorbereiten (padding + one-hot-encoding)

In [20]:
#unique words and tags
words, tags = set([]), set([])
 
for s in train_sentences:
    for w in s:
        words.add(w.lower())

In [21]:
for ts in train_tags:
    for t in ts:
        tags.add(t)

In [22]:
#give words and tags a number
word_index = {w: i + 2 for i, w in enumerate(list(words))}
word_index['<PAD>'] = 0  
word_index['<UNK>'] = 1  
 
tag_index = {t: i + 1 for i, t in enumerate(list(tags))}
tag_index['<PAD>'] = 0  

In [23]:
train_sentences_X, dev_test_sentences_X, train_tags_y, dev_test_tags_y = [], [], [], []

In [24]:
def words_to_numbers(sentences):
    X = []
    for s in sentences:
        s_int = []
        for w in s:
            try:
                s_int.append(word_index[w.lower()])
            except:
                s_int.append(word_index['<UNK>'])
        X.append(s_int)
    return X

In [25]:
#tags to numbers
for s in train_tags:
    train_tags_y.append([tag_index[t] for t in s])
for s in dev_test_tags:
    dev_test_tags_y.append([tag_index[t] for t in s])

In [26]:
train_sentences_X = words_to_numbers(train_sentences)
dev_test_sentences_X = words_to_numbers(dev_test_sentences)

In [27]:
MAX_LENGTH = len(max(train_sentences_X, key=len))

In [28]:
from keras.preprocessing.sequence import pad_sequences
 
train_sentences_X = pad_sequences(train_sentences_X, maxlen=MAX_LENGTH, padding='post')
dev_test_sentences_X = pad_sequences(dev_test_sentences_X, maxlen=MAX_LENGTH, padding='post')
train_tags_y = pad_sequences(train_tags_y, maxlen=MAX_LENGTH, padding='post')
dev_test_tags_y = pad_sequences(dev_test_tags_y, maxlen=MAX_LENGTH, padding='post')
 
print(train_sentences_X[0])
print(train_tags_y[0])

Using TensorFlow backend.


[ 5279 44381 63891 36031  3556 53641 11345  6288 53716 11345 22625 11121
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0     0     0
     0     0     0     0     0     0     0     0     0     0]
[ 9 12  4  3  1  1  1  7 10  1  3  7  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0
  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  

In [29]:
from keras.utils import to_categorical
train_tags_y_one_hot_encoded = to_categorical(train_tags_y, num_classes=len(tag_index), dtype='float32')

In [30]:
from keras.models import Sequential
from keras.layers import Dense, LSTM, InputLayer, Bidirectional, TimeDistributed, Embedding, Activation
from keras.optimizers import Adam
 
model = Sequential()
model.add(InputLayer(input_shape=(MAX_LENGTH, )))
model.add(Embedding(len(word_index), 128))
model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(TimeDistributed(Dense(len(tag_index))))
model.add(Activation('softmax'))
 
model.compile(loss='categorical_crossentropy',
              optimizer=Adam(0.001),
              metrics=['accuracy'])
 
model.summary()

Instructions for updating:
Colocations handled automatically by placer.
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding_1 (Embedding)      (None, 130, 128)          8186496   
_________________________________________________________________
bidirectional_1 (Bidirection (None, 130, 512)          788480    
_________________________________________________________________
time_distributed_1 (TimeDist (None, 130, 13)           6669      
_________________________________________________________________
activation_1 (Activation)    (None, 130, 13)           0         
Total params: 8,981,645
Trainable params: 8,981,645
Non-trainable params: 0
_________________________________________________________________


In [31]:
model.fit(train_sentences_X, train_tags_y_one_hot_encoded, 
          batch_size=128, 
          epochs=3, 
          validation_split=0.2)

Instructions for updating:
Use tf.cast instead.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
Train on 25600 samples, validate on 6400 samples
Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x1a2027ec50>

In [32]:
#dev test
scores = model.evaluate(dev_test_sentences_X, to_categorical(dev_test_tags_y, len(tag_index)))
print("acc = " , f"{model.metrics_names[1]}: {scores[1] * 100}")   

acc =  acc: 99.35884482860565


# Vorbereitung für den finalen Testset

In [33]:
test_words_X = words_to_numbers(test_sentence_words)

In [34]:
test_sentence_words = pad_sequences(test_words_X, maxlen=MAX_LENGTH, padding='post')

In [36]:
#reverse
def reverse_num_to_tokens(nums, index):
    tokens = []
    for categoricals in nums:
        token = []
        for categorical in categoricals:
            token.append(index[np.argmax(categorical)])
        tokens.append(token)
    return tokens

In [37]:
predictions = model.predict(test_sentence_words)

In [39]:
predicted_tags = reverse_num_to_tokens(predictions, {i: t for t, i in tag_index.items()})

In [40]:
def remove_padding(tags):
    tag_lists = init_list_of_objects(len(tags))
    for i in range(len(tags)):
        tag_lists[i] = (list(filter(lambda a: a != '<PAD>', tags[i])))
    return tag_lists

In [41]:
predicted_tags = remove_padding(predicted_tags)

In [42]:
#correct small length errors
for i in range(len(predicted_tags)):
    while(len(predicted_tags[i]) != len(test_sentence_tags[i])):
        if(len(predicted_tags[i]) < len(test_sentence_tags[i])):
           test_sentence_tags[i] = test_sentence_tags[i][:-1]
        else:
           predicted_tags[i] = predicted_tags[i][:-1]

In [43]:
def flatten(a_list):
    flat_list = [item for sublist in a_list for item in sublist]
    return flat_list

In [44]:
predicted_tags_flattened = flatten(predicted_tags)

In [45]:
test_sentence_tags_flattened = flatten(test_sentence_tags)

In [46]:
from sklearn.metrics import classification_report
print(classification_report(test_sentence_tags_flattened, predicted_tags_flattened))

              precision    recall  f1-score   support

           .       0.98      0.98      0.98      1178
         ADJ       0.93      0.79      0.85       699
         ADP       0.95      0.99      0.97      1069
         ADV       0.92      0.88      0.90       437
        CONJ       0.95      0.89      0.92       308
         DET       0.96      0.98      0.97      1055
        NOUN       0.91      0.98      0.94      2394
         NUM       1.00      0.91      0.96       164
        PRON       0.94      0.90      0.92       291
         PRT       0.96      0.92      0.94        84
        VERB       0.97      0.93      0.95       960
           X       0.50      0.10      0.17        20

   micro avg       0.94      0.94      0.94      8659
   macro avg       0.91      0.85      0.87      8659
weighted avg       0.94      0.94      0.94      8659



In [47]:
from sklearn.metrics import accuracy_score
accuracy_score = accuracy_score(test_sentence_tags_flattened, predicted_tags_flattened)
print("Achieved accuracy on the final test dataset: " , accuracy_score, " / ", round((accuracy_score * 100),2), "%")

Achieved accuracy on the final test dataset:  0.9434114793856103  /  94.34 %
