-
Notifications
You must be signed in to change notification settings - Fork 0
/
Utils.py
199 lines (172 loc) · 6.69 KB
/
Utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from __future__ import unicode_literals, print_function, division
from io import open
import os
from nltk.tokenize import RegexpTokenizer
rTokenizer = RegexpTokenizer(r'\w+')
from sklearn.model_selection import train_test_split
import pandas as pd
import time
import math
import torch.nn as nn
import torch
import random
from sklearn.feature_extraction.text import CountVectorizer
"""
Class with the main functions used to develop the training configuration of the RNN
"""
class Utils():
def __init__(self, n_words=None, all_categories=[], Word2Index=None):
# super(Utils, self).__init__()
self.n_words = n_words
self.all_categories = all_categories
self.all_sentences = []
self.all_labels = []
self.Word2Index = Word2Index
self.n_categories = 3
"""
Generate list of OpenSubtitles files in directory
"""
def findFiles(self, data_dir):
list_files = os.listdir(os.path.dirname(os.path.realpath(__file__))+"/"+data_dir)
list_files = [f for f in list_files if f[0:13] == "OpenSubtitles"]
return list_files
"""
Read an input file with sentences and return the top lim sentences from 15 to 100 characters
"""
def readLines(self, filename, lim):
f = open(os.path.dirname(os.path.realpath(__file__))+"/"+filename, "r", encoding='utf-8')
c = 0
sentences = []
while True:
# read line
line = f.readline().rstrip()
if len(line) > 15 & len(line) < 100:
sentences.append(line)
c += 1
# if not line:
if c == lim:
break
f.close()
return sentences
"""
Encode word string to corresponding index, if unknown word it is routed to the token index <UNK> which is 0
"""
def WordToIndex(self, word):
try:
index = self.Word2Index[word.lower()]
except:
index = self.Word2Index['<UNK>']
return index
"""
Convert word string to pytorch tensor
"""
def WordToTensor(self, word):
tensor = torch.zeros(1, self.n_words)
tensor[0][self.WordToIndex(word)] = 1
return tensor
"""
Tokenize and encode sentence into pytorch tensor
"""
def SentToTensor(self, sent):
# Remove punctuation in this tokenizer, words only
sent = self.tokenizer(sent)
sent = [word.lower() for word in sent]
# sent = [word for word in sent if word in ]
tensor = torch.zeros(len(sent), 1, self.n_words)
for li, word in enumerate(sent):
tensor[li][0][self.WordToIndex(word)] = 1
return tensor
"""
Used to get category from output prediction tensor
"""
def categoryFromOutput(self, output):
top_n, top_i = output.topk(1)
category_i = top_i[0].item()
return self.all_categories[category_i], category_i
"""
Funcion to load data from datasets/OpenSubs files and split it into training and validation. It also creates the
vocabulary file using a CountVectorizer
"""
def load_data(self, data_size, data_dir):
self.data_size = data_size
self.data_dir = data_dir
files = self.findFiles(data_dir=data_dir)
# Build the category_lines dictionary, a list of names per language
category_lines = {}
self.all_categories = ['da', 'no', 'sv']
sentences_da = self.readLines(data_dir+"/OpenSubtitles.da-en.da", int(data_size / 3))
self.all_sentences.extend(sentences_da)
self.all_labels.extend(["da" for i in range(len(sentences_da))])
sentences_no = self.readLines(data_dir+"/OpenSubtitles.en-no.no", int(data_size / 3))
self.all_sentences.extend(sentences_no)
self.all_labels.extend(["no" for i in range(len(sentences_no))])
sentences_sv = self.readLines(data_dir+"/OpenSubtitles.en-sv.sv", int(data_size / 3))
self.all_sentences.extend(sentences_sv)
self.all_labels.extend(["sv" for i in range(len(sentences_sv))])
print(self.all_categories)
df = pd.DataFrame([self.all_sentences, self.all_labels]).T
df_train, df_val = train_test_split(df, shuffle=True, test_size=min(0.15, 10000 / len(self.all_sentences)))
# Max val size of 10000. Don't need more really for validation purposes
print("Sentences used for training: " + str(df_train.shape[0]))
print("Sentences used for validation: " + str(df_val.shape[0]))
print(df_val.shape)
vectorizer = CountVectorizer(tokenizer=self.tokenizer)
vectorizer.fit_transform(self.all_sentences)
self.Word2Index = vectorizer.vocabulary_
self.n_words = len(self.Word2Index)
self.n_categories = len(self.all_categories)
#example
print(self.WordToTensor('stora').size())
#example
print(self.SentToTensor('jeg vil vite hva, som skjer med deg.').size())
return self.n_words, self.n_categories, self.Word2Index, df_train, df_val
"""
Get random sentence from dataframe. df can be training or validation df. Used to sample training/validation data
"""
def randomTrainingExample(self, df):
sample = df.sample()
category = sample[1].values[0]
sent = sample[0].values[0]
category_tensor = torch.tensor([self.all_categories.index(category)], dtype=torch.long)
line_tensor = self.SentToTensor(sent)
return category, sent, category_tensor, line_tensor
"""
Measure time since beginning of training
"""
def timeSince(self, since):
now = time.time()
s = now - since
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
"""
Evaluate a sentence using a trained rnn model
"""
def evaluate(self, line_tensor, rnn):
hidden = rnn.initHidden()
for i in range(line_tensor.size()[0]):
output, hidden = rnn(line_tensor[i], hidden)
return output, rnn
"""
Main training function. lr decay implemented
"""
def train(self, category_tensor, line_tensor, iter, rnn, lr, lr_decay):
hidden = rnn.initHidden()
rnn.zero_grad()
for i in range(line_tensor.size()[0]):
output, hidden = rnn(line_tensor[i], hidden)
criterion = nn.NLLLoss()
loss = criterion(output, category_tensor)
loss.backward()
#Learning rate decay
lr_w_decay = lr*pow(lr_decay, iter/10000)
for p in rnn.parameters():
p.data.add_(p.grad.data, alpha=-lr_w_decay)
return output, rnn
"""
Tokenizer function
"""
def tokenizer(self, sent):
tokens = rTokenizer.tokenize(sent)
tokens = [token.lower() for token in tokens]
return tokens