# **Cloud Computing Assignment**

This notebook will allow the user to perform a matrix operation. Before running the notebook, make sure to update your AWS credentials as well as the credentials in the folder .aws in the same working directory as this notebook.

# *Install and Import the libraries*

In [1]:
pip install boto3 regex

In [2]:
import os 
import boto3
import numpy as np
import pandas as pd
import time
import regex as re

# Boto3 Resource and Client

In [3]:
'''
We first define all the useful boto3 resource and client
'''

# ec2 
ec2 = boto3.resource("ec2", region_name='us-east-1')
ec2_client = boto3.client('ec2', region_name='us-east-1')

# sqs
sqs_client = boto3.client('sqs', region_name='us-east-1')
sqs = boto3.resource('sqs', region_name='us-east-1')

# ssm
ssm_client = boto3.client('ssm', region_name='us-east-1')
ssm_waiter = ssm_client.get_waiter('command_executed')

# s3
s3 = boto3.resource('s3', region_name='us-east-1')

# CloudWatch logs
logs_client = boto3.client('logs', region_name='us-east-1')

# *Functions*

## Set-up the AWS architecture

In [4]:
'''
This function will first check whether a running master id is present.
If a master instance is already running, it will return its ID
If no master instances if found, the function will creates the master instance, wait until the instance is running and then return its ID

Input: None
Output: The ID of the master instance
'''
def create_master():
    # We first define the filters that correspond to the master instance
    tags={'Name': 'tag:Name', 'Values': ['Master']}
    running={'Name': 'instance-state-name', 'Values': ['pending', 'running']}
    stopped={'Name': 'instance-state-name', 'Values': ['stopping', 'stopped']}
    
    
    # We check whether such instance is running
    if len(ec2_client.describe_instances(Filters=[tags, running])['Reservations']) :
        # If the instance does exist, we can return the ID of the instance
        return [ec2_client.describe_instances(Filters=[tags, running])['Reservations'][0]['Instances'][0]['InstanceId']]
    
    # We check if the master instance is stopped or stopping in order to restart it
    elif len(ec2_client.describe_instances(Filters=[tags, stopped])['Reservations']) : 
        MasterId = [ec2_client.describe_instances(Filters=[tags, stopped])['Reservations'][0]['Instances'][0]['InstanceId']]
        ec2_client.start_instances(
            InstanceIds = MasterId
        )
        
        for instanceId in MasterId:
            instance = ec2.Instance(instanceId)
            instance.load()
            instance.wait_until_running()
            instance.reload()
        
        return MasterId
        
    else :
        # If no master instance is found, we have to create one
        master = ec2.create_instances(ImageId='ami-0b0dcb5067f052a63', MaxCount=1, MinCount=1, InstanceType='t3a.large', KeyName='vockey', SecurityGroupIds=['sg-01b1b10e44e254cde'],
                                           TagSpecifications=[{
                                               'ResourceType': 'instance',
                                               'Tags': [
                                                   {
                                                       'Key': 'Name',
                                                       'Value': 'Master',
                                                   }
                                               ]
                                           }],
                                        IamInstanceProfile={
                                           'Arn': 'arn:aws:iam::686401610526:instance-profile/LabInstanceProfile'
                                       })
        
        # Once the instance has been created, we should wait until the instance is running before doing anything on it
        for instance in master :
            instance.wait_until_running()
            instance.reload()

        # Once the instance is running, we just need to retrieve its ID and return it
        master_id = [instance.id for instance in master]
    
        return master_id

In [5]:
'''
This function sends all the needed files and folders to the master.
The .aws folder with the credentials as well as the python script to be run on the instance should be sent
Warning: make sure you updated the path of the .aws folder as well as the credentials file
If this has not been updated, this script will be an infinite while loop

Input: The ID of the master Instance
Output: None
'''
def initialise_master(InstanceId):  
    # We first retrieve the directory of this file in order to send the master script to the master instance
    file_path = os.path.abspath('master.py')
    key_path = os.path.abspath('labsuser.pem')
    aws_path = os.path.abspath('.aws')
    
    for inst_id in InstanceId:
        
        inst = ec2.Instance(inst_id)
        
        # We will send the .aws folder to the master instance. This will loop until the sending is successful. If the folder cannot be found, it will loop infinitely
        a = 1
        while a: 
            a = os.system(f'scp -o StrictHostKeyChecking=no -i "' + key_path + f'" -r "' + aws_path + f'" ec2-user@{inst.public_ip_address}:~/')
        
        # Then we send the master script to the instance. Same as the previous one, it will loop infinitely if the script cannot be found
        a = 1
        while a:
            a = os.system(f'scp -o StrictHostKeyChecking=no -i "' + key_path + f'" "' + file_path + f'" ec2-user@{inst.public_ip_address}:~/')

