Skip to content

Commit

Permalink
Merge PR #255 from meejah/2774.status-api-only.0-part1
Browse files Browse the repository at this point in the history
(rebased to current master, added a few fixups)
  • Loading branch information
warner committed Apr 12, 2016
2 parents bb71841 + d385bb7 commit 47b9218
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/allmydata/blacklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_version(self):
def get_best_readable_version(self):
raise FileProhibited(self.reason)

def download_best_version(self):
def download_best_version(self, progress=None):
raise FileProhibited(self.reason)

def get_best_mutable_version(self):
Expand Down
4 changes: 2 additions & 2 deletions src/allmydata/dirnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,15 +588,15 @@ def set_nodes(self, entries, overwrite=True):
return d


def add_file(self, namex, uploadable, metadata=None, overwrite=True):
def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
"""I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when
the operation completes."""
name = normalize(namex)
if self.is_readonly():
return defer.fail(NotWriteableError())
d = self._uploader.upload(uploadable)
d = self._uploader.upload(uploadable, progress=progress)
d.addCallback(lambda results:
self._create_and_validate_node(results.get_uri(), None,
name))
Expand Down
14 changes: 13 additions & 1 deletion src/allmydata/immutable/encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class UploadAborted(Exception):
class Encoder(object):
implements(IEncoder)

