In [None]:
"""
Demo de Seguran√ßa em IoT - AWS IoT Core
MBA FIAP - Internet das Coisas

VERS√ÉO PARA JUPYTER NOTEBOOK LOCAL
Execute c√©lula por c√©lula no Jupyter
"""

# ============================================================================
# C√âLULA 1: IMPORTS E CONFIGURA√á√ÉO
# ============================================================================

print("="*70)
print("üîê DEMO DE SEGURAN√áA EM IoT - AWS IoT Core")
print("="*70)
print()

# Imports
import json
import time
import ssl
from datetime import datetime
import os

print("‚úÖ Bibliotecas importadas")
print()

In [None]:
# ============================================================================
# C√âLULA 2: CONFIGURAR PATHS E VARI√ÅVEIS
# ============================================================================

# ‚ö†Ô∏è AJUSTE ESTES VALORES com suas configura√ß√µes
AWS_IOT_ENDPOINT = "seu-endpoint-aqui-ats.iot.us-east-1.amazonaws.com"
AWS_REGION = "us-east-1"
THING_NAME = "sensor-01-secure"
CLIENT_ID = "sensor-01"

# Caminho dos certificados no seu Mac
CERTS_DIR = "/Users/dmacedo/Documents/Codes/Projects/sec_iot_fiap/aws_iot_certs"

# Arquivos de certificado
cert_file = os.path.join(CERTS_DIR, "sensor-01-certificate.pem.crt")
key_file = os.path.join(CERTS_DIR, "sensor-01-private.pem.key")
root_ca_file = os.path.join(CERTS_DIR, "AmazonRootCA1.pem")

# Verificar
print("üìã Configura√ß√£o:")
print(f"   Endpoint: {AWS_IOT_ENDPOINT}")
print(f"   Region:   {AWS_REGION}")
print(f"   Thing:    {THING_NAME}")
print(f"   Client:   {CLIENT_ID}")
print()

print("üîê Verificando certificados...")
for f, name in [(cert_file, "Certificado"), (key_file, "Chave privada"), (root_ca_file, "Root CA")]:
    if os.path.exists(f):
        print(f"   ‚úÖ {name} encontrado")
    else:
        print(f"   ‚ùå {name} N√ÉO encontrado: {f}")

print()



In [None]:
# ============================================================================
# C√âLULA 3: INSTALAR PAHO-MQTT
# ============================================================================

print("üì¶ Instalando paho-mqtt...")

try:
    import paho.mqtt.client as mqtt
    print("‚úÖ paho-mqtt j√° instalado")
except ImportError:
    print("‚è≥ Instalando...")
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "paho-mqtt==1.6.1", "--quiet"])
    import paho.mqtt.client as mqtt
    print("‚úÖ paho-mqtt instalado")

print()

In [None]:
# ============================================================================
# C√âLULA 4: CONFIGURAR CALLBACKS MQTT
# ============================================================================

# Vari√°veis de controle
connected_flag = False
publish_success = False

def on_connect(client, userdata, flags, rc):
    """Callback de conex√£o"""
    global connected_flag
    if rc == 0:
        connected_flag = True
        print("\nüîí ‚úÖ CONECTADO AO AWS IoT CORE!")
        print("   üîê mTLS conclu√≠do com sucesso")
        print("   ‚úîÔ∏è Cliente validou servidor AWS")
        print("   ‚úîÔ∏è Servidor validou certificado X.509 do dispositivo\n")
    else:
        print(f"\n‚ùå ERRO na conex√£o. C√≥digo: {rc}")
        errors = {
            1: "Protocolo incorreto",
            2: "Client ID inv√°lido",
            3: "Servidor indispon√≠vel",
            4: "Credenciais inv√°lidas",
            5: "N√£o autorizado (verificar pol√≠ticas)"
        }
        print(f"   {errors.get(rc, 'Erro desconhecido')}\n")

def on_publish(client, userdata, mid):
    """Callback de publica√ß√£o"""
    global publish_success
    publish_success = True
    print(f"   ‚úÖ Mensagem {mid} publicada")

def on_message(client, userdata, message):
    """Callback de recebimento"""
    print(f"\nüì® Mensagem recebida:")
    print(f"   T√≥pico: {message.topic}")
    print(f"   Payload: {message.payload.decode()}\n")

def on_subscribe(client, userdata, mid, granted_qos):
    """Callback de subscri√ß√£o"""
    print(f"   ‚úÖ Subscrito com sucesso")

print("‚úÖ Callbacks configurados")
print()

In [None]:
# ============================================================================
# C√âLULA 5: CRIAR CLIENTE MQTT E CONFIGURAR TLS
# ============================================================================

