In [1]:
import boto3

In [2]:
emr = boto3.client('emr')

In [11]:
JOB_FLOW = {
    "Name": "indextracker",
    "ReleaseLabel": "emr-6.5.0",
    "LogUri": "s3://indextracker/emr/logs/",
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2
            }
        ],
        "TerminationProtected": False,
        "KeepJobFlowAliveWhenNoSteps": False,
    },
    
    "ServiceRole": "EMR_DefaultRole",
    "JobFlowRole": "EMR_EC2_DefaultRole",

    "Applications": [{
        "Name": "Spark"
    }],
}

STEPS = [
   {
       'Name': 'setup - copy files',
       'ActionOnFailure': 'CANCEL_AND_WAIT',
       'HadoopJarStep': {
           'Jar': 'command-runner.jar',
           'Args': ['aws', 's3', 'cp', '--recursive', "s3://indextracker/emr/scripts/tw/", '/home/hadoop/']
       }
   }
]


def get_steps(subject, ds):
    return [
        {
            'Name': f'Run Spark {subject} raw {ds}',
            'ActionOnFailure': 'CANCEL_AND_WAIT',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit', f'/home/hadoop/{subject}_raw.py', ds]
            }
        },
        {
            'Name': f'Run Spark {subject} agg {ds}',
            'ActionOnFailure': 'CANCEL_AND_WAIT',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit', f'/home/hadoop/{subject}_agg.py', ds]
            }
        }
    ]

In [4]:
emr.run_job_flow(**JOB_FLOW)

{'JobFlowId': 'j-3BW5I1PI9KM50',
 'ClusterArn': 'arn:aws:elasticmapreduce:us-east-1:633008848065:cluster/j-3BW5I1PI9KM50',
 'ResponseMetadata': {'RequestId': '787ff4a0-7f09-46e4-a1bf-0d0ad11f7b49',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '787ff4a0-7f09-46e4-a1bf-0d0ad11f7b49',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '118',
   'date': 'Sat, 05 Mar 2022 07:28:44 GMT'},
  'RetryAttempts': 0}}

In [5]:
lc_dict = emr.list_clusters()
job_flow_id = next(x for x in lc_dict['Clusters'] if (x['Name'] == 'indextracker') and (x['Status'] != 'TERMINATED'))['Id']

In [12]:
emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=STEPS)

{'StepIds': ['s-1PVNX48RHJV95'],
 'ResponseMetadata': {'RequestId': 'fa9c79c1-f03d-4e7b-acb4-f38a6abb45af',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fa9c79c1-f03d-4e7b-acb4-f38a6abb45af',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '31',
   'date': 'Sat, 05 Mar 2022 07:40:14 GMT'},
  'RetryAttempts': 0}}

In [13]:
for subject in ['futures', 'options', 'index', 'etf']:
    emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=get_steps(subject, '2021_12_23'))

{ "region_name" : "us-east-1" }

In [2]:
import json

steps = """
{
    "Name": "indextracker",
    "ReleaseLabel": "emr-6.5.0",
    "LogUri": "s3://indextracker/emr/logs/",
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1
            },
            {
                "Name": "Slave nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2
            }
        ],
        "TerminationProtected": false,
        "KeepJobFlowAliveWhenNoSteps": false
    },
    
    "ServiceRole": "EMR_DefaultRole",
    "JobFlowRole": "EMR_EC2_DefaultRole",

    "Applications": [
        {
            "Name": "Spark"
        }
    ]
}
"""

json.loads(steps)

{'Name': 'indextracker',
 'ReleaseLabel': 'emr-6.5.0',
 'LogUri': 's3://indextracker/emr/logs/',
 'Instances': {'InstanceGroups': [{'Name': 'Master nodes',
    'Market': 'ON_DEMAND',
    'InstanceRole': 'MASTER',
    'InstanceType': 'm5.xlarge',
    'InstanceCount': 1},
   {'Name': 'Slave nodes',
    'Market': 'ON_DEMAND',
    'InstanceRole': 'CORE',
    'InstanceType': 'm5.xlarge',
    'InstanceCount': 2}],
  'TerminationProtected': False,
  'KeepJobFlowAliveWhenNoSteps': False},
 'ServiceRole': 'EMR_DefaultRole',
 'JobFlowRole': 'EMR_EC2_DefaultRole',
 'Applications': [{'Name': 'Spark'}]}