This repository has been archived by the owner on Oct 13, 2021. It is now read-only.
/
build.py
679 lines (569 loc) · 25.7 KB
/
build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
from datetime import datetime
from errno import ENOENT
from fnmatch import fnmatchcase
from itertools import chain, izip, repeat
import os
from os import stat, makedirs
from os.path import islink, relpath, join, split
from shutil import rmtree
import subprocess
import sys
from sys import exc_info
from traceback import format_exc
from uuid import uuid1
from binaryornot.helpers import is_binary_string
from concurrent.futures import as_completed, ProcessPoolExecutor
from click import progressbar
from flask import current_app
from funcy import ichunks, first, suppress
from pyelasticsearch import (ElasticSearch, ElasticHttpNotFoundError,
IndexAlreadyExistsError, bulk_chunks, Timeout,
ConnectionError)
from dxr.app import make_app, dictify_links
from dxr.config import FORMAT
from dxr.es import UNINDEXED_STRING, UNANALYZED_STRING, TREE, create_index_and_wait
from dxr.exceptions import BuildError
from dxr.filters import LINE, FILE
from dxr.lines import es_lines, finished_tags
from dxr.mime import decode_data, is_binary_image
from dxr.utils import (open_log, deep_update, append_update,
append_update_by_line, append_by_line, bucket, split_content_lines)
from dxr.vcs import VcsCache
def full_traceback(callable, *args, **kwargs):
"""Work around the wretched exception reporting of concurrent.futures.
Futures generally gives no access to the traceback of the task; you get
only a traceback into the guts of futures, plus the description line of
the task's traceback. We jam the full traceback of any exception that
occurs into the message of the exception: disgusting but informative.
"""
try:
return callable(*args, **kwargs)
except Exception:
raise Exception(format_exc())
def index_and_deploy_tree(tree, verbose=False):
"""Index a tree, and make it accessible.
:arg tree: The TreeConfig of the tree to build
"""
config = tree.config
es = ElasticSearch(config.es_hosts,
timeout=config.es_indexing_timeout,
max_retries=config.es_indexing_retries)
index_name = index_tree(tree, es, verbose=verbose)
if 'index' not in tree.config.skip_stages:
deploy_tree(tree, es, index_name)
def deploy_tree(tree, es, index_name):
"""Point the ES aliases and catalog records to a newly built tree, and
delete any obsoleted index.
"""
config = tree.config
# Make new index live:
alias = config.es_alias.format(format=FORMAT, tree=tree.name)
swap_alias(alias, index_name, es)
# Create catalog index if it doesn't exist.
try:
create_index_and_wait(
es,
config.es_catalog_index,
settings={
'settings': {
'index': {
# Fewer should be faster:
'number_of_shards': 1,
# This should be cranked up until it's on all nodes,
# so it's always a fast read:
'number_of_replicas': config.es_catalog_replicas
},
},
'mappings': {
TREE: {
'_all': {
'enabled': False
},
'properties': {
'name': UNANALYZED_STRING,
'format': UNANALYZED_STRING,
# In case es_alias changes in the conf file:
'es_alias': UNINDEXED_STRING,
# Needed so new trees or edited descriptions can show
# up without a WSGI restart:
'description': UNINDEXED_STRING,
# ["clang", "pygmentize"]:
'enabled_plugins': UNINDEXED_STRING,
'generated_date': UNINDEXED_STRING
# We may someday also need to serialize some plugin
# configuration here.
}
}
}
})
except IndexAlreadyExistsError:
pass
# Insert or update the doc representing this tree. There'll be a little
# race between this and the alias swap. We'll live.
es.index(config.es_catalog_index,
doc_type=TREE,
doc=dict(name=tree.name,
format=FORMAT,
es_alias=alias,
description=tree.description,
enabled_plugins=[p.name for p in tree.enabled_plugins],
generated_date=config.generated_date),
id='%s/%s' % (FORMAT, tree.name))
def swap_alias(alias, index, es):
"""Point an ES alias to a new index, and delete the old index.
:arg index: The new index name
"""
# Get the index the alias currently points to.
old_index = first(es.aliases(alias))
# Make the alias point to the new index.
removal = ([{'remove': {'index': old_index, 'alias': alias}}] if
old_index else [])
es.update_aliases(removal + [{'add': {'index': index, 'alias': alias}}]) # atomic
# Delete the old index.
if old_index:
es.delete_index(old_index)
def index_tree(tree, es, verbose=False):
"""Index a single tree into ES and the filesystem, and return the
name of the new ES index.
"""
config = tree.config
def new_pool():
return ProcessPoolExecutor(max_workers=config.workers)
def farm_out(method_name):
"""Farm out a call to all tree indexers across a process pool.
Return the tree indexers, including anything mutations the method call
might have made.
Show progress while doing it.
"""
if not config.workers:
return [save_scribbles(ti, method_name) for ti in tree_indexers]
else:
futures = [pool.submit(full_traceback, save_scribbles, ti, method_name)
for ti in tree_indexers]
return [future.result() for future in
show_progress(futures, 'Running %s' % method_name)]
def delete_index_quietly(es, index):
"""Delete an index, and ignore any error.
This cannot be done inline in the except clause below, because, even
if we catch this exception, it spoils the exception info in that
scope, making the bare ``raise`` raise the not-found error rather than
whatever went wrong earlier.
"""
try:
es.delete_index(index)
except Exception:
pass
print "Starting tree '%s'." % tree.name
# Note starting time
start_time = datetime.now()
skip_indexing = 'index' in config.skip_stages
skip_build = 'build' in config.skip_stages
skip_cleanup = skip_indexing or skip_build or 'clean' in config.skip_stages
# Create and/or clear out folders:
ensure_folder(tree.object_folder, tree.source_folder != tree.object_folder)
ensure_folder(tree.temp_folder, not skip_cleanup)
ensure_folder(tree.log_folder, not skip_cleanup)
ensure_folder(join(tree.temp_folder, 'plugins'), not skip_cleanup)
for plugin in tree.enabled_plugins:
ensure_folder(join(tree.temp_folder, 'plugins', plugin.name),
not skip_cleanup)
vcs_cache = VcsCache(tree)
tree_indexers = [p.tree_to_index(p.name, tree, vcs_cache) for p in
tree.enabled_plugins if p.tree_to_index]
try:
if not skip_indexing:
# Substitute the format, tree name, and uuid into the index identifier.
index = tree.es_index.format(format=FORMAT,
tree=tree.name,
unique=uuid1())
create_index_and_wait(
es,
index,
settings={
'settings': {
'index': {
'number_of_shards': tree.es_shards, # Fewer should be faster, assuming enough RAM.
'number_of_replicas': 0 # for speed
},
# Default analyzers and mappings are in the core plugin.
'analysis': reduce(
deep_update,
(p.analyzers for p in tree.enabled_plugins),
{}),
# DXR indices are immutable once built. Turn the
# refresh interval down to keep the segment count low
# while indexing. It will make for less merging later.
# We could also simply call "optimize" after we're
# done indexing, but it is unthrottled; we'd have to
# use shard allocation to do the indexing on one box
# and then move it elsewhere for actual use.
'refresh_interval':
'%is' % config.es_refresh_interval
},
'mappings': reduce(deep_update,
(p.mappings for p in
tree.enabled_plugins),
{})
})
else:
index = None
print "Skipping indexing (due to 'index' in 'skip_stages')"
# Run pre-build hooks:
with new_pool() as pool:
tree_indexers = farm_out('pre_build')
# Tear down pool to let the build process use more RAM.
if not skip_build:
# Set up env vars, and build:
build_tree(tree, tree_indexers, verbose)
else:
print "Skipping rebuild (due to 'build' in 'skip_stages')"
# Post-build, and index files:
if not skip_indexing:
with new_pool() as pool:
tree_indexers = farm_out('post_build')
index_files(tree, tree_indexers, index, pool, es)
# refresh() times out in prod. Wait until it doesn't. That
# probably means things are ready to rock again.
with aligned_progressbar(repeat(None), label='Refreshing index') as bar:
for _ in bar:
try:
es.refresh(index=index)
except (ConnectionError, Timeout) as exc:
pass
else:
break
es.update_settings(
index,
{
'settings': {
'index': {
'number_of_replicas': 1 # fairly arbitrary
}
}
})
except Exception as exc:
# If anything went wrong, delete the index, because we're not
# going to have a way of returning its name if we raise an
# exception.
if not skip_indexing:
delete_index_quietly(es, index)
raise
print "Finished '%s' in %s." % (tree.name, datetime.now() - start_time)
if not skip_cleanup:
# By default, we remove the temp files, because they're huge.
rmtree(tree.temp_folder)
return index
def aligned_progressbar(*args, **kwargs):
"""Fall through to click's progress bar, but line up all the bars so they
aren't askew."""
return progressbar(
*args, bar_template='%(label)-18s [%(bar)s] %(info)s', **kwargs)
def show_progress(futures, message):
"""Show progress and yield results as futures complete."""
with aligned_progressbar(as_completed(futures),
length=len(futures),
show_eta=False, # never even close
label=message) as bar:
for future in bar:
yield future
def save_scribbles(obj, method):
"""Call obj.method(), then return obj and the result so the master process
can see anything method() scribbled on it.
This is meant to run in a remote process.
"""
getattr(obj, method)()
return obj
def ensure_folder(folder, clean=False):
"""Ensure the existence of a folder.
:arg clean: Whether to ensure that the folder is empty
"""
if clean and os.path.isdir(folder):
rmtree(folder)
if not os.path.isdir(folder):
makedirs(folder)
def _unignored_folders(folders, source_path, ignore_filenames, ignore_paths):
"""Yield the folders from ``folders`` which are not ignored by the given
patterns and paths.
:arg source_path: Relative path to the source directory
:arg ignore_filenames: Filename-based globs to be ignored
:arg ignore_paths: Path-based globs to be ignored
"""
for folder in folders:
if not any(fnmatchcase(folder, p) for p in ignore_filenames):
folder_path = '/' + join(source_path, folder).replace(os.sep, '/') + '/'
if not any(fnmatchcase(folder_path, p) for p in ignore_paths):
yield folder
def unicode_contents(path, encoding_guess): # TODO: Make accessible to TreeToIndex.post_build.
"""Return the unicode contents of a file if we can figure out a decoding,
or else None.
:arg path: A sufficient path to the file
:arg encoding_guess: A guess at the encoding of the file, to be applied if
it seems to be text
"""
# Read the binary contents of the file.
with open(path, 'rb') as source_file:
initial_portion = source_file.read(4096)
if not is_binary_string(initial_portion):
# Move the cursor back to the start of the file.
source_file.seek(0)
decoded, contents = decode_data(source_file.read(),
encoding_guess,
can_be_binary=False)
if decoded:
return contents
def unignored(folder, ignore_paths, ignore_filenames, want_folders=False):
"""Return an iterable of absolute paths to unignored source tree files or
the folders that contain them.
Returned files include both binary and text ones.
:arg want_folders: If falsey, return files. If truthy, return folders
instead.
"""
def raise_(exc):
raise exc
# TODO: Expose a lot of pieces of this as routines plugins can call.
for root, folders, files in os.walk(folder, topdown=True, onerror=raise_):
# Find relative path
rel_path = relpath(root, folder)
if rel_path == '.':
rel_path = ''
if not want_folders:
for f in files:
# Ignore file if it matches an ignore pattern
if any(fnmatchcase(f, e) for e in ignore_filenames):
continue # Ignore the file.
path = join(rel_path, f)
# Ignore file if its path (relative to the root) matches an
# ignore path.
if any(fnmatchcase("/" + path.replace(os.sep, "/"), e) for e in ignore_paths):
continue # Ignore the file.
yield join(root, f)
# Exclude folders that match an ignore pattern.
# os.walk listens to any changes we make in `folders`.
folders[:] = _unignored_folders(
folders, rel_path, ignore_filenames, ignore_paths)
if want_folders:
for f in folders:
yield join(root, f)
def index_file(tree, tree_indexers, path, es, index):
"""Index a single file into ES, and build a static HTML representation of it.
For the moment, we execute plugins in series, figuring that we have plenty
of files to keep our processors busy in most trees that take very long. I'm
a little afraid of the cost of passing potentially large TreesToIndex to
worker processes. That goes at 52MB/s on my OS X laptop, measuring by the
size of the pickled object and including the pickling and unpickling time.
:arg path: Absolute path to the file to index
:arg index: The ES index name
"""
try:
contents = unicode_contents(path, tree.source_encoding)
except IOError as exc:
if exc.errno == ENOENT and islink(path):
# It's just a bad symlink (or a symlink that was swiped out
# from under us--whatever)
return
else:
raise
rel_path = relpath(path, tree.source_folder)
is_text = isinstance(contents, unicode)
is_link = islink(path)
# Index by line if the contents are text and the path is not a symlink.
index_by_line = is_text and not is_link
if index_by_line:
lines = split_content_lines(contents)
num_lines = len(lines)
needles_by_line = [{} for _ in xrange(num_lines)]
annotations_by_line = [[] for _ in xrange(num_lines)]
refses, regionses = [], []
needles = {}
linkses = []
for tree_indexer in tree_indexers:
file_to_index = tree_indexer.file_to_index(rel_path, contents)
if file_to_index.is_interesting():
# Per-file stuff:
append_update(needles, file_to_index.needles())
if not is_link:
linkses.append(file_to_index.links())
# Per-line stuff:
if index_by_line:
refses.append(file_to_index.refs())
regionses.append(file_to_index.regions())
append_update_by_line(needles_by_line,
file_to_index.needles_by_line())
append_by_line(annotations_by_line,
file_to_index.annotations_by_line())
def docs():
"""Yield documents for bulk indexing.
Big Warning: docs also clears the contents of all elements of
needles_by_line because they will no longer be used.
"""
# Index a doc of type 'file' so we can build folder listings.
# At the moment, we send to ES in the same worker that does the
# indexing. We could interpose an external queueing system, but I'm
# willing to potentially sacrifice a little speed here for the easy
# management of self-throttling.
file_info = stat(path)
folder_name, file_name = split(rel_path)
# Hard-code the keys that are hard-coded in the browse()
# controller. Merge with the pluggable ones from needles:
doc = dict(# Some non-array fields:
folder=folder_name,
name=file_name,
size=file_info.st_size,
modified=datetime.fromtimestamp(file_info.st_mtime),
is_folder=False,
# And these, which all get mashed into arrays:
**needles)
links = dictify_links(chain.from_iterable(linkses))
if links:
doc['links'] = links
yield es.index_op(doc, doc_type=FILE)
# Index all the lines.
if index_by_line:
for total, annotations_for_this_line, tags in izip(
needles_by_line,
annotations_by_line,
es_lines(finished_tags(lines,
chain.from_iterable(refses),
chain.from_iterable(regionses)))):
# Duplicate the file-wide needles into this line:
total.update(needles)
# We bucket tags into refs and regions for ES because later at
# request time we want to be able to merge them individually
# with those from skimmers.
refs_and_regions = bucket(tags, lambda index_obj: "regions" if
isinstance(index_obj['payload'], basestring) else
"refs")
if 'refs' in refs_and_regions:
total['refs'] = refs_and_regions['refs']
if 'regions' in refs_and_regions:
total['regions'] = refs_and_regions['regions']
if annotations_for_this_line:
total['annotations'] = annotations_for_this_line
yield es.index_op(total)
# Because needles_by_line holds a reference, total is not
# garbage collected. Since we won't use it again, we can clear
# the contents, saving substantial memory on long files.
total.clear()
# Indexing a 277K-line file all in one request makes ES time out (>60s),
# so we chunk it up. 300 docs is optimal according to the benchmarks in
# https://bugzilla.mozilla.org/show_bug.cgi?id=1122685. So large docs like
# images don't make our chunk sizes ridiculous, there's a size ceiling as
# well: 10000 is based on the 300 and an average of 31 chars per line.
for chunk in bulk_chunks(docs(), docs_per_chunk=300, bytes_per_chunk=10000):
es.bulk(chunk, index=index, doc_type=LINE)
def index_chunk(tree,
tree_indexers,
paths,
index,
swallow_exc=False,
worker_number=None):
"""Index a pile of files.
This is the entrypoint for indexer pool workers.
:arg worker_number: A unique number assigned to this worker so it knows
what to call its log file
"""
path = '(no file yet)'
try:
# So we can use Flask's url_from():
with make_app(tree.config).test_request_context():
es = current_app.es
try:
# Don't log if single-process:
log = (worker_number and
open_log(tree.log_folder,
'index-chunk-%s.log' % worker_number))
for path in paths:
log and log.write('Starting %s.\n' % path)
index_file(tree, tree_indexers, path, es, index)
log and log.write('Finished chunk.\n')
finally:
log and log.close()
except Exception as exc:
if swallow_exc:
type, value, traceback = exc_info()
return format_exc(), type, value, path
else:
raise
def index_folders(tree, index, es):
"""Index the folder hierarchy into ES."""
with aligned_progressbar(unignored(tree.source_folder,
tree.ignore_paths,
tree.ignore_filenames,
want_folders=True),
show_eta=False, # never even close
label='Indexing folders') as folders:
for folder in folders:
rel_path = relpath(folder, tree.source_folder)
superfolder_path, folder_name = split(rel_path)
es.index(index, FILE, {
'path': [rel_path], # array for consistency with non-folder file docs
'folder': superfolder_path,
'name': folder_name,
'is_folder': True})
def index_files(tree, tree_indexers, index, pool, es):
"""Divide source files into groups, and send them out to be indexed."""
def path_chunks(tree):
"""Return an iterable of worker-sized iterables of paths."""
return ichunks(500, unignored(tree.source_folder,
tree.ignore_paths,
tree.ignore_filenames))
index_folders(tree, index, es)
if not tree.config.workers:
for paths in path_chunks(tree):
index_chunk(tree,
tree_indexers,
paths,
index,
swallow_exc=False)
else:
futures = [pool.submit(index_chunk,
tree,
tree_indexers,
paths,
index,
worker_number=worker_number,
swallow_exc=True)
for worker_number, paths in enumerate(path_chunks(tree), 1)]
for future in show_progress(futures, 'Indexing files'):
result = future.result()
if result:
formatted_tb, type, value, path = result
print 'A worker failed while indexing %s:' % path
print formatted_tb
# Abort everything if anything fails:
raise type, value # exits with non-zero
def _fill_and_write_template(jinja_env, template_name, out_path, vars):
"""Get the template `template_name` from the template folder, substitute in
`vars`, and write the result to `out_path`."""
template = jinja_env.get_template(template_name)
template.stream(**vars).dump(out_path, encoding='utf-8')
def build_tree(tree, tree_indexers, verbose):
"""Set up env vars, and run the build command."""
if not tree.build_command:
return
# Set up build environment variables:
environ = os.environ.copy()
for ti in tree_indexers:
environ.update(ti.environment(environ))
# Call make or whatever:
with open_log(tree.log_folder, 'build.log', verbose) as log:
print 'Building tree'
workers = max(tree.config.workers, 1)
r = subprocess.call(
tree.build_command.replace('$jobs', str(workers))
.format(workers=workers),
shell = True,
stdout = log,
stderr = log,
env = environ,
cwd = tree.object_folder
)
# Abort if build failed:
if r != 0:
print >> sys.stderr, ("Build command for '%s' failed, exited non-zero."
% tree.name)
if not verbose:
print >> sys.stderr, 'Log follows:'
with open(log.name) as log_file:
print >> sys.stderr, ' | %s ' % ' | '.join(log_file)
raise BuildError