diff --git a/r2/r2/lib/emr_helpers.py b/r2/r2/lib/emr_helpers.py index e5f9ec8491..7d6a408a86 100644 --- a/r2/r2/lib/emr_helpers.py +++ b/r2/r2/lib/emr_helpers.py @@ -32,75 +32,9 @@ NOTFOUND = 'NOTFOUND' -def get_compatible_clusters(emr_connection, bootstrap_actions=None, steps=None): - """Return clusters that have specified bootstrap actions and steps. - - Assumes there are no conflicts with bootstrap actions or setup steps: - a cluster is compatible if it contains at least the requested - bootstrap_actions and steps (may contain additional). - - The objects returned by list_bootstrap_actions and list_steps differ from - the ones generated locally by TrafficBase, so careful comparison is - required. - - """ - - bootstrap_actions = bootstrap_actions or [] - steps = steps or [] - +def get_live_clusters(emr_connection): ret = emr_connection.list_clusters(cluster_states=LIVE_STATES) - clusters = ret.clusters - if not clusters: - return [] - - required_actions = { - (action.name, action.path, tuple(sorted(action.args()))) - for action in bootstrap_actions - } - required_steps = { - (step.name, step.jar(), tuple(sorted(step.args()))) - for step in steps - } - - if not required_actions and not required_steps: - return clusters - - compatible_clusters = [] - for cluster in clusters: - ret = emr_connection.list_bootstrap_actions(cluster.id) - bootstrap_actions = [] - bootstrap_actions.extend(ret.actions) - while hasattr(ret, "marker"): - ret = emr_connection.list_bootstrap_actions(cluster.id, marker=ret.marker) - bootstrap_actions.extend(ret.actions) - - cluster_actions = { - (action.name, action.scriptpath, tuple(sorted(arg.value for arg in action.args))) - for action in bootstrap_actions - } - - missing_actions = required_actions.difference(cluster_actions) - if missing_actions: - continue - - ret = emr_connection.list_steps(cluster.id) - steps = [] - steps.extend(ret.steps) - while hasattr(ret, "marker"): - ret = emr_connection.list_steps(cluster.id, marker=ret.marker) - steps.extend(ret.steps) - - cluster_steps = { - (step.name, step.config.jar, tuple(sorted(arg.value for arg in step.config.args))) - for step in steps - } - - missing_steps = required_steps.difference(cluster_steps) - if missing_steps: - continue - - compatible_clusters.append(cluster) - return compatible_clusters + return ret.clusters or [] @memoize('get_step_states', time=60, timeout=60) diff --git a/r2/r2/lib/traffic/emr_traffic.py b/r2/r2/lib/traffic/emr_traffic.py index c4322ab1f0..69be314ec0 100644 --- a/r2/r2/lib/traffic/emr_traffic.py +++ b/r2/r2/lib/traffic/emr_traffic.py @@ -31,7 +31,7 @@ from r2.lib.emr_helpers import ( EmrException, EmrJob, - get_compatible_clusters, + get_live_clusters, get_step_state, LIVE_STATES, COMPLETED, @@ -124,19 +124,19 @@ def __init__(self, input_path, output_path): def _add_step(emr_connection, step, jobflow_name, **jobflow_kw): """Add step to a running jobflow. - Append the step onto a compatible jobflow with the specified name if one - exists, otherwise create a new jobflow and run it. Returns the jobflowid. + Append the step onto a jobflow with the specified name if one exists, + otherwise create a new jobflow and run it. Returns the jobflowid. NOTE: jobflow_kw will be used to configure the jobflow ONLY if a new jobflow is created. """ - running = get_compatible_clusters(emr_connection, - bootstrap_actions=TrafficBase._bootstrap_actions(), - steps=TrafficBase._setup_steps(), - ) + running = get_live_clusters(emr_connection) for cluster in running: + # NOTE: the existing cluster's bootstrap actions aren't checked so we + # are assuming that any cluster with the correct name is compatible + # with our new step if cluster.name == jobflow_name: jobflowid = cluster.id emr_connection.add_jobflow_steps(jobflowid, step)