/
corpus.py
331 lines (288 loc) · 12.5 KB
/
corpus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
import json
import boto3
import logging
from os import environ
from indra.statements import stmts_to_json, stmts_from_json
from . import file_defaults, InvalidCorpusError, CACHE, default_bucket, \
default_key_base, default_profile
from .util import _stmts_dict_to_json, _json_to_stmts_dict, _json_dumper, \
_json_loader
logger = logging.getLogger(__name__)
class Corpus(object):
"""Represent a corpus of statements with curation.
Parameters
----------
statements : list[indra.statement.Statement]
A list of INDRA Statements to embed in the corpus.
raw_statements : list[indra.statement.Statement]
A List of raw statements forming the basis of the statements in
'statements'.
aws_name : str
The name of the profile in the AWS credential file to use. 'default' is
used by default.
Attributes
----------
statements : dict
A dict of INDRA Statements keyed by UUID.
raw_statements : list
A list of the raw statements
curations : dict
A dict keeping track of the curations submitted so far for Statement
UUIDs in the corpus.
meta_data : dict
A dict with meta data associated with the corpus
"""
def __init__(self, corpus_id, statements=None, raw_statements=None,
meta_data=None, aws_name=default_profile):
self.corpus_id = corpus_id
self.statements = {st.uuid: st for st in statements} if statements \
else {}
self.raw_statements = raw_statements if raw_statements else []
self.curations = {}
self.meta_data = meta_data if meta_data else {}
self.aws_name = aws_name
self._s3 = None
def _get_s3_client(self):
if self._s3 is None:
if environ.get('AWS_ACCESS_KEY_ID') and \
environ.get('AWS_SECRET_ACCESS_KEY'):
logger.info('Got credentials in environment for client')
self._s3 = boto3.session.Session(
aws_access_key_id=environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=environ.get('AWS_SECRET_ACCESS_KEY')
).client('s3')
else:
logger.info('Using stored AWS profile for client')
self._s3 = boto3.session.Session(
profile_name=self.aws_name).client('s3')
return self._s3
def __str__(self):
return 'Corpus(%s -> %s)' % (str(self.statements), str(self.curations))
def __repr__(self):
return str(self)
@classmethod
def load_from_s3(cls, corpus_id, aws_name=default_profile,
bucket=default_bucket, force_s3_reload=False,
raise_exc=False):
corpus = cls(corpus_id, statements=[], aws_name=aws_name)
corpus.s3_get(bucket, cache=(not force_s3_reload),
raise_exc=raise_exc)
return corpus
def s3_put(self, bucket=default_bucket, cache=True):
"""Push a corpus object to S3 in the form of three json files
The json files representing the object have S3 keys of the format
<key_base_name>/<name>/<file>.json
Parameters
----------
bucket : str
The S3 bucket to upload the Corpus to. Default: 'world-modelers'.
cache : bool
If True, also create a local cache of the corpus. Default: True.
Returns
-------
keys : tuple(str)
A tuple of three strings giving the S3 key to the pushed objects
"""
# Note that the S3 path to each json file is of the form
# <bucket>/indra_models/<corpus_id>/<file>.json"
s3key = '%s/%s/' % (default_key_base, self.corpus_id)
raw = s3key + file_defaults['raw'] + '.json'
sts = s3key + file_defaults['sts'] + '.json'
cur = s3key + file_defaults['cur'] + '.json'
meta = s3key + file_defaults['meta'] + '.json'
try:
s3 = self._get_s3_client()
# Structure and upload raw statements
self._s3_put_file(s3, raw, stmts_to_json(self.raw_statements),
bucket)
# Structure and upload assembled statements
self._s3_put_file(s3, sts, _stmts_dict_to_json(self.statements),
bucket)
# Structure and upload curations
self._s3_put_file(s3, cur, self.curations, bucket)
# Upload meta data
self._s3_put_file(s3, meta, self.meta_data, bucket)
if cache:
self._save_to_cache(raw, sts, cur)
return list((raw, sts, cur))
except Exception as e:
logger.exception('Failed to put on s3: %s' % e)
return None
@staticmethod
def _s3_put_file(s3, key, json_obj, bucket=default_bucket):
"""Does the json.dumps operation for the the upload, i.e. json_obj
must be an object that can be turned into a bytestring using
json.dumps"""
logger.info('Uploading %s to S3' % key)
s3.put_object(Body=json.dumps(json_obj, indent=1),
Bucket=bucket, Key=key)
def _save_to_cache(self, raw=None, sts=None, cur=None, meta=None):
"""Helper method that saves the current state of the provided
file keys"""
# Assuming file keys are full s3 keys:
# <base_name>/<dirname>/<file>.json
# Raw:
if raw:
rawf = CACHE.joinpath(raw.replace(default_key_base + '/', ''))
if not rawf.is_file():
rawf.parent.mkdir(exist_ok=True, parents=True)
rawf.touch(exist_ok=True)
_json_dumper(jsonobj=stmts_to_json(self.raw_statements),
fpath=rawf.as_posix())
# Assembled
if sts:
stsf = CACHE.joinpath(sts.replace(default_key_base + '/', ''))
if not stsf.is_file():
stsf.parent.mkdir(exist_ok=True, parents=True)
stsf.touch(exist_ok=True)
_json_dumper(jsonobj=_stmts_dict_to_json(self.statements),
fpath=stsf.as_posix())
# Curation
if cur:
curf = CACHE.joinpath(cur.replace(default_key_base + '/', ''))
if not curf.is_file():
curf.parent.mkdir(exist_ok=True, parents=True)
curf.touch(exist_ok=True)
_json_dumper(jsonobj=self.curations, fpath=curf.as_posix())
# Meta data
if meta:
metaf = CACHE.joinpath(meta.replace(default_key_base + '/', ''))
if not metaf.is_file():
metaf.parent.mkdir(exist_ok=True, parents=True)
metaf.touch(exist_ok=True)
_json_dumper(jsonobj=self.meta_data, fpath=metaf.as_posix())
def s3_get(self, bucket=default_bucket, cache=True,
raise_exc=False):
"""Fetch a corpus object from S3 in the form of three json files
The json files representing the object have S3 keys of the format
<s3key>/statements.json and <s3key>/raw_statements.json.
Parameters
----------
bucket : str
The S3 bucket to fetch the Corpus from. Default: 'world-modelers'.
cache : bool
If True, look for corpus in local cache instead of loading it
from s3. Default: True.
raise_exc : bool
If True, raise InvalidCorpusError when corpus failed to load
"""
# Note that the S3 path to each json file is of the form
# <bucket>/indra_models/<corpus_id>/<file>.json"
s3key = '%s/%s/' % (default_key_base, self.corpus_id)
raw = s3key + file_defaults['raw'] + '.json'
sts = s3key + file_defaults['sts'] + '.json'
cur = s3key + file_defaults['cur'] + '.json'
meta = s3key + file_defaults['meta'] + '.json'
try:
logger.info('Loading corpus: %s' % s3key)
s3 = self._get_s3_client()
# Get and process raw statements
raw_stmt_jsons = []
if cache:
raw_stmt_jsons = self._load_from_cache(raw) or []
if not raw_stmt_jsons:
raw_stmt_jsons_str = s3.get_object(
Bucket=bucket, Key=raw)['Body'].read()
raw_stmt_jsons = json.loads(raw_stmt_jsons_str) or []
self.raw_statements = stmts_from_json(raw_stmt_jsons)
# Get and process assembled statements from list to dict
json_stmts = []
if cache:
json_stmts = self._load_from_cache(sts) or []
if not json_stmts:
json_stmts = json.loads(s3.get_object(
Bucket=bucket, Key=sts)['Body'].read()) or []
self.statements = _json_to_stmts_dict(json_stmts)
# Get and process curations if any
curation_json = {}
if cache:
curation_json = self._load_from_cache(cur) or {}
if not curation_json:
curation_json = json.loads(s3.get_object(
Bucket=bucket, Key=cur)['Body'].read()) or {}
self.curations = curation_json
meta_json = {}
try:
if cache:
meta_json = self._load_from_cache(meta)
if not meta_json:
meta_json = json.loads(s3.get_object(
Bucket=bucket, Key=meta)['Body'].read())
except Exception as e:
if isinstance(e, s3.exceptions.NoSuchKey):
logger.warning('No meta data found on s3')
else:
logger.warning('No meta data found')
meta_json = {}
self.meta_data = meta_json
except Exception as e:
if raise_exc:
raise InvalidCorpusError('Failed to get from s3: %s' % e)
else:
logger.warning('Failed to get from s3: %s' % e)
def upload_curations(self, look_in_cache=False,
save_to_cache=False, bucket=default_bucket):
"""Upload the current state of curations for the corpus
Parameters
----------
look_in_cache : bool
If True, when no curations are avaialbe check if there are
curations cached locally. Default: False
save_to_cache : bool
If True, also save current curation state to cache. If
look_in_cache is True, this option will have no effect. Default:
False.
bucket : str
The bucket to upload to. Default: 'world-modelers'.
"""
# Get curation file key
file_key = '%s/%s/%s.json' % (default_key_base, self.corpus_id,
file_defaults['cur'])
# First see if we have any curations, then check in cache if
# look_in_cache == True
if self.curations:
curations = self.curations
elif look_in_cache:
curations = self._load_from_cache(file_key)
else:
curations = None
# Only upload if we actually have any curations to upload
if curations:
self._s3_put_file(s3=self._get_s3_client(),
key=file_key,
json_obj=curations,
bucket=bucket)
if self.curations and save_to_cache and not look_in_cache:
self._save_to_cache(cur=file_key)
def get_curations(self, look_in_cache=False):
"""Get curations for the corpus
Parameters
----------
look_in_cache : bool
If True, look in local cache if there are no curations loaded
Returns
-------
dict
The curations for this corpus, if any
"""
if self.curations:
curations = self.curations
elif look_in_cache:
file_key = '%s/%s/%s.json' % (default_key_base, self.corpus_id,
file_defaults['cur'])
curations = self._load_from_cache(file_key) or {}
else:
curations = {}
return curations
@staticmethod
def _load_from_cache(file_key):
# Assuming file_key is cleaned, contains the file name and contains
# the initial file base name:
# <base_name>/<dirname>/<file>.json
# Remove <base_name> and get local path to file
local_file = CACHE.joinpath(
'/'.join([s for s in file_key.split('/')[1:]]))
# Load json object
if local_file.is_file():
return _json_loader(local_file.as_posix())
return None