Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
emr_traffic: Don't check existing cluster's bootstrap actions and steps
Browse files Browse the repository at this point in the history
This was generally overkill and we should just assume that any cluster with
the correct name is compatible. Doing the extra checks was running into
AWS ratelimiting.
  • Loading branch information
bsimpson63 committed Jul 11, 2016
1 parent ae2f0af commit a61823a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 75 deletions.
70 changes: 2 additions & 68 deletions r2/r2/lib/emr_helpers.py
Expand Up @@ -32,75 +32,9 @@
NOTFOUND = 'NOTFOUND'


def get_compatible_clusters(emr_connection, bootstrap_actions=None, steps=None):
"""Return clusters that have specified bootstrap actions and steps.
Assumes there are no conflicts with bootstrap actions or setup steps:
a cluster is compatible if it contains at least the requested
bootstrap_actions and steps (may contain additional).
The objects returned by list_bootstrap_actions and list_steps differ from
the ones generated locally by TrafficBase, so careful comparison is
required.
"""

bootstrap_actions = bootstrap_actions or []
steps = steps or []

def get_live_clusters(emr_connection):
ret = emr_connection.list_clusters(cluster_states=LIVE_STATES)
clusters = ret.clusters
if not clusters:
return []

required_actions = {
(action.name, action.path, tuple(sorted(action.args())))
for action in bootstrap_actions
}
required_steps = {
(step.name, step.jar(), tuple(sorted(step.args())))
for step in steps
}

if not required_actions and not required_steps:
return clusters

compatible_clusters = []
for cluster in clusters:
ret = emr_connection.list_bootstrap_actions(cluster.id)
bootstrap_actions = []
bootstrap_actions.extend(ret.actions)
while hasattr(ret, "marker"):
ret = emr_connection.list_bootstrap_actions(cluster.id, marker=ret.marker)
bootstrap_actions.extend(ret.actions)

cluster_actions = {
(action.name, action.scriptpath, tuple(sorted(arg.value for arg in action.args)))
for action in bootstrap_actions
}

missing_actions = required_actions.difference(cluster_actions)
if missing_actions:
continue

ret = emr_connection.list_steps(cluster.id)
steps = []
steps.extend(ret.steps)
while hasattr(ret, "marker"):
ret = emr_connection.list_steps(cluster.id, marker=ret.marker)
steps.extend(ret.steps)

cluster_steps = {
(step.name, step.config.jar, tuple(sorted(arg.value for arg in step.config.args)))
for step in steps
}

missing_steps = required_steps.difference(cluster_steps)
if missing_steps:
continue

compatible_clusters.append(cluster)
return compatible_clusters
return ret.clusters or []


@memoize('get_step_states', time=60, timeout=60)
Expand Down
14 changes: 7 additions & 7 deletions r2/r2/lib/traffic/emr_traffic.py
Expand Up @@ -31,7 +31,7 @@
from r2.lib.emr_helpers import (
EmrException,
EmrJob,
get_compatible_clusters,
get_live_clusters,
get_step_state,
LIVE_STATES,
COMPLETED,
Expand Down Expand Up @@ -124,19 +124,19 @@ def __init__(self, input_path, output_path):
def _add_step(emr_connection, step, jobflow_name, **jobflow_kw):
"""Add step to a running jobflow.
Append the step onto a compatible jobflow with the specified name if one
exists, otherwise create a new jobflow and run it. Returns the jobflowid.
Append the step onto a jobflow with the specified name if one exists,
otherwise create a new jobflow and run it. Returns the jobflowid.
NOTE: jobflow_kw will be used to configure the jobflow ONLY if a new
jobflow is created.
"""

running = get_compatible_clusters(emr_connection,
bootstrap_actions=TrafficBase._bootstrap_actions(),
steps=TrafficBase._setup_steps(),
)
running = get_live_clusters(emr_connection)

for cluster in running:
# NOTE: the existing cluster's bootstrap actions aren't checked so we
# are assuming that any cluster with the correct name is compatible
# with our new step
if cluster.name == jobflow_name:
jobflowid = cluster.id
emr_connection.add_jobflow_steps(jobflowid, step)
Expand Down

0 comments on commit a61823a

Please sign in to comment.