<a href="https://colab.research.google.com/github/marianeberga/Teste_Scrum/blob/main/Teste_Redefinicao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# ===========================
# Password Reset Service (Colab)
# ===========================
import os
import re
import hmac
import base64
import sqlite3
import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional

In [6]:
# -------- Configurações e Segurança --------
DEV_MODE = True                               # Mostra OTP/links no console e em "Outbox"
APP_BASE_URL = "https://exemplo-app.local"    # Somente para compor o link (simulado)
RESET_EXPIRES_MIN = 15                        # Validade em minutos (OTP/token)
MAX_OTP_ATTEMPTS = 5                          # Tentativas máximas do OTP
PBKDF2_ITERATIONS = 310000                    # Iterações PBKDF2
DB_PATH = "auth_demo.db"                      # Banco local (persistente no Colab até reiniciar)

def now_utc():
    return datetime.now(timezone.utc)

def new_salt(n: int = 16) -> bytes:
    return secrets.token_bytes(n)

def pbkdf2_hash(value: str, salt: bytes) -> str:
    """Retorna pbkdf2$<iterations>$<b64salt>$<b64hash>"""
    dk = hashlib.pbkdf2_hmac("sha256", value.encode("utf-8"), salt, PBKDF2_ITERATIONS)
    return f"pbkdf2${PBKDF2_ITERATIONS}${base64.b64encode(salt).decode()}${base64.b64encode(dk).decode()}"

def pbkdf2_verify(value: str, hashed: str) -> bool:
    try:
        alg, iterations, b64salt, b64hash = hashed.split("$")
        if alg != "pbkdf2":
            return False
        salt = base64.b64decode(b64salt.encode())
        expected = base64.b64decode(b64hash.encode())
        dk = hashlib.pbkdf2_hmac("sha256", value.encode("utf-8"), salt, int(iterations))
        return hmac.compare_digest(dk, expected)
    except Exception:
        return False

def make_token() -> str:
    return secrets.token_urlsafe(32)  # ~43 chars

def hash_token_for_storage(token: str) -> str:
    return hashlib.sha256(token.encode("utf-8")).hexdigest()

def make_otp() -> str:
    return f"{secrets.randbelow(10**6):06d}"

def check_password_strength(password: str) -> Optional[str]:
    """
    Regras didáticas:
    - Min 8 chars, 1 minúscula, 1 maiúscula, 1 dígito, 1 especial
    """
    if len(password) < 8:
        return "A senha deve ter pelo menos 8 caracteres."
    if not re.search(r"[a-z]", password):
        return "A senha deve conter pelo menos 1 letra minúscula."
    if not re.search(r"[A-Z]", password):
        return "A senha deve conter pelo menos 1 letra maiúscula."
    if not re.search(r"\d", password):
        return "A senha deve conter pelo menos 1 dígito."
    if not re.search(r"[^\w\s]", password):
        return "A senha deve conter pelo menos 1 caractere especial."
    return None

def normalize_phone(phone: str) -> str:
    return re.sub(r"\D", "", phone or "")

# -------- Banco de dados (SQLite) --------
def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn

