# `emrspark_lib` plugin creation

In [None]:
import boto3
from botocore.exceptions import ClientError
import subprocess
import json
from pprint import pprint
import requests
import configparser
import time

import logging
logger = logging.getLogger()
logger.setLevel(logging.ERROR)


config = configparser.ConfigParser()
config.read('airflow/config.cfg')

REGION_NAME = config['AWS']['REGION_NAME']
CLUSTER_NAME = config['AWS']['CLUSTER_NAME']

# When empty, use the first available VPC
VPC_ID = config['AWS']['VPC_ID']
# When empty, use the first available subnet
# NOTE: Subnet must have an internet gateway within its routes.
SUBNET_ID = config['AWS']['SUBNET_ID']

# If access and secret keys are empty, use the one stored by the OS.

if config['AWS']['AWS_ACCESS_KEY_ID'] != '' and config['AWS']['AWS_SECRET_ACCESS_KEY'] != '':    
    ec2 = boto3.client('ec2', region_name=REGION_NAME,
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                      )
    emr = boto3.client('emr', region_name=REGION_NAME,
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                      )
    iam = boto3.client('iam', region_name=REGION_NAME,
                       aws_access_key_id=config['AWS']['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=config['AWS']['AWS_SECRET_ACCESS_KEY']
                      )
else:
    ec2 = boto3.client('ec2', region_name=REGION_NAME)
    emr = boto3.client('emr', region_name=REGION_NAME)
    iam = boto3.client('iam', region_name=REGION_NAME)
    
    
def get_first_available_vpc(ec2_client):
    return ec2.describe_vpcs().get('Vpcs', [{}])[0].get('VpcId', '')

def get_first_available_subnet(ec2_client, vpc_id):
    return ec2.describe_subnets(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}, {'Name': 'state', 'Values': ['available']}])['Subnets'][0].get('SubnetId', '')

if VPC_ID == '':
    VPC_ID = get_first_available_vpc(ec2)

if SUBNET_ID == '':
    SUBNET_ID = get_first_available_subnet(ec2, VPC_ID)
    
# def create_spark_session():
#     spark = SparkSession \
#         .builder \
#         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#         .getOrCreate()
#     return spark

# emrlib.create_cluster()
print('vpc:', VPC_ID)
print('subnet:', SUBNET_ID)
print('region:', ec2.meta.region_name)

In [None]:
# Testing getting default ip address.
ip = requests.get('https://api.ipify.org').text
print('My public IP address is:', ip)

## Create Security Group

In [None]:
def create_security_group(ec2_client, name, desc, vpc_id, ip=None):
    """ Create a security group
    Args:
        - ec2_client (boto3.EC2.Client): EC2 client object.
        - name (string): Name of Security Group
        - desc (string): Description of Security Group
        - vpc_id (string): Name of VPC. If empty, use the first available VPC
        - ip (string): The IP address of this machine. Only this machine can connect to the cluster.
                       If empty, use https://api.ipify.org service to get public IP address.
    Return:
    
        dict: {
            'KeyFingerprint': 'string',
            'KeyMaterial': 'string',
            'KeyName': 'string',
            'KeyPairId': 'string'
        }
    """
    region = ec2_client.meta.region_name
    security_group_id = None
    
    try:
        # Do not create if we found an existing Security Group
        response = ec2_client.describe_security_groups(
            Filters=[
                {'Name':'group-name', 'Values': [name]}
            ]
        )
        groups = response['SecurityGroups']
        security_group_id = groups[0]['GroupId']

        if ip is None:
            ip = requests.get('https://api.ipify.org').text

        if len(groups) > 0:
            # Update the rule to use the new IP address
            
            ip_permissions = groups[0]['IpPermissions']
            for ip_permission in ip_permissions:
                # Delete all rules that listens to TCP port 8998
                if ip_permission["IpProtocol"] == 'tcp' and ip_permission["FromPort"] == 8998 and ip_permission["FromPort"] == 8998:
                    cidr_ip = ip_permission['IpRanges'][0]['CidrIp']
                    revoke_status = ec2_client.revoke_security_group_ingress(
                        GroupId=security_group_id,
                        IpPermissions=[
                            {'IpProtocol': 'tcp',
                             'FromPort': 8998,
                             'ToPort': 8998,
                             'IpRanges': [{'CidrIp': cidr_ip}]
                            }
                        ])
            
            # Create a new inbound rule that listens to this machine's IP
            data = ec2_client.authorize_security_group_ingress(
                GroupId=security_group_id,
                IpPermissions=[
                    {'IpProtocol': 'tcp',
                     'FromPort': 8998,
                     'ToPort': 8998,
                     'IpRanges': [{'CidrIp': '{}/32'.format(ip)}]}
                ])
            return groups[0]['GroupId']
        else:
            response = ec2_client.create_security_group(GroupName=name,
                                                 Description=desc,
                                                 VpcId=vpc_id)
            security_group_id = response['GroupId']
