In [5]:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
import boto3
from datetime import datetime
import sys
sys.path.append('~/github/je_scripts/github_scripts/scratch/local/')
import secret

def initiate_connection():        

    connection = boto3.client('emr',
                              region_name='us-east-1',
                              aws_access_key_id=secret.cph_access_key,
                              aws_secret_access_key=secret.cph_secret_key
                             )
    print('Connection to EMR successful')
            
def initiate_cluster():
    
    print('Initiating cluster...')
    cluster = connection.run_job_flow(Name='JE_Test_EMR',
                                        LogUri='s3n://debug-dwh-liveintent-com/emr-debug-logs/JE_Test_EMR/',
                                        ReleaseLabel='emr-5.20.0',
                                        VisibleToAllUsers=True,
                                        EbsRootVolumeSize=10,
                                        ScaleDownBehavior='TERMINATE_AT_TASK_COMPLETION',
                                        JobFlowRole='EMR_EC2_DefaultRole',
                                        ServiceRole='EMR_DefaultRole',
                                        Tags=[
                                             {
                                                 'Key': 'Name',
                                                 'Value': 'JE-Test',
                                             },
                                             {
                                                 'Key': 'cost-center',
                                                 'Value': 'JE'
                                             },
                                         ],
                                        Applications=[
                                            {
                                                'Name':'Spark',
                                            },
                                            {
                                                'Name':'Hadoop',
                                            },
                                            {
                                                'Name':'Hive',
                                            }
                                        ],
                                        Instances={
                                            'InstanceFleets': [
                                                {
                                                    'Name': 'Master',
                                                    'InstanceFleetType': 'MASTER',
                                                    'TargetOnDemandCapacity': 0,
                                                    'TargetSpotCapacity': 1,
                                                    'InstanceTypeConfigs': [
                                                        {
                                                            'InstanceType': 'm3.xlarge',
                                                            'BidPriceAsPercentageOfOnDemandPrice': 100.0,
                                                        }
                                                    ],
                                                    'LaunchSpecifications': {
                                                        'SpotSpecification': {
                                                            'TimeoutDurationMinutes': 60,
                                                            'TimeoutAction': 'TERMINATE_CLUSTER',
                                                        }
                                                    }
                                                },
                                                {
                                                    'Name': 'Core',
                                                    'InstanceFleetType': 'CORE',
                                                    'TargetOnDemandCapacity': 0,
                                                    'TargetSpotCapacity': 1,
                                                    'InstanceTypeConfigs': [
                                                        {
                                                            'InstanceType': 'i2.xlarge',
                                                            'BidPriceAsPercentageOfOnDemandPrice': 100.0,
                                                        }
                                                    ],
                                                    'LaunchSpecifications': {
                                                        'SpotSpecification': {
                                                            'TimeoutDurationMinutes': 60,
                                                            'TimeoutAction': 'TERMINATE_CLUSTER',
                                                        }
                                                    }
                                                }
                                            ],
                                            'Ec2KeyName': 'airflow',
                                            'Ec2SubnetId': 'subnet-badaee95',
                                            'EmrManagedMasterSecurityGroup': 'sg-21078352',
                                            'EmrManagedSlaveSecurityGroup': 'sg-32048041',
                                            'KeepJobFlowAliveWhenNoSteps': True
                                        },
                                         BootstrapActions=[
                                             {
                                                 'Name':'LiveIntent shell installer',
                                                 'ScriptBootstrapAction': {
                                                     'Path': 's3://us-config-mojn/emr/add-liveintent-shell',
                                                 }
                                             },
                                             {
                                                 'Name': 'Idle cluster reaper',
                                                 'ScriptBootstrapAction': {
                                                     'Path': 's3://us-config-mojn/emr/idle_timeout.sh',
                                                 }
                                             },
                                             {
                                                 'Name': 'Prometheus export start',
                                                 'ScriptBootstrapAction': {
                                                     'Path': 's3://us-config-mojn/emr/emr_bootstrap_node_exporter.sh',
                                                 }
                                             },
                                             {
                                                 'Name': 'Additional AWS Credential Profiles',
                                                 'ScriptBootstrapAction': {
                                                     'Path': 's3://us-config-mojn/emr/add-profile-configuration'
                                                 }
                                             }
                                         ],
                                         Configurations=[
                                             {
                                                 'Classification': 'mapred-site',
                                                 'Properties': {
                                                    'mapreduce.job.userlog.retain.hours':'2',
                                                    'mapred.reduce.slowstart.completed.maps':'0.8',
                                                    'mapreduce.task.timeout':'6000000',
                                                    'mapred.child.java.opts':'-Xmx8192M',
                                                 }                                             
                                             },
                                             {
                                                 'Classification': 'yarn-site',
                                                 'Properties': {
                                                     'yarn.resourcemanager.scheduler.class':'org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler',
                                                     'yarn.nodemanager.log.retain-seconds':'1800',
                                                     'yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage':'96.0',
                                                     'yarn.scheduler.fair.allocation.file':'/home/hadoop/fair-scheduler.xml',
                                                 }
                                             },
                                             {
                                                 'Classification':'spark-defaults',
                                                 'Properties': {
                                                     'spark.executor.memory':'5g',
                                                     'spark.driver.memory':'10g',
                                                     'spark.driver.maxResultSize':'0',
                                                     'spark.shuffle.compress':'true',
                                                     'spark.rdd.compress':'true',
                                                     'spark.shuffle.service.enabled':'true',
                                                     'spark.yarn.executor.memoryOverhead':'1024',
                                                     'spark.dynamicAllocation.maxExecutors':'600',
                                                 }                                             
                                             },
                                             {
                                                 'Classification':'hdfs-site',
                                                 'Properties':{
                                                     'dfs.replication':'3',
                                                 }
                                             },
                                             {
                                                 'Classification':'hadoop-env',
                                                 'Properties':{},
                                                 'Configurations':[{
                                                     'Classification':'export',
                                                     'Properties':{
                                                         'HADOOP_HEAPSIZE':'40000',
                                                     }
                                                 }]
                                             },
                                             {
                                                 'Classification':'emrfs-site',
                                                 'Properties':{
                                                     'fs.s3.enableServerSideEncryption':'false',
                                                 }
                                             },
                                             {
                                                 'Classification':'spark-hive-site',
                                                 'Properties':{
                                                     'hive.metastore.client.factory.class':'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory',
                                                 }
                                             },
                                             {
                                                 'Classification':'hive-site',
                                                 'Properties':{
                                                     'hive.metastore.client.factory.class':'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'
                                                 }
                                             }
                                         ]
                                        )

    job_flow_id = cluster['JobFlowId']
    print(f'Cluster initiation success: {job_flow_id}')


dag = DAG('test_launch_emr', description='Launch EMR DAG', schedule_interval='0 12 * * *', start_date=datetime(2019,7,11), catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

initiate_boto3 = PythonOperator(task_id='initiate_connection', python_callable=initiate_connection, dag=dag)

initiate_emr = PythonOperator(task_id='initiate_cluster', python_callable=initiate_cluster, dag=dag)

dummy_operator >> initiate_boto3 >> initiate_emr


<Task(PythonOperator): initiate_cluster>