In [1]:
import boto3 as b3
import logging
from botocore.exceptions import ClientError

In [2]:
logger=logging.getLogger(__name__)
emr_client=b3.client("emr")

In [None]:
def list_clstrs(cluster_id,emr_client):
    resp=emr_client.list_clusters(ClusterStates = ['TERMINATED'])
    return resp

In [None]:
def define_cluster(cluster_id,emr_client):
    try:
        resp=emr_client.describe_cluster(cluster_id=cluster_id)
        cluster=resp['Cluster']
        logger.info("Got data for cluster %s.",cluster_id)
    except ClientError as ce:
        logger.info("Couldn't retreive information for cluster %s.",cluster_id)
        raise
    else:
        return cluster

In [4]:
def add_step(cluster_id, name, script_uri, emr_client):
    """
    Adds a job step for copying a file from s3 onto the master node(root directory) of the cluster.

    :param cluster_id: The ID of the cluster.
    :param name: The name of the step.
    :param script_uri: The URI where the Python script is stored.
    :param script_args: Arguments to pass to the Python script.
    :param emr_client: The Boto3 EMR client object.
    :return: The ID of the newly added step.
    """
    try:
        response = emr_client.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[{
                'Name': name,
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ["bash","-c","aws s3 cp " + str(script_uri) + " /home/hadoop;chmod u+x /home/hadoop/hello_bash.sh;cd /home/hadoop;sudo mv hello_bash.sh /hello_bash.sh;cd /;sh hello_bash.sh"]
                }
            }])
        step_id = response['StepIds'][0]
        logger.info("Started step with ID %s", step_id)
    except ClientError:
        logger.exception("Couldn't start step %s with URI %s.", name, script_uri)
        raise
    else:
        return step_id

add_step('j-J9564QVPLMDR','sample_shell_script_03','s3://command-scripts-01/hello_bash.sh',emr_client)
#;chmod u+x /home/hadoop/hello_bash.sh
#cd /home/hadoop; sh hello_bash.sh"]

's-03725611QO4YLXY8A95V'

In [3]:
def add_step(cluster_id, name, script_uri, emr_client):
    """
    Adds a job step for copying a file from s3 onto the master node(root directory) of the cluster.

    :param cluster_id: The ID of the cluster.
    :param name: The name of the step.
    :param script_uri: The URI where the Python script is stored.
    :param script_args: Arguments to pass to the Python script.
    :param emr_client: The Boto3 EMR client object.
    :return: The ID of the newly added step.
    """
    try:
        response = emr_client.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[{
                'Name': name,
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ["bash","-c","aws s3 cp " + str(script_uri) + " /home/hadoop;chmod u+x /home/hadoop/hadoop_mv_cmd.sh;cd /home/hadoop;sudo mv hadoop_mv_cmd.sh /hadoop_mv_cmd.sh;cd /;sh hadoop_mv_cmd.sh 'mar'"]
                }
            }])
        step_id = response['StepIds'][0]
        logger.info("Started step with ID %s", step_id)
    except ClientError:
        logger.exception("Couldn't start step %s with URI %s.", name, script_uri)
        raise
    else:
        return step_id

add_step('j-1NIYIYJCQ8JDI','sqoop_hadoop_mv_cmd_revised','s3://command-scripts-01/hadoop_mv_cmd.sh',emr_client)
#;chmod u+x /home/hadoop/hello_bash.sh
#cd /home/hadoop; sh hello_bash.sh"]

's-1038974H850L4F704V3'

In [4]:
def add_step(cluster_id, name, script_uri, emr_client):
    """
    Adds a job step for copying a file from s3 onto the master node(root directory) of the cluster.

    :param cluster_id: The ID of the cluster.
    :param name: The name of the step.
    :param script_uri: The URI where the Python script is stored.
    :param script_args: Arguments to pass to the Python script.
    :param emr_client: The Boto3 EMR client object.
    :return: The ID of the newly added step.
    """
    try:
        response = emr_client.add_job_flow_steps(
            JobFlowId=cluster_id,
            Steps=[{
                'Name': name,
                'ActionOnFailure': 'CONTINUE',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ["bash","-c","aws s3 cp " + str(script_uri) + " /home/hadoop;chmod u+x /home/hadoop/mysql_driver_bootstrap_action.sh;cd /home/hadoop;sudo mv mysql_driver_bootstrap_action.sh /mysql_driver_bootstrap_action.sh;cd /;sh mysql_driver_bootstrap_action.sh"]
                }
            }])
        step_id = response['StepIds'][0]
        logger.info("Started step with ID %s", step_id)
    except ClientError:
        logger.exception("Couldn't start step %s with URI %s.", name, script_uri)
        raise
    else:
        return step_id

add_step('j-25PY7D3QAM26S','bootstrap_action_01','s3://command-scripts-01/bootstrap_actions/mysql_driver_bootstrap_action.sh',emr_client)
#;chmod u+x /home/hadoop/hello_bash.sh
#cd /home/hadoop; sh hello_bash.sh"]

's-034151738G3YQULALU93'