-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
555 lines (436 loc) · 23.5 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
import importlib
import logging
import os
from itertools import cycle
from pathlib import Path
from shutil import copyfile
from statistics import mean
from typing import List, Tuple, Dict, Iterable
import joblib
import numpy as np
from sklearn_crfsuite import metrics
from tabulate import tabulate
from medacy.data.annotations import Annotations, EntTuple
from medacy.data.dataset import Dataset
from medacy.pipeline_components.feature_extractors import FeatureTuple
from medacy.pipelines.base.base_pipeline import BasePipeline
DEFAULT_NUM_FOLDS = 10
def create_folds(y, num_folds=DEFAULT_NUM_FOLDS) -> List[Tuple[FeatureTuple, List]]:
"""
Partitions a data set of sequence labels and classifications into a number of stratified folds. Each partition
should have an evenly distributed representation of sequence labels. Without stratification, under-representated
labels may not appear in some folds. Returns an iterable [(X*,y*), ...] where each element contains the indices
of the train and test set for the particular testing fold.
See Dietterich, 1997 "Approximate Statistical Tests for Comparing Supervised Classification
Algorithms" for in-depth analysis.
:param y: a collection of sequence labels
:param num_folds: the number of folds (defaults to five, but must be >= 2
:return: an iterable
"""
if not isinstance(num_folds, int) or num_folds < 2:
raise ValueError(f"'num_folds' must be an int >= 2, but is {repr(num_folds)}")
# labels are ordered by most examples in data
labels = np.unique([label for sequence in y for label in sequence])
np.flip(labels)
added = np.ones(len(y), dtype=bool)
partitions = [[] for _ in range(num_folds)]
partition_cycler = cycle(partitions)
for label in labels:
possible_sequences = [index for index, sequence in enumerate(y) if label in sequence]
for index in possible_sequences:
if added[index]:
partition = next(partition_cycler)
partition.append(index)
added[index] = 0
train_test_array = []
for i, y in enumerate(partitions):
X = []
for j, partition in enumerate(partitions):
if i != j:
X += partition
train_test_array.append((X, y))
return train_test_array
def sequence_to_ann(X: List[FeatureTuple], y: List[str], file_names: Iterable[str]) -> Dict[str, Annotations]:
"""
Creates a dictionary of document-level Annotations objects for a given sequence
:param X: A list of sentence level zipped (features, indices, document_name) tuples
:param y: A list of sentence-level lists of tags
:param file_names: A list of file names that are used by these sequences
:return: A dictionary mapping txt file names (the whole path) to their Annotations objects, where the
Annotations are constructed from the X and y data given here.
"""
# Flattening nested structures into 2d lists
anns = {filename: Annotations([]) for filename in file_names}
tuples_by_doc = {filename: [] for filename in file_names}
document_indices = []
span_indices = []
for sequence in X:
document_indices += [sequence.file_name] * len(sequence.features)
span_indices.extend(sequence.indices)
groundtruth = [element for sentence in y for element in sentence]
# Map the predicted sequences to their corresponding documents
i = 0
while i < len(groundtruth):
if groundtruth[i] == 'O':
i += 1
continue
entity = groundtruth[i]
document = document_indices[i]
first_start, first_end = span_indices[i]
# Ensure that consecutive tokens with the same label are merged
while i < len(groundtruth) - 1 and groundtruth[i + 1] == entity: # If inside entity, keep incrementing
i += 1
last_start, last_end = span_indices[i]
tuples_by_doc[document].append((entity, first_start, last_end))
i += 1
# Create the Annotations objects
for file_name, tups in tuples_by_doc.items():
ann_tups = []
with open(file_name) as f:
text = f.read()
for tup in tups:
entity, start, end = tup
ent_text = text[start:end]
new_tup = EntTuple(entity, start, end, ent_text)
ann_tups.append(new_tup)
anns[file_name].annotations = ann_tups
return anns
def write_ann_dicts(output_dir: Path, dict_list: List[Dict[str, Annotations]]) -> Dict[str, Annotations]:
"""
Merges a list of dicts of Annotations into one dict representing all the individual ann files and prints the
ann data for both the individual Annotations and the combined one.
:param output_dir: Path object of the output directory (a subdirectory is made for each fold)
:param dict_list: a list of file_name: Annotations dictionaries
:return: The merged Annotations dict, if wanted
"""
file_names = set()
for d in dict_list:
file_names |= set(d.keys())
all_annotations_dict = {filename: Annotations([]) for filename in file_names}
for i, fold_dict in enumerate(dict_list, 1):
fold_dir = output_dir / f"fold_{i}"
os.mkdir(fold_dir)
for file_name, ann in fold_dict.items():
# Write the Annotations from the individual fold to file;
# Note that in this is written to the fold_dir, which is a subfolder of the output_dir
ann.to_ann(fold_dir / (os.path.basename(file_name).rstrip("txt") + "ann"))
# Merge the Annotations from the fold into the inter-fold Annotations
all_annotations_dict[file_name] |= ann
# Write the Annotations that are the combination of all folds to file
for file_name, ann in all_annotations_dict.items():
output_file_path = output_dir / (os.path.basename(file_name).rstrip("txt") + "ann")
ann.to_ann(output_file_path)
return all_annotations_dict
class Model:
"""
A medaCy Model allows the fitting of a named entity recognition model to a given dataset according to the
configuration of a given medaCy pipeline. Once fitted, Model instances can be used to predict over documents.
Also included is a function for cross validating over a dataset for measuring the performance of a pipeline.
:ivar pipeline: a medaCy pipeline, must be a subclass of BasePipeline (see medacy.pipelines.base.BasePipeline)
:ivar model: weights, if the model has been fitted
:ivar X_data: X_data from the pipeline; primarily for internal use
:ivar y_data: y_data from the pipeline; primarily for internal use
"""
def __init__(self, medacy_pipeline, model=None):
if not isinstance(medacy_pipeline, BasePipeline):
raise TypeError("Pipeline must be a medaCy pipeline that interfaces medacy.pipelines.base.BasePipeline")
self.pipeline = medacy_pipeline
self.model = model
# These arrays will store the sequences of features and sequences of corresponding labels
self.X_data = []
self.y_data = []
# Run an initializing document through the pipeline to register all token extensions.
# This allows the gathering of pipeline information prior to fitting with live data.
doc = self.pipeline(medacy_pipeline.spacy_pipeline.make_doc("Initialize"), predict=True)
if doc is None:
raise IOError("Model could not be initialized with the set pipeline.")
def preprocess(self, dataset):
"""
Preprocess dataset into a list of sequences and tags.
:param dataset: Dataset object to preprocess.
"""
self.X_data = []
self.y_data = []
# Run all Docs through the pipeline before extracting features, allowing for pipeline components
# that require inter-dependent doc objects
docs = [self._run_through_pipeline(data_file) for data_file in dataset if data_file.txt_path]
for doc in docs:
features, labels = self._extract_features(doc)
self.X_data += features
self.y_data += labels
def fit(self, dataset: Dataset, groundtruth_directory: Path = None):
"""
Runs dataset through the designated pipeline, extracts features, and fits a conditional random field.
:param dataset: Instance of Dataset.
:return model: a trained instance of a sklearn_crfsuite.CRF model.
"""
groundtruth_directory = Path(groundtruth_directory) if groundtruth_directory else False
report = self.pipeline.get_report()
self.preprocess(dataset)
if groundtruth_directory:
logging.info(f"Writing dataset groundtruth to {groundtruth_directory}")
for file_path, ann in sequence_to_ann(self.X_data, self.y_data, {x[2] for x in self.X_data}).items():
ann.to_ann(groundtruth_directory / (os.path.basename(file_path).strip("txt") + "ann"))
learner_name, learner = self.pipeline.get_learner()
logging.info(f"Training: {learner_name}")
train_data = [x[0] for x in self.X_data]
learner.fit(train_data, self.y_data)
logging.info(f"Successfully Trained: {learner_name}\n{report}")
self.model = learner
return self.model
def _predict_document(self, doc):
"""
Generates an dictionary of predictions of the given model over the corresponding document. The passed document
is assumed to be annotated by the same pipeline utilized when training the model.
:param doc: A spacy document
:return: an Annotations object containing the model predictions
"""
feature_extractor = self.pipeline.get_feature_extractor()
features, indices = feature_extractor.get_features_with_span_indices(doc)
predictions = self.model.predict(features)
predictions = [element for sentence in predictions for element in sentence] # flatten 2d list
span_indices = [element for sentence in indices for element in sentence] # parallel array containing indices
annotations = []
i = 0
while i < len(predictions):
if predictions[i] == 'O':
i += 1
continue
entity = predictions[i]
first_start, first_end = span_indices[i]
# Ensure that consecutive tokens with the same label are merged
while i < len(predictions) - 1 and predictions[i + 1] == entity: # If inside entity, keep incrementing
i += 1
last_start, last_end = span_indices[i]
labeled_text = doc.text[first_start:last_end]
new_ent = EntTuple(entity, first_start, last_end, labeled_text)
annotations.append(new_ent)
logging.debug(f"{doc._.file_name}: Predicted {entity} at ({first_start}, {last_end}) {labeled_text}")
i += 1
return Annotations(annotations)
def predict(self, input_data, prediction_directory=None):
"""
Generates predictions over a string or a input_data utilizing the pipeline equipped to the instance.
:param input_data: a string, Dataset, or directory path to predict over
:param prediction_directory: The directory to write predictions if doing bulk prediction
(default: */prediction* sub-directory of Dataset)
:return: if input_data is a str, returns an Annotations of the predictions;
if input_data is a Dataset or a valid directory path, returns a Dataset of the predictions.
Note that if input_data is supposed to be a directory path but the directory is not found, it will be predicted
over as a string. This can be prevented by validating inputs with os.path.isdir().
"""
if self.model is None:
raise RuntimeError("Must fit or load a pickled model before predicting")
if isinstance(input_data, str) and not os.path.isdir(input_data):
doc = self.pipeline.spacy_pipeline.make_doc(input_data)
doc.set_extension('file_name', default=None, force=True)
doc._.file_name = 'STRING_INPUT'
doc = self.pipeline(doc, predict=True)
annotations = self._predict_document(doc)
return annotations
if isinstance(input_data, Dataset):
input_files = [d.txt_path for d in input_data]
# Change input_data to point to the Dataset's directory path so that we can use it
# to create the prediction directory
input_data = input_data.data_directory
elif os.path.isdir(input_data):
input_files = [os.path.join(input_data, f) for f in os.listdir(input_data) if f.endswith('.txt')]
else:
raise ValueError(f"'input_data' must be a string (which can be a directory path) or a Dataset, but is {repr(input_data)}")
if prediction_directory is None:
prediction_directory = os.path.join(input_data, 'predictions')
if os.path.isdir(prediction_directory):
logging.warning("Overwriting existing predictions at %s", prediction_directory)
else:
os.mkdir(prediction_directory)
for file_path in input_files:
file_name = os.path.basename(file_path).strip('.txt')
logging.info("Predicting file: %s", file_path)
with open(file_path, 'r') as f:
doc = self.pipeline.spacy_pipeline.make_doc(f.read())
doc.set_extension('file_name', default=None, force=True)
doc._.file_name = file_name
# run through the pipeline
doc = self.pipeline(doc, predict=True)
# Predict, creating a new Annotations object
annotations = self._predict_document(doc)
logging.debug("Writing to: %s", os.path.join(prediction_directory, file_name + ".ann"))
annotations.to_ann(write_location=os.path.join(prediction_directory, file_name + ".ann"))
# Copy the txt file so that the output will also be a Dataset
copyfile(file_path, os.path.join(prediction_directory, file_name + ".txt"))
return Dataset(prediction_directory)
def cross_validate(self, training_dataset, num_folds=DEFAULT_NUM_FOLDS, prediction_directory=None, groundtruth_directory=None):
"""
Performs k-fold stratified cross-validation using our model and pipeline.
If the training dataset, groundtruth_directory and prediction_directory are passed, intermediate predictions during cross validation
are written to the directory `write_predictions`. This allows one to construct a confusion matrix or to compute
the prediction ambiguity with the methods present in the Dataset class to support pipeline development without
a designated evaluation set.
:param training_dataset: Dataset that is being cross validated
:param num_folds: number of folds to split training data into for cross validation, defaults to 5
:param prediction_directory: directory to write predictions of cross validation to
:param groundtruth_directory: directory to write the ground truth MedaCy evaluates on
:return: Prints out performance metrics, if prediction_directory
"""
if num_folds <= 1:
raise ValueError("Number of folds for cross validation must be greater than 1, but is %s" % repr(num_folds))
groundtruth_directory = Path(groundtruth_directory) if groundtruth_directory else False
prediction_directory = Path(prediction_directory) if prediction_directory else False
for d in [groundtruth_directory, prediction_directory]:
if d and not d.exists():
raise NotADirectoryError(f"Options groundtruth_directory and predictions_directory must be existing directories, but one is {d}")
pipeline_report = self.pipeline.get_report()
self.preprocess(training_dataset)
if not (self.X_data and self.y_data):
raise RuntimeError("Must have features and labels extracted for cross validation")
tags = sorted(self.pipeline.entities)
logging.info(f'Tagset: {tags}')
eval_stats = {}
# Dict for storing mapping of sequences to their corresponding file
fold_groundtruth_dicts = []
fold_prediction_dicts = []
file_names = {x.file_name for x in self.X_data}
folds = create_folds(self.y_data, num_folds)
for fold_num, fold_data in enumerate(folds, 1):
train_indices, test_indices = fold_data
fold_statistics = {}
learner_name, learner = self.pipeline.get_learner()
X_train = [self.X_data[index] for index in train_indices]
y_train = [self.y_data[index] for index in train_indices]
X_test = [self.X_data[index] for index in test_indices]
y_test = [self.y_data[index] for index in test_indices]
logging.info("Training Fold %i", fold_num)
train_data = [x[0] for x in X_train]
test_data = [x[0] for x in X_test]
learner.fit(train_data, y_train)
y_pred = learner.predict(test_data)
if groundtruth_directory is not None:
ann_dict = sequence_to_ann(X_test, y_test, file_names)
fold_groundtruth_dicts.append(ann_dict)
if prediction_directory is not None:
ann_dict = sequence_to_ann(X_test, y_pred, file_names)
fold_prediction_dicts.append(ann_dict)
# Write the metrics for this fold.
for label in tags:
fold_statistics[label] = {
"recall": metrics.flat_recall_score(y_test, y_pred, average='weighted', labels=[label]),
"precision": metrics.flat_precision_score(y_test, y_pred, average='weighted', labels=[label]),
"f1": metrics.flat_f1_score(y_test, y_pred, average='weighted', labels=[label])
}
# add averages
fold_statistics['system'] = {
"recall": metrics.flat_recall_score(y_test, y_pred, average='weighted', labels=tags),
"precision": metrics.flat_precision_score(y_test, y_pred, average='weighted', labels=tags),
"f1": metrics.flat_f1_score(y_test, y_pred, average='weighted', labels=tags)
}
table_data = [
[label,
format(fold_statistics[label]['precision'], ".3f"),
format(fold_statistics[label]['recall'], ".3f"),
format(fold_statistics[label]['f1'], ".3f")
] for label in tags + ['system']
]
logging.info('\n' + tabulate(table_data, headers=['Entity', 'Precision', 'Recall', 'F1'], tablefmt='orgtbl'))
eval_stats[fold_num] = fold_statistics
statistics_all_folds = {}
for label in tags + ['system']:
statistics_all_folds[label] = {
'precision_average': mean(eval_stats[fold][label]['precision'] for fold in eval_stats),
'precision_max': max(eval_stats[fold][label]['precision'] for fold in eval_stats),
'precision_min': min(eval_stats[fold][label]['precision'] for fold in eval_stats),
'recall_average': mean(eval_stats[fold][label]['recall'] for fold in eval_stats),
'recall_max': max(eval_stats[fold][label]['recall'] for fold in eval_stats),
'f1_average': mean(eval_stats[fold][label]['f1'] for fold in eval_stats),
'f1_max': max(eval_stats[fold][label]['f1'] for fold in eval_stats),
'f1_min': min(eval_stats[fold][label]['f1'] for fold in eval_stats),
}
entity_counts = training_dataset.compute_counts()
entity_counts['system'] = sum(v for k, v in entity_counts.items() if k in self.pipeline.entities)
table_data = [
[f"{label} ({entity_counts[label]})", # Entity (Count)
format(statistics_all_folds[label]['precision_average'], ".3f"),
format(statistics_all_folds[label]['recall_average'], ".3f"),
format(statistics_all_folds[label]['f1_average'], ".3f"),
format(statistics_all_folds[label]['f1_min'], ".3f"),
format(statistics_all_folds[label]['f1_max'], ".3f")
] for label in tags + ['system']
]
# Combine the pipeline report and the resulting data, then log it or print it (whichever ensures that it prints)
output_str = '\n' + pipeline_report + '\n\n' + tabulate(
table_data,
headers=['Entity (Count)', 'Precision', 'Recall', 'F1', 'F1_Min', 'F1_Max'],
tablefmt='orgtbl'
)
if logging.root.level > logging.INFO:
print(output_str)
else:
logging.info(output_str)
# Write groundtruth and predictions to file
if groundtruth_directory:
write_ann_dicts(groundtruth_directory, fold_groundtruth_dicts)
if prediction_directory:
write_ann_dicts(prediction_directory, fold_prediction_dicts)
return statistics_all_folds
def _run_through_pipeline(self, data_file):
"""
Runs a DataFile through the pipeline, returning the resulting Doc object
:param data_file: instance of DataFile
:return: a Doc object
"""
nlp = self.pipeline.spacy_pipeline
logging.info("Processing file: %s", data_file.file_name)
with open(data_file.txt_path, 'r', encoding='utf-8') as f:
doc = nlp.make_doc(f.read())
# Link ann_path to doc
doc.set_extension('gold_annotation_file', default=None, force=True)
doc.set_extension('file_name', default=None, force=True)
doc._.gold_annotation_file = data_file.ann_path
doc._.file_name = data_file.txt_path
# run 'er through
return self.pipeline(doc)
def _extract_features(self, doc):
"""
Extracts features from a Doc
:param doc: an instance of Doc
:return: a tuple of the feature dict and label list
"""
feature_extractor = self.pipeline.get_feature_extractor()
features, labels = feature_extractor(doc)
logging.info(f"{doc._.file_name}: Feature Extraction Completed (num_sequences={len(labels)})")
return features, labels
def load(self, path):
"""
Loads a pickled model.
:param path: File path to directory where fitted model should be dumped
:return:
"""
model_name, model = self.pipeline.get_learner()
if model_name == 'BiLSTM+CRF' or model_name == 'BERT':
model.load(path)
self.model = model
else:
self.model = joblib.load(path)
def dump(self, path):
"""
Dumps a model into a pickle file
:param path: Directory path to dump the model
:return:
"""
if self.model is None:
raise RuntimeError("Must fit model before dumping.")
model_name, _ = self.pipeline.get_learner()
if model_name == 'BiLSTM+CRF' or model_name == 'BERT':
self.model.save(path)
else:
joblib.dump(self.model, path)
@staticmethod
def load_external(package_name):
"""
Loads an external medaCy compatible Model. Require's the models package to be installed
Alternatively, you can import the package directly and call it's .load() method.
:param package_name: the package name of the model
:return: an instance of Model that is configured and loaded - ready for prediction.
"""
if importlib.util.find_spec(package_name) is None:
raise ImportError("Package not installed: %s" % package_name)
return importlib.import_module(package_name).load()