diff --git a/telepresence/connect/ssh.py b/telepresence/connect/ssh.py index d92906ca31..2601953c7c 100644 --- a/telepresence/connect/ssh.py +++ b/telepresence/connect/ssh.py @@ -13,7 +13,6 @@ # limitations under the License. from subprocess import CalledProcessError -from time import time, sleep from typing import List from telepresence.runner import Runner @@ -71,12 +70,10 @@ def bg_command(self, additional_args: List[str]) -> List[str]: def wait(self) -> None: """Return when SSH server can be reached.""" - start = time() - while time() - start < 30: + for _ in self.runner.loop_until(30, 0.25): try: self.runner.check_call(self.command(["/bin/true"])) - except CalledProcessError: - sleep(0.25) - else: return + except CalledProcessError: + pass raise RuntimeError("SSH isn't starting.") diff --git a/telepresence/main.py b/telepresence/main.py index c63a289b0f..3316c1b77d 100644 --- a/telepresence/main.py +++ b/telepresence/main.py @@ -20,7 +20,7 @@ from telepresence import connect, mount, outbound, proxy, remote_env from telepresence.runner import wait_for_exit, Runner from telepresence.cli import parse_args, crash_reporting -from telepresence.output import Output +from telepresence.runner.output import Output from telepresence.startup import KubeInfo, final_checks from telepresence.usage_tracking import call_scout diff --git a/telepresence/outbound/container.py b/telepresence/outbound/container.py index 5e353f2f0a..1ab5f4dc95 100644 --- a/telepresence/outbound/container.py +++ b/telepresence/outbound/container.py @@ -15,7 +15,6 @@ import argparse import json from subprocess import CalledProcessError, Popen -from time import sleep from typing import List, Callable, Dict, Tuple, Optional import os @@ -117,7 +116,8 @@ def run_docker_command( ) # Wait for sshuttle to be running: - while True: + sshuttle_ok = False + for _ in runner.loop_until(120, 1): try: runner.check_call( docker_runify([ @@ -128,11 +128,11 @@ def run_docker_command( except CalledProcessError as e: if e.returncode == 100: # We're good! + sshuttle_ok = True break elif e.returncode == 125: # Docker failure, probably due to original container not - # starting yet... so sleep and try again: - sleep(1) + # starting yet... so try again: continue else: raise @@ -140,6 +140,11 @@ def run_docker_command( raise RuntimeError( "Waiting container exited prematurely. File a bug, please!" ) + if not sshuttle_ok: + # This used to loop forever. Now we time out after two minutes. + raise RuntimeError( + "Waiting for network container timed out. File a bug, please!" + ) # Start the container specified by the user: container_name = random_name() diff --git a/telepresence/outbound/local.py b/telepresence/outbound/local.py index 060c1b4aff..1edc0dadea 100644 --- a/telepresence/outbound/local.py +++ b/telepresence/outbound/local.py @@ -15,7 +15,6 @@ import os import sys from subprocess import CalledProcessError, Popen -from time import time, sleep from typing import Dict, List from telepresence.outbound.workarounds import apply_workarounds @@ -60,16 +59,16 @@ def set_up_torsocks(runner: Runner, socks_port: int) -> Dict[str, str]: ] launch_env = os.environ.copy() launch_env.update(torsocks_env) - start = time() - while time() - start < 10: - try: - runner.check_call(test_proxying_cmd, env=launch_env) - span.end() - return torsocks_env - except CalledProcessError: - sleep(0.1) - span.end() - raise RuntimeError("SOCKS network proxying failed to start...") + try: + for _ in runner.loop_until(10, 0.1): + try: + runner.check_call(test_proxying_cmd, env=launch_env) + return torsocks_env + except CalledProcessError: + pass + raise RuntimeError("SOCKS network proxying failed to start...") + finally: + span.end() def terminate_local_process(runner, process): diff --git a/telepresence/outbound/vpn.py b/telepresence/outbound/vpn.py index 17465d9b0b..105de0f6a9 100644 --- a/telepresence/outbound/vpn.py +++ b/telepresence/outbound/vpn.py @@ -15,7 +15,6 @@ import ipaddress import json from subprocess import CalledProcessError -from time import time, sleep from typing import List from telepresence.connect.ssh import SSH @@ -275,18 +274,16 @@ def get_hellotelepresence(counter=iter(range(10000))): ]) subspan = runner.span("sshuttle-wait") - start = time() probes = 0 - while time() - start < 20: + for _ in runner.loop_until(20, 0.1): probes += 1 try: get_hellotelepresence() if probes >= REQUIRED_HELLOTELEPRESENCE_DNS_PROBES: break except CalledProcessError: - sleep(0.1) + pass - sleep(1) # just in case there's more to startup get_hellotelepresence() subspan.end() span.end() diff --git a/telepresence/proxy/remote.py b/telepresence/proxy/remote.py index 4cd683a5b2..656e050ebd 100644 --- a/telepresence/proxy/remote.py +++ b/telepresence/proxy/remote.py @@ -14,7 +14,6 @@ import json from subprocess import STDOUT, CalledProcessError -from time import time, sleep from typing import Optional, Dict from telepresence import image_version @@ -110,8 +109,7 @@ def get_deployment_json( def wait_for_pod(runner: Runner, remote_info: RemoteInfo) -> None: """Wait for the pod to start running.""" span = runner.span() - start = time() - while time() - start < 120: + for _ in runner.loop_until(120, 0.25): try: pod = json.loads( runner.get_output( @@ -121,7 +119,6 @@ def wait_for_pod(runner: Runner, remote_info: RemoteInfo) -> None: ) ) except CalledProcessError: - sleep(0.25) continue if pod["status"]["phase"] == "Running": for container in pod["status"]["containerStatuses"]: @@ -130,7 +127,6 @@ def wait_for_pod(runner: Runner, remote_info: RemoteInfo) -> None: ): span.end() return - sleep(0.25) span.end() raise RuntimeError( "Pod isn't starting or can't be found: {}".format(pod["status"]) @@ -165,8 +161,7 @@ def get_remote_info( if run_id: cmd.append("--selector=telepresence={}".format(run_id)) - start = time() - while time() - start < 120: + for _ in runner.loop_until(120, 1): pods = json.loads(runner.get_output(runner.kubectl(cmd)))["items"] for pod in pods: name = pod["metadata"]["name"] @@ -207,7 +202,6 @@ def get_remote_info( return remote_info # Didn't find pod... - sleep(1) span.end() raise RuntimeError( diff --git a/telepresence/remote_env.py b/telepresence/remote_env.py index 031fa1357f..40f981f9c0 100644 --- a/telepresence/remote_env.py +++ b/telepresence/remote_env.py @@ -14,7 +14,6 @@ from json import loads, dump from subprocess import CalledProcessError -from time import time, sleep from typing import Dict, Tuple, List from telepresence.proxy.remote import RemoteInfo @@ -64,18 +63,16 @@ def get_remote_env(runner: Runner, remote_info: RemoteInfo) -> Dict[str, str]: Get the environment variables we want to copy from the remote pod """ span = runner.span() - # It may take a few seconds for the SSH proxies to get going: - start = time() - while time() - start < 10: - try: - env = get_env_variables(runner, remote_info) - break - except CalledProcessError: - sleep(0.25) - else: + try: + # It may take a few seconds for the SSH proxies to get going: + for _ in runner.loop_until(10, 0.25): + try: + return get_env_variables(runner, remote_info) + except CalledProcessError: + pass raise runner.fail("Error: Failed to get environment variables") - span.end() - return env + finally: + span.end() def _serialize_as_env_file(env: Dict[str, str]) -> Tuple[str, List[str]]: diff --git a/telepresence/runner/__init__.py b/telepresence/runner/__init__.py index 530e9ea34e..d5b7783f20 100644 --- a/telepresence/runner/__init__.py +++ b/telepresence/runner/__init__.py @@ -30,7 +30,7 @@ Background, BackgroundThread, BackgroundProcess, TrackedBG ) from telepresence.runner.cache import Cache -from telepresence.output import Output +from telepresence.runner.output import Output from telepresence.runner.span import Span from telepresence.utilities import str_command @@ -242,6 +242,39 @@ def require(self, commands: typing.Iterable[str], message: str) -> None: "for more information." ) + # Time + + def time(self) -> float: + """ + Return the time in seconds since the epoch. + """ + return time() + + def sleep(self, seconds: float) -> None: + """ + Suspend execution for the given number of seconds. + """ + sleep(seconds) + + def loop_until(self, loop_seconds: float, + sleep_seconds: float) -> typing.Iterable[int]: + """ + Yield a loop counter during the loop time, then end. Sleep the + specified amount between loops. Always run at least once. + + :param loop_seconds: How long the loop should run + :param sleep_seconds: How long to sleep between loops + :return: yields the loop counter, 0 onward + """ + end_time = self.time() + loop_seconds - sleep_seconds + counter = 0 + while True: + yield counter + counter += 1 + if self.time() >= end_time: + break + self.sleep(sleep_seconds) + # Subprocesses def _make_logger(self, track, capture=None): diff --git a/telepresence/output.py b/telepresence/runner/output.py similarity index 100% rename from telepresence/output.py rename to telepresence/runner/output.py diff --git a/tests/test_unit.py b/tests/test_unit.py index e96912a71c..a07700df34 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -25,7 +25,7 @@ import telepresence.cli import telepresence.outbound.container import telepresence.proxy.deployment -import telepresence.output +import telepresence.runner.output import telepresence.outbound.vpn import telepresence.main @@ -242,15 +242,15 @@ def test_covering_cidr(ips): def test_output_file(): """Test some reasonable values for the log file""" # stdout - lf_dash = telepresence.output.Output("-") + lf_dash = telepresence.runner.output.Output("-") assert lf_dash.logfile is sys.stdout, lf_dash.logfile # /dev/null -- just make sure we don't crash - telepresence.output.Output("/dev/null") + telepresence.runner.output.Output("/dev/null") # Regular file -- make sure the file has been truncated o_content = "original content\n" with tempfile.NamedTemporaryFile(mode="w", delete=False) as out: out.write(o_content + "This should be truncated away.\nThis too.\n") - lf_file = telepresence.output.Output(out.name) + lf_file = telepresence.runner.output.Output(out.name) n_content = "replacement content\n" lf_file.write(n_content) with open(out.name) as in_again: