<a href="https://colab.research.google.com/github/yuliiabosher/Cyber_Resilience_Course/blob/main/Testing_lambda_function_locally_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Testing the lambda function code locally

#### Installing the boto3 module

In [1]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.34.64-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.35.0,>=1.34.64 (from boto3)
  Downloading botocore-1.34.64-py3-none-any.whl (12.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.0/12.0 MB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.1-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.2/82.2 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.34.64 botocore-1.34.64 jmespath-1.0.1 s3transfer-0.10.1


#### Importing the required modules

In [2]:
import pandas as pd
import io
import os
import json
import csv
import boto3
from IPython.display import clear_output

#### Saving an access key and a secret key in an environment variable
---
This is only required to test the Lambda code locally as in the production environment an AWS role was assigned to the Lambda function. The role that grants full access to AWS S3 allows Lambda access the bucket without the need for a secret and access keys

In [3]:
def set_environment_variable_values():
  ACCESS_KEY = input("Please enter the AWS access key: ")
  SECRET_ACCESS_KEY = input("Please enter the AWS secret access key: ")
  BUCKET_NAME = input("Please enter the name of the bucket in S3: ")
  os.environ['ACCESS_KEY'] = ACCESS_KEY
  os.environ['SECRET_ACCESS_KEY'] = SECRET_ACCESS_KEY
  os.environ['BUCKET_NAME'] = BUCKET_NAME
  clear_output()
  return None

set_environment_variable_values()

#### Creating a connection to the S3 bucket
---

In [4]:
def get_S3_client():
	resource = boto3.client(
     "s3",
		aws_access_key_id = os.environ.get('ACCESS_KEY'),
		aws_secret_access_key = os.environ.get('SECRET_ACCESS_KEY')
	)
	return resource

s3_client = get_S3_client()

#### Defining show_schools_data_in_bucket(client, filename) function
---
Some adjustments were made so that the function could be tested locally. Some original Lambda code lines are hashed out and some lines were added.

In [7]:
def show_schools_data_in_bucket(client, filename):
    try:
        s3_client = client    # added
        # s3_client = boto3.client('s3')
        s3 = boto3.resource('s3')
        bucket = s3.Bucket(os.environ.get("BUCKET_NAME"))
        obj = s3_client.get_object(Bucket=os.environ.get("BUCKET_NAME"), Key=filename)
        data = obj['Body'].read().decode('utf-8').splitlines()
        records = csv.reader(data)
        data_array =[]
        for row in records:
            data_array.append(row)
        return "The data has been found", data_array
    except Exception as e:
      if str(e) == 'An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.': # added
    # except botocore.exceptions.ClientError as e:
      #  if e.response["Error"]["Code"] == "NoSuchKey":
        return "There is no such file", []
      else:
        # return e.response["Error"]["Code"], []
        return str(e), []                                               # added
    # except:
      #  return "There was an error", []

#### Defining add_school_data_to_bucket(client, filename, filedata) function
---
Some adjustments were made so that the function could be tested locally. Some original Lambda code lines were commented out and new lines were added.

In [70]:
def add_school_data_to_bucket(client, filename, filedata):
    try:
        s3_client = client # added
        #s3_client = boto3.client('s3')
        s3 = boto3.resource('s3')
        bucket = s3.Bucket(os.environ.get("BUCKET_NAME"))
        obj = s3_client.get_object(Bucket=os.environ.get("BUCKET_NAME"), Key=filename)
        data = obj['Body'].read().decode('utf-8').splitlines()
        records = csv.reader(data)
        names_array = []
        data_array =[]                   # added
        # with open('/tmp/schools_list.csv', 'w', newline='') as f:
        for row in records:
          names_array.append(row[0])
          data_array.append(row)         # added
          # writer = csv.writer(f)
          # writer.writerow(row)
        new_data_rows = []
        # with open('/tmp/schools_list.csv', 'a', newline='') as f:
        for i in filedata:
          if i[0] not in names_array:
            new_data_rows.append(i)
          # writer = csv.writer(f)
          # writer.writerow(i)
        if len(new_data_rows) == 0:
            return "These schools are already in a file", []
        else:
            # bucket.upload_file('/tmp/schools_list.csv', filename)
            file_object =  io.StringIO()     # added
            pd.DataFrame(data_array+new_data_rows).to_csv(file_object, index=False, header=False) # added
            response = s3_client.put_object(Bucket=os.environ.get('BUCKET_NAME'), Body=file_object.getvalue(), Key=filename) # added
            return "New data been successfully added", new_data_rows
    except Exception as e:
      if str(e) == 'An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.':   # added
    # except botocore.exceptions.ClientError as e:
     #   if e.response["Error"]["Code"] == "NoSuchKey":
        # with open('/tmp/schools_list.csv', 'w', newline='') as f:
          # writer = csv.writer(f)
           # writer.writerow(headers)
           # for row in filedata:
            #    writer.writerow(row)
        # bucket.upload_file('/tmp/schools_list.csv', filename)
        file_object =  io.StringIO()    # added
        pd.DataFrame(filedata).to_csv(file_object, index=False, header=False) # added
        response = s3_client.put_object(Bucket=os.environ.get('BUCKET_NAME'), Body=file_object.getvalue(), Key=filename) # added
        return "New file been successfully created", filedata
      else:
        return str(e), []      # added
         # return e.response["Error"]["Code"], []

#### Defining the lambda_handler(event, context) function
---
Some minor edits were made to the lambda_handler(event, context) function running in the production environment in order to make it possible to test it locally

In [71]:
def lambda_handler(event, context):
    try:
        filename = "schools_list.csv"
        # client = boto3.client('s3')
        client = s3_client                      # added
        if event["httpMethod"] == "POST":
            if "body" in event.keys():
                request = event["body"]
                if type(request) is not dict:
                    request = json.loads(request)
                if request is not None and "data" in request.keys():
                    '''if there is no data return a message and an empty list '''
                    data = request["data"]
                    if len(data) == 0:
                        message, return_data = "Please enter valid data", []
                        statuscode = 404
                    else:
                        message, return_data = add_school_data_to_bucket(client, filename, data)
                        statuscode = 200
                else:
                    message, return_data = "Error in the POST request occured", []
                    statuscode = 404
        elif event["httpMethod"] == "GET":
            message, return_data = show_schools_data_in_bucket(client, filename)
            message="Success"
            statuscode = 200
        else:
            message, return_data = "Error occured", []
            statuscode = 404
        return {'statusCode': statuscode,
                'headers': {'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'},
                #'body': json.dumps({"message": message, "data": return_data})
                'body': {"message": message, "data": return_data}    # added
                }
    except:
        return {'statusCode': 404,
                'headers': {'Content-Type': 'application/json',
                            'Access-Control-Allow-Headers': 'Content-Type,X-Api-Key',
                            'Access-Control-Allow-Methods': 'POST',
                            'Access-Control-Allow-Origin': '*'},
                # 'body': json.dumps({"message": "There was an error", "data": []})
                'body': {"message": "There was an error", "data": []} # added
                }

#### Testing a new file creation
---
All the files were deleted from the bucket before the test. Asserting that the message 'New file been successfully created' is returned when trying to add data to a file that does not exist

In [75]:
event0 = {
    "httpMethod": "POST",
    "body": {
    "data": [
        [
            "Bannockburn Primary School & Nursery",
            51.4869172,
            0.1015561
        ],
        [
            "St Margaret Clitherow Primary School",
            51.50103289999999,
            0.1132992
        ]
    ]
}
}
test0 = lambda_handler(event0, None)
if test0['body']['message'] == 'New file been successfully created':
  print('test0 passed')
else:
  print('test0 failed')

test0 passed


#### Testing that the function does not add schools already in the file
---
Test1 should follow test0. Asserting that the message 'These schools are already in a file' is returned when trying to add schools already in the bucket file

In [76]:
event1 = {
    "httpMethod": "POST",
    "body": {
    "data": [
        [
            "Bannockburn Primary School & Nursery",
            51.4869172,
            0.1015561
        ],
        [
            "St Margaret Clitherow Primary School",
            51.50103289999999,
            0.1132992
        ]
    ]
}
}
test1 = lambda_handler(event1, None)
if test1['body']['message'] == 'These schools are already in a file':
  print('test1 passed')
else:
  print('test1 failed')

test1 passed


#### Testing the usage of the httpMethod GET
---
Asserting that statusCode 200 is returned when httpMethod GET is used

In [77]:
event2 = {
    "httpMethod": "GET"
}

test2 = lambda_handler(event2, None)
if test2['statusCode'] == 200:
  print('test2 passed')
else:
  print('test2 failed')

test2 passed


#### Testing if the Lambda function fails gracefully if the httpMethod is not POST or GET
---
Asserting that when httpMethod DELETE is used the 'Error occured' message is returned

In [78]:
event3 = {"httpMethod": "DELETE"}
test3 = lambda_handler(event3, None)
if test3['body']['message'] == 'Error occured':
  print('test3 passed')
else:
  print('test3 failed')

test3 passed


#### Testing that the Lambda function fails gracefully with body as an empty string
---
Asserting that when the httpMethod POST is used and the "body" is an empty string, statusCode 404 is displayed

In [79]:
event4 = {
    "httpMethod": "POST",
    "body": ""
}
test4 = lambda_handler(event4, None)
if test4['statusCode'] == 404:
  print('test4 passed')
else:
  print('test4 failed')

test4 passed


#### Testing that the Lambda function fails gracefully with data as an empty list
---
Asserting that the statusCode 404 is returned when httpMethod POST is used and the data field in input is an empty list

In [80]:
event5 = {
    "httpMethod": "POST",
    "body": {
    "data": []
}
}
test5 = lambda_handler(event5, None)
if test5['statusCode'] == 404:
  print('test5 passed')
else:
  print('test5 failed')

test5 passed


#### Testing that the Lambda function fails gracefully with data as an empty string
---
Asserting that when httpMethod POST is used the message 'Please enter valid data' is returned if data field in input is an empty string

In [81]:
event6 = {
    "httpMethod": "POST",
    "body": {
    "data": ""
}
}
test6 = lambda_handler(event6, None)
if test6['body']['message'] == 'Please enter valid data':
  print('test6 passed')
else:
  print('test6 failed')

test6 passed


#### Testing that the Lambda function fails gracefully if the data field is a single digit integer
---
Asserting that the message 'There was an error' is returned if the httpMethod is POST and the data field is 1

In [82]:
event7 = {
    "httpMethod": "POST",
    "body": {
    "data": 1
}
}
test7 = lambda_handler(event7, None)
if test7['body']['message'] == 'There was an error':
  print('test7 passed')
else:
  print('test7 failed')

test7 passed


#### Testing that the Lambda function fails gracefully if the there is no body
---
Asserting that an empty list is returned if the httpMethod is POST and there is no body in input

In [83]:
event8 = {
    "httpMethod": "POST"
}
test8 = lambda_handler(event8, None)
if test8['body']['data'] == []:
  print('test8 passed')
else:
  print('test8 failed')

test8 passed


#### Testing that the new data is added successfully if a school is not listed in the bucket file
---
Test9 should only be executed once in the intended order. Asserting that the list of lists is returned with the newly added data if the httpMethod POST is used and the data input field contains a school not in the bucket file

In [84]:
event9 = {
    "httpMethod": "POST",
    "body": {"data":[['Riverside Primary Academy',54.9564501,-1.6441569]]}
}
test9 = lambda_handler(event9, None)
if test9['body']['data'] == [['Riverside Primary Academy', 54.9564501, -1.6441569]]:
  print('test9 passed')
else:
  print('test9 failed')

test9 passed


#### Testing that all data has been added correctly
---
Assering that data variable returned is [['Bannockburn Primary School & Nursery', '51.4869172', '0.1015561'], ['St Margaret Clitherow Primary School', '51.50103289999999', '0.1132992'], ['Riverside Primary Academy', '54.9564501', '-1.6441569']] after running all the tests from test0 to test10

In [86]:
event10 = {
    "httpMethod": "GET"
}
test10 = lambda_handler(event10, None)
if test10['body']['data'] == [['Bannockburn Primary School & Nursery', '51.4869172', '0.1015561'], ['St Margaret Clitherow Primary School', '51.50103289999999', '0.1132992'], ['Riverside Primary Academy', '54.9564501', '-1.6441569']]:
  print('test10 passed')
else:
  print('test10 failed')

test10 passed