def __init__(self, log_parent=None, upload_status=None):
def __init__(self, log_parent=None, upload_status=None, progress=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
Expand All @@ -86,6 +86,7 @@ def __init__(self, log_parent=None, upload_status=None):
self._log_number = log.msg("creating Encoder %s" % self,
facility="tahoe.encoder", parent=log_parent)
self._aborted = False
self._progress = progress

def __repr__(self):
if hasattr(self, "_storage_index"):
Expand All @@ -105,6 +106,8 @@ def set_encrypted_uploadable(self, uploadable):
def _got_size(size):
self.log(format="file size: %(size)d", size=size)
self.file_size = size
if self._progress:
self._progress.set_progress_total(self.file_size)
d.addCallback(_got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
Expand Down Expand Up @@ -436,6 +439,7 @@ def _send_segment(self, (shares, shareids), segnum):
shareid = shareids[i]
d = self.send_block(shareid, segnum, block, lognum)
dl.append(d)

block_hash = hashutil.block_hash(block)
#from allmydata.util import base32
#log.msg("creating block (shareid=%d, blocknum=%d) "
Expand All @@ -445,6 +449,14 @@ def _send_segment(self, (shares, shareids), segnum):
self.block_hashes[shareid].append(block_hash)

dl = self._gather_responses(dl)

def do_progress(ign):
done = self.segment_size * (segnum + 1)
if self._progress:
self._progress.set_progress(done)
return ign
dl.addCallback(do_progress)

def _logit(res):
self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
(self,
Expand Down
13 changes: 10 additions & 3 deletions src/allmydata/immutable/filenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,13 @@ def __init__(self, filecap, storage_broker, secret_holder, terminator,
# we keep it here, we should also put this on CiphertextFileNode
def __hash__(self):
return self.u.__hash__()

def __eq__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
else:
return False

def __ne__(self, other):
if isinstance(other, ImmutableFileNode):
return self.u.__eq__(other.u)
Expand All @@ -273,12 +275,16 @@ def get_readonly_uri(self):

def get_uri(self):
return self.u.to_string()

def get_cap(self):
return self.u

def get_readcap(self):
return self.u.get_readonly()

def get_verify_cap(self):
return self.u.get_verify_cap()

def get_repair_cap(self):
# CHK files can be repaired with just the verifycap
return self.u.get_verify_cap()
Expand All @@ -288,6 +294,7 @@ def get_storage_index(self):

def get_size(self):
return self.u.get_size()

def get_current_size(self):
return defer.succeed(self.get_size())

Expand All @@ -305,6 +312,7 @@ def is_allowed_in_immutable_directory(self):

def check_and_repair(self, monitor, verify=False, add_lease=False):
return self._cnode.check_and_repair(monitor, verify, add_lease)

def check(self, monitor, verify=False, add_lease=False):
return self._cnode.check(monitor, verify, add_lease)

Expand All @@ -316,14 +324,13 @@ def get_best_readable_version(self):
"""
return defer.succeed(self)


def download_best_version(self):
def download_best_version(self, progress=None):
"""
Download the best version of this file, returning its contents
as a bytestring. Since there is only one version of an immutable
file, we download and return the contents of this file.
"""
d = consumer.download_to_data(self)
d = consumer.download_to_data(self, progress=progress)
return d

# for an immutable file, download_to_data (specified in IReadable)
Expand Down
5 changes: 4 additions & 1 deletion src/allmydata/immutable/literal.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ def get_best_readable_version(self):
return defer.succeed(self)


def download_best_version(self):
def download_best_version(self, progress=None):
if progress is not None:
progress.set_progress_total(len(self.u.data))
progress.set_progress(len(self.u.data))
return defer.succeed(self.u.data)


Expand Down
3 changes: 2 additions & 1 deletion src/allmydata/immutable/offloaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
def __init__(self, storage_index,
helper, storage_broker, secret_holder,
incoming_file, encoding_file,
log_number):
log_number, progress=None):
upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
Expand Down
36 changes: 26 additions & 10 deletions src/allmydata/immutable/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE
DEFAULT_MAX_SEGMENT_SIZE, IProgress
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES

Expand Down Expand Up @@ -623,7 +623,7 @@ class EncryptAnUploadable:
implements(IEncryptedUploadable)
CHUNKSIZE = 50*1024

def __init__(self, original, log_parent=None):
def __init__(self, original, log_parent=None, progress=None):
precondition(original.default_params_set,
"set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
Expand All @@ -636,6 +636,7 @@ def __init__(self, original, log_parent=None):
self._file_size = None
self._ciphertext_bytes_read = 0
self._status = None
self._progress = progress

def set_upload_status(self, upload_status):
self._status = IUploadStatus(upload_status)
Expand All @@ -656,6 +657,8 @@ def _got_size(size):
self._file_size = size
if self._status:
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return size
d.addCallback(_got_size)
return d
Expand Down Expand Up @@ -894,7 +897,7 @@ def set_results(self, value):
class CHKUploader:
server_selector_class = Tahoe2ServerSelector

def __init__(self, storage_broker, secret_holder):
def __init__(self, storage_broker, secret_holder, progress=None):
# server_selector needs storage_broker and secret_holder
self._storage_broker = storage_broker
self._secret_holder = secret_holder
Expand All @@ -904,6 +907,7 @@ def __init__(self, storage_broker, secret_holder):
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
self._progress = progress

# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
Expand Down Expand Up @@ -947,8 +951,11 @@ def start_encrypted(self, encrypted):
eu = IEncryptedUploadable(encrypted)

started = time.time()
self._encoder = e = encode.Encoder(self._log_number,
self._upload_status)
self._encoder = e = encode.Encoder(
self._log_number,
self._upload_status,
progress=self._progress,
)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
Expand Down Expand Up @@ -1073,19 +1080,22 @@ def _got(data):

class LiteralUploader:

def __init__(self):
def __init__(self, progress=None):
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
self._progress = progress

def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
self._size = size
self._status.set_size(size)
if self._progress:
self._progress.set_progress_total(size)
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
Expand All @@ -1109,6 +1119,8 @@ def _build_results(self, uri):
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
self._status.set_results(ur)
if self._progress:
self._progress.set_progress(self._size)
return ur

def close(self):
Expand Down Expand Up @@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55

def __init__(self, helper_furl=None, stats_provider=None, history=None):
def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._progress = progress
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)

Expand Down Expand Up @@ -1542,12 +1555,13 @@ def get_helper_info(self):
return (self._helper_furl, bool(self._helper))


def upload(self, uploadable):
def upload(self, uploadable, progress=None):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
assert self.parent
assert self.running
assert progress is None or IProgress.providedBy(progress)

uploadable = IUploadable(uploadable)
d = uploadable.get_size()
Expand All @@ -1556,13 +1570,15 @@ def _got_size(size):
precondition(isinstance(default_params, dict), default_params)
precondition("max_segment_size" in default_params, default_params)
uploadable.set_default_encoding_parameters(default_params)
if progress:
progress.set_progress_total(size)

if self.stats_provider:
self.stats_provider.count('uploader.files_uploaded', 1)
self.stats_provider.count('uploader.bytes_uploaded', size)

if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader()
uploader = LiteralUploader(progress=progress)
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
Expand All @@ -1575,7 +1591,7 @@ def _got_size(size):
else:
storage_broker = self.parent.get_storage_broker()
secret_holder = self.parent._secret_holder
uploader = CHKUploader(storage_broker, secret_holder)
uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
d2.addCallback(lambda x: uploader.start(eu))

self._all_uploads[uploader] = None
Expand Down

0 comments on commit 47b9218

Please sign in to comment.