-
Notifications
You must be signed in to change notification settings - Fork 0
/
isolateserver.py
executable file
·1795 lines (1529 loc) · 60.7 KB
/
isolateserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Copyright 2013 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Archives a set of files or directories to an Isolate Server."""
__version__ = '0.9.0'
import collections
import errno
import functools
import logging
import optparse
import os
import re
import signal
import stat
import sys
import tarfile
import threading
import time
import zlib
from utils import tools
tools.force_local_third_party()
# third_party/
import colorama
from depot_tools import fix_encoding
from depot_tools import subcommand
import six
from six.moves import queue as Queue
# pylint: disable=ungrouped-imports
import auth
import isolated_format
import isolate_storage
import local_caching
from utils import file_path
from utils import fs
from utils import logging_utils
from utils import net
from utils import on_error
from utils import subprocess42
from utils import threading_utils
# Version of isolate protocol passed to the server in /handshake request.
ISOLATE_PROTOCOL_VERSION = '1.0'
# Maximum expected delay (in seconds) between successive file fetches or uploads
# in Storage. If it takes longer than that, a deadlock might be happening
# and all stack frames for all threads are dumped to log.
DEADLOCK_TIMEOUT = 5 * 60
# The number of files to check the isolate server per /pre-upload query.
# All files are sorted by likelihood of a change in the file content
# (currently file size is used to estimate this: larger the file -> larger the
# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
# and so on. Numbers here is a trade-off; the more per request, the lower the
# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
# larger values cause longer lookups, increasing the initial latency to start
# uploading, which is especially an issue for large files. This value is
# optimized for the "few thousands files to look up with minimal number of large
# files missing" case.
ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
# A list of already compressed extension types that should not receive any
# compression before being uploaded.
ALREADY_COMPRESSED_TYPES = [
'7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
'png', 'wav', 'zip',
]
# The delay (in seconds) to wait between logging statements when retrieving
# the required files. This is intended to let the user (or buildbot) know that
# the program is still running.
DELAY_BETWEEN_UPDATES_IN_SECS = 30
DEFAULT_BLACKLIST = (
# Temporary vim or python files.
r'^.+\.(?:pyc|swp)$',
# .git or .svn directory.
r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
)
class Error(Exception):
"""Generic runtime error."""
pass
class Aborted(Error):
"""Operation aborted."""
pass
class AlreadyExists(Error):
"""File already exists."""
def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
"""Yields file content in chunks of |chunk_size| starting from |offset|."""
with fs.open(path, 'rb') as f:
if offset:
f.seek(offset)
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def fileobj_path(fileobj):
"""Return file system path for file like object or None.
The returned path is guaranteed to exist and can be passed to file system
operations like copy.
"""
name = getattr(fileobj, 'name', None)
if name is None:
return None
# If the file like object was created using something like open("test.txt")
# name will end up being a str (such as a function outside our control, like
# the standard library). We want all our paths to be unicode objects, so we
# decode it.
if not isinstance(name, six.text_type):
# We incorrectly assume that UTF-8 is used everywhere.
name = name.decode('utf-8')
# fs.exists requires an absolute path, otherwise it will fail with an
# assertion error.
if not os.path.isabs(name):
return None
if fs.exists(name):
return name
return None
# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
# wrappers have been created.
def fileobj_copy(
dstfileobj, srcfileobj, size=-1,
chunk_size=isolated_format.DISK_FILE_CHUNK):
"""Copy data from srcfileobj to dstfileobj.
Providing size means exactly that amount of data will be copied (if there
isn't enough data, an IOError exception is thrown). Otherwise all data until
the EOF marker will be copied.
"""
if size == -1 and hasattr(srcfileobj, 'tell'):
if srcfileobj.tell() != 0:
raise IOError('partial file but not using size')
written = 0
while written != size:
readsize = chunk_size
if size > 0:
readsize = min(readsize, size-written)
data = srcfileobj.read(readsize)
if not data:
if size == -1:
break
raise IOError('partial file, got %s, wanted %s' % (written, size))
dstfileobj.write(data)
written += len(data)
def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
"""Put srcfileobj at the given dstpath with given mode.
The function aims to do this as efficiently as possible while still allowing
any possible file like object be given.
Creating a tree of hardlinks has a few drawbacks:
- tmpfs cannot be used for the scratch space. The tree has to be on the same
partition as the cache.
- involves a write to the inode, which advances ctime, cause a metadata
writeback (causing disk seeking).
- cache ctime cannot be used to detect modifications / corruption.
- Some file systems (NTFS) have a 64k limit on the number of hardlink per
partition. This is why the function automatically fallbacks to copying the
file content.
- /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
same owner is for all hardlinks.
- Anecdotal report that ext2 is known to be potentially faulty on high rate
of hardlink creation.
Creating a tree of symlinks has a few drawbacks:
- Tasks running the equivalent of os.path.realpath() will get the naked path
and may fail.
- Windows:
- Symlinks are reparse points:
https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
- Symbolic links are Win32 paths, not NT paths.
https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
- Symbolic links are supported on Windows 7 and later only.
- SeCreateSymbolicLinkPrivilege is needed, which is not present by
default.
- SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
RID is present in the token;
https://msdn.microsoft.com/en-us/library/bb530410.aspx
"""
srcpath = fileobj_path(srcfileobj)
if srcpath and size == -1:
readonly = file_mode is None or (
file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
if readonly:
# If the file is read only we can link the file
if use_symlink:
link_mode = file_path.SYMLINK_WITH_FALLBACK
else:
link_mode = file_path.HARDLINK_WITH_FALLBACK
else:
# If not read only, we must copy the file
link_mode = file_path.COPY
file_path.link_file(dstpath, srcpath, link_mode)
assert fs.exists(dstpath)
else:
# Need to write out the file
with fs.open(dstpath, 'wb') as dstfileobj:
fileobj_copy(dstfileobj, srcfileobj, size)
if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
# On windows, mode other than removing stat.S_IWRITE is ignored. Returns
# early to skip slow/unnecessary chmod call.
return
# file_mode of 0 is actually valid, so need explicit check.
if file_mode is not None:
fs.chmod(dstpath, file_mode)
def zip_compress(content_generator, level=7):
"""Reads chunks from |content_generator| and yields zip compressed chunks."""
compressor = zlib.compressobj(level)
for chunk in content_generator:
compressed = compressor.compress(chunk)
if compressed:
yield compressed
tail = compressor.flush(zlib.Z_FINISH)
if tail:
yield tail
def zip_decompress(
content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
"""Reads zipped data from |content_generator| and yields decompressed data.
Decompresses data in small chunks (no larger than |chunk_size|) so that
zip bomb file doesn't cause zlib to preallocate huge amount of memory.
Raises IOError if data is corrupted or incomplete.
"""
decompressor = zlib.decompressobj()
compressed_size = 0
try:
for chunk in content_generator:
compressed_size += len(chunk)
data = decompressor.decompress(chunk, chunk_size)
if data:
yield data
while decompressor.unconsumed_tail:
data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
if data:
yield data
tail = decompressor.flush()
if tail:
yield tail
except zlib.error as e:
raise IOError(
'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
# Ensure all data was read and decompressed.
if decompressor.unused_data or decompressor.unconsumed_tail:
raise IOError('Not all data was decompressed')
def _get_zip_compression_level(filename):
"""Given a filename calculates the ideal zip compression level to use."""
file_ext = os.path.splitext(filename)[1].lower()
# TODO(csharp): Profile to find what compression level works best.
return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
def create_directories(base_directory, files):
"""Creates the directory structure needed by the given list of files."""
logging.debug('create_directories(%s, %d)', base_directory, len(files))
# Creates the tree of directories to create.
directories = set(os.path.dirname(f) for f in files)
for item in list(directories):
while item:
directories.add(item)
item = os.path.dirname(item)
for d in sorted(directories):
if d:
abs_d = os.path.join(base_directory, d)
if not fs.isdir(abs_d):
fs.mkdir(abs_d)
def _create_symlinks(base_directory, files):
"""Creates any symlinks needed by the given set of files."""
for filepath, properties in files:
if 'l' not in properties:
continue
if sys.platform == 'win32':
# TODO(maruel): Create symlink via the win32 api.
logging.warning('Ignoring symlink %s', filepath)
continue
outfile = os.path.join(base_directory, filepath)
try:
os.symlink(properties['l'], outfile) # pylint: disable=E1101
except OSError as e:
if e.errno == errno.EEXIST:
raise AlreadyExists('File %s already exists.' % outfile)
raise
class _ThreadFile(object):
"""Multithreaded fake file. Used by TarBundle."""
def __init__(self):
self._data = threading_utils.TaskChannel()
self._offset = 0
def __iter__(self):
return self._data
def tell(self):
return self._offset
def write(self, b):
self._data.send_result(b)
self._offset += len(b)
def close(self):
self._data.send_done()
class FileItem(isolate_storage.Item):
"""A file to push to Storage.
Its digest and size may be provided in advance, if known. Otherwise they will
be derived from the file content.
"""
def __init__(self, path, algo, digest=None, size=None, high_priority=False):
super(FileItem, self).__init__(
digest,
size if size is not None else fs.stat(path).st_size,
high_priority,
compression_level=_get_zip_compression_level(path))
self._path = path
self._algo = algo
self._meta = None
@property
def path(self):
return self._path
@property
def digest(self):
if not self._digest:
self._digest = isolated_format.hash_file(self._path, self._algo)
return self._digest
@property
def meta(self):
if not self._meta:
# TODO(maruel): Inline.
self._meta = isolated_format.file_to_metadata(self.path, 0, False)
# We need to hash right away.
self._meta['h'] = self.digest
return self._meta
def content(self):
return file_read(self.path)
class TarBundle(isolate_storage.Item):
"""Tarfile to push to Storage.
Its digest is the digest of all the files it contains. It is generated on the
fly.
"""
def __init__(self, root, algo):
# 2 trailing 512 bytes headers.
super(TarBundle, self).__init__(size=1024)
self._items = []
self._meta = None
self._algo = algo
self._root_len = len(root) + 1
# Same value as for Go.
# https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
# https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
self._archive_max_size = int(10e6)
@property
def digest(self):
if not self._digest:
self._prepare()
return self._digest
@property
def size(self):
if self._size is None:
self._prepare()
return self._size
def try_add(self, item):
"""Try to add this file to the bundle.
It is extremely naive but this should be just enough for
https://crbug.com/825418.
Future improvements should be in the Go code, and the Swarming bot should be
migrated to use the Go code instead.
"""
if not item.size:
return False
# pylint: disable=unreachable
rounded = (item.size + 512) & ~511
if rounded + self._size > self._archive_max_size:
return False
# https://crbug.com/825418
return False
self._size += rounded
self._items.append(item)
return True
def yield_item_path_meta(self):
"""Returns a tuple(Item, filepath, meta_dict).
If the bundle contains less than 5 items, the items are yielded.
"""
if len(self._items) < 5:
# The tarball is too small, yield individual items, if any.
for item in self._items:
yield item, item.path[self._root_len:], item.meta
else:
# This ensures self._meta is set.
p = self.digest + '.tar'
# Yield itself as a tarball.
yield self, p, self._meta
def content(self):
"""Generates the tarfile content on the fly."""
obj = _ThreadFile()
def _tar_thread():
try:
t = tarfile.open(
fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
for item in self._items:
logging.info(' tarring %s', item.path)
t.add(item.path)
t.close()
except Exception:
logging.exception('Internal failure')
finally:
obj.close()
t = threading.Thread(target=_tar_thread)
t.start()
try:
for data in obj:
yield data
finally:
t.join()
def _prepare(self):
h = self._algo()
total = 0
for chunk in self.content():
h.update(chunk)
total += len(chunk)
# pylint: disable=attribute-defined-outside-init
# This is not true, they are defined in Item.__init__().
self._digest = h.hexdigest()
self._size = total
self._meta = {
'h': self.digest,
's': self.size,
't': u'tar',
}
class BufferItem(isolate_storage.Item):
"""A byte buffer to push to Storage."""
def __init__(self, buf, algo, high_priority=False):
super(BufferItem, self).__init__(
digest=algo(buf).hexdigest(),
size=len(buf),
high_priority=high_priority)
self._buffer = buf
def content(self):
return [self._buffer]
class Storage(object):
"""Efficiently downloads or uploads large set of files via StorageApi.
Implements compression support, parallel 'contains' checks, parallel uploads
and more.
Works only within single namespace (and thus hashing algorithm and compression
scheme are fixed).
Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
signal handlers table to handle Ctrl+C.
"""
def __init__(self, storage_api):
self._storage_api = storage_api
self._cpu_thread_pool = None
self._net_thread_pool = None
self._aborted = False
self._prev_sig_handlers = {}
@property
def server_ref(self):
"""Shortcut to get the server_ref from storage_api.
This can be used to get the underlying hash_algo.
"""
return self._storage_api.server_ref
@property
def cpu_thread_pool(self):
"""ThreadPool for CPU-bound tasks like zipping."""
if self._cpu_thread_pool is None:
threads = max(threading_utils.num_processors(), 2)
max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
if sys.maxsize <= max_size:
# On 32 bits userland, do not try to use more than 16 threads.
threads = min(threads, 16)
self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
return self._cpu_thread_pool
@property
def net_thread_pool(self):
"""AutoRetryThreadPool for IO-bound tasks, retries IOError."""
if self._net_thread_pool is None:
self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
return self._net_thread_pool
def close(self):
"""Waits for all pending tasks to finish."""
logging.info('Waiting for all threads to die...')
if self._cpu_thread_pool:
self._cpu_thread_pool.join()
self._cpu_thread_pool.close()
self._cpu_thread_pool = None
if self._net_thread_pool:
self._net_thread_pool.join()
self._net_thread_pool.close()
self._net_thread_pool = None
logging.info('Done.')
def abort(self):
"""Cancels any pending or future operations."""
# This is not strictly theadsafe, but in the worst case the logging message
# will be printed twice. Not a big deal. In other places it is assumed that
# unprotected reads and writes to _aborted are serializable (it is true
# for python) and thus no locking is used.
if not self._aborted:
logging.warning('Aborting... It can take a while.')
self._aborted = True
def __enter__(self):
"""Context manager interface."""
assert not self._prev_sig_handlers, self._prev_sig_handlers
for s in (signal.SIGINT, signal.SIGTERM):
self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
"""Context manager interface."""
self.close()
while self._prev_sig_handlers:
s, h = self._prev_sig_handlers.popitem()
signal.signal(s, h)
return False
def upload_items(self, items):
"""Uploads a generator of Item to the isolate server.
It figures out what items are missing from the server and uploads only them.
It uses 3 threads internally:
- One to create batches based on a timeout
- One to dispatch the /contains RPC and field the missing entries
- One to field the /push RPC
The main threads enumerates 'items' and pushes to the first thread. Then it
join() all the threads, waiting for them to complete.
(enumerate items of Item, this can be slow as disk is traversed)
|
v
_create_items_batches_thread Thread #1
(generates list(Item), every 3s or 20~100 items)
|
v
_do_lookups_thread Thread #2
| |
v v
(missing) (was on server)
|
v
_handle_missing_thread Thread #3
|
v
(upload Item, append to uploaded)
Arguments:
items: list of isolate_storage.Item instances that represents data to
upload.
Returns:
List of items that were uploaded. All other items are already there.
"""
incoming = Queue.Queue()
batches_to_lookup = Queue.Queue()
missing = Queue.Queue()
uploaded = []
def _create_items_batches_thread():
"""Creates batches for /contains RPC lookup from individual items.
Input: incoming
Output: batches_to_lookup
"""
try:
batch_size_index = 0
batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
batch = []
while not self._aborted:
try:
item = incoming.get(True, timeout=3)
if item:
batch.append(item)
except Queue.Empty:
item = False
if len(batch) == batch_size or (not item and batch):
if len(batch) == batch_size:
batch_size_index += 1
batch_size = ITEMS_PER_CONTAINS_QUERIES[
min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
batches_to_lookup.put(batch)
batch = []
if item is None:
break
finally:
# Unblock the next pipeline.
batches_to_lookup.put(None)
def _do_lookups_thread():
"""Enqueues all the /contains RPCs and emits the missing items.
Input: batches_to_lookup
Output: missing, to_upload
"""
try:
channel = threading_utils.TaskChannel()
def _contains(b):
if self._aborted:
raise Aborted()
return self._storage_api.contains(b)
pending_contains = 0
while not self._aborted:
batch = batches_to_lookup.get()
if batch is None:
break
self.net_thread_pool.add_task_with_channel(
channel, threading_utils.PRIORITY_HIGH, _contains, batch)
pending_contains += 1
while pending_contains and not self._aborted:
try:
v = channel.next(timeout=0)
except threading_utils.TaskChannel.Timeout:
break
pending_contains -= 1
for missing_item, push_state in v.items():
missing.put((missing_item, push_state))
while pending_contains and not self._aborted:
for missing_item, push_state in channel.next().items():
missing.put((missing_item, push_state))
pending_contains -= 1
finally:
# Unblock the next pipeline.
missing.put((None, None))
def _handle_missing_thread():
"""Sends the missing items to the uploader.
Input: missing
Output: uploaded
"""
with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
channel = threading_utils.TaskChannel()
pending_upload = 0
while not self._aborted:
try:
missing_item, push_state = missing.get(True, timeout=5)
if missing_item is None:
break
self._async_push(channel, missing_item, push_state)
pending_upload += 1
except Queue.Empty:
pass
detector.ping()
while not self._aborted and pending_upload:
try:
item = channel.next(timeout=0)
except threading_utils.TaskChannel.Timeout:
break
uploaded.append(item)
pending_upload -= 1
logging.debug(
'Uploaded %d; %d pending: %s (%d)',
len(uploaded), pending_upload, item.digest, item.size)
while not self._aborted and pending_upload:
item = channel.next()
uploaded.append(item)
pending_upload -= 1
logging.debug(
'Uploaded %d; %d pending: %s (%d)',
len(uploaded), pending_upload, item.digest, item.size)
threads = [
threading.Thread(target=_create_items_batches_thread),
threading.Thread(target=_do_lookups_thread),
threading.Thread(target=_handle_missing_thread),
]
for t in threads:
t.start()
try:
# For each digest keep only first isolate_storage.Item that matches it.
# All other items are just indistinguishable copies from the point of view
# of isolate server (it doesn't care about paths at all, only content and
# digests).
seen = {}
try:
# TODO(maruel): Reorder the items as a priority queue, with larger items
# being processed first. This is, before hashing the data.
# This must be done in the primary thread since items can be a
# generator.
for item in items:
if seen.setdefault(item.digest, item) is item:
incoming.put(item)
finally:
incoming.put(None)
finally:
for t in threads:
t.join()
logging.info('All %s files are uploaded', len(uploaded))
if seen:
_print_upload_stats(seen.values(), uploaded)
return uploaded
def _async_push(self, channel, item, push_state):
"""Starts asynchronous push to the server in a parallel thread.
Can be used only after |item| was checked for presence on a server with a
/contains RPC.
Arguments:
channel: TaskChannel that receives back |item| when upload ends.
item: item to upload as instance of isolate_storage.Item class.
push_state: push state returned by storage_api.contains(). It contains
storage specific information describing how to upload the item (for
example in case of cloud storage, it is signed upload URLs).
Returns:
None, but |channel| later receives back |item| when upload ends.
"""
# Thread pool task priority.
priority = (
threading_utils.PRIORITY_HIGH if item.high_priority
else threading_utils.PRIORITY_MED)
def _push(content):
"""Pushes an isolate_storage.Item and returns it to |channel|."""
if self._aborted:
raise Aborted()
self._storage_api.push(item, push_state, content)
return item
# If zipping is not required, just start a push task. Don't pass 'content'
# so that it can create a new generator when it retries on failures.
if not self.server_ref.is_with_compression:
self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
return
# If zipping is enabled, zip in a separate thread.
def zip_and_push():
# TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
# content right here. It will block until all file is zipped.
try:
if self._aborted:
raise Aborted()
stream = zip_compress(item.content(), item.compression_level)
data = ''.join(stream)
except Exception as exc:
logging.error('Failed to zip \'%s\': %s', item, exc)
channel.send_exception()
return
# Pass '[data]' explicitly because the compressed data is not same as the
# one provided by 'item'. Since '[data]' is a list, it can safely be
# reused during retries.
self.net_thread_pool.add_task_with_channel(
channel, priority, _push, [data])
self.cpu_thread_pool.add_task(priority, zip_and_push)
def push(self, item, push_state):
"""Synchronously pushes a single item to the server.
If you need to push many items at once, consider using 'upload_items' or
'_async_push' with instance of TaskChannel.
Arguments:
item: item to upload as instance of isolate_storage.Item class.
push_state: push state returned by storage_api.contains(). It contains
storage specific information describing how to upload the item (for
example in case of cloud storage, it is signed upload URLs).
Returns:
Pushed item (same object as |item|).
"""
channel = threading_utils.TaskChannel()
with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
self._async_push(channel, item, push_state)
pushed = channel.next()
assert pushed is item
return item
def async_fetch(self, channel, priority, digest, size, sink):
"""Starts asynchronous fetch from the server in a parallel thread.
Arguments:
channel: TaskChannel that receives back |digest| when download ends.
priority: thread pool task priority for the fetch.
digest: hex digest of an item to download.
size: expected size of the item (after decompression).
sink: function that will be called as sink(generator).
"""
def fetch():
try:
# Prepare reading pipeline.
stream = self._storage_api.fetch(digest, size, 0)
if self.server_ref.is_with_compression:
stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
# Run |stream| through verifier that will assert its size.
verifier = FetchStreamVerifier(
stream, self.server_ref.hash_algo, digest, size)
# Verified stream goes to |sink|.
sink(verifier.run())
except Exception as err:
logging.error('Failed to fetch %s: %s', digest, err)
raise
return digest
# Don't bother with zip_thread_pool for decompression. Decompression is
# really fast and most probably IO bound anyway.
self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
class FetchQueue(object):
"""Fetches items from Storage and places them into ContentAddressedCache.
It manages multiple concurrent fetch operations. Acts as a bridge between
Storage and ContentAddressedCache so that Storage and ContentAddressedCache
don't depend on each other at all.
"""
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
self._channel = threading_utils.TaskChannel()
self._pending = set()
self._accessed = set()
self._fetched = set(cache)
# Pending digests that the caller waits for, see wait_on()/wait().
self._waiting_on = set()
# Already fetched digests the caller waits for which are not yet returned by
# wait().
self._waiting_on_ready = set()
def add(
self,
digest,
size=local_caching.UNKNOWN_FILE_SIZE,
priority=threading_utils.PRIORITY_MED):
"""Starts asynchronous fetch of item |digest|."""
# Fetching it now?
if digest in self._pending:
return
# Mark this file as in use, verify_all_cached will later ensure it is still
# in cache.
self._accessed.add(digest)
# Already fetched? Notify cache to update item's LRU position.
if digest in self._fetched:
# 'touch' returns True if item is in cache and not corrupted.
if self.cache.touch(digest, size):
return
logging.error('%s is corrupted', digest)
self._fetched.remove(digest)
# TODO(maruel): It should look at the free disk space, the current cache
# size and the size of the new item on every new item:
# - Trim the cache as more entries are listed when free disk space is low,
# otherwise if the amount of data downloaded during the run > free disk
# space, it'll crash.
# - Make sure there's enough free disk space to fit all dependencies of
# this run! If not, abort early.
# Start fetching.
self._pending.add(digest)
self.storage.async_fetch(
self._channel, priority, digest, size,
functools.partial(self.cache.write, digest))
def wait_on(self, digest):
"""Updates digests to be waited on by 'wait'."""
# Calculate once the already fetched items. These will be retrieved first.
if digest in self._fetched:
self._waiting_on_ready.add(digest)
else:
self._waiting_on.add(digest)
def wait(self):
"""Waits until any of waited-on items is retrieved.
Once this happens, it is remove from the waited-on set and returned.
This function is called in two waves. The first wave it is done for HIGH
priority items, the isolated files themselves. The second wave it is called
for all the files.
If the waited-on set is empty, raises RuntimeError.
"""
# Flush any already fetched items.
if self._waiting_on_ready:
return self._waiting_on_ready.pop()
assert self._waiting_on, 'Needs items to wait on'
# Wait for one waited-on item to be fetched.
while self._pending:
digest = self._channel.next()
self._pending.remove(digest)
self._fetched.add(digest)
if digest in self._waiting_on:
self._waiting_on.remove(digest)
return digest
# Should never reach this point due to assert above.
raise RuntimeError('Impossible state')
@property
def wait_queue_empty(self):
"""Returns True if there is no digest left for wait() to return."""
return not self._waiting_on and not self._waiting_on_ready
def inject_local_file(self, path, algo):
"""Adds local file to the cache as if it was fetched from storage."""
with fs.open(path, 'rb') as f:
data = f.read()
digest = algo(data).hexdigest()
self.cache.write(digest, [data])
self._fetched.add(digest)
return digest
@property
def pending_count(self):
"""Returns number of items to be fetched."""
return len(self._pending)
def verify_all_cached(self):
"""True if all accessed items are in cache."""
# Not thread safe, but called after all work is done.