In [None]:
# ETL monolítico para rodar no Colab

import pandas as pd
import os
from datetime import datetime

# =========================
# Configurações
# =========================
class Config:
    DATA_DIR = "data"
    USERS_CSV = os.path.join(DATA_DIR, "users.csv")
    JOBS_CSV = os.path.join(DATA_DIR, "jobs.csv")
    USER_NEWS_CSV = os.path.join(DATA_DIR, "user_news.csv")
    ICON_URL = "https://digitalinnovationone.github.io/santander-dev-week-2023-api/icons/credit.svg"
    SCORE_PER_SKILL = 1
    MIN_SCORE = 2
    MAX_RECS_PER_USER = 2

# =========================
# Utils
# =========================
def log(msg):
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{ts}] {msg}")

def normalize_skills(text):
    if pd.isna(text) or str(text).strip() == "":
        return []
    return [s.strip().lower() for s in str(text).split(";") if s.strip()]

def validate_required_columns(df, required, filename):
    cols = set(df.columns)
    missing = required - cols
    if missing:
        raise ValueError(f"{filename} está faltando colunas: {missing}")

# =========================
# Extract
# =========================
def extract_users(path):
    log(f"Lendo {path}")
    df = pd.read_csv(path)
    validate_required_columns(df, {"user_id","name","skills"}, "users.csv")
    df["skills_norm"] = df["skills"].apply(normalize_skills)
    return df

def extract_jobs(path):
    log(f"Lendo {path}")
    df = pd.read_csv(path)
    validate_required_columns(df, {"job_id","title","company","required_skills","location","level"}, "jobs.csv")
    df["required_skills_norm"] = df["required_skills"].apply(normalize_skills)
    return df

def extract_user_news(path):
    log(f"Verificando saída {path}")
    if not os.path.exists(path):
        pd.DataFrame(columns=["user_id","news_id","icon","description","created_at"]).to_csv(path, index=False)
    return pd.read_csv(path)

# =========================
# Transform
# =========================
def score_match(user_skills, job_skills):
    overlap = set(user_skills) & set(job_skills)
    return len(overlap) * Config.SCORE_PER_SKILL, sorted(list(overlap))

def build_message(user_name, job, score, matched_skills):
    skills_str = ", ".join(matched_skills) if matched_skills else "—"
    return (
        f"{user_name}, esta vaga pode ser ideal para você: {job['title']} em {job['company']} "
        f"({job['location']}, nível {job['level']}). Compatibilidade {score} "
        f"pelas habilidades: {skills_str}."
    )

def recommend_jobs_for_user(user_row, jobs_df):
    user_skills = user_row["skills_norm"]
    scored = []
    for _, job in jobs_df.iterrows():
        score, matched = score_match(user_skills, job["required_skills_norm"])
        if score >= Config.MIN_SCORE:
            scored.append({"job": job, "score": score, "matched_skills": matched})
    scored.sort(key=lambda x: x["score"], reverse=True)
    return scored[:Config.MAX_RECS_PER_USER]

# =========================
# Load
# =========================
def next_news_id(existing_df):
    if existing_df.empty:
        return 1
    return int(existing_df["news_id"].max()) + 1

def news_exists(existing_df, user_id, description):
    subset = existing_df[(existing_df["user_id"] == user_id) & (existing_df["description"] == description)]
    return not subset.empty

def append_news(existing_df, user_id, description, icon_url=Config.ICON_URL):
    if news_exists(existing_df, user_id, description):
        log(f"News já existe (user_id={user_id}). Pulando.")
        return existing_df
    nid = next_news_id(existing_df)
    new_row = {
        "user_id": user_id,
        "news_id": nid,
        "icon": icon_url,
        "description": description,
        "created_at": datetime.now().isoformat(timespec="seconds")
    }
    return pd.concat([existing_df, pd.DataFrame([new_row])], ignore_index=True)

def save_news(df, path):
    df.to_csv(path, index=False)
    log(f"Salvo em {path}")

# =========================
# Pipeline
# =========================
def run_pipeline():
    log("Início do ETL")
    users_df = extract_users(Config.USERS_CSV)
    jobs_df = extract_jobs(Config.JOBS_CSV)
    news_df = extract_user_news(Config.USER_NEWS_CSV)

    total_msgs = 0
    for _, user in users_df.iterrows():
        recs = recommend_jobs_for_user(user, jobs_df)
        if not recs:
            log(f"Sem recomendações para {user['name']}.")
            continue
        for rec in recs:
            msg = build_message(user["name"], rec["job"], rec["score"], rec["matched_skills"])
            news_df = append_news(news_df, user["user_id"], msg, Config.ICON_URL)
            total_msgs += 1

    save_news(news_df, Config.USER_NEWS_CSV)
    log(f"ETL finalizado. Mensagens geradas: {total_msgs}")
    return news_df

# Executa e mostra resultado
output_df = run_pipeline()
output_df.head(10)
