Skip to content

Commit

Permalink
Merge 8adff8b into 7407e7c
Browse files Browse the repository at this point in the history
  • Loading branch information
meejah committed Jul 6, 2016
2 parents 7407e7c + 8adff8b commit f298831
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 106 deletions.
127 changes: 85 additions & 42 deletions src/allmydata/client.py
@@ -1,4 +1,4 @@
import os, stat, time, weakref
import os, stat, time, weakref, yaml
from allmydata import node
from base64 import urlsafe_b64encode

Expand All @@ -9,6 +9,8 @@
from twisted.python.filepath import FilePath
from pycryptopp.publickey import rsa

from foolscap.api import eventually

import allmydata
from allmydata.storage.server import StorageServer
from allmydata import storage_client
Expand Down Expand Up @@ -122,10 +124,13 @@ def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
# All tub.registerReference must happen *after* we upcall, since
# that's what does tub.setLocation()
self.warn_flag = False
self.introducer_clients = []
self.introducer_furls = []
self.started_timestamp = time.time()
self.logSource="Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
self.init_introducer_client()
self.load_connections()
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
Expand Down Expand Up @@ -170,17 +175,63 @@ def _sequencer(self):
nonce = _make_secret().strip()
return seqnum, nonce

def init_introducer_client(self):
self.introducer_furl = self.get_config("client", "introducer.furl")
introducer_cache_filepath = FilePath(os.path.join(self.basedir, "private", "introducer_cache.yaml"))
ic = IntroducerClient(self.tub, self.introducer_furl,
self.nickname,
str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION),
self.get_app_versions(),
self._sequencer, introducer_cache_filepath)
self.introducer_client = ic
ic.setServiceParent(self)
def old_introducer_config_compatiblity(self):
tahoe_cfg_introducer_furl = self.get_config("client", "introducer.furl", None)
if tahoe_cfg_introducer_furl is not None:
tahoe_cfg_introducer_furl = tahoe_cfg_introducer_furl.encode('utf-8')

for nick in self.connections_config['introducers'].keys():
if tahoe_cfg_introducer_furl == self.connections_config['introducers'][nick]['furl']:
log.err("Introducer furl specified in both tahoe.cfg and connections.yaml; please fix impossible configuration.")
self.warn_flag = True
return

if u"introducer" in self.connections_config['introducers'].keys():
if tahoe_cfg_introducer_furl is not None:
log.err("Introducer nickname in connections.yaml must not be called 'introducer' if the tahoe.cfg file also specifies and introducer.")
self.warn_flag = True

if tahoe_cfg_introducer_furl is not None:
self.connections_config['introducers'][u"introducer"] = {}
self.connections_config['introducers'][u"introducer"]['furl'] = tahoe_cfg_introducer_furl

def load_connections(self):
"""
Load the connections.yaml file if it exists, otherwise
create a default configuration.
"""
self.warn_flag = False
connections_filepath = FilePath(os.path.join(self.basedir, "private", "connections.yaml"))
def construct_unicode(loader, node):
return node.value
yaml.SafeLoader.add_constructor("tag:yaml.org,2002:str",
construct_unicode)
try:
with connections_filepath.open() as f:
self.connections_config = yaml.safe_load(f)
except EnvironmentError:
exists = False
self.connections_config = { 'servers' : {},
'introducers' : {},
}
connections_filepath.setContent(yaml.safe_dump(self.connections_config))

self.old_introducer_config_compatiblity()
introducers = self.connections_config['introducers']
for nickname in introducers:
introducer_cache_filepath = FilePath(os.path.join(self.basedir, "private", nickname))
self.introducer_furls.append(introducers[nickname]['furl'])
ic = IntroducerClient(introducers[nickname]['furl'],
nickname,
str(allmydata.__full_version__),
str(self.OLDEST_SUPPORTED_VERSION),
self.get_app_versions(),
self._sequencer, introducer_cache_filepath)
self.introducer_clients.append(ic)

# init introducer_clients as usual
for ic in self.introducer_clients:
ic.setServiceParent(self)

def init_stats_provider(self):
gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
Expand Down Expand Up @@ -298,7 +349,9 @@ def init_storage(self):
ann = {"anonymous-storage-FURL": furl,
"permutation-seed-base32": self._init_permutation_seed(ss),
}
self.introducer_client.publish("storage", ann, self._node_key)

for ic in self.introducer_clients:
ic.publish("storage", ann, self._node_key)

def init_client(self):
helper_furl = self.get_config("client", "helper.furl", None)
Expand Down Expand Up @@ -360,32 +413,13 @@ def init_client_storage_broker(self):
helper = storage_client.ConnectedEnough(sb, connection_threshold)
self.upload_ready_d = helper.when_connected_enough()