#             print('Security Group Created %s in vpc %s (%s).' % (security_group_id, vpc_id, region))

            data = ec2_client.authorize_security_group_ingress(
                GroupId=security_group_id,
                IpPermissions=[
                    {'IpProtocol': 'tcp',
                     'FromPort': 8998,
                     'ToPort': 8998,
                     'IpRanges': [{'CidrIp': '{}/32'.format(ip)}]}
                ])
#             print('Ingress Successfully Set %s' % data)
            return security_group_id
    except ClientError as e:
        print(e)
        return security_group_id


In [None]:
master_sg_id = create_security_group(ec2, '{}SG'.format(CLUSTER_NAME), 'Master SG for {}'.format(CLUSTER_NAME), VPC_ID)
slave_sg_id = create_security_group(ec2, '{}SlaveSG'.format(CLUSTER_NAME), 'Slave SG for {}'.format(CLUSTER_NAME), VPC_ID)

## Create EMR Cluster

In [None]:
testvar = None

def recreate_default_roles(iam_client):
    # Recreate default roles
    try:
        iam_client.remove_role_from_instance_profile(InstanceProfileName='EMR_EC2_DefaultRole', RoleName='EMR_EC2_DefaultRole')
        iam_client.delete_instance_profile(InstanceProfileName='EMR_EC2_DefaultRole')
        iam_client.detach_role_policy(RoleName='EMR_EC2_DefaultRole', PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role')
        iam_client.delete_role(RoleName='EMR_EC2_DefaultRole')
        iam_client.detach_role_policy(RoleName='EMR_DefaultRole', PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole')
        iam_client.delete_role(RoleName='EMR_DefaultRole')
    except iam_client.exceptions.NoSuchEntityException:
        pass
    return subprocess.check_output(['aws', 'emr', 'create-default-roles'])


def recreate_key_pair(ec2_client, key_name):
    """
    Args:
        - ec2_client (boto3.EC2.Client): EC2 client object.
        - key_name (string): Name of key, usually 'xxx_pem'
    Return:
    
        dict: {
            'KeyFingerprint': 'string',
            'KeyMaterial': 'string',
            'KeyName': 'string',
            'KeyPairId': 'string'
        }
    """
    ec2_client.delete_key_pair(KeyName=key_name)
    keypair = ec2_client.create_key_pair(KeyName=key_name)
    return keypair


class ClusterError(Exception):
    def __init__(self, last_guess):
        self.last_guess = last_guess
            
def create_emr_cluster(emr_client, cluster_name, master_sg, slave_sg, keypair_name, subnet_id, job_flow_role='EMR_EC2_DefaultRole', service_role='EMR_DefaultRole', release_label='emr-5.9.0',
                   master_instance_type='m3.xlarge', num_core_nodes=3, core_node_instance_type='m3.xlarge'):
    """ Create an EMR cluster
    Args:
        - subnet_id (string): If empty, use first available VPC (VPC is inferred from Security Groups)
    """
    # Avoid recreating cluster
    clusters = emr_client.list_clusters(ClusterStates=['STARTING', 'RUNNING', 'WAITING', 'BOOTSTRAPPING'])
    active_clusters = [i for i in clusters['Clusters'] if i['Name'] == cluster_name]
    if len(active_clusters) > 0:
        return active_clusters[0]['Id']
    else:
        # Create cluster
        cluster_response = emr_client.run_job_flow(
            Name=cluster_name,
            ReleaseLabel=release_label,
            Instances={
                'InstanceGroups': [
                    {
                        'Name': "Master nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'MASTER',
                        'InstanceType': master_instance_type,
                        'InstanceCount': 1
                    },
                    {
                        'Name': "Slave nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'CORE',
                        'InstanceType': core_node_instance_type,
                        'InstanceCount': num_core_nodes
                    }
                ],
                'KeepJobFlowAliveWhenNoSteps': True,
                'Ec2SubnetId': subnet_id,
                'Ec2KeyName' : keypair_name,
                'EmrManagedMasterSecurityGroup': master_sg,
                'EmrManagedSlaveSecurityGroup': slave_sg
            },
            VisibleToAllUsers=True,
            JobFlowRole=job_flow_role,
            ServiceRole=service_role,
            Applications=[
                { 'Name': 'hadoop' },
                { 'Name': 'spark' },
                { 'Name': 'hive' },
                { 'Name': 'livy' },
                { 'Name': 'zeppelin' }
            ]
        )
        cluster_id = cluster_response['JobFlowId']
        cluster_state = emr_client.describe_cluster(ClusterId=cluster_id)['Cluster']['Status']['State']
        if cluster_state != 'STARTING':
            reason = emr_client.describe_cluster(ClusterId=cluster_id)['Cluster']['Status']['StateChangeReason']
            raise Exception("Cluster error: {} - {}".format(reason['Code'], reason['Message']))
        return cluster_id



In [None]:
keypair = recreate_key_pair(ec2, '{}_pem'.format(CLUSTER_NAME))

In [None]:
keypair

In [None]:
recreate_default_roles(iam)
print(iam.get_role(RoleName='EMR_EC2_DefaultRole'))
print(iam.get_role(RoleName='EMR_DefaultRole'))
# Wait a bit until the roles are ready, otherwise we'd get Failed to authorize instance profile arn.../instance-profile/EMR_EC2_DefaultRole

In [None]:
cluster_id = create_emr_cluster(emr, CLUSTER_NAME, master_sg_id, slave_sg_id, keypair['KeyName'], SUBNET_ID)

In [None]:
cluster_id

In [None]:
emr.describe_cluster(ClusterId=cluster_id)

## Create Spark Session

Wait until the cluster is in WAITING state and then create a spark session.

In [None]:
def get_cluster_status(cluster_id):
    cluster = emr.describe_cluster(ClusterId=cluster_id)
    return cluster['Cluster']['Status']['State']


def is_cluster_ready(cluster_id):
    return get_cluster_status(cluster_id) == 'WAITING'


def get_cluster_dns(cluster_id):
    cluster = emr.describe_cluster(ClusterId=cluster_id)
    return cluster['Cluster']['MasterPublicDnsName']


def spark_url(master_dns, location='', port=8998):
    """Get spark session url."""
    return 'http://{}:{}{}'.format(master_dns, port, location)

    
def create_spark_session(master_dns):
    # 8998 is the port on which the Livy server runs
    host = spark_url(master_dns)
    data = {'kind': 'pyspark', 
            "conf" : {"spark.jars.packages" : "saurfang:spark-sas7bdat:2.0.0-s_2.11",
                      "spark.driver.extraJavaOptions" : "-Dlog4jspark.root.logger=WARN,console"
                     }
           }
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

In [None]:
if is_cluster_ready(cluster_id):
    cluster_dns = get_cluster_dns(cluster_id)
    ss_headers = create_spark_session(cluster_dns)
    print(ss_headers)
else:
    print("Cluster is not ready (status is {}), run this code cell again later.".format(get_cluster_status(cluster_id)))

## Send Spark jobs

We will try pulling some stock market data from Quandl and QuoteMedia. Stock names are available here:

- NASDAQ: https://old.nasdaq.com/screening/companies-by-name.aspx?letter=0&exchange=nasdaq&render=download
- AMEX: https://old.nasdaq.com/screening/companies-by-name.aspx?letter=0&exchange=amex&render=download
- NYSE: https://old.nasdaq.com/screening/companies-by-name.aspx?letter=0&exchange=nyse&render=download

### Test Quandl request

In [None]:
import pandas as pd
df = pd.read_csv('https://old.nasdaq.com/screening/companies-by-name.aspx?letter=0&exchange=nasdaq&render=download')
df.head(5)

In [None]:
exchange_map = {
    'nasdaq': 'FNSQ',
    'nyse': 'FNYX'
}

def get_short_interests_pandas(exchange, ticker, api_key):
    response = requests.get("https://www.quandl.com/api/v3/datasets/FINRA/{}_{}?api_key={}".format(exchange, ticker, api_key))
    if response.status_code == 200:
        response_obj = response.json()
        return pd.DataFrame(data=response_obj['dataset']['data'], columns=response_obj['dataset']['column_names'])
    else:
        raise Exception("Error when connecting to Quandl API.")

df = get_short_interests_pandas('FNYX', 'FB', config['Quandl']['API_KEY'])
print(df.describe())
df.head(5)

### Quandl request through Spark

[Spark cannot pull data from URL.](https://stackoverflow.com/questions/29741082/how-to-access-a-web-url-using-a-spark-context/29741462) The other alternative is to download data to S3 from pandas.

In [None]:
from pyspark.sql import SparkSession

spark = spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

def get_short_interests(spark, exchange, ticker, api_key):
    url = "https://www.quandl.com/api/v3/datasets/FINRA/{}_{}?api_key={}".format(exchange, ticker, api_key)
#     spark.sparkContext.addFile(url)
#     response = spark.read.json("file://{}".format(SparkFiles.get("{}_{}".format(exchange, ticker))))
#     print(response)
    result = requests.get(url).json()
    df = spark.createDataFrame(result['dataset']['data'], result['dataset']['column_names'])
    df.createOrReplaceTempView('test')
    table = spark.sql("SELECT * FROM test")
    table_path = "test_table"
    table.write.mode('overwrite').parquet(table_path)

df = get_short_interests(spark, 'FNYX', 'FB', config['Quandl']['API_KEY'])

### Send Spark job to download from Quandl

In [None]:
logger.setLevel(logging.INFO)

def wait_for_spark(session_url, session_headers):
    """Wait until status is idle"""
    status = ''
    while status != 'idle':
        response = requests.get(session_url, headers=session_headers)
        status = response.json()['state']
        time.sleep(5)
        logging.info("Spark session status: {}".format(status))
        
    
def submit_spark_job(session_url, session_headers, code):
    wait_for_spark(session_url, session_headers)
    statements_url = "{}/statements".format(session_url)
    job = {'code': code}
    response = requests.post(statements_url, data=json.dumps(job),
                             headers={'Content-Type': 'application/json'})
    logging.info(response.json)
    return response
    
    

In [None]:
exchange = 'FNYX'
ticker = 'FB'
code = """
import requests
url = "https://www.quandl.com/api/v3/datasets/FINRA/{exchange}_{ticker}?api_key={quandl_api}"
result = requests.get(url).json()
df = spark.createDataFrame(result['dataset']['data'], result['dataset']['column_names'])
df.createOrReplaceTempView('test')
table = spark.sql("SELECT * FROM test")
table_path = "s3://short-interest-effect/data/test_table"
table.write.mode('overwrite').parquet(table_path)
"""


code = code.format(exchange=exchange, ticker=ticker, quandl_api=config['Quandl']['API_KEY'])
job_response = submit_spark_job(spark_url(cluster_dns, location=ss_headers['location']), ss_headers, code)
print(job_response.status_code)
print(job_response.headers)
print(job_response.json()['output'])

In [None]:
job_response.headers['location'].split('/statements', 1)[0]

## Track Spark job status

The following code can be run several times to check the result of the above statement.

In [None]:
statement_status = ''
statements_url = spark_url(cluster_dns, location=job_response.headers['location'])
statements_response = requests.get(statements_url, headers={'content-Type': 'application/json'})
print(statements_response)
print(statements_response.headers)
print("State:", statements_response.json()['state'])
print("Output:\n",statements_response.json()['output'])

session_url = spark_url(cluster_dns, location=job_response.headers['location'].split('/statements', 1)[0])
log_url = session_url + '/log'


In [None]:
import time

def track_spark_job(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    print(session_url)
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)
        logging.info(statement_response.json())
        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    
    # Get the logs
    lines = requests.get(session_url + '/log', 
                        headers={'Content-Type': 'application/json'}).json()['log']
    logging.info('Final Statement Status: ' + final_statement_status)
    return lines

In [None]:
track_spark_job(cluster_dns, ss_headers)

## Kill Spark session

In [None]:
def kill_spark_session(session_url):
    requests.delete(session_url, headers={'Content-Type': 'application/json'})

In [None]:
kill_spark_session(spark_url(cluster_dns, location=ss_headers['location']))

## Delete Cluster

In [None]:
def delete_cluster(emr_client, cluster_id):
    try:
        response = emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
        print('Cluster {} Deleted'.format(cluster_id))
    except ClientError as e:
        print(e)

In [None]:
delete_cluster(emr, cluster_id)

## Delete Key Pair

In [None]:
ec2.delete_key_pair(KeyName=keypair['KeyName'])

## Delete Security Group

In [None]:
emr.describe_cluster(ClusterId=cluster_id)

In [None]:
def is_cluster_terminated(cluster_id):
    cluster = emr.describe_cluster(ClusterId=cluster_id)
    return 'TERMINATED' in cluster['Cluster']['Status']['State']

def delete_security_group(ec2, sgid):
    region=ec2.meta.region_name
    # Delete security group
    try:
        ec2res = boto3.resource('ec2')
        sg = ec2res.SecurityGroup(sgid)
        if len(sg.ip_permissions) > 0:
            sg.revoke_ingress(IpPermissions=sg.ip_permissions)
        response = ec2.delete_security_group(GroupId=sgid)
        print('Security Group {} Deleted'.format(sgid))
    except ClientError as e:
        print(e)

In [None]:
if is_cluster_terminated(cluster_id):
    # delete_security_group(ec2, sg)
    delete_security_group(ec2, master_sg_id)
    delete_security_group(ec2, slave_sg_id)
else:
    cluster = emr.describe_cluster(ClusterId=cluster_id)
    state = cluster['Cluster']['Status']['State']
    print("Cluster is not terminated. If it is terminating, wait until the status is TERMINATED. Current cluster state: {}".format(state))

## References

- EMR creation that works: https://github.com/dai-dao/udacity-data-engineering-capstone/blob/master/dags/lib/emr_lib.py
- On Security Group Creation and Deletion: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/ec2-example-security-group.html
- On how to recreate EMR_EC2_DefaultRole: https://aws.amazon.com/premiumsupport/knowledge-center/emr-default-role-invalid/
- Using Apache Livy with Spark on EMR: https://aws.amazon.com/blogs/big-data/orchestrate-apache-spark-applications-using-aws-step-functions-and-apache-livy/