-
Notifications
You must be signed in to change notification settings - Fork 0
/
tokenizers.py
383 lines (257 loc) · 12.4 KB
/
tokenizers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"""A collection of classes for processing and tokenizing different data input formats,
specifically Wikipedia corpora output from WikiExtractor (https://github.com/attardi/wikiextractor)
and IARPA Babel Program transcript files (http://www.iarpa.gov/index.php/research-programs/babel).
The output from these classes is intended for use with the InfixerModel object in preprocessor."""
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import collections
import glob
import logging
import os.path
import re
from nltk.corpus import names
from nltk.tokenize import wordpunct_tokenize
_logger = logging.getLogger(__name__)
class GeneralTokenizer(object):
"""A superclass for deriving specialized tokenizers.
In its pure form, it will do a simple word-and-punctuation form of
tokenization taken directly from NLTK. Tokenization is done at
initialization, and optional filtering and editing can be done afterwards by
calling the following methods:
clean_tokens
filter_tokens
Output from this class can be called with the following methods:
get_tokens
output_text
output_file_buffer
"""
def __init__(self, target):
"""Initialize a GeneralTokenizer object with a file or a directory of files.
:param target: a file or directory of files containing text to be tokenized.
"""
if os.path.isdir(target):
self.target = target # stored for evaluation in other methods
self.dir = target
self._tokens = self._get_dir_tokens(self.dir)
elif os.path.isfile(target):
self.target = target # stored for evaluation in other methods
self.dir, self.filename = os.path.split(os.path.abspath(target))
self._tokens = self._get_file_tokens(self.filename)
else:
raise ValueError("File or directory not readable.")
self._data_len = len(self._tokens)
def __len__(self):
"""Return the current number of tokens stored in the object."""
return self._data_len
# methods for extracting tokens from files and/or directories of files
def _extract_tokens(self, file_text):
"""Extract tokens from a file and return a Counter dictionary.
This method is designed specifically so that it can be overridden
easily while maintaining _get_file_tokens and _get_dir_tokens.
"""
token_dict = collections.Counter()
# does a simple word and punctuation tokenization on the text
tokens = wordpunct_tokenize(file_text)
for token in tokens:
token_dict[token] += 1
return token_dict
def _get_file_tokens(self, filename):
"""Get all unique tokens from a text file.
This method needs to have a return instead of direct assignment to
_tokens so that it can be called directly or as a subroutine
of _get_dir_tokens, as needed.
"""
with open(filename, 'r') as infile:
infile_text = infile.read()
# extract and add unique tokens to out_set
token_dict = self._extract_tokens(infile_text)
# logging
filename = os.path.basename(infile.name)
token_count = len(token_dict)
# log info for each file in a dir only if logging.DEBUG
if os.path.isdir(self.target):
_logger.debug("{} token types in {}".format(token_count, filename))
else:
_logger.info("{} token types in {}".format(token_count, filename))
return token_dict
def _get_dir_tokens(self, directory):
"""Get all unique tokens from a directory of text files.
This method needs to have a return instead of direct assignment to
_tokens so that _get_file_tokens can be called directly or as
a subroutine, as needed.
"""
tokens_all = collections.Counter()
files = glob.glob(directory + "*")
for f in files:
tokens_file = self._get_file_tokens(f)
tokens_all.update(tokens_file)
n_out = len(list(tokens_all.keys()))
# logging
_logger.info('{} token types found in {} files'.format(n_out, len(files)))
return tokens_all
# methods for removing various types of unwanted data
def clean_tokens(self, rm_dupes=True, rm_names=True, rm_non_words=True,
rm_non_latin=True, rm_uppercase=True):
"""Call methods for removing various types of unwanted data in batch fashion.
:param rm_dupes: remove duplicate upper-case tokens, preserving case and counts
:param rm_names: remove names present in NLTK's names corpus
:param rm_non_words: remove digits, non-alphanumeric tokens, and all-caps words
:param rm_non_latin: remove non-Latin extended unicode characters
:param rm_uppercase: remove upper-case words
"""
if rm_dupes:
self._remove_duplicates()
if rm_names:
self._remove_names()
if rm_non_words:
self._remove_non_words()
if rm_non_latin:
self._remove_non_latin()
if rm_uppercase:
self._remove_uppercase()
def _remove_duplicates(self):
"""Remove duplicate upper-case tokens, preserving case and counts."""
dupes = {key: count for (key, count) in self._tokens.items()
if key in self._tokens and key.lower() in self._tokens}
no_dupes = {key: count for (key, count) in self._tokens.items()
if key not in dupes}
# use Counter.update() method to preserve counts for duplicates
dupes_lower = collections.Counter()
for (key, count) in self._tokens.items():
dupes_lower[key.lower()] = count
no_dupes.update(dupes_lower)
# logging
_logger.info('{} duplicate tokens removed'.format(len(dupes)))
self._tokens = collections.Counter(no_dupes)
def _remove_names(self):
"""Remove names present in NLTK's names corpus."""
name_set = set(names.words())
no_names = {key: count for (key, count) in self._tokens.items()
if key not in name_set}
# logging
num_removed = len(self._tokens) - len(no_names)
_logger.info(('{} name tokens removed').format(num_removed))
self._tokens = collections.Counter(no_names)
def _remove_non_words(self):
"""Remove digits, non-alphanumeric tokens, and all-caps words."""
# pre-filter count of self.tokens for later comparison and logging
base_len = len(self._tokens)
regex = re.compile(r'(^\w*\d+\w*$|^\W*$|^[A-Z]*$|^.*_.*$)')
matches_out = {key: count for (key, count) in self._tokens.items()
if regex.search(key) is None}
# logging
num_removed = len(self._tokens) - base_len
_logger.info('{} non-word tokens removed'.format(num_removed))
self._tokens = collections.Counter(matches_out)
def _remove_non_latin(self):
"""Remove non-Latin extended unicode characters."""
regex = re.compile(r'[^\x00-\x7F\x80-\xFF\u0100-\u017F\u0180-\u024F\u1E00-\u1EFF]')
matches_out = {key: count for (key, count) in self._tokens.items()
if regex.search(key) is None}
_logger.info('Non-latin tokens removed')
self._tokens = collections.Counter(matches_out)
def _remove_uppercase(self):
"""Remove upper-case words. Only run AFTER remove_duplicates."""
tokens_caps = {key for key in self._tokens.keys()
if key[0].isupper()}
no_caps = {key: count for (key, count) in self._tokens.items()
if key not in tokens_caps}
_logger.info('Uppercase tokens removed')
self._tokens = collections.Counter(no_caps)
def filter_tokens(self, filter_source, length=10000):
"""Filter the tokens using a specified outside word list."""
with open(filter_source, 'r') as f:
data = f.read()
# structured for a file of entries in the form 'word ###\n'
regex = re.compile(r'(\w*) \d*')
filter_set = set(regex.findall(data)[:length])
tokens_filtered = {key: count for (key, count) in self._tokens.items()
if key.lower() not in filter_set}
_logger.info('Tokens filtered using {}'.format(filter_source))
self._tokens = collections.Counter(tokens_filtered)
# methods for writing the token sets to various types of files in various
# configurations
def get_tokens(self, output_type='items'):
"""Return tokens as a list object.
:param output_type: 'items' (default), 'elements', or 'counts'
"""
# create the correct output for type, or print error to screen
if output_type == 'items':
out_list = self._tokens.keys()
elif output_type == 'elements':
out_list = self._tokens.elements()
elif output_type == 'counts':
out_list = ['{}\t{}'.format(key, count) for (key, count)
in self._tokens.items()]
else:
err_msg = "output_type: 'items' (default), 'elements', 'counts'"
raise ValueError(err_msg)
return out_list
def output_text(self, outfile, output_type='items'):
"""Output tokens to a text file.
:param outfile: the file where tokens should be written
:param output_type: 'items' (default), 'elements', or 'counts'
"""
# create the correct output for type, or print error to screen
if output_type == 'items':
out_list = self._tokens.keys()
elif output_type == 'elements':
out_list = self._tokens.elements()
elif output_type == 'counts':
out_list = ['{}\t{}'.format(key, count) for (key, count)
in self._tokens.items()]
else:
err_msg = "output_type: 'items' (default), 'elements', 'counts'"
raise ValueError(err_msg)
with open(outfile, 'w') as out_file:
out_file.write('\n'.join(out_list))
out_msg = "{} tokens written to {}".format(len(self._tokens), outfile)
_logger.info(out_msg)
print(out_msg)
class WikipediaTokenizer(GeneralTokenizer):
"""A class for tokenizing the output from WikiExtractor corpora parsing.
This class allows quick tokenization of Wikipedia corpora (dumps.wikimedia.org)
that have been parsed using WikiExtractor (https://github.com/attardi/wikiextractor).
It inherits all of the methods of GeneralTokenizer, with only a change to the
private _extract_tokens method.
"""
def __init__(self, target):
"""Initialize a WikipediaTokenizer object with a file or a directory of files.
:param target: a file or directory of files containing text to be tokenized.
"""
GeneralTokenizer.__init__(self, target)
# methods for extracting tokens from files and/or directories of files
def _extract_tokens(self, file_text):
"""Extract tokens from a file and return a Counter dictionary."""
token_dict = collections.Counter()
# matches and removes beginning and end tags
regex = re.compile(r'(<doc id.*>|<\/doc>)')
data = regex.sub('', file_text)
tokens = wordpunct_tokenize(data)
for token in tokens:
token_dict[token] += 1
return token_dict
class BabelTokenizer(GeneralTokenizer):
"""A class for tokenizing IARPA Babel Program audio transcript files.
It inherits all of the methods of GeneralTokenizer, with only a change to the
private _extract_tokens method.
"""
def __init__(self, target):
"""Initialize a BabelTokenizer object with a file or a directory of files.
:param target: a file or directory of files containing text to be tokenized.
"""
GeneralTokenizer.__init__(self, target)
def _extract_tokens(self, file_text):
"""Extract tokens from a Babel file and return a Counter dictionary."""
token_dict = collections.Counter()
# matches and removes beginning and end tags
regex = re.compile(r'\[\d*\.\d*\]\n(.*)')
matches = regex.findall(file_text)
tokens = set()
for match in matches:
wp_tokenized = wordpunct_tokenize(match)
tokens.update(wp_tokenized)
for token in tokens:
token_dict[token] += 1
return token_dict