# Spark Cluster Slave Nodes Creation

### Importing Required Libraries ::
* __boto3__: Required to connect as operate AWS task
* __botocore__: Required to handle the exceptions related to boto3 tasks
* __paramiko__: Reuired to run commands inside EC2 instances
* __json__: To convert python native dictionaries to string, to write in files
* __pickle__: To store configuration dictionary which will be used in next notebooks
* __datetime__, __pprint__, __os__, __sys__, __time__: General purpose use

In [1]:
import boto3, botocore, paramiko
from datetime import datetime
import pprint, os, sys, time, json, pickle
from botocore.exceptions import ClientError

### Reading current status and availble details::
* User is allowed to provide specific configurations using provided format of configuration file. If user does not provide any confguration or provides wrong configuration format, then default values will be used. Please check **README.MD** file for default values.

* Along with the user defined variables, we will extract details of Stack.

In [2]:
try:
    with open("cluster_config.json", "r") as config_file:
        user_config = json.load(config_file)

    region = user_config.get('Region', "us-east-1")
    wrk_spc_dir = user_config['WorkspaceDirectory']
    user = user_config.get('UserName', "root")
    cluster_instance_type = user_config.get('InstanceType', "t2.micro")
    cluster_key_pair_path = user_config['KeyPairPath']
    cluster_key_pair_name = user_config['KeyPairName']
    project_tag = user_config.get('ProjectTag', "SparkCluster")
    slave_count = user_config.get('SlaveCount', 3)
    pickle_file = wrk_spc_dir + "/SparkClusterOnAWSEC2_" + user + "_CurrentStatus.pkl"
    if os.path.exists(pickle_file):
        with open(pickle_file, 'rb') as pickle_handle:
            user_config = pickle.load(pickle_handle)
        cluster_subnet_list = user_config['SubnetList']
        cluster_security_group_list = [user_config['SecurityGroupId']]
        run_id = user_config['RunId']
    else:
        print("Status file '"+ pickle_file + "' is not available, which is unexpected. Please start running from 'cloudformation_stack_creation.ipynb' file.")
        raise Exception
except Exception as e:
    print("Unexpected error while fetching available status: " + str(e))
    exit()

### Creating boto3 session, clients and resources ::

In [3]:
try:
    session = boto3.session.Session(region_name=region)
    ec2_client = session.client('ec2')
    ec2_resource = session.resource('ec2')
    print("Required Boto3 objects are defined.")
except ClientError as e:
    print("Unexpected error while creating boto3 session, client and resources: " + str(e))
    exit()

Required Boto3 objects are defined.


### Check for already running Master Node for current user ::
* Running master node is required for any cluster to run. Checking whether master node is available for this user on not.

In [4]:
try:
    master_instances = ec2_resource.instances.filter(
        Filters=[
            {
                'Name': 'instance-state-name',
                'Values': ['running']
            },
            {
                'Name': 'tag:Project',
                'Values': [project_tag]
            },
            {
                'Name': 'tag:User',
                'Values': [user]
            },
            {
                'Name': 'tag:NodeType',
                'Values': ['Master']
            }
        ]
    )
except ClientError as e:
    print("Unexpected error while looking for already running Master node EC2 instance for user-'" + user + "': " + str(e))
    exit()
try:
    master_instance_list = list(master_instances)
    if master_instance_list:
        master_instance = master_instance_list[0]
        spark_cluster_master_private_ip = master_instance.private_ip_address
        print("Instance('" + master_instance.id + "') is available master node for User('" + user + "'). Public DNS: '" + master_instance.public_dns_name + "'.")
        instance_creation_flag = 1
    else:
        print("Master node is not available for User('" + user + "'), please create the master node first by executing 'master_node_creation.ipynb' Notebook first.")
        exit()
except Exception as e:
    print("Unexpected error while extracting Spark Cluster Master node details: " + str(e))
    exit()

Instance('i-0cda438648bad66f2') is available master node for User('ccbp-dev-user-saumalya'). Public DNS: 'ec2-3-87-179-171.compute-1.amazonaws.com'.


### Fetching latest Image id ::
This image ID will be used to create the Slave nodes. Following configurations are already done in the Image:
* Spark distribution is already present in the Image
* All required packages to run pyspark is already installed in the Image
* Jupyter notebook is configured
* following command must be executed before spark session/context can be created using this master node:

    _import findspark_
    
    _findspark.init(‘/home/ec2-user/spark-2.4.5-bin-hadoop2.7’)_

    _import pyspark_