In [6]:
'''
This function retrieves the IDs of all the workers instances that are running or pending
Input: None
Output: A list of ec2 instances ID
'''
def getWorkersId() : 
    # We define the tags and the state of the instances we are looking for
    tags={'Name': 'tag:Name', 'Values': ['Worker']}
    state={'Name': 'instance-state-name', 'Values': ['pending', 'running']}
    
    list_workers = []
    
    # We retrieve the Instance ID of all the instances that corresponds to the used filters
    for each_instance in ec2_client.describe_instances(Filters=[tags, state])['Reservations']:
        for inst_id in each_instance['Instances']:
            list_workers.append(inst_id['InstanceId'])
    
    return list_workers

In [7]:
'''
This function creates as many workers instances as the input value and then wait until these instances are running.

Input: An integer giving the number of workers to create.
Output: A list of the ID of the created workers instances
'''

def create_workers(nb_workers):
    # We create as many instances as the input value
    instances = ec2.create_instances(ImageId='ami-0b0dcb5067f052a63', MaxCount=nb_workers, MinCount=nb_workers, InstanceType='t2.micro', KeyName='vockey', SecurityGroupIds=['sg-01b1b10e44e254cde'],
                                       TagSpecifications=[{
                                           'ResourceType': 'instance',
                                           'Tags': [
                                               {
                                                   'Key': 'Name',
                                                   'Value': 'Worker',
                                               }
                                           ]
                                       }],
                                    IamInstanceProfile={
                                       'Arn': 'arn:aws:iam::686401610526:instance-profile/LabInstanceProfile'
                                   })
    
    # We wait until all the created instances are running and ready to compute their jobs
    for instance in instances:
        instance.load()
        instance.wait_until_running()
        instance.reload()
    
    # We then retrieve the instances ID in order to return it
    instance_id = [instance.id for instance in instances]
    
    return instance_id

In [8]:
'''
This function restarts all the stopped workers

Input: None
Output: A list of all the ID of the restarted workers
'''

def restart_workers():
    # We define the tags and the state of the instances we are looking for
    tags={'Name': 'tag:Name', 'Values': ['Worker']}
    state={'Name': 'instance-state-name', 'Values': ['stopped', 'stopping']}
    
    list_workers = []
    
    # We retrieve the Instance ID of all the instances that corresponds to the used filters
    for each_instance in ec2_client.describe_instances(Filters=[tags, state])['Reservations']:
        for inst_id in each_instance['Instances']:
            list_workers.append(inst_id['InstanceId'])
    try:
        ec2_client.start_instances(
            InstanceIds = list_workers
        )

        for instanceId in list_workers:
            instance = ec2.Instance(instanceId)
            instance.load()
            instance.wait_until_running()
            instance.reload()
    except:
        pass
        
    return list_workers

In [9]:
'''
This function initialises the workers. The .aws folder and the worker script to be run on the workers instances
If the folder\file cannot be found, this function will loop infinitely.
Make sure you've updated your AWS credentials file

Input: A list of all the worker instances ID
Output: None
'''

def initialise_workers(InstanceId):
    # It first retrieves the current working directory and get the path of the worker script
    file_path = os.path.abspath('worker.py')
    key_path = os.path.abspath('labsuser.pem')
    aws_path = os.path.abspath('.aws')
    
    # We then send all the needed folder and files to the worker instances
    for inst_id in InstanceId:
        inst = ec2.Instance(inst_id)
        
        a = 1
        while a :
            a = os.system(f'scp -o StrictHostKeyChecking=no -i "' + key_path + f'" -r "' + aws_path + f'" ec2-user@{inst.public_ip_address}:~/')
         
        a = 1
        while a :
            a = os.system(f'scp -o StrictHostKeyChecking=no -i "' + key_path + f'" "' + file_path + f'" ec2-user@{inst.public_ip_address}:~/')

In [10]:
def initialise_sqs_queues() :   
    response_master = sqs_client.list_queues(
        QueueNamePrefix='JobToWorker'
    )
    
    try :
        Url =  response_master['QueueUrls'][0]
    except :
        create_queue = sqs.create_queue(
            QueueName='JobToWorker'
        )
    
    response_result = sqs_client.list_queues(
        QueueNamePrefix='ResultToMaster'
    )
    
    try :
        Url = response_result['QueueUrls'][0]
    except :
        create_queue = sqs.create_queue(
            QueueName= 'ResultToMaster'
        )