print("üîß Criando cliente MQTT...")

# Criar cliente
client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv311)

# Anexar callbacks
client.on_connect = on_connect
client.on_publish = on_publish
client.on_message = on_message
client.on_subscribe = on_subscribe

print("‚úÖ Cliente criado")
print()

# ‚≠ê CONFIGURAR TLS/SSL (PONTO CR√çTICO DE SEGURAN√áA)
print("‚≠ê Configurando TLS/SSL com mTLS...")
print()

client.tls_set(
    ca_certs=root_ca_file,           # Valida servidor AWS
    certfile=cert_file,              # Identidade do cliente
    keyfile=key_file,                # Chave privada
    cert_reqs=ssl.CERT_REQUIRED,     # Exige cert do servidor
    tls_version=ssl.PROTOCOL_TLSv1_2 # TLS 1.2+
)

print("‚úÖ TLS configurado:")
print("   üîê TLS 1.2+")
print("   üîê Autentica√ß√£o M√∫tua (mTLS)")
print("   üîê Certificado X.509")
print("   üîê Valida√ß√£o obrigat√≥ria do servidor")
print()

In [None]:
# ============================================================================
# C√âLULA 6: CONECTAR AO AWS IoT CORE
# ============================================================================

print("üîå Conectando ao AWS IoT Core...")
print(f"   Endpoint: {AWS_IOT_ENDPOINT}")
print(f"   Porta: 8883 (MQTT/TLS)")
print()

# Resetar flag
connected_flag = False

# Conectar
client.connect(AWS_IOT_ENDPOINT, 8883, keepalive=60)
client.loop_start()

# Aguardar conex√£o
timeout = 15
start_time = time.time()
while not connected_flag and (time.time() - start_time) < timeout:
    time.sleep(0.5)

if not connected_flag:
    print("‚ùå Timeout na conex√£o")
    print("   Verifique: endpoint, certificados, pol√≠ticas")
else:
    print("‚úÖ Conex√£o estabelecida com sucesso!")

print()

In [None]:
# ============================================================================
# C√âLULA 7: TESTE 1 - T√≥pico PERMITIDO
# ============================================================================

print("="*70)
print("üß™ TESTE 1: Publica√ß√£o em T√≥pico PERMITIDO")
print("="*70)
print()

allowed_topic = "iot/security/demo/sensor01/temperature"
print(f"T√≥pico: {allowed_topic}")
print(f"Pol√≠tica IoT: iot/security/demo/* ‚úÖ MATCH")
print()

# Payload
payload_1 = {
    "device_id": CLIENT_ID,
    "timestamp": datetime.now().isoformat(),
    "temperature": 23.5,
    "humidity": 65.2,
    "test": "ALLOWED_TOPIC"
}

print("üì§ Publicando...")
print(f"   Payload: {json.dumps(payload_1, indent=2)}")
print()

# Publicar
publish_success = False
client.publish(allowed_topic, json.dumps(payload_1), qos=1)
time.sleep(2)

if publish_success:
    print("‚úÖ RESULTADO: AUTORIZADO")
    print()
    print("   üîç An√°lise de Seguran√ßa:")
    print("   ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ")
    print("   ‚úîÔ∏è Pol√≠tica IoT permitiu")
    print("   ‚úîÔ∏è T√≥pico corresponde ao padr√£o")
    print("   ‚úîÔ∏è Dados criptografados via TLS 1.2+")
    print("   ‚úîÔ∏è Integridade garantida")
else:
    print("‚ö†Ô∏è N√£o confirmado (aguarde ou verifique pol√≠ticas)")

print()

In [None]:
# ============================================================================
# C√âLULA 8: TESTE 2 - T√≥pico NEGADO ‚≠ê MOMENTO-CHAVE!
# ============================================================================

print("="*70)
print("üß™ TESTE 2: T√≥pico N√ÉO PERMITIDO ‚≠ê MOMENTO-CHAVE DA DEMO")
print("="*70)
print()

denied_topic = "iot/production/data"
print(f"T√≥pico: {denied_topic}")
print(f"Pol√≠tica IoT: iot/security/demo/*")
print(f"Match: ‚ùå N√ÉO CORRESPONDE")
print()
print("üéØ Objetivo: Demonstrar Princ√≠pio do Menor Privil√©gio")
print()

# Payload
payload_2 = {
    "device_id": CLIENT_ID,
    "timestamp": datetime.now().isoformat(),
    "data": "Tentativa n√£o autorizada",
    "test": "DENIED_TOPIC"
}

print("üì§ Tentando publicar em t√≥pico fora do escopo...")
print(f"   Payload: {json.dumps(payload_2, indent=2)}")
print()

