Framework REST API enterprise-grade construído com FastAPI, Clean Architecture e Domain-Driven Design
- Visão Geral
- Características
- Arquitetura
- Estrutura do Projeto
- Início Rápido
- Módulos
- Padrões de Design
- Autenticação e Autorização
- Recursos Avançados
- Configuração
- Testes
- Deploy
- Comandos Úteis
- Stack Tecnológica
- Documentação
- Conformidade
- Contribuindo
- Licença
Python API Base é um framework REST API reutilizável e pronto para produção, projetado para acelerar o desenvolvimento backend com Python. Fornece uma base sólida baseada nos princípios de Clean Architecture e Domain-Driven Design (DDD), aproveitando Python Generics (PEP 695) para maximizar reuso de código e type safety.
| Problema | Solução |
|---|---|
| Boilerplate repetitivo | CRUD genérico com apenas 3 arquivos |
| Falta de type safety | Generics PEP 695 com suporte completo de IDE |
| Segurança complexa | JWT + RBAC + Rate Limiting prontos |
| Observabilidade difícil | OpenTelemetry + structlog integrados |
| Testes trabalhosos | 200+ testes property-based inclusos |
| Deploy complicado | Docker, K8s, Terraform, Serverless prontos |
- CRUD Zero Boilerplate - Crie endpoints REST completos com apenas 3 arquivos: entidade, use case e router
- Generics Type-Safe -
IRepository[T],BaseUseCase[T],GenericCRUDRouter[T]com suporte completo de IDE - Clean Architecture - Separação clara de responsabilidades em 5 camadas
- Domain-Driven Design - Entidades, Value Objects, Aggregates, Specifications, Domain Events
- Autenticação JWT - Access tokens (30min) + Refresh tokens (7 dias)
- RBAC - Role-Based Access Control com composição de permissões
- Rate Limiting - slowapi com limites configuráveis por endpoint
- Security Headers - CSP, HSTS, X-Frame-Options, X-Content-Type-Options
- Password Policy - Argon2 hashing + política configurável
- Token Revocation - Blacklist via Redis
- Circuit Breaker - Proteção contra falhas em cascata
- Retry Pattern - Backoff exponencial com jitter
- Bulkhead - Isolamento de recursos
- Timeout - Controle de tempo de execução
- Tracing - OpenTelemetry com exportação OTLP
- Metrics - Prometheus endpoint
/metrics - Logging - structlog com formato JSON/ECS
- Correlation ID - Rastreamento de requests
- PostgreSQL - SQLAlchemy 2.0 + Alembic migrations (banco relacional)
- ScyllaDB/Cassandra - Banco NoSQL de alta performance para dados distribuídos
- Redis - Cache + Token storage
- Elasticsearch - Full-text search
- Kafka - Event streaming
- MinIO/S3 - Object storage
- RabbitMQ - Background tasks
- gRPC Server - Comunicação service-to-service de alta performance via HTTP/2
- Protocol Buffers - Serialização binária eficiente com type safety
- Interceptors - Auth, Logging, Tracing, Metrics integrados
- Health Checks - Protocolo padrão gRPC para Kubernetes probes
- Streaming - Suporte a server, client e bidirectional streaming
- Resilience - Retry com exponential backoff + Circuit Breaker
O projeto implementa Clean Architecture com 5 camadas principais:
┌─────────────────────────────────────────────────────────────────┐
│ INTERFACE LAYER │
│ (FastAPI Routers, Middleware, GraphQL, Versioning) │
├─────────────────────────────────────────────────────────────────┤
│ APPLICATION LAYER │
│ (Use Cases, Commands, Queries, DTOs, Mappers, Services) │
├─────────────────────────────────────────────────────────────────┤
│ DOMAIN LAYER │
│ (Entities, Value Objects, Aggregates, Specifications, Events) │
├─────────────────────────────────────────────────────────────────┤
│ INFRASTRUCTURE LAYER │
│ (Database, Cache, Messaging, Storage, Auth, Observability) │
├─────────────────────────────────────────────────────────────────┤
│ CORE LAYER │
│ (Configuration, DI Container, Protocols, Base Types) │
└─────────────────────────────────────────────────────────────────┘
Client → Middleware → Router → Use Case → Domain → Infrastructure → Database
↓
Domain Events → Event Bus → Handlers
python-api-base/
├── src/ # Código fonte principal
│ ├── core/ # Kernel da aplicação (8 módulos, 28 subpastas)
│ │ ├── base/ # Classes base abstratas
│ │ │ ├── cqrs/ # CQRS base classes
│ │ │ ├── domain/ # Domain base classes
│ │ │ ├── events/ # Event base classes
│ │ │ └── repository/ # Repository base classes
│ │ ├── config/ # Configurações centralizadas
│ │ │ ├── database/ # Database settings
│ │ │ ├── observability/ # Observability settings
│ │ │ └── security/ # Security settings
│ │ ├── di/ # Container de Injeção de Dependência
│ │ ├── errors/ # Exception handlers RFC 7807
│ │ │ ├── base/ # Base error classes
│ │ │ ├── http/ # HTTP error responses
│ │ │ └── status/ # Status codes
│ │ ├── protocols/ # Interfaces/Protocolos
│ │ │ ├── application/ # Application protocols
│ │ │ ├── data_access/ # Data access protocols
│ │ │ ├── domain/ # Domain protocols
│ │ │ └── entity/ # Entity protocols
│ │ ├── shared/ # Utilitários compartilhados
│ │ └── types/ # Type aliases e definições
│ │
│ ├── domain/ # Camada de Domínio (3 módulos, 11 subpastas)
│ │ ├── common/ # Componentes compartilhados
│ │ │ ├── specification/ # Specification pattern
│ │ │ └── value_objects/ # Value Objects base
│ │ ├── users/ # Bounded Context: Usuários
│ │ │ ├── aggregates/ # User aggregate root
│ │ │ ├── events/ # Domain events
│ │ │ ├── repositories/ # Repository interfaces
│ │ │ ├── services/ # Domain services
│ │ │ └── value_objects/ # User value objects
│ │ └── examples/ # Bounded Context: Exemplos
│ │ ├── item/ # Item aggregate
│ │ └── pedido/ # Pedido aggregate
│ │
│ ├── application/ # Camada de Aplicação (1 módulo, 11 subpastas)
│ │ ├── common/ # CQRS, Middleware, Batch, Export
│ │ ├── services/ # Cross-cutting services
│ │ │ ├── feature_flags/ # Feature flags service
│ │ │ │ ├── config/ # Configuration
│ │ │ │ ├── core/ # Core enums
│ │ │ │ ├── models/ # Models
│ │ │ │ ├── service/ # Service implementation
│ │ │ │ └── strategies/ # Evaluation strategies
│ │ │ ├── file_upload/ # File upload service
│ │ │ │ ├── models/ # Upload models
│ │ │ │ ├── service/ # Upload service
│ │ │ │ └── validators/ # File validators
│ │ │ └── multitenancy/ # Multi-tenant service
│ │ │ ├── middleware/ # Tenant middleware
│ │ │ ├── models/ # Tenant models
│ │ │ └── repository/ # Tenant repository
│ │ ├── users/ # Use cases de usuários
│ │ └── examples/ # Use cases de exemplos
│ │
│ ├── infrastructure/ # Camada de Infraestrutura (29 módulos, 18+ subpastas)
│ │ ├── auth/ # Autenticação
│ │ │ ├── jwt/ # JWT implementation
│ │ │ ├── oauth/ # OAuth providers
│ │ │ ├── policies/ # Password policies
│ │ │ ├── token_store/ # Token storage
│ │ │ └── validators/ # JWT validators
│ │ ├── cache/ # Cache
│ │ │ ├── core/ # Config, models, protocols
│ │ │ └── providers/ # Cache providers
│ │ ├── dapr/ # Dapr integration
│ │ │ ├── core/ # Client, errors, health
│ │ │ ├── patterns/ # Invoke, pubsub, bindings
│ │ │ └── services/ # State, secrets, actors
│ │ ├── db/ # Database
│ │ │ ├── core/ # Session management
│ │ │ ├── event_sourcing/ # Event sourcing
│ │ │ ├── middleware/ # DB middleware
│ │ │ ├── migrations/ # Alembic migrations
│ │ │ ├── models/ # SQLAlchemy models
│ │ │ ├── query_builder/ # Query builder pattern
│ │ │ ├── repositories/ # Repository implementations
│ │ │ ├── saga/ # Saga pattern
│ │ │ ├── search/ # Search functionality
│ │ │ └── uow/ # Unit of Work
│ │ ├── elasticsearch/ # Search engine
│ │ │ ├── core/ # Client, config, document
│ │ │ └── operations/ # Query, index, search
│ │ ├── generics/ # Generic infrastructure
│ │ │ └── core/ # Config, errors, protocols
│ │ ├── kafka/ # Event streaming
│ │ ├── minio/ # Object storage
│ │ ├── messaging/ # Messaging
│ │ ├── multitenancy/ # Multi-tenant support
│ │ ├── observability/ # Telemetry, Logging, Metrics
│ │ ├── prometheus/ # Prometheus metrics
│ │ ├── ratelimit/ # Rate limiting
│ │ ├── rbac/ # Role-Based Access Control
│ │ ├── redis/ # Redis client
│ │ ├── resilience/ # Circuit Breaker, Retry
│ │ ├── scylladb/ # ScyllaDB/Cassandra
│ │ ├── security/ # Field encryption
│ │ ├── storage/ # File storage
│ │ ├── sustainability/ # GreenOps/Kepler
│ │ └── tasks/ # Background tasks
│ │
│ ├── interface/ # Camada de Interface (2 módulos, 10 subpastas)
│ │ ├── errors/ # Error handlers HTTP
│ │ ├── graphql/ # GraphQL schema
│ │ │ ├── core/ # Schema, router
│ │ │ ├── queries/ # Query definitions
│ │ │ ├── mutations/ # Mutation definitions
│ │ │ ├── resolvers/ # Resolver functions
│ │ │ ├── mappers/ # DTO mappers
│ │ │ ├── relay/ # Relay pagination
│ │ │ └── types/ # Type definitions
│ │ ├── middleware/ # HTTP middleware
│ │ ├── routes/ # Route definitions
│ │ ├── v1/ # API v1 endpoints
│ │ │ ├── auth/ # Auth routes
│ │ │ ├── core/ # Health, cache, infra
│ │ │ ├── enterprise/ # Enterprise features
│ │ │ ├── examples/ # Example routes
│ │ │ ├── features/ # Kafka, storage, etc
│ │ │ ├── items/ # Items routes
│ │ │ └── users/ # Users routes
│ │ ├── v2/ # API v2 endpoints
│ │ └── versioning/ # API versioning
│ │
│ └── main.py # Application entry point
│
├── tests/ # Testes
│ ├── unit/ # Testes unitários
│ ├── integration/ # Testes de integração
│ ├── properties/ # Property-based tests (Hypothesis)
│ ├── e2e/ # End-to-end tests
│ ├── performance/ # Load tests (k6)
│ └── factories/ # Test factories (Polyfactory)
│
├── docs/ # Documentação
│ ├── adr/ # Architecture Decision Records
│ ├── api/ # API documentation
│ ├── architecture/ # Architecture docs
│ ├── guides/ # User guides
│ ├── infrastructure/ # Infrastructure docs
│ ├── layers/ # Layer documentation
│ ├── operations/ # Operations runbooks
│ └── testing/ # Testing guides
│
├── deployments/ # Configurações de deploy
│ ├── docker/ # Docker Compose files
│ ├── helm/ # Helm charts
│ ├── k8s/ # Kubernetes manifests
│ ├── serverless/ # AWS Lambda, Vercel
│ └── terraform/ # Infrastructure as Code
│
├── scripts/ # Scripts utilitários
├── alembic/ # Database migrations
├── .env.example # Template de configuração
├── pyproject.toml # Dependências e configuração
├── Makefile # Comandos de automação
└── README.md # Este arquivo
| Software | Versão | Obrigatório |
|---|---|---|
| Python | 3.12+ | ✅ |
| uv | latest | Recomendado |
| Docker | 24+ | ✅ |
| Docker Compose | 2.0+ | ✅ |
# Clonar repositório
git clone https://github.com/example/python-api-base.git
cd python-api-base
# Instalar com uv (recomendado)
uv sync --dev
# Ou com pip
pip install -e ".[dev]"# Copiar template de configuração
cp .env.example .env
# Gerar chave secreta (OBRIGATÓRIO)
python -c "import secrets; print(secrets.token_urlsafe(64))"
# Editar .env e configurar SECURITY__SECRET_KEY# Iniciar PostgreSQL e Redis
docker compose -f deployments/docker/docker-compose.base.yml up -d postgres redis
# Ou usar Makefile
make setup-db# Aplicar migrations
uv run alembic upgrade head
# Ou usar Makefile
make migrate# Desenvolvimento com hot reload
uv run uvicorn src.main:app --reload --host 0.0.0.0 --port 8000
# Ou usar Makefile
make run# Health check
curl http://localhost:8000/health/live
# Resposta: {"status": "healthy"}
# Documentação
# Swagger UI: http://localhost:8000/docs
# ReDoc: http://localhost:8000/redoc| Endpoint | Descrição |
|---|---|
http://localhost:8000 |
Base da API |
http://localhost:8000/docs |
Swagger UI |
http://localhost:8000/redoc |
ReDoc |
http://localhost:8000/health/live |
Liveness Check |
http://localhost:8000/health/ready |
Readiness Check |
http://localhost:8000/metrics |
Prometheus Metrics |
http://localhost:8000/graphql |
GraphQL Playground (se habilitado) |
| Módulo | Descrição | Exports Principais |
|---|---|---|
core.config |
Configurações centralizadas | Settings, get_settings, DatabaseSettings, SecuritySettings |
core.protocols |
Interfaces/Protocolos | AsyncRepository, Entity, Mapper, UnitOfWork |
core.di |
Container de DI | Container, get_container |
core.errors |
Exception handlers | AppException, ValidationError, NotFoundError |
core.types |
Type definitions | EntityId, JsonDict, Result |
core.shared |
Utilitários | get_logger, cached |
| Módulo | Descrição | Componentes |
|---|---|---|
domain.common |
Componentes compartilhados | Specification, ValueObject, DomainEvent |
domain.users |
Bounded Context Usuários | User, Email, Password, IUserRepository |
domain.examples |
Exemplos de implementação | Item, Pedido |
| Módulo | Descrição | Componentes |
|---|---|---|
application.common.cqrs |
CQRS Pattern | Command, Query, CommandBus, QueryBus |
application.common.batch |
Operações em lote | BatchProcessor, BatchResult |
application.common.export |
Exportação de dados | DataExporter, ExportFormat |
application.services |
Serviços cross-cutting | FeatureFlagService, FileUploadService, TenantService |
application.users |
Use cases de usuários | CreateUserCommand, GetUserQuery, UserDTO |
| Módulo | Descrição | Componentes |
|---|---|---|
infrastructure.auth |
Autenticação | JWTService, PasswordPolicy, TokenStore |
infrastructure.cache |
Cache | CacheProvider, RedisCacheProvider, @cached |
infrastructure.db |
Database | AsyncSession, Repository, QueryBuilder |
infrastructure.resilience |
Resiliência | CircuitBreaker, Retry, Bulkhead, Timeout |
infrastructure.observability |
Observabilidade | TelemetryService, LoggingMiddleware |
infrastructure.rbac |
Autorização | RBACChecker, Permission, Role |
infrastructure.kafka |
Event streaming | KafkaProducer, KafkaConsumer |
infrastructure.elasticsearch |
Search | ElasticsearchRepository, SearchQuery |
infrastructure.minio |
Object storage | MinIOClient, FileUploadHandler |
infrastructure.redis |
Redis client | RedisClient, DistributedLock |
| Módulo | Descrição | Componentes |
|---|---|---|
interface.v1 |
API v1 endpoints | users_router, examples_router, health_router |
interface.v2 |
API v2 endpoints | examples_router |
interface.middleware |
HTTP middleware | SecurityHeadersMiddleware, LoggingMiddleware |
interface.graphql |
GraphQL | schema, resolvers, dataloader |
interface.errors |
Error handlers | exception_handler, validation_handler |
Encapsula regras de negócio em objetos composáveis.
from domain.common.specification import equals, greater_than, is_null
# Specifications individuais
active_users = equals("is_active", True)
premium_users = equals("subscription", "premium")
not_deleted = is_null("deleted_at")
# Composição com AND/OR
target_users = active_users.and_spec(premium_users).and_spec(not_deleted)
# Uso com SQLAlchemy
query = select(User).where(target_users.to_sql_condition(User))Operadores disponíveis: EQ, NE, GT, GE, LT, LE, CONTAINS, STARTS_WITH, IN, IS_NULL, IS_NOT_NULL
Separação de operações de leitura e escrita.
from application.common.cqrs import Command, Query
from core.types import Result
# Command (Escrita)
@dataclass
class CreateUserCommand(Command[User]):
email: str
name: str
password: str
async def execute(self, repository: IUserRepository) -> Result[User, str]:
if await repository.exists_by_email(self.email):
return Err("Email already exists")
user = User(email=self.email, name=self.name)
return Ok(await repository.create(user))
# Query (Leitura)
@dataclass
class GetUserQuery(Query[UserDTO | None]):
user_id: str
cacheable: bool = True
async def execute(self, repository: IUserRepository) -> UserDTO | None:
user = await repository.get(self.user_id)
return UserMapper.to_dto(user) if user else NoneAbstrai o acesso a dados através de interfaces.
# Interface (Domain)
class IUserRepository(AsyncRepository[User, str], Protocol):
async def get_by_email(self, email: str) -> User | None: ...
async def exists_by_email(self, email: str) -> bool: ...
# Implementação (Infrastructure)
class UserRepository(IUserRepository):
def __init__(self, session: AsyncSession):
self._session = session
async def get(self, id: str) -> User | None:
return await self._session.get(UserModel, id)
async def create(self, user: User) -> User:
self._session.add(user)
await self._session.flush()
return userfrom infrastructure.resilience import CircuitBreaker, Retry, Bulkhead, Timeout
# Circuit Breaker - Proteção contra falhas em cascata
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await circuit.execute(lambda: external_api.call())
# Retry com Exponential Backoff
retry = Retry(max_attempts=3, base_delay=1.0, exponential_base=2.0)
result = await retry.execute(lambda: flaky_service.call())
# Bulkhead - Limitador de concorrência
bulkhead = Bulkhead(max_concurrent=10, max_wait=5.0)
result = await bulkhead.execute(lambda: expensive_operation())
# Timeout
timeout = Timeout(timeout=30.0)
result = await timeout.execute(lambda: slow_operation())from infrastructure.auth.jwt import JWTService
jwt_service = JWTService(secret_key="...", algorithm="HS256")
# Criar tokens
access_token = jwt_service.create_access_token(
user_id="123",
roles=["admin"],
expires_minutes=30
)
refresh_token = jwt_service.create_refresh_token(user_id="123")
# Validar token
payload = jwt_service.verify_token(access_token)
# payload: {"sub": "123", "roles": ["admin"], "exp": ..., "jti": "..."}from infrastructure.auth.token_store import TokenStore
token_store = TokenStore(redis_client)
# Revogar token (logout)
await token_store.revoke_token(token_jti, expires_at)
# Verificar se está revogado
is_revoked = await token_store.is_revoked(token_jti)
# Revogar todos os tokens de um usuário
await token_store.revoke_all_user_tokens(user_id)from infrastructure.rbac import RBACChecker, Role, Permission
# Definir roles com permissões
admin_role = Role(
name="admin",
permissions=[
Permission(resource="users", action="read"),
Permission(resource="users", action="write"),
Permission(resource="users", action="delete"),
]
)
editor_role = Role(
name="editor",
permissions=[
Permission(resource="posts", action="read"),
Permission(resource="posts", action="write"),
]
)
# Verificar permissão
rbac = RBACChecker()
rbac.register_role(admin_role)
rbac.register_role(editor_role)
has_access = rbac.has_permission(
user_roles=["editor"],
resource="posts",
action="write"
) # Truefrom infrastructure.auth.password_policy import PasswordPolicy
policy = PasswordPolicy(
min_length=12,
require_uppercase=True,
require_lowercase=True,
require_digit=True,
require_special=True,
max_repeated_chars=3,
)
result = policy.validate("MyP@ssw0rd123")
if not result.is_valid:
print(result.errors) # Lista de erros de validaçãofrom fastapi import Depends
from interface.dependencies import get_current_user, require_permission
@router.get("/users")
async def list_users(
current_user: User = Depends(get_current_user),
_: None = Depends(require_permission("users", "read"))
):
return await user_service.list_all()
@router.delete("/users/{user_id}")
async def delete_user(
user_id: str,
current_user: User = Depends(get_current_user),
_: None = Depends(require_permission("users", "delete"))
):
return await user_service.delete(user_id)O Python API Base inclui suporte completo a gRPC para comunicação eficiente entre microsserviços.
# Gerar código Python a partir dos protos
make proto-gen
# Iniciar servidor gRPC
make grpc-run
# Testar com grpcurl
grpcurl -plaintext localhost:50051 list
grpcurl -plaintext localhost:50051 grpc.health.v1.Health/Check# .env
GRPC__SERVER__ENABLED=true
GRPC__SERVER__PORT=50051
GRPC__SERVER__REFLECTION_ENABLED=true
GRPC__CLIENT__DEFAULT_TIMEOUT=30.0
GRPC__CLIENT__MAX_RETRIES=3# 1. Definir proto (protos/myservice/service.proto)
# 2. Gerar código: make proto-gen
# 3. Implementar servicer:
from src.interface.grpc.servicers.base import BaseServicer
class MyServiceServicer(BaseServicer):
async def GetResource(self, request, context):
use_case = self.get_use_case(GetResourceUseCase)
result = await use_case.execute(request.id)
return self._to_proto(result)from src.infrastructure.grpc.client import GRPCClientFactory
factory = GRPCClientFactory(
default_timeout=30.0,
retry_config=RetryConfig(max_retries=3),
circuit_breaker_config=CircuitBreakerConfig(failure_threshold=5),
)
channel = await factory.create_channel("service:50051")
stub = factory.create_stub(MyServiceStub, channel)
response = await stub.GetResource(GetResourceRequest(id="123"))| Interceptor | Descrição |
|---|---|
AuthInterceptor |
Validação JWT via metadata |
LoggingInterceptor |
Logs com correlation ID |
TracingInterceptor |
OpenTelemetry spans |
MetricsInterceptor |
Métricas Prometheus |
ErrorInterceptor |
Conversão de erros para gRPC status |
# Kubernetes probe configuration
livenessProbe:
grpc:
port: 50051
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
grpc:
port: 50051
initialDelaySeconds: 5
periodSeconds: 5| Endpoint | Porta | Descrição |
|---|---|---|
| gRPC Server | 50051 | Serviços gRPC |
| Health Check | 50051 | grpc.health.v1.Health |
| Reflection | 50051 | Service discovery |
from infrastructure.cache import cached, RedisCacheProvider
# Decorator simples
@cached(ttl=300)
async def get_user(user_id: str) -> User:
return await repository.get(user_id)
# Com key builder customizado
@cached(ttl=60, key_builder=lambda **kw: f"user:{kw['user_id']}")
async def get_user_profile(user_id: str) -> UserProfile:
return await profile_service.get(user_id)
# Provider direto
cache = RedisCacheProvider(redis_client)
await cache.set("key", value, ttl=300)
value = await cache.get("key")
await cache.delete("key")
await cache.clear_pattern("user:*")from infrastructure.messaging import EventBus, DomainEvent
# Definir evento
@dataclass
class UserCreatedEvent(DomainEvent):
user_id: str
email: str
created_at: datetime
# Subscribe
@event_bus.subscribe("user.created")
async def handle_user_created(event: UserCreatedEvent):
await send_welcome_email(event.email)
await create_audit_log(event)
# Publish
await event_bus.publish(UserCreatedEvent(
user_id="123",
email="user@example.com",
created_at=datetime.utcnow()
))from infrastructure.observability.telemetry import traced
@traced(name="process_payment", attributes={"provider": "stripe"})
async def process_payment(order_id: str, amount: float) -> bool:
# Span criado automaticamente
# Exceções registradas como eventos
return await stripe.charge(order_id, amount)from core.shared.logging import get_logger
logger = get_logger(__name__)
logger.info(
"order_processed",
order_id=order_id,
total=order.total,
items_count=len(order.items),
customer_id=order.customer_id,
)
# Output JSON: {"event": "order_processed", "order_id": "...", ...}from infrastructure.feature_flags import FeatureFlagService
flags = FeatureFlagService()
# Verificar flag
if await flags.is_enabled("new_checkout_flow", user_context):
return await new_checkout(order)
else:
return await legacy_checkout(order)
# Com fallback
result = await flags.get_value("max_items_per_order", default=10)from infrastructure.storage import FileUploadHandler, FileValidator
validator = FileValidator(
max_size=10 * 1024 * 1024, # 10MB
allowed_extensions=["jpg", "png", "pdf"],
allowed_mimetypes=["image/jpeg", "image/png", "application/pdf"],
)
handler = FileUploadHandler(storage_provider, validator)
# Upload
file_info = await handler.upload(file, folder="documents")
# file_info: FileInfo(id="...", url="...", size=..., mimetype="...")
# Download
content = await handler.download(file_info.id)from infrastructure.multitenancy import TenantContext, tenant_middleware
# Middleware extrai tenant do header/subdomain
@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
tenant_id = extract_tenant(request)
with TenantContext(tenant_id):
return await call_next(request)
# Uso em queries
async def get_users():
tenant_id = TenantContext.current()
return await repository.get_by_tenant(tenant_id)Recursos implementados seguindo as melhores práticas de API para 2025, com 95+ property-based tests.
Endpoint para distribuição de chaves públicas JWT RS256.
# Obter chaves públicas
curl http://localhost:8000/.well-known/jwks.json
# OpenID Configuration
curl http://localhost:8000/.well-known/openid-configurationfrom infrastructure.auth.jwt.jwks import JWKSService, initialize_jwks_service
# Inicializar com chave privada
initialize_jwks_service(private_key_pem=private_key, algorithm="RS256")
# Rotação de chaves com grace period
jwks_service = get_jwks_service()
jwks_service.rotate_current_key(new_public_key_pem, "RS256")Previne thundering herd com jitter de 5-15% no TTL.
from infrastructure.cache.providers import RedisCacheWithJitter, JitterConfig
cache = RedisCacheWithJitter[dict](
redis_client=redis, # ou redis_url="redis://localhost:6379"
config=JitterConfig(
min_jitter_percent=0.05, # 5% mínimo
max_jitter_percent=0.15, # 15% máximo
),
)
# Get or compute com stampede prevention
user = await cache.get_or_compute(
key="user:123",
compute_fn=lambda: repository.get_by_id("123"),
ttl=300, # TTL com jitter automático
)Suporte a operações idempotentes via header Idempotency-Key.
# Request idempotente
curl -X POST http://localhost:8000/api/v1/examples/items \
-H "Content-Type: application/json" \
-H "Idempotency-Key: unique-request-id-123" \
-d '{"name": "Item", "sku": "SKU-001"}'
# Retry retorna resposta cacheada
curl -X POST http://localhost:8000/api/v1/examples/items \
-H "Content-Type: application/json" \
-H "Idempotency-Key: unique-request-id-123" \
-d '{"name": "Item", "sku": "SKU-001"}'
# Header: X-Idempotent-Replayed: truefrom infrastructure.idempotency import IdempotencyHandler, IdempotencyMiddleware
# Configurar middleware (automático via main.py)
app.add_middleware(
IdempotencyMiddleware,
methods={"POST", "PUT"},
required_endpoints={"/api/v1/payments"},
)Utilitários para validação de alta performance.
from core.shared.validation import TypeAdapterCache, validate_json_fast
# Cache de TypeAdapter (evita recriação)
adapter = TypeAdapterCache(UserDTO)
user = adapter.validate_json(b'{"name": "John", "email": "john@example.com"}')
# Serialização rápida
json_bytes = adapter.dump_json(user)
# Validação em lote com coleta de erros
valid, errors = validate_bulk(UserDTO, items)Endpoints para probes de Kubernetes.
| Endpoint | Descrição | Uso |
|---|---|---|
/health/live |
Liveness probe | Processo vivo |
/health/ready |
Readiness probe | Dependências OK |
/health/startup |
Startup probe | Inicialização completa |
# Verificar startup
curl http://localhost:8000/health/startup
# {"startup_complete": true, "uptime_seconds": 123.45}| Variável | Tipo | Default | Descrição |
|---|---|---|---|
APP_NAME |
string | "My API" | Nome da aplicação |
DEBUG |
bool | false | Modo debug (NUNCA true em produção) |
VERSION |
string | "0.1.0" | Versão da API |
API_PREFIX |
string | "/api/v1" | Prefixo das rotas |
| Variável | Tipo | Default | Descrição |
|---|---|---|---|
DATABASE__URL |
string | - | Connection string PostgreSQL |
DATABASE__POOL_SIZE |
int | 10 | Tamanho do pool de conexões |
DATABASE__MAX_OVERFLOW |
int | 20 | Conexões extras permitidas |
DATABASE__ECHO |
bool | false | Log de queries SQL |
| Variável | Tipo | Default | Descrição |
|---|---|---|---|
SECURITY__SECRET_KEY |
string | - | Chave JWT (mín 32 chars) OBRIGATÓRIO |
SECURITY__CORS_ORIGINS |
list | ["*"] | Origens CORS permitidas |
SECURITY__RATE_LIMIT |
string | "100/minute" | Rate limit |
SECURITY__ALGORITHM |
string | "HS256" | Algoritmo JWT |
SECURITY__ACCESS_TOKEN_EXPIRE_MINUTES |
int | 30 | Expiração access token |
SECURITY__REFRESH_TOKEN_EXPIRE_DAYS |
int | 7 | Expiração refresh token |
| Variável | Tipo | Default | Descrição |
|---|---|---|---|
REDIS__URL |
string | "redis://localhost:6379/0" | URL Redis |
REDIS__ENABLED |
bool | false | Habilitar Redis |
REDIS__TOKEN_TTL |
int | 604800 | TTL de tokens (segundos) |
| Variável | Tipo | Default | Descrição |
|---|---|---|---|
OBSERVABILITY__LOG_LEVEL |
string | "INFO" | Nível de log |
OBSERVABILITY__LOG_FORMAT |
string | "json" | Formato (json/console) |
OBSERVABILITY__OTLP_ENDPOINT |
string | null | Endpoint OpenTelemetry |
OBSERVABILITY__SERVICE_NAME |
string | "python-api-base" | Nome do serviço |
OBSERVABILITY__PROMETHEUS_ENABLED |
bool | true | Habilitar Prometheus |
OBSERVABILITY__KAFKA_ENABLED |
bool | false | Habilitar Kafka |
OBSERVABILITY__ELASTICSEARCH_ENABLED |
bool | false | Habilitar Elasticsearch |
# Application
APP_NAME="My API"
DEBUG=false
VERSION="1.0.0"
# Database
DATABASE__URL=postgresql+asyncpg://user:password@localhost:5432/mydb
DATABASE__POOL_SIZE=20
# Security (OBRIGATÓRIO)
SECURITY__SECRET_KEY=your-super-secret-key-at-least-32-characters-long
SECURITY__CORS_ORIGINS=["https://app.example.com"]
SECURITY__RATE_LIMIT=100/minute
# Redis
REDIS__URL=redis://localhost:6379/0
REDIS__ENABLED=true
# Observability
OBSERVABILITY__LOG_LEVEL=INFO
OBSERVABILITY__LOG_FORMAT=json
OBSERVABILITY__OTLP_ENDPOINT=http://localhost:4317
OBSERVABILITY__PROMETHEUS_ENABLED=truepython -c "import secrets; print(secrets.token_urlsafe(64))"tests/
├── conftest.py # Fixtures compartilhadas
├── factories/ # Test factories (Polyfactory, Hypothesis strategies)
│ ├── entity_factory.py
│ ├── hypothesis_strategies.py
│ └── mock_repository.py
├── unit/ # Testes unitários (isolados, sem I/O)
│ ├── application/
│ ├── core/
│ ├── domain/
│ ├── infrastructure/
│ └── interface/
├── integration/ # Testes de integração (com banco, Redis, etc)
│ ├── db/
│ ├── infrastructure/
│ └── interface/
├── properties/ # Property-based tests (Hypothesis) - 200+ arquivos
├── e2e/ # End-to-end tests
│ └── api/
└── performance/ # Load tests (k6)
├── smoke.js
└── stress.js
# Todos os testes
uv run pytest
# Com cobertura
uv run pytest --cov=src --cov-report=html --cov-report=term
# Por tipo
uv run pytest tests/unit/ # Unitários
uv run pytest tests/integration/ # Integração
uv run pytest tests/properties/ # Property-based
uv run pytest tests/e2e/ # End-to-end
# Teste específico
uv run pytest tests/unit/domain/users/test_user.py::TestUser::test_create
# Por marcador
uv run pytest -m "unit"
uv run pytest -m "integration"
uv run pytest -m "property"
# Com verbose
uv run pytest -v
# Paralelismo
uv run pytest -n autoO projeto inclui 200+ arquivos de testes property-based cobrindo:
| Categoria | Exemplos |
|---|---|
| JWT | Token round-trip, validação, expiração |
| RBAC | Permission composition, role inheritance |
| Repository | CRUD consistency, soft delete |
| Cache | Invalidation, TTL, serialization |
| Security | Headers presence, sanitization |
| Errors | RFC 7807 format, status codes |
| Resilience | Circuit breaker states, retry backoff |
| Specification | Composition laws, SQL conversion |
# Exemplo de property test
from hypothesis import given, strategies as st
@given(st.emails())
def test_email_validation_accepts_valid_emails(email: str):
"""Qualquer email válido deve ser aceito."""
result = validate_email(email)
assert result.is_valid
@given(st.integers(), st.integers())
def test_specification_and_is_commutative(a: int, b: int):
"""AND composition deve ser comutativa."""
spec1 = equals("value", a)
spec2 = equals("value", b)
obj = {"value": a}
assert spec1.and_spec(spec2).is_satisfied_by(obj) == \
spec2.and_spec(spec1).is_satisfied_by(obj)# Instalar k6: https://k6.io/docs/get-started/installation/
# Smoke test (verificação básica)
k6 run tests/performance/smoke.js
# Stress test (encontrar limites)
k6 run tests/performance/stress.js
# Com URL customizada
k6 run -e BASE_URL=http://api.example.com tests/performance/smoke.js- Mínimo exigido: 80%
- Branch coverage: 75%
# Gerar relatório HTML
uv run pytest --cov=src --cov-report=html
# Abrir relatório
open htmlcov/index.html # macOS
xdg-open htmlcov/index.html # Linux
start htmlcov/index.html # WindowsO projeto suporta múltiplas estratégias de deploy:
| Método | Localização | Descrição |
|---|---|---|
| Docker | deployments/docker/ |
Docker Compose para dev/staging/prod |
| Kubernetes | deployments/k8s/ |
Manifests K8s com Kustomize |
| Helm | deployments/helm/ |
Helm charts para K8s |
| Knative | deployments/knative/ |
Serverless Kubernetes-native (scale-to-zero) |
| ArgoCD | deployments/argocd/ |
GitOps continuous delivery |
| Terraform | deployments/terraform/ |
IaC para AWS/GCP/Azure |
| Serverless | deployments/serverless/ |
AWS Lambda, Vercel |
# Desenvolvimento
docker compose -f deployments/docker/docker-compose.base.yml \
-f deployments/docker/docker-compose.dev.yml up -d
# Produção
docker compose -f deployments/docker/docker-compose.base.yml \
-f deployments/docker/docker-compose.production.yml up -d
# Build da imagem
docker build -t my-api:latest -f deployments/docker/dockerfiles/Dockerfile.prod .# Aplicar manifests
kubectl apply -k deployments/k8s/base/
# Com overlay de ambiente
kubectl apply -k deployments/k8s/overlays/production/# Instalar chart
helm install my-api deployments/helm/api/ \
--namespace my-api \
--create-namespace \
--values deployments/helm/api/values-production.yamlcd deployments/terraform
# Inicializar
terraform init
# Planejar
terraform plan -var-file=environments/production.tfvars
# Aplicar
terraform apply -var-file=environments/production.tfvarsDeploy serverless nativo para Kubernetes com auto-scaling e scale-to-zero:
# Deploy para desenvolvimento (scale-to-zero habilitado)
kubectl apply -k deployments/knative/overlays/dev
# Deploy para produção (minScale=2 para alta disponibilidade)
kubectl apply -k deployments/knative/overlays/prod
# Verificar status
kubectl get ksvc -n my-api
# Obter URL do serviço
kubectl get ksvc python-api-base -n my-api -o jsonpath='{.status.url}'Recursos inclusos:
- Auto-scaling com scale-to-zero
- Traffic splitting para canary deployments
- CloudEvents para event-driven architecture
- Integração com Istio (mTLS, observability)
- Integração com Kafka via Knative Eventing
Documentação completa: Knative README
O projeto inclui configuração completa de GitOps com ArgoCD para continuous delivery declarativo.
# Instalar ArgoCD
kubectl apply -k deployments/argocd/overlays/dev
# Obter senha admin
kubectl -n argocd get secret argocd-initial-admin-secret -o jsonpath="{.data.password}" | base64 -d
# Acessar UI
kubectl port-forward svc/argocd-server -n argocd 8080:443
# Abrir: https://localhost:8080
# Aplicar Applications
kubectl apply -k deployments/argocd/applicationsAmbientes configurados:
| Ambiente | Auto-Sync | Self-Heal | Aprovação |
|---|---|---|---|
| Dev | ✅ | ✅ | Automática |
| Staging | ✅ | ❌ | Automática |
| Prod | ❌ | ❌ | Manual |
Recursos incluídos:
- ApplicationSets para geração dinâmica
- Image Updater para atualização automática de imagens
- Notificações Slack para eventos de sync
- Sealed Secrets para gestão segura de secrets
- Sync hooks (PreSync migrations, PostSync smoke tests)
Documentação completa: deployments/argocd/README.md
O projeto inclui um Makefile completo para automação:
# Setup inicial completo
make setup
# Desenvolvimento
make run # Iniciar servidor dev
make run-prod # Iniciar servidor produção
# Database
make migrate # Aplicar migrations
make migrate-down # Rollback última migration
make migrate-create msg="add users table" # Criar migration
# Testes
make test # Todos os testes
make test-unit # Unitários
make test-integration # Integração
make test-property # Property-based
make test-cov # Com cobertura
# Qualidade de código
make lint # Verificar lint (ruff)
make lint-fix # Corrigir lint
make format # Formatar código
make type-check # Type check (mypy)
make check # Todas as verificações
# Segurança
make security # Scan de segurança (bandit)
make security-full # Scan completo + secrets
# Docker
make docker-up # Iniciar serviços
make docker-down # Parar serviços
make docker-logs # Ver logs
make docker-rebuild # Rebuild imagens
# Documentação
make docs-serve # Servir docs localmente
make docs-build # Build docs
# Limpeza
make clean # Limpar arquivos temporários
make clean-all # Limpar tudo incluindo venv
# Utilitários
make generate-secret # Gerar secret key
make health # Verificar saúde do sistema
make validate # Validar configurações
# ArgoCD / GitOps
make validate-argocd # Validar manifests ArgoCD
make argocd-install-dev # Instalar ArgoCD (dev)
make argocd-install-prod # Instalar ArgoCD (prod)
make argocd-password # Obter senha admin
make argocd-port-forward # Port-forward UI
make argocd-status # Status das applications
make argocd-sync-dev # Sync dev
make argocd-sync-prod # Sync prod (manual)
make test-argocd # Property tests ArgoCD# Validar documentação
python scripts/validate_docs.py
# Validar configurações
python scripts/validate-config.py
python scripts/validate-config.py --strict # Modo estrito
python scripts/validate-config.py --fix # Auto-fix
# Seed de dados de exemplo
python scripts/seed_examples.py# Comandos via CLI
uv run api-cli --help| Categoria | Tecnologia | Versão |
|---|---|---|
| Linguagem | Python | 3.12+ |
| Framework Web | FastAPI | 0.115+ |
| Validação | Pydantic | 2.9+ |
| ORM | SQLAlchemy + SQLModel | 2.0+ |
| Migrations | Alembic | 1.14+ |
| DI Container | dependency-injector | 4.42+ |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Database Relacional | PostgreSQL | Banco principal (ACID, transações) |
| Database NoSQL | ScyllaDB/Cassandra | Alta performance, dados distribuídos, time-series |
| ORM Async | asyncpg | Driver async PostgreSQL |
| Cache | Redis | Cache, tokens, rate limit |
| Search | Elasticsearch | Full-text search |
ScyllaDB é utilizado para cenários que exigem:
- Alta taxa de escrita (logs, eventos, métricas)
- Dados distribuídos geograficamente
- Time-series data
- Escalabilidade horizontal
from infrastructure.scylladb import ScyllaDBRepository
# Configuração via variáveis de ambiente
# SCYLLADB__HOSTS=["localhost:9042"]
# SCYLLADB__KEYSPACE=my_keyspace
repo = ScyllaDBRepository(entity_class=EventLog)
await repo.insert(event)
events = await repo.find_by_partition("user_123")| Categoria | Tecnologia | Uso |
|---|---|---|
| Event Streaming | Kafka (aiokafka) | Domain events |
| Message Queue | RabbitMQ | Background tasks |
| Object Storage | MinIO | File uploads |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Logging | structlog | Structured JSON logs |
| Tracing | OpenTelemetry | Distributed tracing |
| Metrics | Prometheus | Métricas customizadas |
| Log Aggregation | Elasticsearch | Centralização de logs |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Rate Limiting | slowapi | Proteção contra abuse |
| Password Hashing | passlib (Argon2) | Hash seguro de senhas |
| JWT | python-jose | Tokens de autenticação |
| Encryption | cryptography | Criptografia de campos |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Test Framework | pytest | Execução de testes |
| Async Tests | pytest-asyncio | Testes assíncronos |
| Property Tests | Hypothesis | Property-based testing |
| Factories | polyfactory | Geração de dados de teste |
| Mocking | respx | Mock de HTTP requests |
| Load Tests | k6 | Testes de carga |
| Coverage | pytest-cov | Cobertura de código |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Linter | ruff | Lint e formatação |
| Type Checker | mypy | Verificação de tipos |
| Security Scanner | bandit | Análise de segurança |
| Pre-commit | pre-commit | Git hooks |
| Categoria | Tecnologia | Uso |
|---|---|---|
| GraphQL | Strawberry | Schema e resolvers |
| Categoria | Tecnologia | Uso |
|---|---|---|
| Service Mesh | Istio 1.20+ | mTLS, traffic management, observability |
| GitOps | ArgoCD | Continuous delivery |
| Container Orchestration | Kubernetes 1.28+ | Orquestração de containers |
| Infrastructure as Code | Terraform 1.6+ | Provisionamento de infraestrutura |
| Helm | Helm 3.14+ | Packaging de aplicações K8s |
| Documento | Descrição | Localização |
|---|---|---|
| Overview | Visão geral do sistema | docs/overview.md |
| Architecture | Arquitetura detalhada | docs/architecture.md |
| Modules | Descrição dos módulos | docs/modules.md |
| Patterns | Padrões de implementação | docs/patterns.md |
| Configuration | Configuração completa | docs/configuration.md |
| Getting Started | Guia de início rápido | docs/getting-started.md |
| Testing | Guia de testes | docs/testing.md |
| Deployment | Guia de deploy | docs/deployment.md |
| ADR | Título | Status |
|---|---|---|
| ADR-001 | JWT Authentication | Accepted |
| ADR-002 | RBAC Implementation | Accepted |
| ADR-003 | API Versioning | Accepted |
| ADR-004 | Token Revocation | Accepted |
| ADR-005 | Repository Pattern | Accepted |
| ADR-006 | Specification Pattern | Accepted |
| ADR-007 | CQRS Implementation | Accepted |
| ADR-008 | Cache Strategy | Accepted |
| ADR-009 | Resilience Patterns | Accepted |
| ADR-010 | Error Handling | Accepted |
| ADR-011 | Observability Stack | Accepted |
| ADR-012 | Clean Architecture | Accepted |
| ADR-013 | SQLModel Production Readiness | Accepted |
| ADR-014 | API Best Practices 2025 | Accepted |
| ADR-015 | GitOps with ArgoCD | Accepted |
| ADR-016 | Core Modules Restructuring 2025 | Accepted |
| ADR-017 | Core Modules Code Review 2025 | Accepted |
| ADR-018 | Istio Service Mesh | Accepted |
| ADR-019 | Kepler GreenOps | Accepted |
| Guia | Descrição |
|---|---|
docs/guides/getting-started.md |
Início rápido |
docs/guides/testing-guide.md |
Como escrever testes |
docs/guides/security-guide.md |
Práticas de segurança |
docs/guides/debugging-guide.md |
Debugging e troubleshooting |
docs/guides/integration-guide.md |
Integração com serviços |
docs/guides/cqrs-middleware-guide.md |
Usando CQRS |
docs/guides/bounded-context-guide.md |
Criando bounded contexts |
| Documento | Descrição |
|---|---|
docs/api/openapi.yaml |
OpenAPI 3.1 spec |
docs/api/security.md |
Segurança da API |
docs/api/versioning.md |
Versionamento |
docs/api/rest/ |
Endpoints REST |
docs/api/graphql/ |
Schema GraphQL |
| Documento | Descrição |
|---|---|
docs/infrastructure/postgresql.md |
Configuração PostgreSQL |
docs/infrastructure/redis.md |
Configuração Redis |
docs/infrastructure/elasticsearch.md |
Configuração Elasticsearch |
docs/infrastructure/kafka.md |
Configuração Kafka |
docs/infrastructure/minio.md |
Configuração MinIO |
| Runbook | Descrição |
|---|---|
docs/runbooks/database-connection-issues.md |
Problemas de conexão DB |
docs/runbooks/cache-failures.md |
Falhas de cache |
docs/runbooks/circuit-breaker-open.md |
Circuit breaker aberto |
| Categoria | Módulos | Subpastas | Status |
|---|---|---|---|
| Core | 8 | 28 | ✅ Reorganizado |
| Application | 1 | 11 | ✅ Reorganizado |
| Domain | 3 | 11 | ✅ Reorganizado |
| Interface | 2 | 10 | ✅ Reorganizado |
| Infrastructure | 29 | 18+ | ✅ Reorganizado |
| Total | 43 | 78+ | ✅ Production-Ready |
| Categoria | Score | Detalhe |
|---|---|---|
| Core Generic Patterns (R1-R10) | 98% | All PEP 695 |
| Infrastructure & Quality (R11-R20) | 92% | Full coverage |
| Production Features (R21-R30) | 92% | Enterprise-ready |
- Compilação: ✅ Sem erros
- Imports: ✅ Todos validados
- Circular Dependencies: ✅ Nenhuma
- Backward Compatibility: ✅ 100%
- Code Review: ✅ 43/43 módulos aprovados
- Documentação: ✅ 19 ADRs
| Padrão | Status | Descrição |
|---|---|---|
| Clean Architecture | ✅ 100% | Separação de camadas com dependências unidirecionais |
| OWASP API Security Top 10 | ✅ 100% | Proteção contra vulnerabilidades comuns |
| 12-Factor App | ✅ 100% | Cloud-native design |
| RFC 7807 | ✅ Implementado | Problem Details for HTTP APIs |
| RFC 8594 | ✅ Implementado | Deprecation Headers |
| OpenAPI 3.1 | ✅ Implementado | Documentação automática |
| SOLID Principles | ✅ Implementado | Código manutenível |
| DDD | ✅ Implementado | Domain-Driven Design |
| PEP 695 | ✅ Implementado | Modern Python Generics |
| Recurso | Implementação |
|---|---|
| Autenticação | JWT com access (30min) e refresh tokens (7 dias) |
| Autorização | RBAC com composição de permissões |
| Rate Limiting | slowapi com limites configuráveis |
| Headers | CSP, HSTS, X-Frame-Options, X-Content-Type-Options |
| Senhas | Argon2 hashing + política configurável |
| Input | Validação Pydantic + sanitização |
| Tokens | Revogação via Redis blacklist |
| Logs | Redação automática de PII |
| Encryption | Field-level encryption para dados sensíveis |
1. Entidade (src/domain/products/entities.py):
from sqlmodel import SQLModel, Field
from ulid import ULID
class Product(SQLModel, table=True):
__tablename__ = "products"
id: str = Field(default_factory=lambda: str(ULID()), primary_key=True)
name: str = Field(max_length=100)
price: float = Field(ge=0)
description: str | None = Field(default=None)
is_active: bool = Field(default=True)2. DTOs (src/application/products/dtos.py):
from pydantic import BaseModel, Field
class ProductCreate(BaseModel):
name: str = Field(min_length=1, max_length=100)
price: float = Field(ge=0)
description: str | None = None
class ProductUpdate(BaseModel):
name: str | None = None
price: float | None = Field(default=None, ge=0)
description: str | None = None
class ProductResponse(BaseModel):
id: str
name: str
price: float
description: str | None
is_active: bool
model_config = {"from_attributes": True}3. Router (src/interface/v1/products_router.py):
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/products", tags=["Products"])
@router.post("/", response_model=ProductResponse, status_code=201)
async def create_product(
data: ProductCreate,
session: AsyncSession = Depends(get_session),
) -> ProductResponse:
product = Product(**data.model_dump())
session.add(product)
await session.commit()
await session.refresh(product)
return ProductResponse.model_validate(product)
@router.get("/{product_id}", response_model=ProductResponse)
async def get_product(
product_id: str,
session: AsyncSession = Depends(get_session),
) -> ProductResponse:
product = await session.get(Product, product_id)
if not product:
raise HTTPException(404, "Product not found")
return ProductResponse.model_validate(product)4. Migration:
uv run alembic revision --autogenerate -m "Add products table"
uv run alembic upgrade headsqlalchemy.exc.OperationalError: connection refused
Solução:
# Verificar se PostgreSQL está rodando
docker ps | grep postgres
# Iniciar se necessário
docker compose -f deployments/docker/docker-compose.base.yml up -d postgrespydantic_core._pydantic_core.ValidationError: secret_key
Solução:
# Gerar chave segura
python -c "import secrets; print(secrets.token_urlsafe(64))"
# Adicionar ao .env
SECURITY__SECRET_KEY=<chave_gerada>ModuleNotFoundError: No module named 'core'
Solução:
# Verificar PYTHONPATH
export PYTHONPATH="${PYTHONPATH}:${PWD}/src"
# Ou instalar em modo editável
pip install -e .OSError: [Errno 98] Address already in use
Solução:
# Windows
netstat -ano | findstr :8000
taskkill /PID <PID> /F
# Linux/Mac
lsof -i :8000
kill -9 <PID>
# Ou usar outra porta
uvicorn src.main:app --port 8001redis.exceptions.ConnectionError: Error connecting to localhost:6379
Solução:
# Iniciar Redis
docker compose -f deployments/docker/docker-compose.base.yml up -d redis
# Ou desabilitar Redis no .env
REDIS__ENABLED=falseVeja CONTRIBUTING.md para diretrizes detalhadas.
- Fork o repositório
- Crie uma branch (
git checkout -b feature/nova-feature) - Faça suas alterações seguindo os coding standards
- Escreva/atualize testes (cobertura mínima 80%)
- Execute verificações (
make check) - Commit (
git commit -m 'feat(scope): add nova feature') - Push (
git push origin feature/nova-feature) - Abra um Pull Request
- Arquivos: kebab-case
- Classes: PascalCase
- Funções/Variáveis: snake_case
- Constantes: UPPER_SNAKE_CASE
- Complexidade máxima: 10
- Linhas por arquivo: 200-400 (máx 500)
- Linhas por função: 10-50 (máx 75)
MIT License - veja LICENSE para detalhes.
- FastAPI Documentation
- Pydantic Documentation
- SQLAlchemy Documentation
- Hypothesis Documentation
- OpenTelemetry Python
Desenvolvido com ❤️ usando Python e FastAPI
O Python API Base inclui suporte completo para monitoramento de sustentabilidade e práticas GreenOps usando Kepler (CNCF Sandbox).
| Componente | Descrição |
|---|---|
| Kepler | Coleta métricas de energia via eBPF/RAPL |
| Carbon Calculator | Calcula emissões de CO2 baseado em intensidade regional |
| Cost Tracker | Correlaciona custos energéticos com gastos cloud |
| Grafana Dashboard | Visualização de consumo e emissões |
| Prometheus Alerts | Alertas para anomalias de consumo |
# Deploy Kepler
kubectl apply -k deployments/kepler/overlays/production
# Verificar métricas
kubectl port-forward -n kepler-system daemonset/kepler 9102:9102
curl http://localhost:9102/metrics | grep kepler_container_joules| Endpoint | Descrição |
|---|---|
GET /api/v1/sustainability/metrics |
Métricas de energia |
GET /api/v1/sustainability/emissions |
Emissões de carbono |
GET /api/v1/sustainability/reports/{namespace} |
Relatório de sustentabilidade |
GET /api/v1/sustainability/costs |
Custos energéticos |
GET /api/v1/sustainability/export/csv |
Exportar CSV |
GET /api/v1/sustainability/export/json |
Exportar JSON |
from infrastructure.sustainability import SustainabilityService
service = SustainabilityService()
# Obter métricas de carbono
metrics = await service.get_carbon_metrics(namespace="production")
# Gerar relatório
report = await service.generate_report(
namespace="production",
period_start=datetime(2024, 1, 1),
period_end=datetime(2024, 1, 31),
baseline_emissions=Decimal("50000"),
target_emissions=Decimal("40000"),
)
print(f"Progress: {report.progress_percentage}%")# Consumo de energia por namespace (kWh/h)
sum by (namespace) (rate(kepler_container_joules_total[5m])) / 3600000
# Emissões de carbono estimadas (gCO2/h)
(sum(rate(kepler_container_joules_total[5m])) / 3600000) * 400
# Top 10 pods por consumo
topk(10, sum by (pod) (rate(kepler_container_joules_total[5m])))
O Python API Base inclui integração completa com Dapr 1.14 para construção de microsserviços distribuídos.
| Building Block | Descrição | Componente |
|---|---|---|
| Service Invocation | Comunicação service-to-service | HTTP/gRPC |
| Pub/Sub | Mensageria assíncrona | Kafka |
| State Management | Armazenamento de estado | Redis |
| Secrets | Gerenciamento de secrets | Vault/K8s |
| Bindings | Integrações externas | Cron, Kafka |
| Actors | Atores virtuais | Redis |
| Workflows | Orquestração de processos | Dapr Workflow |
# Iniciar com Docker Compose
cd deployments/dapr
docker-compose -f docker-compose.dapr.yaml up -d
# Ou com Dapr CLI
dapr run --app-id python-api --app-port 8000 -- python -m uvicorn src.main:app# .env
DAPR_ENABLED=true
DAPR_HTTP_ENDPOINT=http://localhost:3500
DAPR_GRPC_ENDPOINT=localhost:50001
DAPR_APP_ID=python-apifrom infrastructure.dapr.client import get_dapr_client
from infrastructure.dapr.state import StateManager
from infrastructure.dapr.pubsub import PubSubManager
# State Management
client = get_dapr_client()
state = StateManager(client, "statestore")
await state.save("key", b"value")
item = await state.get("key")
# Pub/Sub
pubsub = PubSubManager(client, "pubsub")
await pubsub.publish("orders", {"order_id": "123"})
# Service Invocation
from infrastructure.dapr.invoke import ServiceInvoker
invoker = ServiceInvoker(client)
response = await invoker.invoke("order-service", "orders")Políticas de resiliência configuráveis em deployments/dapr/config/resiliency.yaml:
- Timeouts: Duração máxima de operações
- Retries: Retry com backoff exponencial
- Circuit Breakers: Proteção contra falhas em cascata
