Skip to content

Commit

Permalink
Cleanup localcluster (dask#2693)
Browse files Browse the repository at this point in the history
* Remove address handling (handled in scheduler)
* Move ip= keyword to host=
  • Loading branch information
mrocklin committed May 14, 2019
1 parent bb80c5e commit fd31ecc
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 38 deletions.
57 changes: 24 additions & 33 deletions distributed/deploy/local.py
Expand Up @@ -15,7 +15,6 @@
from ..compatibility import get_thread_identity
from ..core import CommClosedError
from ..utils import (
get_ip_interface,
sync,
ignoring,
All,
Expand Down Expand Up @@ -51,8 +50,10 @@ class LocalCluster(Cluster):
silence_logs: logging level
Level of logs to print out to stdout. ``logging.WARN`` by default.
Use a falsey value like False or None for no change.
host: string
Host address on which the scheduler will listen, defaults to only localhost
ip: string
IP address on which the scheduler will listen, defaults to only localhost
Deprecated. See ``host`` above.
dashboard_address: str
Address on which to listen for the Bokeh diagnostics server like
'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'.
Expand Down Expand Up @@ -88,13 +89,9 @@ class LocalCluster(Cluster):
>>> c = Client(cluster) # connect to local cluster # doctest: +SKIP
Add a new worker to the cluster
Scale the cluster to three workers
>>> w = cluster.start_worker(ncores=2) # doctest: +SKIP
Shut down the extra worker
>>> cluster.stop_worker(w) # doctest: +SKIP
>>> cluster.scale(3) # doctest: +SKIP
Pass extra keyword arguments to Bokeh
Expand All @@ -109,6 +106,7 @@ def __init__(
loop=None,
start=None,
ip=None,
host=None,
scheduler_port=0,
silence_logs=logging.WARN,
dashboard_address=":8787",
Expand All @@ -125,6 +123,10 @@ def __init__(
worker_class=None,
**worker_kwargs
):
if ip is not None:
warnings.warn("The ip keyword has been moved to host")
host = ip

if start is not None:
msg = (
"The start= parameter is deprecated. "
Expand All @@ -145,8 +147,8 @@ def __init__(
self.processes = processes

if protocol is None:
if ip and "://" in ip:
protocol = ip.split("://")[0]
if host and "://" in host:
protocol = host.split("://")[0]
elif security:
protocol = "tls://"
elif not self.processes and not scheduler_port:
Expand All @@ -155,12 +157,12 @@ def __init__(
protocol = "tcp://"
if not protocol.endswith("://"):
protocol = protocol + "://"
self.protocol = protocol

if host is None and not protocol.startswith("inproc") and not interface:
host = "127.0.0.1"

self.silence_logs = silence_logs
self._asynchronous = asynchronous
self.security = security
self.interface = interface
services = services or {}
worker_services = worker_services or {}
if silence_logs:
Expand All @@ -184,6 +186,8 @@ def __init__(
"ncores": threads_per_worker,
"services": worker_services,
"dashboard_address": worker_dashboard_address,
"interface": interface,
"protocol": protocol,
}
)

Expand All @@ -192,14 +196,16 @@ def __init__(

self.scheduler = Scheduler(
loop=self.loop,
host=host,
services=services,
service_kwargs=service_kwargs,
security=security,
port=scheduler_port,
interface=interface,
protocol=protocol,
dashboard_address=dashboard_address,
blocked_handlers=blocked_handlers,
)
self.scheduler_port = scheduler_port

self.workers = []
self.worker_kwargs = worker_kwargs
Expand All @@ -210,7 +216,7 @@ def __init__(
worker_class = Worker if not processes else Nanny
self.worker_class = worker_class

self.start(ip=ip, n_workers=n_workers)
self.start(n_workers=n_workers)

clusters_to_close.add(self)

Expand Down Expand Up @@ -251,32 +257,17 @@ def start(self, **kwargs):
self.sync(self._start, **kwargs)

@gen.coroutine
def _start(self, ip=None, n_workers=0):
def _start(self, n_workers=0):
"""
Start all cluster services.
"""
if self.status == "running":
return

if self.protocol == "inproc://":
address = self.protocol
else:
if ip is None:
if self.interface:
ip = get_ip_interface(self.interface)
else:
ip = "127.0.0.1"

if "://" in ip:
address = ip
else:
address = self.protocol + ip
if self.scheduler_port:
address += ":" + str(self.scheduler_port)

self.scheduler.start(address)
self.scheduler.start()

yield [self._start_worker(**self.worker_kwargs) for i in range(n_workers)]
yield self.scheduler

self.status = "running"

Expand Down
14 changes: 9 additions & 5 deletions distributed/deploy/tests/test_local.py
Expand Up @@ -128,7 +128,7 @@ def test_move_unserializable_data():
assert y.result() is lock


def test_transports():
def test_transports_inproc():
"""
Test the transport chosen by LocalCluster depending on arguments.
"""
Expand All @@ -140,6 +140,8 @@ def test_transports():
with Client(c.scheduler.address) as e:
assert e.submit(inc, 4).result() == 5


def test_transports_tcp():
# Have nannies => need TCP
with LocalCluster(
1, processes=True, silence_logs=False, dashboard_address=None
Expand All @@ -149,6 +151,8 @@ def test_transports():
with Client(c.scheduler.address) as e:
assert e.submit(inc, 4).result() == 5


def test_transports_tcp_port():
# Scheduler port specified => need TCP
with LocalCluster(
1,
Expand Down Expand Up @@ -417,7 +421,7 @@ def test_remote_access(loop):
scheduler_port=0,
silence_logs=False,
dashboard_address=None,
ip="",
host="",
loop=loop,
) as c:
sync(loop, assert_can_connect_from_everywhere_4_6, c.scheduler.port)
Expand Down Expand Up @@ -620,7 +624,7 @@ def test_local_tls(loop):
silence_logs=False,
security=security,
dashboard_address=False,
ip="tls://0.0.0.0",
host="tls://0.0.0.0",
loop=loop,
) as c:
sync(
Expand Down Expand Up @@ -690,7 +694,7 @@ def test_local_tls_restart(loop):
silence_logs=False,
security=security,
dashboard_address=False,
ip="tls://0.0.0.0",
host="tls://0.0.0.0",
loop=loop,
) as c:
with Client(c.scheduler.address, loop=loop, security=security) as client:
Expand Down Expand Up @@ -750,7 +754,7 @@ def test_protocol_tcp(loop):
)
def test_protocol_ip(loop):
with LocalCluster(
ip="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False
host="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False
) as cluster:
assert cluster.scheduler.address.startswith("tcp://127.0.0.2")

Expand Down

0 comments on commit fd31ecc

Please sign in to comment.