# Spool up EMR Cluster to run HAIL with bootstrapping only

In [1]:
import boto3
from botocore.exceptions import ClientError
import time
import sys
import botocore
import paramiko
import re
import os
import yaml, re

In [2]:
PATH = os.path.abspath(os.getcwd())
PATH

'/data/hail-on-AWS-spot-instances'

In [3]:
#Setup logging
import logging, bluebee
from bluebee import bgp

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
logFile = f'{PATH}/spoolup_EMR_cluster.log'
fh = logging.FileHandler(logFile)
fh.setLevel(logging.INFO)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)-18s-%(levelname)-8s %(message)s', datefmt='%d%b%Y %H:%M:%S')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# bgp.api.dump_curl = True
# bluebee.logger.setLevel(logging.DEBUG)

2021021016


In [4]:
def upload_file(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = file_name

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False
    return True

In [5]:
#Get the configuration as a yaml object
c=yaml.load(open(PATH+"/config_EMR_spot.yaml"),Loader=yaml.SafeLoader)
conf = c['config']
logger.info(f'Configuration settings: {c}')

30Mar2021 18:53:11-INFO     Configuration settings: {'config': {'EMR_RELEASE': 'emr-6.2.0', 'EMR_CLUSTER_NAME': 'tdeboer-hail', 'EC2_NAME_TAG': 'tdeboer-hail-EMR', 'OWNER_TAG': 'tdeboer-ilmn', 'PROJECT_TAG': 'GRE_on_ICA', 'REGION': 'us-east-1', 'MASTER_INSTANCE_TYPE': 'm4.large', 'WORKER_INSTANCE_TYPE': 'r4.4xlarge', 'WORKER_COUNT': '1', 'WORKER_BID_PRICE': '0.90', 'MASTER_HD_SIZE': '250', 'WORKER_HD_SIZE': '500', 'SUBNET_ID': '', 'S3_BUCKET': 's3://ilmn-hail/', 'BOOTSTRAP_S3_URI': 's3://ilmn-hail/bootstrap_hail/bootstrap.sh', 'KEY_NAME': 'hail-ES-GRE', 'PATH_TO_KEY': '/data/', 'WORKER_SECURITY_GROUP': 'sg-0df1e5704ca2a8196', 'MASTER_SECURITY_GROUP': 'sg-0bab1202c0aa453b3'}}


In [6]:
#Store or update the bootstrapping file into the S# bucket (same as used for the logs, for now)
s = conf['BOOTSTRAP_S3_URI'].split('/',3)
bucket = s[2]
object = s[3]
assert upload_file('bootstrap.sh',bucket,object)

30Mar2021 18:53:12-INFO     Found credentials in shared credentials file: ~/.aws/credentials


In [None]:
command = f'''aws emr create-cluster \\
--name "{conf['EMR_CLUSTER_NAME']}" \\
--release-label {conf['EMR_RELEASE']} \\
--applications Name=Spark Name=Hadoop \\
--tags 'project={conf['PROJECT_TAG']}' 'Owner={conf['OWNER_TAG']}' 'Name={conf['EC2_NAME_TAG']}' \\
--ec2-attributes '{{"KeyName":"{conf['KEY_NAME']}","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"{conf['SUBNET_ID']}",\
  "EmrManagedMasterSecurityGroup":"{conf['MASTER_SECURITY_GROUP']}","EmrManagedSlaveSecurityGroup":"{conf['WORKER_SECURITY_GROUP']}"}}' \\
--service-role EMR_DefaultRole \\
--log-uri '{conf['S3_BUCKET']}' \\
--instance-groups '[{{"InstanceCount":{conf['WORKER_COUNT']},\
  "EbsConfiguration":{{"EbsBlockDeviceConfigs":[{{"VolumeSpecification":{{"SizeInGB":{conf['MASTER_HD_SIZE']},"VolumeType":"gp2"}},\
  "VolumesPerInstance":1}}]}},"InstanceGroupType":"MASTER","InstanceType":"{conf['MASTER_INSTANCE_TYPE']}","Name":"Master-Instance"}},\
  {{"InstanceCount":{conf['WORKER_COUNT']},"BidPrice":"{conf['WORKER_BID_PRICE']}",\
  "EbsConfiguration":{{"EbsBlockDeviceConfigs":[{{"VolumeSpecification":{{"SizeInGB":{conf['WORKER_HD_SIZE']},\
  "VolumeType":"gp2"}},"VolumesPerInstance":1}}]}},"InstanceGroupType":"CORE","InstanceType":"{conf['WORKER_INSTANCE_TYPE']}",\
  "Name":"Core-Group"}}]' \\
--configurations file://./spark-configs.json \\
--auto-scaling-role EMR_AutoScaling_DefaultRole \\
--ebs-root-volume-size 32 \\
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \\
--region {conf['REGION']} \\
--bootstrap-actions Path='{conf['BOOTSTRAP_S3_URI']}'
'''
logger.info(f'Executing following command: \n{command}')

In [None]:
cluster_id_json=os.popen(command).read()
#My default profile exports TEXT
cluster_id=re.split('\s',cluster_id_json)[1]
# Gives EMR cluster information
client_EMR = boto3.client('emr', region_name=c['config']['REGION'])
logger.warning(f'Created cluster with ID: "{cluster_id}"')

In [None]:
# Cluster state update
status_EMR='STARTING'
tic = time.time()
# Wait until the cluster is created
logger.info('Creating EMR...')

while (status_EMR!='EMPTY'):
    details_EMR=client_EMR.describe_cluster(ClusterId=cluster_id)
    status_EMR=details_EMR.get('Cluster').get('Status').get('State')
    logger.info('Cluster status: '+status_EMR)
    if (status_EMR=='WAITING'):
        logger.warning('Cluster successfully created! Starting HAIL installation...')
        toc=time.time()-tic
        logger.warning("Total time to provision your cluster: %.2f "%(toc/60)+" minutes")
        break
    if (status_EMR=='TERMINATED_WITH_ERRORS' or status_EMR=='TERMINATING'):
        err = "Cluster un-successfully created. Ending installation..."
        logger.error(err)
        sys.exit(err)
    time.sleep(30)

In [None]:
# Get public DNS from master node
master_dns=details_EMR.get('Cluster').get('MasterPublicDnsName')
master_IP=re.sub("-",".",master_dns.split(".")[0].split("ec2-")[1])
logger.info(f'Master IP address: {master_IP}')
logger.warning(f'This is the public JupyterLab link: "http://{master_IP}:8192"')

In [None]:
logger.warning(f'Successfully started ENR cluster "{cluster_id}"')