# Pipeline ETL - API → MongoDB Atlas

In [None]:
!pip install -q pymongo python-dotenv requests pandas dnspython

In [None]:
import os
from typing import List

from fastapi import FastAPI, Header, HTTPException, Depends
from pydantic import BaseModel
from pymongo import MongoClient, UpdateOne

## 1️⃣ Configuração

In [None]:
import os
import requests
from pymongo import MongoClient

CONFIG_URL = "https://r-config-service.up.railway.app/config"

def carregar_config_seguro():
    resp = requests.get(CONFIG_URL, timeout=10)
    resp.raise_for_status()
    data = resp.json()

    api_key = data.get("API_KEY")
    atlas_uri = data.get("ATLAS_URI")

    if not api_key or not atlas_uri:
        raise RuntimeError("❌ r-config-service não retornou todas as variáveis")

    print("✅ Variáveis recebidas do r-config-service")
    return api_key, atlas_uri

def conectar_mongodb(atlas_uri):
    client = MongoClient(atlas_uri)
    db = client["test"]
    print("✅ Conexão com MongoDB Atlas estabelecida")
    return db

if __name__ == "__main__":
    api_key, atlas_uri = carregar_config_seguro()
    db = conectar_mongodb(atlas_uri)

    collection = db["users"]
    registros = list(collection.find({}, {"_id": 0}))
    print("Registros:", registros)



## 2️⃣ Extract

In [None]:

import requests
import pandas as pd

API_URL = "https://users-api-etl.up.railway.app/users"

response = requests.get(API_URL, timeout=10)
response.raise_for_status()

data = response.json()
df = pd.json_normalize(data)

print(f"✅ {len(df)} registros extraídos")
df.head()


## 3️⃣ Transform

In [None]:

df = df.dropna(subset=["id", "name"])
df["id"] = df["id"].astype(int)

def transform_user(row):
    return {
        "id": int(row["id"]),
        "name": row["name"],
        "account": {
            "id": int(row.get("account_id", 0)),
            "number": row.get("account_number", ""),
            "agency": row.get("agency", ""),
            "balance": float(row.get("balance", 0.0)),
            "limit": float(row.get("account_limit", 0.0)),
        },
        "card": {
            "id": int(row.get("card_id", 0)),
            "number": row.get("card_number", ""),
            "limit": float(row.get("card_limit", 0.0)),
        },
        "features": [],
        "news": []
    }

users = df.apply(transform_user, axis=1).tolist()
print(f"✅ {len(users)} usuários transformados")


## 4️⃣ Load

In [None]:

from pymongo import UpdateOne

collection = db["users"]
collection.create_index("id", unique=True)

operations = [
    UpdateOne(
        {"id": user["id"]},
        {"$setOnInsert": user},
        upsert=True
    )
    for user in users
]

result = collection.bulk_write(operations, ordered=False)

print("Inseridos:", result.upserted_count)



In [None]:
extra_users = [
    {
        "id": 1001,
        "name": "Carlos Silva",
        "account": {
            "id": 1001,
            "number": "00012345-6",
            "agency": "0001",
            "balance": 2500.75,
            "limit": 1000.0
        },
        "card": {
            "id": 1001,
            "number": "4111-1111-1111-1111",
            "limit": 3000.0
        },
        "features": [],
        "news": []
    },
    {
        "id": 1002,
        "name": "Ana Pereira",
        "account": {
            "id": 1002,
            "number": "00098765-4",
            "agency": "0002",
            "balance": 5200.00,
            "limit": 2000.0
        },
        "card": {
            "id": 1002,
            "number": "5500-0000-0000-0004",
            "limit": 5000.0
        },
        "features": [],
        "news": []
    }
]


In [None]:
users_all = users + extra_users

In [None]:
from pymongo import UpdateOne

operations = [
    UpdateOne(
        {"id": u["id"]},
        {"$setOnInsert": u},
        upsert=True
    )
    for u in users_all
]

result = collection.bulk_write(operations, ordered=False)
print("Inseridos:", result.upserted_count)


In [None]:
# collection.find({"id": {"$in": [1001, 1002]}})
collection.count_documents({})

## 5️⃣ Validação

In [None]:

from pprint import pprint
docs = list(collection.find({}, {"_id": 0}).limit(8))
pprint(docs)
