Skip to content

Commit

Permalink
Implements PeerSelector class to track peerids
Browse files Browse the repository at this point in the history
Implements the IPeerSelector class as PeerSelector and integrates the
class into Tahoe2ServerSelector in order to track peerids and existing
share allocations among the peers
  • Loading branch information
markberger committed Aug 28, 2013
1 parent e7af5f7 commit 5875c31
Showing 1 changed file with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion src/allmydata/immutable/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE
DEFAULT_MAX_SEGMENT_SIZE, IPeerSelector
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES

from cStringIO import StringIO
from happiness_upload import Happiness_Upload


# this wants to live in storage, not here
Expand Down Expand Up @@ -201,8 +202,57 @@ def abort_some_buckets(self, sharenums):
def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)

class PeerSelector():
implements(IPeerSelector)

def __init__(self, num_segments, total_shares, needed_shares, servers_of_happiness):
self.num_segments = num_segments
self.total_shares = total_shares
self.needed_shares = needed_shares
self.min_happiness = servers_of_happiness

self.existing_shares = {}
self.peers = set()
self.full_peers = set()
self.bad_peers = set()

def add_peer_with_shares(self, peerid, shnum):
if peerid in self.existing_shares.keys():
self.existing_shares[peerid].add(shnum)
else:
self.existing_shares[peerid] = set([shnum])

def confirm_share_allocation(peerid, shnum):
pass

def add_peers(self, peerids=set):
self.peers = peerids.copy()

def mark_full_peer(self, peerid):
self.full_peers.add(peerid)
self.peers.remove(peerid)

def mark_bad_peer(self, peerid):
if peerid in self.peers:
self.peers.remove(peerid)
self.bad_peers.add(peerid)
elif peerid in self.full_peers:
self.full_peers.remove(peerid)
self.bad_peers.add(peerid)

def get_tasks(self):
shares = set(range(self.total_shares))
self.h = Happiness_Upload(self.peers, self.full_peers, shares, self.existing_shares)
return self.h.generate_mappings()

def is_healthy(self):
return self.min_happiness <= self.h.happiness()


class Tahoe2ServerSelector(log.PrefixingLogMixin):

peer_selector_class = PeerSelector

def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
Expand All @@ -215,6 +265,7 @@ def __init__(self, upload_id, logparent=None, upload_status=None):
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)


def __repr__(self):
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id

Expand All @@ -234,6 +285,9 @@ def get_shareholders(self, storage_broker, secret_holder,
if self._status:
self._status.set_status("Contacting Servers..")

self.peer_selector = self.peer_selector_class(num_segments, total_shares,
needed_shares, servers_of_happiness)

self.total_shares = total_shares
self.servers_of_happiness = servers_of_happiness
self.needed_shares = needed_shares
Expand Down Expand Up @@ -271,10 +325,16 @@ def _get_maxsize(server):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]

self.peer_selector.add_peers(set(server.get_serverid() for server in all_servers))

writeable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)

for server in readonly_servers:
self.peer_selector.mark_full_peer(server.get_serverid())

# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
client_renewal_secret = secret_holder.get_renewal_secret()
Expand Down Expand Up @@ -351,6 +411,7 @@ def _handle_existing_response(self, res, tracker):
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
% (tracker.get_name(), res), level=log.UNUSUAL)
self.peer_selector.mark_bad_peer(serverid)
self.error_count += 1
self.bad_query_count += 1
else:
Expand All @@ -361,6 +422,7 @@ def _handle_existing_response(self, res, tracker):
% (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.peer_selector.add_peer_with_shares(serverid, bucket)
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
self.full_count += 1
Expand Down

0 comments on commit 5875c31

Please sign in to comment.