In [11]:
'''
This function sets up the distributed Cloud architecture. It first makes sure that a master instance as well as 8 worker instances are running. 
It then sends all the needed files and folders to these instances

Input: None
Output: The Master Instance ID in a list and a list of the 8 Worker Instances ID.
'''

def setup_application():
    start = time.time()
    print("The set-up of the application has started...")
    MasterId = create_master()
    initialise_master(MasterId)
    
    end_master = time.time()
    print(f"The master instance is now running and ready to process the data")
    print(f"It took {end_master - start} seconds to deploy the master instance\n")
    
    print("We will now set up the 8 processing nodes")
    WorkersIdRestarted = restart_workers()
    WorkersId = getWorkersId()
    
    
    if len(WorkersId) < 8 :
        WorkerIdCreated = create_workers(8-len(WorkersId))
        
        for ID in WorkerIdCreated:
            WorkersId.append(ID)
    
    initialise_workers(WorkersId)
    
    end_workers = time.time()
    print(f"The worker instances are now running and ready to process the work packages send to the queue")
    print(f"It took {end_workers - end_master} seconds to deploy the workers instances\n")
    
    print("We will now make sure that the queues are created and ready to send the work packages and receive the results")
    initialise_sqs_queues()
    
    end = time.time()
    print(f"The SQS Queues are now created and ready")
    print(f"It took {end - end_workers} seconds to deploy the 2 SQS Queues\n")
    
    print(f"in total, it took {end-start} seconds to set up the Cloud environment\n")
    
    return MasterId, WorkersId

## Commands for the ec2 instances

In [12]:
'''
This function will used boto3 Simple Systems Manager (SSM) to send a command on the master instance
The command will install the useful python package and then run the python script to compute the operation

Input: The Id of the master instance, the operation to perform and the shapes of the two matrices that will be added of multiplied
Output: A dictionnary containing information about the SSM command performed on the instance
'''

def run_master_code(master_id, operation, A_shape, B_shape):
    # We first define the commands to be run on the master instance
    command1 = ['cd /home/ec2-user/~', 'pip3 install numpy boto3 pandas regex tqdm s3fs']
    command2 = ['python3 -u /home/ec2-user/master.py ' + str(operation) + ' "' + str(A_shape) + '" "' + str(B_shape) + '" 2>&1']
    
    ssm_client.send_command(
        DocumentName="AWS-RunShellScript", # One of AWS' preconfigured documents
        Parameters={'commands': command1},
        InstanceIds=master_id
    )
    
    ssm_command = ssm_client.send_command(
        DocumentName="AWS-RunShellScript", # One of AWS' preconfigured documents
        Parameters={'commands': command2},
        InstanceIds=master_id,
        CloudWatchOutputConfig={
                    'CloudWatchLogGroupName': '/aws/ssm/AWS-RunShellScript',
                    'CloudWatchOutputEnabled': True
                }
    )
    
    # Then, when the command is running, we return its information in order to check its step and get the outputs
    return ssm_command

In [13]:
'''
This function uses boto3 Simple Systems Manager (SSM) to send a command to the worker instances
The command will first install the useful python package and then run the workers script

Input: The ID of all the workers node that will be used and an integer that represents the operation to be performed  
Output: The SSM Command information (dictionnary)
'''

def run_worker_code(InstanceId, operation) :
    # First, the commands to be performed are defined
    commands = ['pip3 install numpy boto3 pandas regex', 'python3 /home/ec2-user/worker.py ' + str(operation)]
    
    # Then, we send the command to the worker instances
    ssm_command = ssm_client.send_command(
        DocumentName="AWS-RunShellScript", # One of AWS' preconfigured documents
        Parameters={'commands': commands},
        InstanceIds=InstanceId
    )
    
    return ssm_command

## *Local Set-up*

In [14]:
'''
This function will ask the user which operation to compute on the two input matrices

Input: None
Output: An integer representing the operation to be performed
'''

def Operation() :
    while True :
        print("Which operation do you want to perform ?")
        print("    - Input 1 for matrix addition")
        print("    - Input 2 for matrix multiplication")
        operation = int(input())
        if operation == 1 or operation == 2 :
            break
        else :
            print("This input does not correspond to any available operations!\n")
      
    return operation

