Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 150 additions & 15 deletions src/workflows_cdk/core/dynamic_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Union, cast
from flask import Flask, request as flask_request, current_app
from flask import Flask, request as flask_request, current_app, Response as FlaskResponse
from flask_cors import CORS
import inspect
import os
Expand All @@ -13,6 +13,7 @@
import importlib.util
import logging
import yaml
import json
from pathlib import Path
from sentry_sdk.integrations.flask import FlaskIntegration
from .errors import ManagedError
Expand All @@ -33,6 +34,78 @@ def load_app_config(app_dir: str) -> Dict[str, Any]:
with open(config_path, "r") as f:
return yaml.safe_load(f) or {}

def load_schema_file(schema_path: str) -> Optional[Dict[str, Any]]:
"""Load and validate a schema file.

Args:
schema_path: Path to the schema file

Returns:
Optional[Dict[str, Any]]: The loaded schema or None if invalid/not found
"""
try:
if not os.path.exists(schema_path):
return None

with open(schema_path, 'r') as f:
schema_data = json.load(f)

# Basic schema validation - ensure it's a dict with required fields
if not isinstance(schema_data, dict):
logging.warning(f"Schema file {schema_path} does not contain a valid JSON object")
return None

return schema_data
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse schema file {schema_path}: {e}")
return None
except Exception as e:
logging.warning(f"Error loading schema file {schema_path}: {e}")
return None

def find_schema_files(directory: str) -> Dict[str, Dict[str, Any]]:
"""Find and load all schema.json files in a directory tree.

Args:
directory: Root directory to search from

Returns:
Dict[str, Dict[str, Any]]: Map of route paths to their schema data
"""
schema_files = {}
try:
for root, _, files in os.walk(directory):
if 'schema.json' in files:
# Load the schema file
schema_path = os.path.join(root, 'schema.json')
schema_data = load_schema_file(schema_path)

if schema_data:
# Calculate the route path based on directory structure
rel_path = os.path.relpath(root, directory)
route_path = '/' + rel_path.replace(os.sep, '/')
if route_path == '/.': # Handle root directory case
route_path = ''
schema_files[route_path] = schema_data

except Exception as e:
logging.error(f"Error scanning for schema files: {e}")

return schema_files

def create_schema_handler(schema_data: Dict[str, Any]) -> Callable[[], FlaskResponse]:
"""Create a handler function for a schema route.

Args:
schema_data: The schema data to return

Returns:
Callable[[], FlaskResponse]: Handler function for the route
"""
def schema_handler() -> FlaskResponse:
return Response(data={"schema": schema_data})
return schema_handler

def log_error_details(app: Flask, error: Union[ManagedError, Exception], is_managed: bool = False) -> Optional[str]:
"""Centralized error logging function."""
# Get full traceback from current exception context
Expand Down Expand Up @@ -97,7 +170,6 @@ def __init__(self, app: Optional[Flask] = None, *,
sentry_dsn: Optional Sentry DSN for error tracking
cors_origins: Optional list of allowed CORS origins
"""

# List to store all discovered routes
self.routes: List[Dict[str, Any]] = []
# Flask application instance
Expand All @@ -112,6 +184,7 @@ def __init__(self, app: Optional[Flask] = None, *,
self.config = config or {}
self.sentry_dsn = sentry_dsn or self.app_settings.get("sentry_dsn")
self.cors_origins = cors_origins
self.schema_routes: Dict[str, Dict[str, Any]] = {}

if app is not None:
self.init_app(app)
Expand Down Expand Up @@ -212,6 +285,37 @@ def _create_route_info(self, function: Callable, rule: Optional[str] = None, opt

return route_info

def _register_schema_route(self, route_dir: str, base_path: str):
"""Register schema route for a directory if schema.json exists and no schema route is defined."""
schema_path = os.path.join(route_dir, 'schema.json')

# Check if schema.json exists
if not os.path.exists(schema_path):
return

# Check if schema route is already registered for this path
schema_route = f"{base_path}/schema"
if any(r.get('path') == schema_route for r in self.routes):
return

# Load schema
schema_data = load_schema_file(schema_path)
if not schema_data:
return

# Create schema route handler
def schema_handler():
return Response(data={"schema": schema_data})

# Register the route
route_info = {
"path": schema_route,
"endpoint": f"{base_path.replace('/', '_')}_schema",
"view_func": schema_handler,
"methods": ["GET", "POST"]
}
self.routes.append(route_info)

def discover_routes(self) -> None:
"""
Automatically discover and register all routes in the routes directory.
Expand Down Expand Up @@ -330,6 +434,35 @@ def routes():
"routes": self.routes
})

def register_schema_routes(self, app: Flask) -> None:
"""Register schema routes for all discovered schema files."""
# Get routes directory from config or use default
routes_dir = self.app_settings.get("routes_directory", "routes")
routes_path = os.path.join(os.getcwd(), routes_dir)

