/
document.py
741 lines (644 loc) · 26.6 KB
/
document.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
# SPDX-License-Identifier: LGPL-3.0-only
"""Representation of a collection of items."""
import os
import re
from collections import OrderedDict
from itertools import chain
from doorstop import common, settings
from doorstop.common import DoorstopError, DoorstopInfo, DoorstopWarning
from doorstop.core.base import (
BaseFileObject,
BaseValidatable,
add_document,
auto_load,
auto_save,
delete_document,
edit_document,
)
from doorstop.core.item import Item
from doorstop.core.types import UID, Level, Prefix
log = common.logger(__name__)
class Document(BaseValidatable, BaseFileObject): # pylint: disable=R0902
"""Represents a document directory containing an outline of items."""
CONFIG = '.doorstop.yml'
SKIP = '.doorstop.skip' # indicates this document should be skipped
ASSETS = 'assets'
INDEX = 'index.yml'
DEFAULT_PREFIX = Prefix('REQ')
DEFAULT_SEP = ''
DEFAULT_DIGITS = 3
def __init__(self, path, root=os.getcwd(), **kwargs):
"""Initialize a document from an exiting directory.
:param path: path to document directory
:param root: path to root of project
"""
super().__init__()
# Ensure the directory is valid
if not os.path.isfile(os.path.join(path, Document.CONFIG)):
relpath = os.path.relpath(path, root)
msg = "no {} in {}".format(Document.CONFIG, relpath)
raise DoorstopError(msg)
# Initialize the document
self.path = path
self.root = root
self.tree = kwargs.get('tree')
self.auto = kwargs.get('auto', Document.auto)
# Set default values
self._data['prefix'] = Document.DEFAULT_PREFIX
self._data['sep'] = Document.DEFAULT_SEP
self._data['digits'] = Document.DEFAULT_DIGITS
self._data['parent'] = None # the root document does not have a parent
self._items = []
self._itered = False
self.children = []
def __repr__(self):
return "Document('{}')".format(self.path)
def __str__(self):
if common.verbosity < common.STR_VERBOSITY:
return self.prefix
else:
return "{} ({})".format(self.prefix, self.relpath)
def __iter__(self):
yield from self._iter()
def __len__(self):
return len(list(i for i in self._iter() if i.active))
def __bool__(self):
"""Even empty documents should be considered truthy."""
return True
@staticmethod
@add_document
def new(
tree, path, root, prefix, sep=None, digits=None, parent=None, auto=None
): # pylint: disable=R0913,C0301
"""Create a new document.
:param tree: reference to tree that contains this document
:param path: path to directory for the new document
:param root: path to root of the project
:param prefix: prefix for the new document
:param sep: separator between prefix and numbers
:param digits: number of digits for the new document
:param parent: parent UID for the new document
:param auto: automatically save the document
:raises: :class:`~doorstop.common.DoorstopError` if the document
already exists
:return: new :class:`~doorstop.core.document.Document`
"""
# TODO: raise a specific exception for invalid separator characters?
assert not sep or sep in settings.SEP_CHARS
config = os.path.join(path, Document.CONFIG)
# Check for an existing document
if os.path.exists(config):
raise DoorstopError("document already exists: {}".format(path))
# Create the document directory
Document._create(config, name='document')
# Initialize the document
document = Document(path, root=root, tree=tree, auto=False)
document.prefix = prefix if prefix is not None else document.prefix
document.sep = sep if sep is not None else document.sep
document.digits = digits if digits is not None else document.digits
document.parent = parent if parent is not None else document.parent
if auto or (auto is None and Document.auto):
document.save()
# Return the document
return document
def load(self, reload=False):
"""Load the document's properties from its file."""
if self._loaded and not reload:
return
log.debug("loading {}...".format(repr(self)))
# Read text from file
text = self._read(self.config)
# Parse YAML data from text
data = self._load(text, self.config)
# Store parsed data
sets = data.get('settings', {})
for key, value in sets.items():
try:
if key == 'prefix':
self._data[key] = Prefix(value)
elif key == 'sep':
self._data[key] = value.strip()
elif key == 'parent':
self._data[key] = value.strip()
elif key == 'digits':
self._data[key] = int(value)
else:
msg = "unexpected document setting '{}' in: {}".format(
key, self.config
)
raise DoorstopError(msg)
except (AttributeError, TypeError, ValueError):
msg = "invalid value for '{}' in: {}".format(key, self.config)
raise DoorstopError(msg)
# Set meta attributes
self._loaded = True
if reload:
list(self._iter(reload=reload))
@edit_document
def save(self):
"""Save the document's properties to its file."""
log.debug("saving {}...".format(repr(self)))
# Format the data items
data = {}
sets = {}
for key, value in self._data.items():
if key == 'prefix':
sets[key] = str(value)
elif key == 'sep':
sets[key] = value
elif key == 'digits':
sets[key] = value
elif key == 'parent':
if value:
sets[key] = value
else:
data[key] = value
data['settings'] = sets
# Dump the data to YAML
text = self._dump(data)
# Save the YAML to file
self._write(text, self.config)
# Set meta attributes
self._loaded = False
self.auto = True
def _iter(self, reload=False):
"""Yield the document's items."""
if self._itered and not reload:
msg = "iterating document {}'s loaded items...".format(self)
log.debug(msg)
yield from list(self._items)
return
log.info("loading document {}'s items...".format(self))
# Reload the document's item
self._items = []
for dirpath, dirnames, filenames in os.walk(self.path):
for dirname in list(dirnames):
path = os.path.join(dirpath, dirname, Document.CONFIG)
if os.path.exists(path):
path = os.path.dirname(path)
dirnames.remove(dirname)
log.trace("skipped embedded document: {}".format(path))
for filename in filenames:
path = os.path.join(dirpath, filename)
try:
item = Item(self, path, root=self.root, tree=self.tree)
except DoorstopError:
pass # skip non-item files
else:
self._items.append(item)
if reload:
try:
item.load(reload=reload)
except Exception:
log.error("Unable to load: %s", item)
raise
if settings.CACHE_ITEMS and self.tree:
self.tree._item_cache[item.uid] = item # pylint: disable=W0212
log.trace("cached item: {}".format(item))
# Set meta attributes
self._itered = True
# Yield items
yield from list(self._items)
def copy_assets(self, dest):
"""Copy the contents of the assets directory."""
if not self.assets:
return
common.copy_dir_contents(self.assets, dest)
# properties #############################################################
@property
def config(self):
"""Get the path to the document's file."""
return os.path.join(self.path, Document.CONFIG)
@property
def assets(self):
"""Get the path to the document's assets if they exist else `None`."""
path = os.path.join(self.path, Document.ASSETS)
return path if os.path.isdir(path) else None
@property
@auto_load
def prefix(self):
"""Get the document's prefix."""
return self._data['prefix']
@prefix.setter
@auto_save
@auto_load
def prefix(self, value):
"""Set the document's prefix."""
self._data['prefix'] = Prefix(value)
# TODO: should the new prefix be applied to all items?
@property
@auto_load
def sep(self):
"""Get the prefix-number separator to use for new item UIDs."""
return self._data['sep']
@sep.setter
@auto_save
@auto_load
def sep(self, value):
"""Set the prefix-number separator to use for new item UIDs."""
# TODO: raise a specific exception for invalid separator characters?
assert not value or value in settings.SEP_CHARS
self._data['sep'] = value.strip()
# TODO: should the new separator be applied to all items?
@property
@auto_load
def digits(self):
"""Get the number of digits to use for new item UIDs."""
return self._data['digits']
@digits.setter
@auto_save
@auto_load
def digits(self, value):
"""Set the number of digits to use for new item UIDs."""
self._data['digits'] = value
# TODO: should the new digits be applied to all items?
@property
@auto_load
def parent(self):
"""Get the document's parent document prefix."""
return self._data['parent']
@parent.setter
@auto_save
@auto_load
def parent(self, value):
"""Set the document's parent document prefix."""
self._data['parent'] = str(value) if value else ""
@property
def items(self):
"""Get an ordered list of active items in the document."""
return sorted(i for i in self._iter() if i.active)
@property
def depth(self):
"""Return the maximum item level depth."""
return max(item.depth for item in self)
@property
def next_number(self):
"""Get the next item number for the document."""
try:
number = max(item.number for item in self) + 1
except ValueError:
number = 1
log.debug("next number (local): {}".format(number))
if self.tree and self.tree.request_next_number:
remote_number = 0
while remote_number is not None and remote_number < number:
if remote_number:
log.warning("server is behind, requesting next number...")
remote_number = self.tree.request_next_number(self.prefix)
log.debug("next number (remote): {}".format(remote_number))
if remote_number:
number = remote_number
return number
@property
def skip(self):
"""Indicate the document should be skipped."""
return os.path.isfile(os.path.join(self.path, Document.SKIP))
@property
def index(self):
"""Get the path to the document's index if it exists else `None`."""
path = os.path.join(self.path, Document.INDEX)
return path if os.path.isfile(path) else None
@index.setter
def index(self, value):
"""Create or update the document's index."""
if value:
path = os.path.join(self.path, Document.INDEX)
log.info("creating {} index...".format(self))
common.write_lines(self._lines_index(self.items), path)
@index.deleter
def index(self):
"""Delete the document's index if it exists."""
log.info("deleting {} index...".format(self))
common.delete(self.index)
# actions ################################################################
# decorators are applied to methods in the associated classes
def add_item(self, number=None, level=None, reorder=True):
"""Create a new item for the document and return it.
:param number: desired item number
:param level: desired item level
:param reorder: update levels of document items
:return: added :class:`~doorstop.core.item.Item`
"""
number = max(number or 0, self.next_number)
log.debug("next number: {}".format(number))
try:
last = self.items[-1]
except IndexError:
next_level = level
else:
if level:
next_level = level
elif last.level.heading:
next_level = last.level >> 1
next_level.heading = False
else:
next_level = last.level + 1
log.debug("next level: {}".format(next_level))
uid = UID(self.prefix, self.sep, number, self.digits)
item = Item.new(self.tree, self, self.path, self.root, uid, level=next_level)
if level and reorder:
self.reorder(keep=item)
return item
# decorators are applied to methods in the associated classes
def remove_item(self, value, reorder=True):
"""Remove an item by its UID.
:param value: item or UID
:param reorder: update levels of document items
:raises: :class:`~doorstop.common.DoorstopError` if the item
cannot be found
:return: removed :class:`~doorstop.core.item.Item`
"""
uid = UID(value)
item = self.find_item(uid)
item.delete()
if reorder:
self.reorder()
return item
# decorators are applied to methods in the associated classes
def reorder(self, manual=True, automatic=True, start=None, keep=None, _items=None):
"""Reorder a document's items.
Two methods are using to create the outline order:
- manual: specify the order using an updated index file
- automatic: shift duplicate levels and compress gaps
:param manual: enable manual ordering using the index (if one exists)
:param automatic: enable automatic ordering (after manual ordering)
:param start: level to start numbering (None = use current start)
:param keep: item or UID to keep over duplicates
"""
# Reorder manually
if manual and self.index:
log.info("reordering {} from index...".format(self))
self._reorder_from_index(self, self.index)
del self.index
# Reorder automatically
if automatic:
log.info("reordering {} automatically...".format(self))
items = _items or self.items
keep = self.find_item(keep) if keep else None
self._reorder_automatic(items, start=start, keep=keep)
@staticmethod
def _lines_index(items):
"""Generate (pseudo) YAML lines for the document index."""
yield '#' * settings.MAX_LINE_LENGTH
yield '# THIS TEMPORARY FILE WILL BE DELETED AFTER DOCUMENT REORDERING'
yield '# MANUALLY INDENT, DEDENT, & MOVE ITEMS TO THEIR DESIRED LEVEL'
yield '# A NEW ITEM WILL BE ADDED FOR ANY UNKNOWN IDS, i.e. - new: '
yield '# THE COMMENT WILL BE USED AS THE ITEM TEXT FOR NEW ITEMS'
yield '# CHANGES WILL BE REFLECTED IN THE ITEM FILES AFTER CONFIRMATION'
yield '#' * settings.MAX_LINE_LENGTH
yield ''
yield "initial: {}".format(items[0].level if items else 1.0)
yield "outline:"
for item in items:
space = " " * item.depth
lines = item.text.strip().splitlines()
comment = lines[0] if lines else ""
line = space + "- {u}: # {c}".format(u=item.uid, c=comment)
if len(line) > settings.MAX_LINE_LENGTH:
line = line[: settings.MAX_LINE_LENGTH - 3] + '...'
yield line
@staticmethod
def _read_index(path):
"""Load the index, converting comments to text entries for each item."""
with open(path, 'r', encoding='utf-8') as stream:
text = stream.read()
yaml_text = []
for line in text.split('\n'):
m = re.search(r'(\s+)(- [\w\d-]+\s*): # (.+)$', line)
if m:
prefix = m.group(1)
uid = m.group(2)
item_text = m.group(3).replace('"', '\\"')
yaml_text.append('{p}{u}:'.format(p=prefix, u=uid))
yaml_text.append(' {p}- text: "{t}"'.format(p=prefix, t=item_text))
else:
yaml_text.append(line)
return common.load_yaml('\n'.join(yaml_text), path)
@staticmethod
def _reorder_from_index(document, path):
"""Reorder a document's item from the index."""
data = document._read_index(path) # pylint: disable=protected-access
# Read updated values
initial = data.get('initial', 1.0)
outline = data.get('outline', [])
# Update levels
level = Level(initial)
ids_after_reorder = []
Document._reorder_section(outline, level, document, ids_after_reorder)
for item in document.items:
if item.uid not in ids_after_reorder:
log.info('Deleting %s', item.uid)
item.delete()
@staticmethod
def _reorder_section(section, level, document, list_of_ids):
"""Recursive function to reorder a section of an outline.
:param section: recursive `list` of `dict` loaded from document index
:param level: current :class:`~doorstop.core.types.Level`
:param document: :class:`~doorstop.core.document.Document` to order
"""
if isinstance(section, dict): # a section
# Get the item and subsection
uid = list(section.keys())[0]
if uid == 'text':
return
subsection = section[uid]
# An item is a header if it has a subsection
level.heading = False
item_text = ''
if isinstance(subsection, str):
item_text = subsection
elif isinstance(subsection, list):
if 'text' in subsection[0]:
item_text = subsection[0]['text']
if len(subsection) > 1:
level.heading = True
try:
item = document.find_item(uid)
item.level = level
log.info("Found ({}): {}".format(uid, level))
list_of_ids.append(uid)
except DoorstopError:
item = document.add_item(level=level, reorder=False)
list_of_ids.append(item.uid)
if level.heading:
item.normative = False
item.text = item_text
log.info("Created ({}): {}".format(item.uid, level))
# Process the heading's subsection
if subsection:
Document._reorder_section(subsection, level >> 1, document, list_of_ids)
elif isinstance(section, list): # a list of sections
# Process each subsection
for index, subsection in enumerate(section):
Document._reorder_section(
subsection, level + index, document, list_of_ids
)
@staticmethod
def _reorder_automatic(items, start=None, keep=None):
"""Reorder a document's items automatically.
:param items: items to reorder
:param start: level to start numbering (None = use current start)
:param keep: item to keep over duplicates
"""
nlevel = plevel = None
for clevel, item in Document._items_by_level(items, keep=keep):
log.debug("current level: {}".format(clevel))
# Determine the next level
if not nlevel:
# Use the specified or current starting level
nlevel = Level(start) if start else clevel
nlevel.heading = clevel.heading
log.debug("next level (start): {}".format(nlevel))
else:
# Adjust the next level to be the same depth
if len(clevel) > len(nlevel):
nlevel >>= len(clevel) - len(nlevel)
log.debug("matched current indent: {}".format(nlevel))
elif len(clevel) < len(nlevel):
nlevel <<= len(nlevel) - len(clevel)
# nlevel += 1
log.debug("matched current dedent: {}".format(nlevel))
nlevel.heading = clevel.heading
# Check for a level jump
_size = min(len(clevel.value), len(plevel.value))
for index in range(max(_size - 1, 1)):
if clevel.value[index] > plevel.value[index]:
nlevel <<= len(nlevel) - 1 - index
nlevel += 1
nlevel >>= len(clevel) - len(nlevel)
msg = "next level (jump): {}".format(nlevel)
log.debug(msg)
break
# Check for a normal increment
else:
if len(nlevel) <= len(plevel):
nlevel += 1
msg = "next level (increment): {}".format(nlevel)
log.debug(msg)
else:
msg = "next level (indent/dedent): {}".format(nlevel)
log.debug(msg)
# Apply the next level
if clevel == nlevel:
log.info("{}: {}".format(item, clevel))
else:
log.info("{}: {} to {}".format(item, clevel, nlevel))
item.level = nlevel.copy()
# Save the current level as the previous level
plevel = clevel.copy()
@staticmethod
def _items_by_level(items, keep=None):
"""Iterate through items by level with the kept item first."""
# Collect levels
levels = OrderedDict()
for item in items:
if item.level in levels:
levels[item.level].append(item)
else:
levels[item.level] = [item]
# Reorder levels
for level, items_at_level in levels.items():
# Reorder items at this level
if keep in items_at_level:
# move the kept item to the front of the list
log.debug("keeping {} level over duplicates".format(keep))
items_at_level.remove(keep)
items_at_level.insert(0, keep)
for item in items_at_level:
yield level, item
def find_item(self, value, _kind=''):
"""Return an item by its UID.
:param value: item or UID
:raises: :class:`~doorstop.common.DoorstopError` if the item
cannot be found
:return: matching :class:`~doorstop.core.item.Item`
"""
uid = UID(value)
for item in self:
if item.uid == uid:
if item.active:
return item
else:
log.trace("item is inactive: {}".format(item))
raise DoorstopError("no matching{} UID: {}".format(_kind, uid))
def get_issues(
self, skip=None, document_hook=None, item_hook=None
): # pylint: disable=unused-argument
"""Yield all the document's issues.
:param skip: list of document prefixes to skip
:param item_hook: function to call for custom item validation
:return: generator of :class:`~doorstop.common.DoorstopError`,
:class:`~doorstop.common.DoorstopWarning`,
:class:`~doorstop.common.DoorstopInfo`
"""
assert document_hook is None
skip = [] if skip is None else skip
hook = item_hook if item_hook else lambda **kwargs: []
if self.prefix in skip:
log.info("skipping document %s...", self)
return
else:
log.info("checking document %s...", self)
# Check for items
items = self.items
if not items:
yield DoorstopWarning("no items")
return
# Reorder or check item levels
if settings.REORDER:
self.reorder(_items=items)
elif settings.CHECK_LEVELS:
yield from self._get_issues_level(items)
# Check each item
for item in items:
# Check item
for issue in chain(
hook(item=item, document=self, tree=self.tree),
item.get_issues(skip=skip),
):
# Prepend the item's UID to yielded exceptions
if isinstance(issue, Exception):
yield type(issue)("{}: {}".format(item.uid, issue))
@staticmethod
def _get_issues_level(items):
"""Yield all the document's issues related to item level."""
prev = items[0] if items else None
for item in items[1:]:
puid = prev.uid
plev = prev.level
nuid = item.uid
nlev = item.level
log.debug("checking level {} to {}...".format(plev, nlev))
# Duplicate level
if plev == nlev:
uids = sorted((puid, nuid))
msg = "duplicate level: {} ({}, {})".format(plev, *uids)
yield DoorstopWarning(msg)
# Skipped level
length = min(len(plev.value), len(nlev.value))
for index in range(length):
# Types of skipped levels:
# 1. over: 1.0 --> 1.2
# 2. out: 1.1 --> 3.0
if (
nlev.value[index] - plev.value[index] > 1
or
# 3. over and out: 1.1 --> 2.2
(
plev.value[index] != nlev.value[index]
and index + 1 < length
and nlev.value[index + 1] not in (0, 1)
)
):
msg = "skipped level: {} ({}), {} ({})".format(
plev, puid, nlev, nuid
)
yield DoorstopInfo(msg)
break
prev = item
@delete_document
def delete(self, path=None):
"""Delete the document and its items."""
for item in self:
item.delete()
# the document is deleted in the decorated method