In [15]:
'''
This function asks the user for the two matrices shape.
If the user wants to perform an addition, only one shape will be asked whereas two shapes will be asked if the operation is a multiplication
For the multiplication, the given shapes will be verified to ensure that the operation can be performed with such shapes.
If the multiplication is not possible with these shapes, the user will have to give differents shapes

Input: An integer representing the operation to perform
Output: The shape of the two input matrices
'''

def Verify_shapes(operation):
    if operation == 1 : # In this case, the operation is an addition
        print("\nYou want to compute an addition of two matrices")
        print("The shape of the matrices should be the same")
        print("The matrices should also be smaller than 18000x18000")
        
        while True:
            # We ask the user for a shape (row, column)
            shape = input("Input the shape of the matrices in the following format: row column\n")

            shape = shape.split(" ")
            if max(int(shape[0]), int(shape[1])) > 18000 :
                print("The proposed shape is too large. The matrices should be smaller than 18000x18000 in order to process the data")
            
            else :
                break
        
        return (int(shape[0]), int(shape[1])), (int(shape[0]), int(shape[1]))

        
    elif operation == 2 : # In this case, the operation is a multiplication
        print("\nYou want to compute a multiplication of two matrices")
        print("The shape of the matrices should be consistent")

        # We use a while loop to ask for new shapes until they are consistent for matrix multiplication
        while True :
            shape1 = input("Input the shape of the first matrix in the following format: row column\n")
            shape2 = input("Input the shape of the second matrix:\n")
                
            shape1 = shape1.split(' ')
            shape2 = shape2.split(' ')
            
            # If the number of columns of the first input matrix match the number of rows of the second input matrix, we stop the loop
            if int(shape1[1]) == int(shape2[0]) and max(int(shape1[0]), int(shape1[1]), int(shape2[0]), int(shape2[1])) <= 18000:
                break
                
            # If the shapes are not consistent, the user has to give new shapes
            else:
                print('The shapes are not consistent or they are too large')
                print('the first matrix should have as many columns as the number of rows in the second matrix')
         
        # When the shapes are consistent, the function return them
        return (int(shape1[0]), int(shape1[1])), (int(shape2[0]), int(shape2[1]))

In [16]:
'''
This function downloads the input and output matrices and the computed operation on a s3 bucket
Once the files has been downloaded, the function open them as pandas DataFrames

Input: None
Output: The two input matrices A and B and the output matrix C as pandas DataFrames
'''
def dlAndOpenMatrices():
    # We first retrieve the working directory to download the files
    work_dir = os.getcwd()
    
    # We downloads the three matrices
    s3.meta.client.download_file('tlbucketcc', 'A.csv', work_dir + '\\A.csv')
    s3.meta.client.download_file('tlbucketcc', 'B.csv', work_dir + '\\B.csv')
    s3.meta.client.download_file('tlbucketcc', 'C.csv', work_dir + '\\C.csv')
    
    # Then we open them
    C = pd.read_csv('C.csv', header=None)
    A = pd.read_csv('A.csv', header=None)
    B = pd.read_csv('B.csv', header=None)
    
    return A, B, C

In [17]:
'''
This function will decide the number of workers to use depending on the shapes of the input matrices
This has been determined "randomly" without any performance tests.

Input: The shape of the two input matrices
Output: The number of workers to use for the computation of the operation
'''

def NbWorkerToDeploy(A_shape, B_shape):
    max_shape = max(max(A_shape, B_shape))
    
    if max_shape <= 100 :
        return 1
        
    elif 100 < max_shape <= 200 :
        return 2
        
    elif 200 < max_shape <= 500 :
        return 3
        
    elif 500 < max_shape <= 1000 :
        return 4
        
    elif 1000 < max_shape <= 2000 :
        return 5
        
    elif 2000 < max_shape <= 5000 :
        return 6
        
    elif 5000 < max_shape <= 10000 :
        return 7
        
    elif 10000 < max_shape :
        return 8
    

In [18]:
'''
This function will use the CloudWatch logs system to monitor the progress of the calculation.
It will retrieve the output of the calculation in real time and print them on the notebook.

Input: The Id of the Command sent to the master instance, and the Id of the master instance
Output: None
'''

