In [None]:
# Import necessary libraries
import requests  # For making HTTP requests to external APIs
import json  # For handling JSON data
import pandas as pd  # For data manipulation and processing
import boto3  # AWS SDK for Python to interact with AWS services
import uuid  # For generating unique identifiers
from datetime import datetime  # For handling datetime objects

# Constants - These values should be configured as per your environment
S3_BUCKET = 'S3_BUCKET_NAME'  # Name of the S3 bucket where data will be stored
S3_KEY = 'offset.json'  # S3 object key for the offset file (tracking the last processed record)
KINESIS_STREAM_NAME = 'DATA_STREAM_NAME'  # The name of the Kinesis stream for data ingestion
API_ACCESS_KEY = 'YOUR_API_KEY'  # API access key for authenticating API requests
AIRLINE_NAME = 'American Airlines'  # The airline for which the data is being tracked
BATCH_SIZE = 100  # The number of records to fetch per API call

# Initialize AWS clients
# Boto3 client to interact with Amazon S3
s3 = boto3.client('s3')

# Boto3 client to interact with Amazon Kinesis for real-time data streaming
kinesis = boto3.client('kinesis')

# Functionality that interacts with the defined constants will be implemented below this point


In [None]:
def get_partition_key():
    """
    Generate a unique partition key using UUID.

    This function generates a universally unique identifier (UUID) and returns it
    as a string to be used as a partition key in streaming services like Kinesis.

    Returns:
        str: A string representation of a UUID to be used as a partition key.
    """
    return str(uuid.uuid4())  # Generate a UUID and convert it to a string


In [None]:
def get_offset_data():
    """
    Retrieve the current offset data from S3.

    This function reads the offset file from S3 to track the last processed record.
    It ensures the offset is reset if the current date has changed since the last run.

    Returns:
        dict: A dictionary containing the current date, offset, and total records.
              If the offset file does not exist or an error occurs, it initializes
              a new offset structure starting from zero.
    """
    try:
        # Attempt to fetch the offset JSON file from S3
        obj = s3.get_object(Bucket=S3_BUCKET, Key=S3_KEY)

        # Read and decode the file content
        data = json.loads(obj['Body'].read().decode())

        # Get today's date in 'YYYY-MM-DD' format
        current_date = datetime.now().strftime('%Y-%m-%d')

        # If the date has changed, reset the offset and total_records
        if data.get('current_date') != current_date:
            data = {
                'current_date': current_date,
                'offset': 0,
                'total_records': None
            }

        return data

    except s3.exceptions.NoSuchKey:
        # If the offset file doesn't exist, start fresh with default values
        return {
            'current_date': datetime.now().strftime('%Y-%m-%d'),
            'offset': 0,
            'total_records': None
        }

    except Exception as e:
        # Log the exception and return a fresh offset structure
        print(f"Error reading offset from S3: {e}")
        return {
            'current_date': datetime.now().strftime('%Y-%m-%d'),
            'offset': 0,
            'total_records': None
        }


In [None]:
def update_offset_data(offset, total_records):
    """
    Update the offset file in S3 with the latest processed offset and total record count.

    This function creates or overwrites the offset JSON file in the specified S3 bucket
    to persist the current processing state. It's useful for batch or stream processing
    to ensure resumability and avoid reprocessing records.

    Args:
        offset (int): The current offset (index) of the last processed record.
        total_records (int or None): The total number of records expected (optional).
    """
    data = {
        'current_date': datetime.now().strftime('%Y-%m-%d'),  # Today's date to track date-wise batches
        'offset': offset,  # Last processed offset
        'total_records': total_records  # Optional: total expected records from API
    }

    try:
        # Upload the updated offset data as a JSON object to S3
        s3.put_object(
            Bucket=S3_BUCKET,
            Key=S3_KEY,
            Body=json.dumps(data)
        )
    except Exception as e:
        # Log the error for debugging; consider using CloudWatch logs in production
        print(f"Error writing offset to S3: {e}")