# Only proceed if auto-registration is enabled
if self.app_settings.get("automatically_register_schema_routes", True):
# Find all schema files
schema_files = find_schema_files(routes_path)

# Register each schema route
for route_path, schema_data in schema_files.items():
# Skip if a route already exists
schema_route = f"{route_path}/schema"
if any(r.get('path') == schema_route for r in self.routes):
continue

# Create and register the route
handler = create_schema_handler(schema_data)
route_info = {
"path": schema_route,
"endpoint": f"schema_{route_path.replace('/', '_')}",
"view_func": handler,
"methods": ["GET", "POST"]
}
self.routes.append(route_info)
self.schema_routes[route_path] = schema_data

def init_app(self, app: Flask) -> None:
"""Initialize the router with a Flask app and register all discovered routes."""
self.app = app
Expand All @@ -345,28 +478,30 @@ def init_app(self, app: Flask) -> None:
# Configure components
self.configure_logging(app)
self.configure_cors(app)


# First discover all routes in the project
self.discover_routes()

# Register schema routes
self.register_schema_routes(app)

# Register error handlers
self.register_error_handlers(app)

# Register each discovered route with Flask
for route_info in self.routes:
app.add_url_rule(
route_info["path"],
endpoint=route_info["endpoint"],
view_func=route_info["view_func"],
methods=route_info["methods"],
**{k: v for k, v in route_info.items() if k not in ["path", "endpoint", "view_func", "methods"]}
)
print(f"Registered route: {route_info['path']} with methods {route_info['methods']}") if self.environment == "dev" or self.environment == "development" else None

# Register core routes
self._register_core_routes(app)


# Register all discovered routes
for route in self.routes:
app.add_url_rule(
route["path"],
endpoint=route["endpoint"],
view_func=route["view_func"],
methods=route.get("methods", ["POST"]),
**{k: v for k, v in route.items() if k not in ["path", "endpoint", "view_func", "methods"]}
)
if self.environment in ["dev", "development"]:
print(f"Registered route: {route['path']} with methods {route['methods']}")

def route(self, rule: Optional[str] = None, **options: Any) -> Callable:
"""
Expand Down
30 changes: 21 additions & 9 deletions src/workflows_cdk/core/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,32 @@
from werkzeug.exceptions import HTTPException
from workflows_cdk.core.errors import ManagedError


class Response:
"""Standardized response class for API endpoints."""

# Cache environment check
_IS_PRODUCTION = os.getenv("ENVIRONMENT", "").lower() == "prod"

@staticmethod
def create_response(
data: Any = None,
metadata: Optional[Dict[str, Any]] = None,
status_code: int = 200
) -> FlaskResponse:
"""Create a standardized response."""
response_data = {"data": data}
if metadata:
response_data["metadata"] = metadata
return make_response(jsonify(response_data), status_code)

def __new__(
cls,
data: Any = None,
metadata: Optional[Dict[str, Any]] = None,
status_code: int = 200
) -> FlaskResponse:
"""Create a new success response."""
return cls.create_response(data, metadata, status_code)

@classmethod
def success(
Expand All @@ -26,14 +45,7 @@ def success(
status_code: int = 200
) -> FlaskResponse:
"""Create a success response."""
response_data = {
"data": data,
}

if metadata:
response_data["metadata"] = metadata

return make_response(jsonify(response_data), status_code)
return cls.create_response(data, metadata, status_code)

@classmethod
def error(
Expand Down
4 changes: 2 additions & 2 deletions src/workflows_cdk/core/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ def init_sentry(app: Flask, dsn: Optional[str] = None) -> None:
attach_stacktrace=True,
send_default_pii=False,
include_source_context=True,

debug=False,
)
app.logger.info("Sentry initialized successfully with debug mode enabled")
app.logger.info("Sentry initialized successfully")
22 changes: 15 additions & 7 deletions src/workflows_cdk/core/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@ def validate_request(request: Any, required_fields: List[str]):
Raises:
ManagedError: If validation fails
"""
if not request.json:
raise ManagedError("request body is empty")

data = request.json["data"]
credentials = request.json["credentials"]
# Skip validation for schema routes
if request.path.endswith('/schema'):
return

# Skip validation if no required fields
if not required_fields:
return

# Only validate if there's a JSON body
if request.is_json and request.json:
data = request.json.get("data")
credentials = request.json.get("credentials")

if not data or not credentials:
raise ManagedError("Missing required fields: data or credentials")
if not data or not credentials:
raise ManagedError("Missing required fields: data or credentials")

if required_fields:
missing_fields = [field for field in required_fields if field not in request.json]
if missing_fields:
raise ManagedError(f"Missing required fields: {', '.join(missing_fields)}")



def parse_str_to_json(data):
try:
return json.loads(data)
Expand Down