Skip to content

Controllers

santiago edited this page Jan 29, 2026 · 1 revision

Controllers

Controllers organize your Azure Functions triggers using a clean, class-based approach.

Base Controller

All controllers must inherit from BaseController:

import logging
from azfunc_boot import BaseController, DependencyContainer
import azure.functions as func

class MyController(BaseController):
    PATH = "my-endpoint"

    def __init__(self, container: DependencyContainer, bp: func.Blueprint):
        super().__init__(container, bp)
        self._logger = logging.getLogger(__name__)
    
    def register_routes(self):
        self.bp.route(self.PATH, methods=["GET"])(self.my_method)
    
    async def my_method(self, req: func.HttpRequest) -> func.HttpResponse:
        # Your route handler logic here
        return func.HttpResponse("OK", status_code=200)

Key Components

PATH Attribute

The PATH attribute defines the base path for all routes in this controller:

class UserController(BaseController):
    PATH = "users"  # All routes will be under /users
    
    def register_routes(self):
        self.bp.route(self.PATH, methods=["GET"])(self.get_users)
        # Route will be: /users

register_routes Method

Override register_routes() to define your routes and triggers:

def register_routes(self):
    # HTTP routes
    self.bp.route(self.PATH, methods=["GET"])(self.get_items)
    self.bp.route(f"{self.PATH}/{{id}}", methods=["GET"])(self.get_item)
    self.bp.route(self.PATH, methods=["POST"])(self.create_item)
    
    # Timer trigger
    self.bp.timer_trigger(schedule="0 */5 * * * *")(self.scheduled_task)
    
    # Queue trigger
    self.bp.service_bus_queue_trigger(
        arg_name="msg",
        queue_name="myqueue",
        connection="ServiceBusConnection"
    )(self.process_queue_message)

Automatic Scope Management

Controllers automatically create and dispose scoped services for each request:

async def create_user(self, req: func.HttpRequest) -> func.HttpResponse:
    # A scope is automatically created for this request
    # Scoped services are automatically disposed after the request
    
    user_service = self.container.get_service(IUserService)
    # Use the service...
    
    # Scope is automatically disposed when method completes
    # If user_service implements IDisposable, dispose() is called

Supported Triggers

The framework supports all Azure Functions triggers through the Blueprint API:

HTTP Triggers

def register_routes(self):
    self.bp.route(self.PATH, methods=["GET", "POST"])(self.handle_request)
    
    # With auth level
    self.bp.route(
        self.PATH,
        methods=["GET"],
        auth_level=func.AuthLevel.FUNCTION
    )(self.authenticated_endpoint)

Timer Triggers

def register_routes(self):
    self.bp.timer_trigger(
        schedule="0 */5 * * * *",  # Every 5 minutes
        arg_name="timer"
    )(self.scheduled_task)

async def scheduled_task(self, timer: func.TimerRequest):
    self._logger.info("Timer triggered")
    # Your scheduled logic here

Service Bus Triggers

def register_routes(self):
    self.bp.service_bus_queue_trigger(
        arg_name="msg",
        queue_name="myqueue",
        connection="ServiceBusConnection"
    )(self.process_message)

async def process_message(self, msg: func.ServiceBusMessage):
    self._logger.info(f"Received message: {msg.get_body().decode()}")
    # Process the message

Blob Triggers

def register_routes(self):
    self.bp.blob_trigger(
        arg_name="blob",
        path="samples/{name}",
        connection="AzureWebJobsStorage"
    )(self.process_blob)

async def process_blob(self, blob: func.InputStream):
    self._logger.info(f"Processing blob: {blob.name}")
    # Process the blob

Event Hub Triggers

def register_routes(self):
    self.bp.event_hub_trigger(
        arg_name="events",
        event_hub_name="myeventhub",
        connection="EventHubConnection"
    )(self.process_events)

async def process_events(self, events: func.EventHubEvent):
    self._logger.info(f"Received {len(events)} events")
    # Process events

Cosmos DB Triggers

def register_routes(self):
    self.bp.cosmos_db_trigger(
        arg_name="documents",
        database_name="mydb",
        collection_name="mycollection",
        connection_string_setting="CosmosDBConnection"
    )(self.process_documents)

async def process_documents(self, documents: func.DocumentList):
    self._logger.info(f"Received {len(documents)} documents")
    # Process documents

Complete Example

import logging
from azfunc_boot import BaseController, DependencyContainer
import azure.functions as func

from services.user_service import IUserService


class UserController(BaseController):
    PATH = "users"

    def __init__(self, container: DependencyContainer, bp: func.Blueprint):
        super().__init__(container, bp)
        self._logger = logging.getLogger(__name__)

    def register_routes(self):
        # GET /users
        self.bp.route(self.PATH, methods=["GET"])(self.get_users)
        
        # GET /users/{id}
        self.bp.route(f"{self.PATH}/{{id}}", methods=["GET"])(self.get_user)
        
        # POST /users
        self.bp.route(self.PATH, methods=["POST"])(self.create_user)
        
        # PUT /users/{id}
        self.bp.route(f"{self.PATH}/{{id}}", methods=["PUT"])(self.update_user)
        
        # DELETE /users/{id}
        self.bp.route(f"{self.PATH}/{{id}}", methods=["DELETE"])(self.delete_user)

    async def get_users(self, req: func.HttpRequest) -> func.HttpResponse:
        user_service: IUserService = self.container.get_service(IUserService)
        users = await user_service.get_all()
        return func.HttpResponse(
            json.dumps([u.to_dict() for u in users]),
            status_code=200,
            mimetype="application/json"
        )

    async def get_user(self, req: func.HttpRequest) -> func.HttpResponse:
        user_id = req.route_params.get("id")
        user_service: IUserService = self.container.get_service(IUserService)
        user = await user_service.get_by_id(user_id)
        
        if not user:
            return func.HttpResponse("User not found", status_code=404)
        
        return func.HttpResponse(
            json.dumps(user.to_dict()),
            status_code=200,
            mimetype="application/json"
        )

    async def create_user(self, req: func.HttpRequest) -> func.HttpResponse:
        try:
            body = req.get_json()
            user_service: IUserService = self.container.get_service(IUserService)
            user = await user_service.create(body)
            return func.HttpResponse(
                json.dumps(user.to_dict()),
                status_code=201,
                mimetype="application/json"
            )
        except Exception as e:
            self._logger.error(f"Error creating user: {e}")
            return func.HttpResponse(f"Error: {e}", status_code=500)

    async def update_user(self, req: func.HttpRequest) -> func.HttpResponse:
        user_id = req.route_params.get("id")
        body = req.get_json()
        user_service: IUserService = self.container.get_service(IUserService)
        user = await user_service.update(user_id, body)
        return func.HttpResponse(
            json.dumps(user.to_dict()),
            status_code=200,
            mimetype="application/json"
        )

    async def delete_user(self, req: func.HttpRequest) -> func.HttpResponse:
        user_id = req.route_params.get("id")
        user_service: IUserService = self.container.get_service(IUserService)
        await user_service.delete(user_id)
        return func.HttpResponse("", status_code=204)

Related Topics

Clone this wiki locally