In [None]:
def fetch_flight_data(offset):
    """
    Fetch a batch of flight data from the AviationStack API.

    This function constructs a request URL with parameters such as API key,
    airline name, batch size, and offset. It then sends a GET request to fetch
    flight data and returns it as a pandas DataFrame along with pagination info.

    Args:
        offset (int): The offset value to start fetching records from (used for pagination).

    Returns:
        tuple:
            pd.DataFrame: A DataFrame containing the flight data for the current batch.
            dict or None: Pagination metadata from the API response if available.
    """
    # Construct the API URL with query parameters
    url = (
        f"https://api.aviationstack.com/v1/flights"
        f"?access_key={API_ACCESS_KEY}"
        f"&airline_name={AIRLINE_NAME}"
        f"&limit={BATCH_SIZE}"
        f"&offset={offset}"
    )

    # Send the GET request to the AviationStack API
    response = requests.get(url)

    # Raise an exception if the response status code is not 200 (OK)
    if response.status_code != 200:
        raise Exception(
            f"API request failed with status {response.status_code}: {response.text}"
        )

    # Parse the JSON response body
    response_data = response.json()

    # Check if 'data' field is present and non-empty
    if 'data' not in response_data or not response_data['data']:
        # Return an empty DataFrame and pagination info (can be None)
        return pd.DataFrame(), response_data.get('pagination', None)

    # Normalize nested JSON structure into a flat pandas DataFrame
    df = pd.json_normalize(response_data['data'], sep='_')

    # Return the DataFrame and pagination metadata (useful for checking if more data exists)
    return df, response_data.get('pagination', None)


In [None]:
def validate_schema(df):
    """
    Validate the schema and data integrity of the flight DataFrame.

    This function ensures that:
    - All expected columns are present.
    - Critical fields have correct data types.
    - Required fields do not contain null values.

    Args:
        df (pd.DataFrame): The DataFrame containing flight data.

    Raises:
        ValueError: If any required column is missing, has invalid types,
                    or contains null values in critical fields.
    """

    # Define the columns expected in the incoming data
    expected_columns = ['flight_iata', 'departure', 'arrival', 'status', 'airline_name']

    # Check for missing columns in the DataFrame
    missing_columns = [col for col in expected_columns if col not in df.columns]
    if missing_columns:
        raise ValueError(f"Missing expected columns: {', '.join(missing_columns)}")

    # Validate that 'flight_iata' contains only strings
    if not df['flight_iata'].apply(lambda x: isinstance(x, str)).all():
        raise ValueError("Column 'flight_iata' must contain only strings.")

    # Ensure 'departure' field is not null (could represent missing location/times)
    if df['departure'].isnull().any():
        raise ValueError("Column 'departure' contains null values, which are not allowed.")

    # Ensure 'arrival' field is not null (important for flight tracking)
    if df['arrival'].isnull().any():
        raise ValueError("Column 'arrival' contains null values, which are not allowed.")

    # If all checks pass, log success
    print("Schema validation passed.")


In [None]:
def main():
    """
    Main orchestration function to extract, validate, and stream flight data.

    This function:
    - Retrieves the last saved offset from S3.
    - Fetches flight data from the AviationStack API in batches.
    - Validates each batch against a defined schema.
    - Sends validated records to a Kinesis Data Stream.
    - Updates the offset metadata in S3 to ensure idempotency and resumption.

    Runs in a loop until all available records are processed or an error occurs.
    """

    # Load current offset and total record metadata from S3
    offset_data = get_offset_data()
    current_offset = offset_data['offset']
    total_records = offset_data['total_records']

    while True:
        print(f"Fetching data with offset {current_offset}...")

        try:
            # Fetch batch of flight data from API
            df, pagination = fetch_flight_data(current_offset)
        except Exception as e:
            # Break loop if API fetch fails
            print(f"Failed to fetch data: {e}")
            break

        # Stop processing if no more data or pagination info is missing
        if df.empty or not pagination:
            print("No more data to process or pagination info missing.")
            break

        # Validate schema of incoming data
        try:
            validate_schema(df)
        except ValueError as e:
            print(f"Schema validation failed: {e}")
            break

        # Set total_records only once, based on the first pagination response
        if total_records is None:
            total_records = pagination.get('total', None)
            if total_records is None:
                print("Total records info missing from API response.")
                break

        # Check if remaining records are fewer than a full batch
        remaining = total_records - current_offset
        if remaining < BATCH_SIZE:
            print(f"Not enough data available to fetch next batch. Remaining: {remaining}")
            break

        # Convert DataFrame to list of JSON records for streaming
        records = df.to_dict(orient='records')

        # Send each record to Kinesis stream
        for record in records:
            try:
                kinesis.put_record(
                    StreamName=KINESIS_STREAM_NAME,
                    Data=json.dumps(record) + '\n',
                    PartitionKey=get_partition_key()
                )
                print(f"Successfully sent record with flight number: {record.get('flight_iata', 'N/A')}")
            except Exception as e:
                print(f"Failed to send record: {record}. Error: {e}")

        # Increment offset and update metadata in S3
        current_offset += BATCH_SIZE
        update_offset_data(current_offset, total_records)

        # Break loop if all records have been processed
        if current_offset >= total_records:
            print("Reached end of available records.")
            break


if __name__ == "__main__":
    main()
