# **AWS Lambda**

In [None]:
import boto3
import pandas as pd
import io

s3 = boto3.client('s3')

def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']

# Read the CSV file from S3
response = s3.get_object(Bucket=bucket, Key=key)
df = pd.read_csv(response['Body'], encoding='utf-8')

# Ensure 'delay' column exists
if 'delay' not in df.columns:
return {"status": "error", "message": "Column 'delay' not found in dataset."}

# Detect anomalies (Flights delayed over 500 minutes)
anomalies = df[df['delay'] > 500]
clean_data = df[df['delay'] <= 500]

# Move anomalies to the 'anomalies' folder
if not anomalies.empty:
buffer = io.StringIO()
anomalies.to_csv(buffer, index=False)
anomaly_key = key.replace("raw", "anomalies")  # Ensure correct folder structure
s3.put_object(Bucket=bucket, Key=anomaly_key, Body=buffer.getvalue())

# Move clean data to 'processed' instead of overwriting 'raw'
if not clean_data.empty:
buffer = io.StringIO()
clean_data.to_csv(buffer, index=False)
processed_key = key.replace("raw", "processed")  # Store cleaned data in /processed/
s3.put_object(Bucket=bucket, Key=processed_key, Body=buffer.getvalue())

return {"status": "processed", "clean_records": len(clean_data), "anomaly_records": len(anomalies)}


# **Creation of state machine**

In [None]:
{
"StartAt": "RunCrawler",
"States": {
"RunCrawler": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:glue:startCrawler",
"Parameters": {
"Name": "AirlineRawDataCrawler"
},
"Next": "WaitForCrawler"
},
"WaitForCrawler": {
"Type": "Wait",
"Seconds": 60,
"Next": "CheckCrawlerStatus"
},
"CheckCrawlerStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:glue:getCrawler",
"Parameters": {
"Name": "AirlineRawDataCrawler"
},
"Next": "CrawlerStatusChoice"
},
"CrawlerStatusChoice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Crawler.State",
"StringEquals": "READY",
"Next": "RunGlueJob"
}
],
"Default": "WaitForCrawler"
},
"RunGlueJob": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "AirlineTransformJob"
},
"Next": "Success",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "SendSNSAlert"
}
]
},
"SendSNSAlert": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:891612582976:AirlineDataFailureTopic",
"Message": "Airline data pipeline failed. Check logs.",
"Subject": "Airline Pipeline Failure"
},
"Next": "Fail"
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail"
}
}
}


# **Event bridge**

In [None]:
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["airline-data-bucket-cc"]
},
"object": {
"key": [{
"prefix": "raw/"
}]
}
}
}