def get_output(CommandId, MasterId):
    event = []
    print("\n\nMaster Output:\n")

    # We will check the outputs until the calculation is over
    while True :
        # At the beginning, the log is not created yet. So we use the try key work to look for log events
        try:
            # We retrieve all the outputs in the log of the command
            response = logs_client.get_log_events(
                        logGroupName='/aws/ssm/AWS-RunShellScript',
                        logStreamName=CommandId + '/' + MasterId + '/aws-runShellScript/stdout',
                        limit=123,
                        startFromHead=True
                    )

            # We get all the output printed by the master instance
            log_events = response['events']

            # We check all the outputs. If the output is new, it will be printed
            for each_event in log_events:
                    if each_event['message'] not in event:
                        if each_event['message'].split("\n")[-1] == 'Done!': 
                            print(each_event['message'][:-5].replace('\r', '\n'))
                            event.append(each_event['message'])

                        else:
                            print(each_event['message'].replace('\r', '\n'))
                            event.append(each_event['message'])

            # Futhermore, if the output corresponds to the last keyword "Done!", we can stop the loop 
            if log_events[-1]['message'].split("\n")[-1] == 'Done!' or log_events[-1]['message'].split("\n")[-2] == 'Done!': break

        except: 
            time.sleep(0.2)

In [19]:
'''
This function calls all the previously defined function in order to set up the AWS applicatino architecture if this is the first run of the session
The user will be able to choose the operation to perform as well as the shape of the two input matrices

Input: None
Output: None. This function will just print the outputs of the master script
'''

setup = 0

def local() :
    # First we will use a global variable to set up the AWS architecture
    global setup
    
    if setup == 0 : # If setup is equal to 0, this means that we need to setup the architecture
        MasterId, WorkersId = setup_application()
        setup += 1
    else :
        MasterId = create_master()
        WorkersId = getWorkersId()
        print("The Cloud environment has already been set up!")
    
    # When the application has been set up, the user will be asked for the operation to perform
    operation = Operation()
    
    # Then the user should choose the shapes of the input matrices, which should be consistent
    A_shape, B_shape = Verify_shapes(operation)

    # Depending of the shapes, the number of workers to be used is computed 
    nb_workers_to_deploy = NbWorkerToDeploy(A_shape, B_shape)
    
    # We can then run the worker script. It is an infinite whiel loop that wait for a message in the SQS Queue to do a job
    # So we can run it first to decrease the computational time
    ssm_command_worker = run_worker_code(WorkersId[:nb_workers_to_deploy], operation)
    
    # Then we can run the master script
    ssm_command_master = run_master_code(MasterId, operation, A_shape, B_shape)

    masterCommandId = ssm_command_master['Command']['CommandId']
    
    get_output(masterCommandId, MasterId[0])
    
    # We can also stop the worker script because it is a infinite while loop.
    command_id_worker = ssm_command_worker['Command']['CommandId']

    response = ssm_client.cancel_command(CommandId=command_id_worker, InstanceIds=WorkersId[:nb_workers_to_deploy])

In [20]:
local()

The set-up of the application has started...
The master instance is now running and ready to process the data
It took 5.648499250411987 seconds to deploy the master instance

We will now set up the 8 processing nodes
The worker instances are now running and ready to process the work packages send to the queue
It took 32.934983253479004 seconds to deploy the workers instances

We will now make sure that the queues are created and ready to send the work packages and receive the results
The SQS Queues are now created and ready
It took 0.5575668811798096 seconds to deploy the 2 SQS Queues

in total, it took 39.1410493850708 seconds to set up the Cloud environment

Which operation do you want to perform ?
    - Input 1 for matrix addition
    - Input 2 for matrix multiplication


 1



You want to compute an addition of two matrices
The shape of the matrices should be the same
The matrices should also be smaller than 18000x18000


Input the shape of the matrices in the following format: row column
 1000 1000




Master Output:

The two matrices have been created. The computation can now start
The operation has been decomposed into 24 work packages
We begin to send the work packages



Sending Messages:   0%|                                                      | 0/24 [00:00<?, ?it/s]
Sending Messages: 100%|█████████████████████████████████████████████| 24/24 [00:02<00:00, 11.28it/s]True

Sending Messages: 100%|█████████████████████████████████████████████| 24/24 [00:02<00:00, 11.27it/s]

All the work packages have been sent to the Queue
It took 2.1688 seconds to send all the work packages
The first result can be received



Receiving Messages:   0%|                                                    | 0/24 [00:00<?, ?it/s]
Receiving Messages: 100%|███████████████████████████████████████████| 24/24 [00:01<00:00, 14.07it/s]True

Receiving Messages: 100%|███████████████████████████████████████████| 24/24 [00:01<00:00, 14.07it/s]

All the results have been received
It took 1.7130 seconds to rece