def init_db():
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            email TEXT UNIQUE,
            phone TEXT UNIQUE,
            password_hash TEXT NOT NULL,
            created_at TEXT NOT NULL
        )""")
        c.execute("""
        CREATE TABLE IF NOT EXISTS password_resets (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER NOT NULL,
            token_hash TEXT,
            otp_hash TEXT,
            expires_at TEXT NOT NULL,
            attempts INTEGER NOT NULL DEFAULT 0,
            max_attempts INTEGER NOT NULL DEFAULT 5,
            used INTEGER NOT NULL DEFAULT 0,
            created_at TEXT NOT NULL,
            FOREIGN KEY(user_id) REFERENCES users(id)
        )""")
        conn.commit()

def clear_all():
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("DELETE FROM password_resets")
        c.execute("DELETE FROM users")
        conn.commit()

def create_user(email: str, phone: str, password: str):
    err = check_password_strength(password)
    if err:
        raise ValueError(err)
    pwd_hash = pbkdf2_hash(password, new_salt())
    with get_conn() as conn:
        c = conn.cursor()
        c.execute(
            "INSERT INTO users (email, phone, password_hash, created_at) VALUES (?, ?, ?, ?)",
            (email.lower(), normalize_phone(phone), pwd_hash, now_utc().isoformat()),
        )
        conn.commit()

def find_user_by_identifier(identifier: str) -> Optional[sqlite3.Row]:
    email = identifier.strip().lower()
    phone = normalize_phone(identifier)
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("SELECT * FROM users WHERE lower(email)=?", (email,))
        user = c.fetchone()
        if user:
            return user
        if phone:
            c.execute("SELECT * FROM users WHERE phone=?", (phone,))
            user = c.fetchone()
            if user:
                return user
    return None

def insert_password_reset(user_id: int, otp: str, token: str):
    otp_hash = pbkdf2_hash(otp, new_salt())
    token_hash = hash_token_for_storage(token)
    expires_at = (now_utc() + timedelta(minutes=RESET_EXPIRES_MIN)).isoformat()
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("""
        INSERT INTO password_resets (user_id, token_hash, otp_hash, expires_at, attempts, max_attempts, used, created_at)
        VALUES (?, ?, ?, ?, 0, ?, 0, ?)
        """, (user_id, token_hash, otp_hash, expires_at, MAX_OTP_ATTEMPTS, now_utc().isoformat()))
        conn.commit()

def find_valid_reset_for_user(user_id: int) -> Optional[sqlite3.Row]:
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("""
        SELECT * FROM password_resets
        WHERE user_id=? AND used=0 AND expires_at > ?
        ORDER BY id DESC LIMIT 1
        """, (user_id, now_utc().isoformat()))
        return c.fetchone()

def find_valid_reset_by_identifier(identifier: str) -> Optional[sqlite3.Row]:
    user = find_user_by_identifier(identifier)
    if not user:
        return None
    return find_valid_reset_for_user(user["id"])

def find_reset_by_token(token: str) -> Optional[sqlite3.Row]:
    token_hash = hash_token_for_storage(token)
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("""
        SELECT * FROM password_resets
        WHERE token_hash=? AND used=0 AND expires_at > ?
        ORDER BY id DESC LIMIT 1
        """, (token_hash, now_utc().isoformat()))
        return c.fetchone()

def mark_reset_used(reset_id: int):
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("UPDATE password_resets SET used=1 WHERE id=?", (reset_id,))
        conn.commit()

def update_attempts(reset_id: int, attempts: int):
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("UPDATE password_resets SET attempts=? WHERE id=?", (attempts, reset_id))
        conn.commit()

def update_user_password(user_id: int, new_password: str):
    err = check_password_strength(new_password)
    if err:
        raise ValueError(err)
    new_hash = pbkdf2_hash(new_password, new_salt())
    with get_conn() as conn:
        c = conn.cursor()
        c.execute("UPDATE users SET password_hash=? WHERE id=?", (new_hash, user_id))
        conn.commit()

# -------- "Outbox" de Desenvolvimento (para visualizar OTP/links) --------
class DevOutbox:
    emails = {}     # emails[email] = {"subject": ..., "body": ...}
    sms = {}        # sms[phone] = {"body": ...}
    last_otp = {}   # last_otp[identifier] = "123456"
    last_link = {}  # last_link[identifier] = "https://..."

    @classmethod
    def send_email(cls, to_email: str, subject: str, body: str):
        cls.emails[to_email.lower()] = {"subject": subject, "body": body}
        if DEV_MODE:
            print(f"[DEV][EMAIL] To: {to_email}\nSubject: {subject}\n{body}\n")

    @classmethod
    def send_sms(cls, to_phone: str, body: str):
        phone = normalize_phone(to_phone)
        cls.sms[phone] = {"body": body}
        if DEV_MODE:
            print(f"[DEV][SMS] To: +{phone}\n{body}\n")

# -------- Serviço de Recuperação --------
class PasswordResetService:

    @staticmethod
    def request_reset(identifier: str):
        """Fluxo: gerar OTP + token, gravar no BD e 'enviar' por e-mail ou SMS."""
        identifier = identifier.strip()
        user = find_user_by_identifier(identifier)

        # Gera artefatos sempre (resposta neutra p/ não revelar existência)
        otp = make_otp()
        token = make_token()
        link = f"{APP_BASE_URL}/reset-password?token={token}"

        if user:
            insert_password_reset(user["id"], otp, token)

            if "@" in identifier:
                subject = "Redefinição de Senha"
                body = (
                    "Você solicitou a redefinição de senha.\n\n"
                    f"Código OTP: {otp}\n"
                    f"Link (válido por {RESET_EXPIRES_MIN} minutos): {link}\n\n"
                    "Se não foi você, ignore esta mensagem."
                )
                DevOutbox.send_email(identifier, subject, body)
            else:
                phone = normalize_phone(identifier)
                body = f"OTP: {otp}\nLink: {link}\nValidade: {RESET_EXPIRES_MIN} min"
                DevOutbox.send_sms(phone, body)

        # Guarda na outbox para facilitar o teste, mesmo sem usuário real
        DevOutbox.last_otp[identifier] = otp
        DevOutbox.last_link[identifier] = link

        # Resposta neutra
        return {"message": "Se o identificador existir, enviaremos instruções de redefinição."}

    @staticmethod
    def confirm_otp(identifier: str, otp: str):
        """Valida OTP e retorna novo reset_token de uso único."""
        pr = find_valid_reset_by_identifier(identifier)
        if not pr:
            # Não revela existência
            return {"message": "Se o identificador existir, enviaremos instruções de redefinição."}

        if pr["attempts"] >= pr["max_attempts"]:
            raise ValueError("Limite de tentativas excedido. Solicite um novo código.")

        if not pbkdf2_verify(otp.strip(), pr["otp_hash"]):
            update_attempts(pr["id"], pr["attempts"] + 1)
            raise ValueError("Código OTP inválido.")

        # Gera novo token e substitui o hash do token no registro
        new_token = make_token()
        new_token_hash = hash_token_for_storage(new_token)
        with get_conn() as conn:
            c = conn.cursor()
            c.execute("UPDATE password_resets SET token_hash=? WHERE id=?", (new_token_hash, pr["id"]))
            conn.commit()

        return {"reset_token": new_token, "expires_in_minutes": RESET_EXPIRES_MIN}

    @staticmethod
    def reset_password(reset_token: str, new_password: str):
        """Redefine a senha se o token for válido e não expirado/uso único."""
        pr = find_reset_by_token(reset_token.strip())
        if not pr:
            raise ValueError("Token inválido ou expirado.")

        # Descobre user_id
        with get_conn() as conn:
            c = conn.cursor()
            c.execute("SELECT user_id FROM password_resets WHERE id=?", (pr["id"],))
            user_id = c.fetchone()["user_id"]

        update_user_password(user_id, new_password)
        mark_reset_used(pr["id"])
        return {"message": "Senha redefinida com sucesso."}

# Inicializa o banco ao importar esta célula
init_db()
print("✅ Serviço de recuperação carregado e banco inicializado.")


✅ Serviço de recuperação carregado e banco inicializado.


In [7]:
# Recria dados de exemplo (opcional)
clear_all()
init_db()

# Usuárias/os de exemplo
try:
    create_user("alice@example.com", "+55 (41) 99999-0000", "Senha@123")
    create_user("bob@example.com",   "+55 (41) 98888-1111", "BobSeguro@2024!")
    print("👤 Usuários criados com sucesso.")
except Exception as e:
    print("Aviso:", e)


👤 Usuários criados com sucesso.


In [8]:
# Por e-mail:
PasswordResetService.request_reset("alice@example.com")

# OU por celular (formato livre; será normalizado):
# PasswordResetService.request_reset("+55 41 99999-0000")

print("\n🔎 Outbox DEV (acesso rápido em teste):")
print("Último OTP:", DevOutbox.last_otp.get("alice@example.com"))
print("Último Link:", DevOutbox.last_link.get("alice@example.com"))


[DEV][EMAIL] To: alice@example.com
Subject: Redefinição de Senha
Você solicitou a redefinição de senha.

Código OTP: 111160
Link (válido por 15 minutos): https://exemplo-app.local/reset-password?token=-LA3STIFgvl5swjY833lPWGhxAkHmQilS_KKaIKYHIw

Se não foi você, ignore esta mensagem.


🔎 Outbox DEV (acesso rápido em teste):
Último OTP: 111160
Último Link: https://exemplo-app.local/reset-password?token=-LA3STIFgvl5swjY833lPWGhxAkHmQilS_KKaIKYHIw


In [9]:
# Cole aqui o OTP impresso na célula anterior:
otp_recebido = DevOutbox.last_otp.get("alice@example.com")  # simulação (em produção o usuário digita)
print("Usando OTP:", otp_recebido)

resp = PasswordResetService.confirm_otp("alice@example.com", otp_recebido)
resp


Usando OTP: 111160


{'reset_token': 'S0wN6KyS1-yw9Hqb32FMwKK1cJ6Pq7J5wwxQM-geBpA',
 'expires_in_minutes': 15}

In [10]:
reset_token = resp.get("reset_token")  # do passo anterior
nova_senha = "NovaSenha@2025!"         # exemplo (atende aos critérios)

PasswordResetService.reset_password(reset_token, nova_senha)


{'message': 'Senha redefinida com sucesso.'}

In [11]:
# Solicitar - Via celular
PasswordResetService.request_reset("+55 (41) 98888-1111")
otp_bob = DevOutbox.last_otp.get("+55 (41) 98888-1111")
print("OTP Bob:", otp_bob)

# Confirmar
resp_bob = PasswordResetService.confirm_otp("+55 (41) 98888-1111", otp_bob)
resp_bob

# Redefinir
PasswordResetService.reset_password(resp_bob["reset_token"], "BobForte@2025!")


[DEV][SMS] To: +5541988881111
OTP: 945867
Link: https://exemplo-app.local/reset-password?token=IFhaXuBPpM8b8VlqaUrxXSBY_N995GMIa5YtavhUA7A
Validade: 15 min

OTP Bob: 945867


{'message': 'Senha redefinida com sucesso.'}