Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix some SDMF partial-read cases #177

Merged
merged 4 commits into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
132 changes: 71 additions & 61 deletions src/allmydata/mutable/retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from twisted.internet.interfaces import IPushProducer, IConsumer
from foolscap.api import eventually, fireEventually, DeadReferenceError, \
RemoteException

from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
DownloadStopped, MDMF_VERSION, SDMF_VERSION
from allmydata.util.assertutil import _assert, precondition
from allmydata.util import hashutil, log, mathutil, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata import hashtree, codec
Expand Down Expand Up @@ -115,6 +117,10 @@ def __init__(self, filenode, storage_broker, servermap, verinfo,
self.servermap = servermap
assert self._node.get_pubkey()
self.verinfo = verinfo
# TODO: make it possible to use self.verinfo.datalength instead
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self._data_length = datalength
# during repair, we may be called upon to grab the private key, since
# it wasn't picked up during a verify=False checker run, and we'll
# need it for repair to generate a new version.
Expand Down Expand Up @@ -145,8 +151,6 @@ def __init__(self, filenode, storage_broker, servermap, verinfo,
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self._status.set_size(datalength)
self._status.set_encoding(k, N)
self.readers = {}
Expand Down Expand Up @@ -230,21 +234,37 @@ def _check_for_stopped(self, res):


def download(self, consumer=None, offset=0, size=None):
assert IConsumer.providedBy(consumer) or self._verify

precondition(self._verify or IConsumer.providedBy(consumer))
if size is None:
size = self._data_length - offset
if self._verify:
_assert(size == self._data_length, (size, self._data_length))
self.log("starting download")
self._done_deferred = defer.Deferred()
if consumer:
self._consumer = consumer
# we provide IPushProducer, so streaming=True, per
# IConsumer.
# we provide IPushProducer, so streaming=True, per IConsumer.
self._consumer.registerProducer(self, streaming=True)
self._started = time.time()
self._started_fetching = time.time()
if size == 0:
# short-circuit the rest of the process
self._done()
else:
self._start_download(consumer, offset, size)
return self._done_deferred

def _start_download(self, consumer, offset, size):
precondition((0 <= offset < self._data_length)
and (size > 0)
and (offset+size <= self._data_length),
(offset, size, self._data_length))

self._done_deferred = defer.Deferred()
self._offset = offset
self._read_length = size
self._setup_encoding_parameters()
self._setup_download()
self.log("starting download")
self._started_fetching = time.time()

# The download process beyond this is a state machine.
# _add_active_servers will select the servers that we want to use
# for the download, and then attempt to start downloading. After
Expand All @@ -254,7 +274,6 @@ def download(self, consumer=None, offset=0, size=None):
# will errback. Otherwise, it will eventually callback with the
# contents of the mutable file.
self.loop()
return self._done_deferred

def loop(self):
d = fireEventually(None) # avoid #237 recursion limit problem
Expand All @@ -265,7 +284,6 @@ def loop(self):
d.addErrback(self._error)

def _setup_download(self):
self._started = time.time()
self._status.set_status("Retrieving Shares")

# how many shares do we need?
Expand Down Expand Up @@ -325,6 +343,8 @@ def decode(self, blocks_and_salts, segnum):
"""
# We don't need the block hash trees in this case.
self._block_hash_trees = None
self._offset = 0
self._read_length = self._data_length
self._setup_encoding_parameters()

# _decode_blocks() expects the output of a gatherResults that
Expand Down Expand Up @@ -352,7 +372,7 @@ def _setup_encoding_parameters(self):
self._required_shares = k
self._total_shares = n
self._segment_size = segsize
self._data_length = datalength
#self._data_length = datalength # set during __init__()

if not IV:
self._version = MDMF_VERSION
Expand Down Expand Up @@ -408,34 +428,29 @@ def _setup_encoding_parameters(self):
# offset we were given.
start = self._offset // self._segment_size

assert start < self._num_segments
_assert(start <= self._num_segments,
start=start, num_segments=self._num_segments,
offset=self._offset, segment_size=self._segment_size)
self._start_segment = start
self.log("got start segment: %d" % self._start_segment)
else:
self._start_segment = 0


# If self._read_length is None, then we want to read the whole
# file. Otherwise, we want to read only part of the file, and
# need to figure out where to stop reading.
if self._read_length is not None:
# our end segment is the last segment containing part of the
# segment that we were asked to read.
self.log("got read length %d" % self._read_length)
if self._read_length != 0:
end_data = self._offset + self._read_length

# We don't actually need to read the byte at end_data,
# but the one before it.
end = (end_data - 1) // self._segment_size

assert end < self._num_segments
self._last_segment = end
else:
self._last_segment = self._start_segment
self.log("got end segment: %d" % self._last_segment)
else:
self._last_segment = self._num_segments - 1
# We might want to read only part of the file, and need to figure out
# where to stop reading. Our end segment is the last segment
# containing part of the segment that we were asked to read.
_assert(self._read_length > 0, self._read_length)
end_data = self._offset + self._read_length

# We don't actually need to read the byte at end_data, but the one
# before it.
end = (end_data - 1) // self._segment_size
_assert(0 <= end < self._num_segments,
end=end, num_segments=self._num_segments,
end_data=end_data, offset=self._offset,
read_length=self._read_length, segment_size=self._segment_size)
self._last_segment = end
self.log("got end segment: %d" % self._last_segment)

self._current_segment = self._start_segment

Expand Down Expand Up @@ -568,6 +583,7 @@ def _download_current_segment(self):
I download, validate, decode, decrypt, and assemble the segment
that this Retrieve is currently responsible for downloading.
"""

if self._current_segment > self._last_segment:
# No more segments to download, we're done.
self.log("got plaintext, done")
Expand Down Expand Up @@ -654,33 +670,27 @@ def _set_segment(self, segment):
target that is handling the file download.
"""
self.log("got plaintext for segment %d" % self._current_segment)

if self._read_length == 0:
self.log("on first+last segment, size=0, using 0 bytes")
segment = b""

if self._current_segment == self._last_segment:
# trim off the tail
wanted = (self._offset + self._read_length) % self._segment_size
if wanted != 0:
self.log("on the last segment: using first %d bytes" % wanted)
segment = segment[:wanted]
else:
self.log("on the last segment: using all %d bytes" %
len(segment))

if self._current_segment == self._start_segment:
# We're on the first segment. It's possible that we want
# only some part of the end of this segment, and that we
# just downloaded the whole thing to get that part. If so,
# we need to account for that and give the reader just the
# data that they want.
n = self._offset % self._segment_size
self.log("stripping %d bytes off of the first segment" % n)
self.log("original segment length: %d" % len(segment))
segment = segment[n:]
self.log("new segment length: %d" % len(segment))

if self._current_segment == self._last_segment and self._read_length is not None:
# We're on the last segment. It's possible that we only want
# part of the beginning of this segment, and that we
# downloaded the whole thing anyway. Make sure to give the
# caller only the portion of the segment that they want to
# receive.
extra = self._read_length
if self._start_segment != self._last_segment:
extra -= self._segment_size - \
(self._offset % self._segment_size)
extra %= self._segment_size
self.log("original segment length: %d" % len(segment))
segment = segment[:extra]
self.log("new segment length: %d" % len(segment))
self.log("only taking %d bytes of the last segment" % extra)
# Trim off the head, if offset != 0. This should also work if
# start==last, because we trim the tail first.
skip = self._offset % self._segment_size
self.log("on the first segment: skipping first %d bytes" % skip)
segment = segment[skip:]

if not self._verify:
self._consumer.write(segment)
Expand Down