# load static server specifications from tahoe.cfg, if any.
# Not quite ready yet.
#if self.config.has_section("client-server-selection"):
# server_params = {} # maps serverid to dict of parameters
# for (name, value) in self.config.items("client-server-selection"):
# pieces = name.split(".")
# if pieces[0] == "server":
# serverid = pieces[1]
# if serverid not in server_params:
# server_params[serverid] = {}
# server_params[serverid][pieces[2]] = value
# for serverid, params in server_params.items():
# server_type = params.pop("type")
# if server_type == "tahoe-foolscap":
# s = storage_client.NativeStorageClient(*params)
# else:
# msg = ("unrecognized server type '%s' in "
# "tahoe.cfg [client-server-selection]server.%s.type"
# % (server_type, serverid))
# raise storage_client.UnknownServerTypeError(msg)
# sb.add_server(s.serverid, s)

# check to see if we're supposed to use the introducer too
if self.get_config("client-server-selection", "use_introducer",
default=True, boolean=True):
sb.use_introducer(self.introducer_client)
# utilize the loaded static server specifications
servers = self.connections_config['servers']
for server_key in servers.keys():
eventually(self.storage_broker.got_static_announcement, server_key, servers[server_id]['announcement'])

for ic in self.introducer_clients:
sb.use_introducer(ic)

def get_storage_broker(self):
return self.storage_broker
Expand Down Expand Up @@ -508,9 +542,18 @@ def _check_exit_trigger(self, exit_trigger_file):
def get_encoding_parameters(self):
return self.encoding_params

# In case we configure multiple introducers
def introducer_connection_statuses(self):
status = []
if self.introducer_clients:
for ic in self.introducer_clients:
s = ic.connected_to_introducer()
status.append(s)
return status

def connected_to_introducer(self):
if self.introducer_client:
return self.introducer_client.connected_to_introducer()
if len(self.introducer_clients) > 0:
return True
return False

def get_renewal_secret(self): # this will go away
Expand Down
25 changes: 21 additions & 4 deletions src/allmydata/introducer/client.py
Expand Up @@ -3,6 +3,8 @@
from zope.interface import implements
from twisted.application import service
from foolscap.api import Referenceable, eventually
from foolscap.api import Tub

from allmydata.interfaces import InsufficientVersionError
from allmydata.introducer.interfaces import IIntroducerClient, \
RIIntroducerSubscriberClient_v2
Expand All @@ -17,13 +19,16 @@ class InvalidCacheError(Exception):

V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"

class IntroducerClient(service.Service, Referenceable):
class IntroducerClient(service.MultiService, Referenceable):
implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)

def __init__(self, tub, introducer_furl,
def __init__(self, introducer_furl,
nickname, my_version, oldest_supported,
app_versions, sequencer, cache_filepath):
self._tub = tub
service.MultiService.__init__(self)

self._tub = Tub()
self._tub.setServiceParent(self)
self.introducer_furl = introducer_furl

assert type(nickname) is unicode
Expand All @@ -46,6 +51,7 @@ def __init__(self, tub, introducer_furl,
self._canary = Referenceable()

self._publisher = None
self._since = None

self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
Expand Down Expand Up @@ -77,7 +83,7 @@ def _debug_retired(self, res):
return res

def startService(self):
service.Service.startService(self)
service.MultiService.startService(self)
self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
Expand Down Expand Up @@ -144,13 +150,15 @@ def _got_versioned_introducer(self, publisher):
if V2 not in publisher.version:
raise InsufficientVersionError("V2", publisher.version)
self._publisher = publisher
self._since = int(time.time())
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
self._maybe_subscribe()

def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
self._publisher = None
self._since = int(time.time())
self._subscriptions.clear()

def log(self, *args, **kwargs):
Expand Down Expand Up @@ -325,3 +333,12 @@ def _deliver_announcements(self, key_s, ann):

def connected_to_introducer(self):
return bool(self._publisher)

def get_since(self):
return self._since

def get_last_received_data_time(self):
if self._publisher is None:
return None
else:
return self._publisher.getDataLastReceivedAt()
11 changes: 11 additions & 0 deletions src/allmydata/storage_client.py
Expand Up @@ -139,6 +139,17 @@ def _got_connection(self):
# this is called by NativeStorageClient when it is connected
self._server_listeners.notify()

def got_static_announcement(self, key_s, ann):
print "got static announcement"
if key_s is not None:
precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
s = NativeStorageServer(key_s, ann) # XXX tub_options=...
server_id = s.get_serverid()
self.servers[server_id] = s
s.start_connecting(self._trigger_connections)

def _got_announcement(self, key_s, ann):
if key_s is not None:
precondition(isinstance(key_s, str), key_s)
Expand Down

0 comments on commit f298831

Please sign in to comment.