Permalink
Browse files

Adds optional waiting to cs tail (#854)

* Adds optional retrying for cs tail

* Renames 'retry' to 'wait'

* Changes flag to --wait

* Bumps version

* Makes test_manage_task_random_binary_output be xfail

* Removes the option to specify how long to wait for

* Removes f from string

* Removes superfluous import

* Introduces CookRetriableException

* Stops waiting after 1 day (and re-raises the last exception)
  • Loading branch information...
dposada authored and sradack committed May 27, 2018
1 parent 01f5e76 commit ca4bcb8b7af0c6c96a302e5beff9d28e0f45a7d8
@@ -0,0 +1,2 @@
class CookRetriableException(Exception):
pass
@@ -3,6 +3,7 @@
from urllib.parse import urlparse, parse_qs

from cook import http
from cook.exceptions import CookRetriableException


def instance_to_agent_url(instance):
@@ -53,7 +54,7 @@ def retrieve_instance_sandbox_directory(instance, job):
directories = [e['directory'] for e in cook_executors if e['id'] == instance_id]

if len(directories) == 0:
raise Exception(f'Unable to retrieve sandbox directory for job instance {instance_id}.')
raise CookRetriableException(f'Unable to retrieve sandbox directory for job instance {instance_id}.')

if len(directories) > 1:
# This should not happen, but we'll be defensive anyway
@@ -74,11 +75,11 @@ def read_file(instance, sandbox_dir, path, offset=None, length=None):

resp = http.__get(f'{agent_url}/files/read', params=params)
if resp.status_code == 404:
raise Exception(f"Cannot open '{path}' for reading (file was not found).")
raise CookRetriableException(f"Cannot open '{path}' for reading (file was not found).")

if resp.status_code != 200:
logging.error(f'mesos agent returned status code {resp.status_code} and body {resp.text}')
raise Exception('Could not read the file.')
raise CookRetriableException('Could not read the file.')

return resp.json()

@@ -9,6 +9,7 @@
from urllib.parse import urlparse, parse_qs

from cook import http, colors, mesos, progress
from cook.exceptions import CookRetriableException
from cook.util import is_valid_uuid, wait_until, print_info, distinct, partition


@@ -234,25 +235,40 @@ def __get_latest_instance(job):
instance = max(instances, key=itemgetter('start_time'))
return instance

raise Exception(f'Job {job["uuid"]} currently has no instances.')
raise CookRetriableException(f'Job {job["uuid"]} currently has no instances.')


def query_unique_and_run(clusters, entity_ref, command_fn):
def query_unique_and_run(clusters, entity_ref, command_fn, wait=False):
"""Calls query_unique and then calls the given command_fn on the resulting job instance"""
query_result = query_unique(clusters, entity_ref)
if query_result['type'] == Types.JOB:
job = query_result['data']
instance = __get_latest_instance(job)
directory = mesos.retrieve_instance_sandbox_directory(instance, job)
command_fn(instance, directory)
elif query_result['type'] == Types.INSTANCE:
instance, job = query_result['data']
directory = mesos.retrieve_instance_sandbox_directory(instance, job)
command_fn(instance, directory)

def query_unique_and_run():
query_result = query_unique(clusters, entity_ref)
if query_result['type'] == Types.JOB:
job = query_result['data']
instance = __get_latest_instance(job)
directory = mesos.retrieve_instance_sandbox_directory(instance, job)
command_fn(instance, directory)
elif query_result['type'] == Types.INSTANCE:
instance, job = query_result['data']
directory = mesos.retrieve_instance_sandbox_directory(instance, job)
command_fn(instance, directory)
else:
# This should not happen, because query_unique should
# only return a map with type "job" or type "instance"
raise Exception(f'Encountered error when querying for {entity_ref}.')

if wait:
# Importing tenacity locally to prevent startup time
# from increasing in the default (i.e. don't wait) case
import tenacity
one_day_in_seconds = 24 * 60 * 60
r = tenacity.Retrying(wait=tenacity.wait_fixed(5),
retry=tenacity.retry_if_exception_type(CookRetriableException),
stop=tenacity.stop_after_delay(one_day_in_seconds),
reraise=True)
r.call(query_unique_and_run)
else:
# This should not happen, because query_unique should
# only return a map with type "job" or type "instance"
raise Exception(f'Encountered error when querying for {entity_ref}.')
query_unique_and_run()


def resource_to_entity_type(resource):
@@ -124,14 +124,15 @@ def tail(clusters, args, _):
lines = args.get('lines')
follow = args.get('follow')
sleep_interval = args.get('sleep-interval')
wait = args.get('wait')

if len(entity_refs) > 1:
# argparse should prevent this, but we'll be defensive anyway
raise Exception(f'You can only provide a single uuid.')

command_fn = partial(tail_for_instance, path=path, num_lines_to_print=lines,
follow=follow, follow_sleep_seconds=sleep_interval)
query_unique_and_run(clusters_of_interest, entity_refs[0], command_fn)
query_unique_and_run(clusters_of_interest, entity_refs[0], command_fn, wait)


def register(add_parser, add_defaults):
@@ -143,6 +144,9 @@ def register(add_parser, add_defaults):
parser.add_argument('--sleep-interval', '-s',
help=f'with -f, sleep for N seconds (default {DEFAULT_FOLLOW_SLEEP_SECS}) between iterations',
metavar='N', type=float)
parser.add_argument('--wait', '-w',
help='wait indefinitely for the job to be running and for the file to become available',
action='store_true')
parser.add_argument('uuid', nargs=1)
parser.add_argument('path', nargs='?')

@@ -1 +1 @@
VERSION = '2.6.0'
VERSION = '2.7.0'
@@ -12,6 +12,7 @@
'pytz',
'requests',
'tabulate',
'tenacity',
'tzlocal',
]

@@ -539,6 +539,10 @@ def assertions(driver, task_id, sandbox_directory):
command = 'sleep 100'
self.run_command_in_manage_task_runner(command, assertions, 2)

# FIXME - remove the xfail mark once the issue with this test crashing is resolved:
# https://github.com/twosigma/Cook/issues/856
@pytest.mark.xfail
@unittest.skip('This test fails occasionally')
def test_manage_task_random_binary_output(self):
def assertions(driver, task_id, sandbox_directory):
expected_statuses = [{'task_id': {'value': task_id}, 'state': cook.TASK_STARTING},
@@ -793,6 +793,22 @@ def test_tail_default_path(self):
self.assertEqual(0, cp.returncode, cp.stderr)
self.assertIn(text, cli.decode(cp.stdout))

def test_tail_wait(self):
# No --wait should fail
cp, uuids = cli.submit(f'bash -c \'sleep 10; echo hello > foo\'', self.cook_url)
self.assertEqual(0, cp.returncode, cp.stderr)
cp = cli.tail(uuids[0], 'foo', self.cook_url)
self.assertEqual(1, cp.returncode, cp.stderr)
# Indefinite wait should work
cp = cli.tail(uuids[0], 'foo', self.cook_url, '--wait')
self.assertEqual(0, cp.returncode, cli.decode(cp.stderr))
self.assertEqual('hello\n', cli.decode(cp.stdout))
# Tailing a file that doesn't exist should fail
path = uuid.uuid4()
cp = cli.tail(uuids[0], path, self.cook_url)
self.assertEqual(1, cp.returncode, cli.decode(cp.stderr))
self.assertEqual(f"Cannot open '{path}' for reading (file was not found).\n", cli.decode(cp.stderr))

def test_ls(self):

def entry(name):

0 comments on commit ca4bcb8

Please sign in to comment.