In [5]:
if instance_creation_flag:
    try:
        node_images_list = ec2_client.describe_images(
            Filters=[
                {
                    'Name': 'tag:Project',
                    'Values': [project_tag]
                },
                {
                    'Name': 'state',
                    'Values': ['available']
                }
            ]
        )
    except ClientError as e:
            print("Unexpected error while fetching node images: " + str(e))
            exit()

    try:
        node_image_createdates = [(datetime.strptime(img['CreationDate'][:-5], '%Y-%m-%dT%H:%M:%S'), img['ImageId']) for img in node_images_list['Images']]
        latest_image_id = sorted(node_image_createdates, key=lambda x: x[1], reverse=True)[0][1]
        print("Image('" + latest_image_id + "') will be used to create the nodes.")
    except Exception as e:
        print("Unexpected error while extracting latest node image: " + str(e))
        exit()

Image('ami-0b109626cd1d1e84c') will be used to create the nodes.


### Check for already running Slave Nodes for current user ::
* If some slave nodes are already running, then same nodes will be used as slave nodes of current user.
* To create new set of slave nodes, please terminate current slaves using 'slave_nodes_termination.ipynb', then re-run current notebook.

In [6]:
if instance_creation_flag:
    try:
        instances = ec2_resource.instances.filter(
            Filters=[
                {
                    'Name': 'instance-state-name',
                    'Values': ['running']
                },
                {
                    'Name': 'tag:Project',
                    'Values': [project_tag]
                },
                {
                    'Name': 'tag:User',
                    'Values': [user]
                },
                {
                    'Name': 'tag:NodeType',
                    'Values': ['Slave']
                }
            ]
        )
    except ClientError as e:
        print("Unexpected error while looking for already running Master node EC2 instance for user-'" + user + "': " + str(e))
        exit()

### Instanciating EC2 servers for slave nodes on AWS ::
* __create_instance__ API is used under EC2 resource to instanciate EC2 nodes, which will be used as slave nodes of our spark cluster.
* __Instance type__, __key-pair__ name, __subnet id__, __security group list__ is provided as decalred in previous cell.

In [7]:
if instance_creation_flag:
    if list(instances):
        for instance in instances:
            print("Some slave nodes are already running for user-'" + user + "'. Those will be reused. If you want new set of slave nodes, please terminate those and re-run this notebook.")
    else:
        print("No slave nodes are running for user-'" + user + "'. New nodes will be created.")
        try:
            triggered_slave_instance_list_1 = ec2_resource.create_instances(
                BlockDeviceMappings=[
                    {
                        'DeviceName': '/dev/xvda',
                        'Ebs': {
                            'DeleteOnTermination': True
                        }
                    },
                ],
                ImageId=latest_image_id,
                InstanceType=cluster_instance_type,
                KeyName=cluster_key_pair_name,
                MaxCount=int(slave_count/2),
                MinCount=1,
                NetworkInterfaces=[
                    {
                        'DeviceIndex': 0,
                        'SubnetId' : cluster_subnet_list[0],
                        'Groups': cluster_security_group_list,
                        'AssociatePublicIpAddress': True            
                    }
                ],
                TagSpecifications=[
                    {
                        'ResourceType': 'instance',
                        'Tags': [
                            {
                                'Key': 'Project',
                                'Value': project_tag
                            },
                            {
                                'Key': 'RunId',
                                'Value': run_id
                            },
                            {
                                'Key': 'User',
                                'Value': user
                            },
                            {
                                'Key': 'Name',
                                'Value': 'SparkClusterSlave_' + str(run_id)
                            },
                            {
                                'Key': 'NodeType',
                                'Value': 'Slave'
                            }
                        ]
                    }
                ]
            )
            triggered_slave_instance_list_2 = ec2_resource.create_instances(
                BlockDeviceMappings=[
                    {
                        'DeviceName': '/dev/xvda',
                        'Ebs': {
                            'DeleteOnTermination': True
                        }
                    },
                ],
                ImageId=latest_image_id,
                InstanceType=cluster_instance_type,
                KeyName=cluster_key_pair_name,
                MaxCount=slave_count - int(slave_count/2),
                MinCount=1,
                NetworkInterfaces=[
                    {
                        'DeviceIndex': 0,
                        'SubnetId' : cluster_subnet_list[1],
                        'Groups': cluster_security_group_list,
                        'AssociatePublicIpAddress': True            
                    }
                ],
                TagSpecifications=[
                    {
                        'ResourceType': 'instance',
                        'Tags': [
                            {
                                'Key': 'Project',
                                'Value': project_tag
                            },
                            {
                                'Key': 'RunId',
                                'Value': run_id
                            },
                            {
                                'Key': 'User',
                                'Value': user
                            },
                            {
                                'Key': 'Name',
                                'Value': 'SparkClusterSlave_' + str(run_id)
                            },
                            {
                                'Key': 'NodeType',
                                'Value': 'Slave'
                            }
                        ]
                    }
                ]
            )
            triggered_slave_instance_list = triggered_slave_instance_list_1 + triggered_slave_instance_list_2
            print('Requested number of slave nodes are triggered. Will wait till those are in ''running'' state and then procceed with configuration.')
        except ClientError as e:
            print("Unexpected error while creating Spark Cluster Master node EC2 instance for user-'" + user + "': " + str(e))
            exit()