# Tentar publicar
publish_success = False
client.publish(denied_topic, json.dumps(payload_2), qos=1)
time.sleep(3)

if publish_success:
    print("‚ö†Ô∏è ALERTA: Foi autorizado!")
    print("   Verificar pol√≠tica - pode estar muito permissiva")
else:
    print("‚úÖ RESULTADO: NEGADO (como esperado!)")
    print()
    print("   üîç An√°lise de Seguran√ßa:")
    print("   ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ")
    print("   ‚úîÔ∏è Pol√≠tica IoT BLOQUEOU a opera√ß√£o")
    print("   ‚úîÔ∏è Princ√≠pio do Menor Privil√©gio aplicado")
    print("   ‚úîÔ∏è Dispositivo limitado ao escopo definido")
    print("   ‚úîÔ∏è T√≥pico fora do padr√£o = NEGADO")
    print()
    print("   üõ°Ô∏è Seguran√ßa Demonstrada:")
    print("   ‚Ä¢ Controle de acesso granular por t√≥pico")
    print("   ‚Ä¢ Preven√ß√£o de acesso n√£o autorizado")
    print("   ‚Ä¢ Isolamento entre ambientes (demo vs production)")
    print("   ‚Ä¢ Se dispositivo for comprometido, dano √© LIMITADO")

print()

In [None]:
# ============================================================================
# C√âLULA 9: TESTE 3 - Subscribe e Receive
# ============================================================================

print("="*70)
print("üß™ TESTE 3: Subscri√ß√£o e Comunica√ß√£o Bidirecional")
print("="*70)
print()

subscribe_topic = "iot/security/demo/sensor01/commands"
print(f"T√≥pico: {subscribe_topic}")
print()

# Subscrever
print("üì• Subscrevendo...")
client.subscribe(subscribe_topic, qos=1)
time.sleep(2)

# Publicar para si mesmo
print("üì§ Publicando mensagem de teste...")
test_command = {
    "command": "STATUS_CHECK",
    "timestamp": datetime.now().isoformat(),
    "from": "control_center"
}
client.publish(subscribe_topic, json.dumps(test_command), qos=1)

print("‚è≥ Aguardando recebimento...")
time.sleep(3)

print()
print("‚úÖ Subscribe/Receive testados")
print("   ‚úîÔ∏è Comunica√ß√£o bidirecional funcionando")
print("   ‚úîÔ∏è Permiss√µes de Subscribe e Receive verificadas")
print()

In [None]:
# ============================================================================
# C√âLULA 10: RESUMO DOS CONCEITOS
# ============================================================================

print("="*70)
print("üìö CONCEITOS DE SEGURAN√áA DEMONSTRADOS")
print("="*70)
print()

concepts = {
    "1. mTLS": "Autentica√ß√£o m√∫tua - cliente e servidor se validam",
    "2. X.509": "Certificado digital √∫nico por dispositivo",
    "3. Pol√≠ticas IoT": "Controle granular de acesso (Teste 2 comprovou!)",
    "4. TLS 1.2+": "Criptografia forte em tr√¢nsito",
    "5. Menor Privil√©gio": "Permiss√µes m√≠nimas - limita danos",
    "6. Defense in Depth": "M√∫ltiplas camadas de prote√ß√£o"
}

for conceito, descricao in concepts.items():
    print(f"   ‚úîÔ∏è {conceito}: {descricao}")

print()
print("‚≠ê Destaque: TESTE 2 demonstrou que mesmo dispositivo")
print("   AUTENTICADO √© BLOQUEADO ao tentar acessar recursos")
print("   fora do seu escopo. Isso √© SEGURAN√áA EM A√á√ÉO!")
print()

In [None]:
# ============================================================================
# C√âLULA 11: DESCONECTAR E FINALIZAR
# ============================================================================

print("üßπ Desconectando...")
client.loop_stop()
client.disconnect()
time.sleep(1)

print()
print("="*70)
print("‚úÖ DEMONSTRA√á√ÉO CONCLU√çDA COM SUCESSO!")
print("="*70)
print()
print("Conceitos Demonstrados:")
print("  ‚úÖ Autentica√ß√£o m√∫tua (mTLS)")
print("  ‚úÖ Certificados X.509")
print("  ‚úÖ Pol√≠ticas IoT granulares")
print("  ‚úÖ Criptografia em tr√¢nsito (TLS 1.2+)")
print("  ‚úÖ Princ√≠pio do Menor Privil√©gio ‚≠ê‚≠ê‚≠ê")
print("  ‚úÖ Defesa em Profundidade")
print()
print("üéì Pronto para apresentar no MBA FIAP!")
print()