/
model.py
567 lines (487 loc) · 17.5 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
# -*- coding: utf-8 -*-
"""
This module provides features to manipulate model files.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import struct
from binascii import crc32
from io import BytesIO
import base64
import subprocess
import tempfile
import optparse
import msgpack
import json
from .compat import *
from ._stdio import print, printe, get_stdio
from ._process import JubaProcess
class JubaDump(object):
"""
``JubaDump`` provides a high-level dump of Jubatus models.
``jubadump`` command must be installed.
"""
@classmethod
def dump_file(cls, target):
"""
Returns the dumped model data structure of the model file path ``target``.
"""
proc = JubaProcess.get_process(['jubadump', '-i', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
status = proc.returncode
if status != 0:
raise InvalidModelFormatError('{0} (exit with status {1})'.format(stderr, status))
return json.loads(stdout.decode())
@classmethod
def dump(cls, data):
"""
Returns the dumped model data structure of the raw model data.
"""
with tempfile.NamedTemporaryFile(mode='wb', prefix='jubakit-jubadump-') as f:
f.write(data)
f.flush()
return cls.dump_file(f.name)
class JubaModel(object):
"""
``JubaModel`` provides features to perform low-level manipulation of Jubatus model data structure.
"""
def __init__(self):
self.header = self.Header()
self.system = self.SystemContainer()
self.user = self.UserContainer()
self._user_raw = None
@classmethod
def load_binary(cls, f, validate=True):
"""
Loads Jubatus binary model file from binary stream ``f``.
When ``validate`` is ``True``, the model file format is strictly validated.
"""
m = cls()
checksum = 0
# Load header
h = cls.Header.load(f)
m.header = h
if validate:
checksum = crc32(h.dumps(False), checksum)
# Load system_data
buf = f.read(h.system_data_size)
m.system = cls.SystemContainer.loads(buf)
if validate:
if h.system_data_size != len(buf):
raise InvalidModelFormatError(
'EOF detected while reading system_data: ' +
'expected {0} bytes, got {1} bytes'.format(h.system_data_size, len(buf)))
checksum = crc32(buf, checksum)
# Load user_data
buf = f.read(h.user_data_size)
m.user = cls.UserContainer.loads(buf)
m._user_raw = buf
if validate:
if h.user_data_size != len(buf):
raise InvalidModelFormatError(
'EOF detected while reading user_data: ' +
'expected {0} bytes, got {1} bytes'.format(h.user_data_size, len(buf)))
checksum = crc32(buf, checksum)
if validate:
# Convert the checksum into 32-bit unsigned integer (for Python 2/3 compatibility)
checksum = checksum & 0xffffffff
# Check CRC
if checksum != h.crc32:
raise InvalidModelFormatError(
'CRC32 mismatch: expected {0}, got {1}'.format(checksum, h.crc32))
return m
def dump_binary(self, f):
"""
Dumps the model as Jubatus binary model file to binary stream ``f``.
"""
# Dump header
self.header.dump(f)
# Dump system_data
self.system.dump(f)
# Dump user_data
if self._user_raw is None:
printe('Warning: conversion from Python object to binary model format may generate corrupt model')
self.user.dump(f)
else:
f.write(self._user_raw)
@classmethod
def load_json(cls, f):
"""
Loads model file saved as JSON file from text stream ``f``.
"""
m = cls()
record = json.load(f)
# Load header
if 'header' not in record:
raise InvalidModelFormatError('header section does not exist')
m.header.set(record['header'])
# Load system_data
if 'system' not in record:
raise InvalidModelFormatError('system section does not exist')
m.system.set(record['system'])
# Load user_data
if 'user_raw' in record:
if 'user' in record:
printe('Notice: using "user_raw" record from JSON; "user" record is ignored')
raw = base64.b64decode(record['user_raw'])
try:
m.user = cls.UserContainer.loads(raw)
except UnicodeDecodeError:
printe('Warning: model contains non UTF-8 strings; cannot be loaded')
m.user = cls.UserContainer()
m.user.user_data = None
m._user_raw = raw
elif 'user' in record:
m.user.set(record['user'])
else:
raise InvalidModelFormatError('user or user_raw section does not exist')
return m
def dump_json(self, f, without_raw=False):
"""
Dumps the model as JSON file to a text stream ``f``.
"""
record = {}
# Dump header
record['header'] = dict(self.header.get())
# Dump system_data
record['system'] = dict(self.system.get())
# Dump user_data
record['user'] = dict(self.user.get())
if not without_raw:
record['user_raw'] = base64.b64encode(self._user_raw).decode()
json.dump(record, f, indent=2)
def dump_text(self, f):
"""
Dumps the model as human-readable text format to a text stream ``f``.
"""
buf = []
for (heading, obj) in [ ('Meta Data', self.header),
('System Data', self.system),
('User Data', self.user) ]:
buf.append("------------------------------------------")
buf.append(heading)
buf.append("------------------------------------------")
for (k, v) in obj.get():
buf.append('{0:24}{1}'.format(k, v))
buf.append('')
f.write('\n'.join(buf))
@classmethod
def predict_format(cls, filename):
"""
Loads the model file named ``filename``.
Returns ``binary`` or ``json``.
"""
with open(filename, 'rb') as f:
sig = f.read(1)
f.seek(-1, 1)
if sig[0] == cls.Header._MAGIC[0]:
return 'binary'
elif sig == b'{':
return 'json'
raise InvalidModelFormatError('model format cannot be predicted')
def fix_header(self):
"""
Repairs the header values.
"""
# Update magic
self.header.magic = self.header._MAGIC
# Update system_data_size
system_raw = self.system.dumps()
self.header.system_data_size = len(system_raw)
# Update user_data_size
user_raw = self._user_raw
self.header.user_data_size = len(user_raw)
# Update crc32
header_raw = self.header.dumps(False)
checksum = 0
checksum = crc32(header_raw, checksum)
checksum = crc32(system_raw, checksum)
checksum = crc32(user_raw, checksum)
# Convert the checksum into 32-bit unsigned integer (for Python 2/3 compatibility)
self.header.crc32 = (checksum & 0xffffffff)
def data(self):
"""
Returns the actual model data part.
This method is a quick shortcut for ``return self.user.user_data``.
"""
return self.user.user_data
class ModelPart(object):
def __init__(self):
for (key, _, default) in self.fields():
setattr(self, key, default)
@classmethod
def fields(cls):
"""
Returns the list of (property_name, data_type, default_value).
"""
raise NotImplementedError
def get(self):
record = []
for (key, _, _) in self.fields():
record.append((key, getattr(self, key),))
return record
def set(self, record):
for (key, _, _) in self.fields():
new_value = record[key]
if isinstance(new_value, bytes):
new_value = new_value.decode()
setattr(self, key, new_value)
@classmethod
def load(cls, f, *args, **kwargs):
# Must be implemented in sub classes.
raise NotImplementedError
@classmethod
def loads(cls, data, *args, **kwargs):
return cls.load(BytesIO(data), *args, **kwargs)
def dump(self, f, *args, **kwargs):
# Must be implemented in sub classes.
raise NotImplementedError
def dumps(self, *args, **kwargs):
f = BytesIO()
self.dump(f, *args, **kwargs)
return f.getvalue()
class Header(ModelPart):
# Magic value for binary model files.
_MAGIC = b'jubatus\0'
@classmethod
def fields(cls):
return [
('format_version' , b'>Q', 1),
('jubatus_version_major', b'>I', 0),
('jubatus_version_minor', b'>I', 0),
('jubatus_version_maint', b'>I', 0),
('crc32' , b'>I', 0),
('system_data_size' , b'>Q', 0),
('user_data_size' , b'>Q', 0),
]
@classmethod
def load(cls, f):
h = cls()
magic = f.read(8)
if len(magic) != 8 or magic != cls._MAGIC:
raise InvalidModelFormatError('invalid magic value: {0}'.format(str(magic)))
for (key, fmt, _) in cls.fields():
size = struct.calcsize(fmt)
raw = f.read(size)
if len(raw) != size:
raise InvalidModelFormatError('failed to read {0} in header (expected {1} bytes, got {2} bytes)'.format(key, size, len(raw)))
try:
value = struct.unpack(fmt, raw)[0]
except ValueError:
raise InvalidModelFormatError('failed to parse {0} value {1} as {2}'.format(key, str(raw), fmt))
setattr(h, key, value)
return h
def dump(self, f, checksum=True):
f.write(bytes(self._MAGIC))
for (key, fmt, _) in self.fields():
if key == 'crc32' and not checksum: continue # skip checksum if checksum == False
f.write(struct.pack(fmt, getattr(self, key)))
class Container(ModelPart):
@classmethod
def load(cls, f):
# Assumes everything is encoded in UTF-8.
# This means that if some records (e.g., config files, feature vector
# keys) are not encoded in UTF-8, the model cannot be loaded. However,
# such models cannot be written out to text or JSON, so we don't really
# care. Callers are responsible for handling UnicodeDecodeError.
values = msgpack.load(f, encoding='utf-8', unicode_errors='strict')
field_names = map(lambda x: x[0], cls.fields())
c = cls()
c.set(dict(zip(field_names, values)))
return c
def dump(self, f):
values = list(map(lambda x: x[1], self.get()))
msgpack.dump(values, f)
class SystemContainer(Container):
@classmethod
def fields(cls):
return [
('version' , int , 0),
('timestamp', int , 0),
('type' , bytes, b''),
('id' , bytes, b''),
('config' , bytes, b''),
]
class UserContainer(Container):
@classmethod
def fields(cls):
return [
('version', int , 0),
('user_data', dict, {}),
]
class InvalidModelFormatError(Exception):
pass
class JubaModelError(Exception):
def __init__(self, msg, e=None):
if e:
msg2 = 'Error: {0} ({1}): {2}'.format(msg, type(e).__name__, str(e))
else:
msg2 = 'Error: {0}'.format(msg)
super(JubaModelError, self).__init__(msg2)
class _JubaModelOptionParser(optparse.OptionParser, object):
def __init__(self, *args, **kwargs):
self._error = False
super(_JubaModelOptionParser, self).__init__(*args, **kwargs)
def error(self, msg):
print('Error: {0}'.format(msg))
self._error = True
class _JubaModelCommand(object):
"""
Provides command line interface for ``jubamodel`` command.
"""
@classmethod
def run(cls, target, in_fmt, out_fmt, output=None,
fix_header=False, output_config=None,
replace_config=None, replace_version=None,
no_validate=False):
# Predict model file format
if in_fmt == 'auto':
try:
in_fmt = JubaModel.predict_format(target)
except InvalidModelFormatError as e:
raise JubaModelError('{0}: invalid model file format'.format(target), e)
except Exception as e:
raise JubaModelError('{0}: failed to predict model format'.format(target), e)
# Load model file
try:
if in_fmt == 'binary':
with open(target, 'rb') as f:
m = JubaModel.load_binary(f, not no_validate)
elif in_fmt == 'json':
with open(target, 'r') as f:
m = JubaModel.load_json(f)
else:
raise ValueError(in_fmt)
except InvalidModelFormatError as e:
raise JubaModelError('{0}: failed to parse model as {1}'.format(target, in_fmt), e)
except Exception as e:
raise JubaModelError('{0}: failed to load from model'.format(target), e)
# Replace config file
if replace_config is not None:
with open(replace_config) as f:
m.system.config = f.read()
if not fix_header:
printe('Warning: replacing config without fixing header; may generate corrupt model')
# Replace version
if replace_version is not None:
(major, minor, maint) = map(int, replace_version.split('.'))
m.header.jubatus_version_major = major
m.header.jubatus_version_minor = minor
m.header.jubatus_version_maint = maint
if not fix_header:
printe('Warning: replacing version without fixing header; may generate corrupt model')
# Repair header
if fix_header:
try:
m.fix_header()
except Exception as e:
raise JubaModelError('{0}: failed to fix header'.format(target), e)
# Output model contents
try:
if out_fmt == 'binary':
if not output:
raise JubaModelError('output file must be specified for binary output')
with open(output, 'wb') as f:
m.dump_binary(f)
elif out_fmt == 'json':
if not output:
m.dump_json(get_stdio()[1]) # stdout
else:
with open(output, 'w') as f:
m.dump_json(f)
elif out_fmt == 'text':
if not output:
m.dump_text(get_stdio()[1]) # stdout
else:
with open(output, 'w') as f:
m.dump_text(f)
except Exception as e:
raise JubaModelError('{0}: failed to write model'.format(output), e)
# Output config
if output_config:
try:
with open(output_config, 'w') as f:
f.write(m.system.config)
except Exception as e:
raise JubaModelError('{0}: failed to write config'.format(output_config), e)
@classmethod
def start(cls, args):
USAGE = '''
jubamodel [--in-format IN_FORMAT] [--out-format OUT_FORMAT]
[--output OUTPUT] [--output-config OUTPUT_CONFIG]
[--no-validate] [--fix-header] model_file
jubamodel --help'''
EPILOG = ' model_file input model file in format specified by --in-format'
# TODO: migrate to argparse (which must be added into dependency to support Python 2.6)
parser = _JubaModelOptionParser(add_help_option=False, usage=USAGE, epilog=EPILOG)
# arguments
parser.add_option('-i', '--in-format', choices=('auto','binary','json'), default='auto',
help='model input format (default: %default)')
parser.add_option('-o', '--out-format', choices=('text','binary','json'), default='text',
help='model output format (default: %default)')
parser.add_option('-O', '--output', type='str', default=None,
help='specify output file instead of stdout')
parser.add_option('-C', '--output-config', type='str', default=None,
help='specify output file of config extracted from model')
parser.add_option('-R', '--replace-config', type='str', default=None,
help='replace configuration in model with specified file')
parser.add_option('-Z', '--replace-version', type='str', default=None,
help='replace Jubatus version in model file')
parser.add_option('-f', '--no-validate', action='store_true', default=False,
help='disable validation of binary model files')
parser.add_option('-F', '--fix-header', action='store_true', default=False,
help='recompute CRC32 checksum and fix corrupt header if possible')
parser.add_option('-h', '--help', action='store_true', default=False,
help='show usage')
def print_usage():
print('JubaModel - Jubatus Low-Level Model Manipulation Tool')
print()
parser.print_help(get_stdio()[1]) # stdout
print()
print('Supported Formats:')
print(' IN_FORMAT: auto | binary | json')
print(' OUT_FORMAT: text | binary | json')
(args, files) = parser.parse_args(args)
# Failed to parse options.
if parser._error:
print_usage()
return 2
# Help option is specified.
if args.help:
print_usage()
return 0
# Validate parameters.
if len(files) == 0:
print('Error: no model file specified')
print_usage()
return 1
if len(files) != 1:
print('Error: cannot specify multiple model files at once')
print_usage()
return 1
if args.out_format == 'binary' and args.output is None:
print('Error: --output must be specified to output in binary format')
print_usage()
return 1
success = False
try:
cls.run(
target=files[0],
in_fmt=args.in_format,
out_fmt=args.out_format,
output=args.output,
output_config=args.output_config,
replace_config=args.replace_config,
replace_version=args.replace_version,
no_validate=args.no_validate,
fix_header=args.fix_header,
)
success = True
except JubaModelError as e:
print(e)
return 0 if success else 3
def _main():
"""
Entry point for ``jubamodel`` command.
"""
sys.exit(_JubaModelCommand.start(sys.argv[1:]))