No slave nodes are running for user-'ccbp-dev-user-saumalya'. New nodes will be created.
Requested number of slave nodes are triggered. Will wait till those are in running state and then procceed with configuration.


### Fetching required information of the Slave Nodes ::
* Need to iterate and probe a few times to check whether all slave nodes are up before we can fetch information.

In [8]:
if instance_creation_flag:
    if triggered_slave_instance_list:
        triggered_slave_instance_ids = [instance.id for instance in triggered_slave_instance_list]
    else:
        print("Slave instances are not triggered proeprly.")
        exit()

    spark_cluster_slave_list = []
    try:
        probe_limit = 60
        for _ in range(1, probe_limit):
            ec2_spark_cluster_slaves = ec2_client.describe_instances(InstanceIds=triggered_slave_instance_ids)['Reservations'][0]['Instances']
            for instance in ec2_spark_cluster_slaves:
                if instance['State']['Code'] == 16:
                    spark_cluster_slave_list.append({'InstanceId': instance['InstanceId'], 'PublicDnsName': instance['PublicDnsName']})
                    triggered_slave_instance_ids.remove(instance['InstanceId'])
            if len(spark_cluster_slave_list) < slave_count:
                print("All Requested EC2 nodes are not running yet. Following are the running instances. Going to sleep for 10 seconds before next probing.")
                print(spark_cluster_slave_list)
                time.sleep(10)
            else:
                print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
                print("All Requested EC2 slave nodes are running. Starting to configure those.")
                print(spark_cluster_slave_list)
                print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
                break
        else:
            print("All Requested EC2 nodes are not up after 10 mins, which is not expected. Please check the status in AWS console. Remember to terminate the already running instances before running creation notebook again. Quiting process!")
            print(spark_cluster_slave_list)
            exit()
    except Exception as e:
        print("Unexpected error while extracting Spark Cluster Master node details: " + str(e))
        exit()

All Requested EC2 nodes are not running yet. Following are the running instances. Going to sleep for 10 seconds before next probing.
[]
All Requested EC2 nodes are not running yet. Following are the running instances. Going to sleep for 10 seconds before next probing.
[]
All Requested EC2 nodes are not running yet. Following are the running instances. Going to sleep for 10 seconds before next probing.
[{'InstanceId': 'i-0f7ebf9ec6108e5c4', 'PublicDnsName': 'ec2-3-232-95-241.compute-1.amazonaws.com'}, {'InstanceId': 'i-0a29289898713fb3d', 'PublicDnsName': 'ec2-18-209-240-76.compute-1.amazonaws.com'}]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
All Requested EC2 slave nodes are running. Starting to configure those.
[{'InstanceId': 'i-0f7ebf9ec6108e5c4', 'PublicDnsName': 'ec2-3-232-95-241.compute-1.amazonaws.com'}, {'InstanceId': 'i-0a29289898713fb3d', 'PublicDnsName': 'ec2-18-209-240-76.compute-1.amazonaws.com'}, {'InstanceId': 'i-