In [None]:
import os
import sys
import json
import logging
import boto3
import requests
from datetime import datetime
from dotenv import load_dotenv
from lambda_flights_raw.utils.models import StateVector

load_dotenv()

# AWS clients
secrets_client = boto3.client("secretsmanager")
kinesis_client = boto3.client("kinesis")

# OpenSky OAuth2 + API endpoints
OPENSKY_TOKEN_URL = (
    "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
)
OPENSKY_STATES_URL = "https://opensky-network.org/api/states/all"



In [None]:
# logging 

logger = logging.getLogger()
logger.setLevel(logging.INFO)

if not logger.handlers:
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)



2026-02-15 12:47:11,168 - INFO - root - teste de log INFO


In [17]:
# opensky functions 

def _get_opensky_credentials_from_secret():
    """
    Lê client_id e client_secret do AWS Secrets Manager.

    Espera que o secret (apontado por OPENSKY_SECRET_ARN) tenha o formato:
    {
      "client_id": "xxxx",
      "client_secret": "yyyy"
    }
    """
    try:
        client_id = os.environ.get("OPENSKY_CLIENT_ID")
        client_secret = os.environ.get("OPENSKY_CLIENT_SECRET")
        if not client_id or not client_secret:
            logger.error("Credentials is missing in secret")
            return None, None
        return client_id, client_secret
    except Exception as e:
        logger.error(f"Error retrieving OpenSky credentials from Secrets Manager: {e}")
        return None, None

def get_opensky_access_token():
    """
    Autentica na OpenSky via OAuth2 Client Credentials e retorna o access_token (Bearer).
    """
    client_id, client_secret = _get_opensky_credentials_from_secret()
    if not client_id or not client_secret:
        return None

    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
    }

    try:
        resp = requests.post(OPENSKY_TOKEN_URL, data=data, timeout=10)
        resp.raise_for_status()
        access_token = resp.json().get("access_token")
        if not access_token:
            logger.error("No access_token in OpenSky auth response")
            return None
        logger.info("Successfully obtained OpenSky access token")
        return access_token
    except requests.RequestException as e:
        logger.error(f"Error obtaining OpenSky access token: {e}")
        return None

def convert_states_response_to_json(states: list[StateVector]) -> dict:
    json_data = {
        "timestamp": datetime.now().isoformat(),
        "total_states": 0,
        "states": [],
    }
    for state in states:
        if state.origin_country != "Brazil":
            continue
        json_data["states"].append(state.to_dict())

    json_data["total_states"] = len(json_data["states"])
    return json_data

def send_states_to_kinesis(json_resultado: dict, batch_size: int = 500) -> bool:
    """
    Envia todos os estados para o Kinesis usando PutRecords em batch.

    Retorna True se todos forem enviados com sucesso, False caso haja falhas.
    """
    stream_name = os.environ.get("KINESIS_STREAM")
    if not stream_name:
        logger.error("KINESIS_STREAM environment variable not set")
        return False

    states = json_resultado["states"]
    if not states:
        logger.info("No states to send to Kinesis")
        return True

    logger.info(f"Sending {len(states)} states to Kinesis stream '{stream_name}' in batches of {batch_size}...")

    all_ok = True

    # Monta todos os records
    records = [
        {
            "Data": json.dumps(state),
            "PartitionKey": state.get("icao24") or "unknown",
        }
        for state in states
    ]

    # Envia em lotes de até 500 registros (limite do Kinesis PutRecords)
    for i in range(0, len(records), batch_size):
        batch = records[i : i + batch_size]
        try:
            response = kinesis_client.put_records(
                StreamName=stream_name,
                Records=batch,
            )
            failed = response.get("FailedRecordCount", 0)
            if failed > 0:
                all_ok = False
                logger.error(
                    f"Batch {i//batch_size} - {failed}/{len(batch)} records failed"
                )
            else:
                logger.info(
                    f"Batch {i//batch_size} - sent {len(batch)} records successfully"
                )
        except Exception as e:
            all_ok = False
            logger.error(f"Error sending batch {i//batch_size} to Kinesis: {e}")

    logger.info(f"Finished sending {len(states)} states to Kinesis (batched)")
    return all_ok


def get_opensky_states(access_token):
    """
    Chama o endpoint /api/states/all usando Bearer token e
    retorna uma lista de StateVector.
    """
    headers = {"Authorization": f"Bearer {access_token}"}

    try:
        resp = requests.get(OPENSKY_STATES_URL, headers=headers, timeout=15)
        resp.raise_for_status()
        body = resp.json()
    except requests.RequestException as e:
        logger.error(f"Error calling OpenSky states API: {e}")
        return []

    raw_states = body.get("states", []) or []
    states: list[StateVector] = []

    for row in raw_states:
        try:
            state = StateVector.from_api_response(row)
            states.append(state)
        except Exception as e:
            logger.warning(f"Failed to parse state vector: {e}")

    logger.info(f"Retrieved {len(states)} state vectors from OpenSky API")
    return states



In [18]:
access_token = get_opensky_access_token()
states = get_opensky_states(access_token)
json_resultado = convert_states_response_to_json(states)



2026-02-15 13:19:59,559 - INFO - root - Successfully obtained OpenSky access token
2026-02-15 13:20:01,736 - INFO - root - Retrieved 9764 state vectors from OpenSky API


In [19]:
ok = send_states_to_kinesis(json_resultado)

2026-02-15 13:20:15,322 - INFO - root - Sending 89 states to Kinesis stream 'flight-radar-kinesis-stream-flights' in batches of 500...
2026-02-15 13:20:16,462 - INFO - root - Batch 0 - sent 89 records successfully
2026-02-15 13:20:16,463 - INFO - root - Finished sending 89 states to Kinesis (batched)
