Skip to content

landim32/mqMonitor

Repository files navigation

mqMonitor - Process Pipeline Monitor

.NET 8 React 19 RabbitMQ PostgreSQL License

Overview

mqMonitor is a real-time process pipeline monitoring system built with .NET 8, RabbitMQ, PostgreSQL, and a React 19 frontend. It provides full observability over distributed process pipelines using event-driven architecture, the Saga pattern (choreography), and CQRS.

Processes flow through configurable pipeline stages (e.g., Report > Account > Routine > Payment > Notification > Audit), with each stage handled by an independent worker. The monitor captures all lifecycle events, projects them into a read model, and pushes real-time updates to the frontend via SignalR.

                                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                    β”‚         React Frontend (Vite)        β”‚
                                    β”‚    Kanban Board + Process Details    β”‚
                                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                   β”‚ SignalR WebSocket
                                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                    β”‚          Monitor API (.NET 8)        β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚  REST API + Event Projection + Saga  β”‚
β”‚  Producer  │──→ RabbitMQ ──→      β”‚  Cancel Consumer + Compensation      β”‚
β”‚   (CLI)   β”‚    Pipeline    β”Œβ”€β”€β†’   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Exchange    β”‚                     β”‚
                     β”‚       β”‚              PostgreSQL (Read Model)
              β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€
              β–Ό              β–Ό
     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚   Stage 1   β”‚ β”‚   Stage N   β”‚    ← Independent Workers
     β”‚   Worker    β”‚ β”‚   Worker    β”‚      (one per pipeline stage)
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“– Guide: Creating a New Worker

This is the main extension point of mqMonitor. Each pipeline stage needs a worker that consumes messages from its queue, processes them, and forwards to the next stage.

Step 1 β€” Register the stage in appsettings.json

Add your new stage to the Pipeline.Stages array in MqMonitor.API/appsettings.json:

{
  "Pipeline": {
    "PipelineExchange": "processes.pipeline",
    "Stages": [
      { "Name": "report",       "DisplayName": "Report",       "QueueName": "processes.report",       "RoutingKey": "pipeline.report",       "MaxPriority": 10, "PrefetchCount": 1, "DlqName": "processes.report.dlq",       "RetryDelayMs": 5000, "MaxRetries": 3 },
      { "Name": "account",      "DisplayName": "Account",      "QueueName": "processes.account",      "RoutingKey": "pipeline.account",      "MaxPriority": 10, "PrefetchCount": 1, "DlqName": "processes.account.dlq",      "RetryDelayMs": 5000, "MaxRetries": 3 },

      // βœ… Add your new stage here:
      { "Name": "myStage",      "DisplayName": "My Stage",     "QueueName": "processes.my-stage",     "RoutingKey": "pipeline.myStage",      "MaxPriority": 10, "PrefetchCount": 1, "DlqName": "processes.my-stage.dlq",   "RetryDelayMs": 5000, "MaxRetries": 3 }
    ]
  }
}

Stage configuration fields:

Field Description
Name Unique identifier used internally (lowercase, no spaces)
DisplayName Human-readable name shown in the UI
QueueName RabbitMQ queue name (convention: processes.<name>)
RoutingKey Routing key for the pipeline exchange (convention: pipeline.<name>)
MaxPriority Max message priority level (1-10)
PrefetchCount How many messages the worker processes concurrently
DlqName Dead Letter Queue name (convention: processes.<name>.dlq)
RetryDelayMs Delay in ms before retrying a failed message
MaxRetries Max retry attempts before sending to DLQ

Note: The topology setup (RabbitMqTopologySetup) automatically creates the queue, DLQ, retry queue, and all bindings from this configuration. No manual RabbitMQ setup needed.

Step 2 β€” Create the worker project

# From the repository root:
mkdir examples/MqMonitor.Example.MyStageWorker

Create the .csproj file:

<!-- examples/MqMonitor.Example.MyStageWorker/MqMonitor.Example.MyStageWorker.csproj -->
<Project Sdk="Microsoft.NET.Sdk.Worker">
  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
  </ItemGroup>
  <ItemGroup>
    <ProjectReference Include="..\..\MqMonitor.Application\MqMonitor.Application.csproj" />
    <ProjectReference Include="..\..\MqMonitor.Domain\MqMonitor.Domain.csproj" />
    <ProjectReference Include="..\..\MqMonitor.Infra\MqMonitor.Infra.csproj" />
  </ItemGroup>
</Project>

Add to the solution:

dotnet sln add examples/MqMonitor.Example.MyStageWorker/MqMonitor.Example.MyStageWorker.csproj

Step 3 β€” Implement the worker (Program.cs)

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Options;
using MqMonitor.Application;
using MqMonitor.Domain.Enums;
using MqMonitor.Domain.Messaging.Interfaces;
using MqMonitor.Infra.Configuration;
using MqMonitor.Infra.Messaging.Contracts;
using MqMonitor.Infra.RabbitMq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddMqMonitor(builder.Configuration);

var host = builder.Build();

// Configure RabbitMQ topology (creates queues if they don't exist)
using (var scope = host.Services.CreateScope())
{
    var topology = scope.ServiceProvider.GetRequiredService<RabbitMqTopologySetup>();
    topology.Configure();
}

// Resolve services
var connectionFactory = host.Services.GetRequiredService<RabbitMqConnectionFactory>();
var publisher = host.Services.GetRequiredService<IMessagePublisher>();
var pipelineSettings = host.Services.GetRequiredService<IOptions<PipelineSettings>>().Value;
var logger = host.Services.GetRequiredService<ILogger<Program>>();

// ─── CONFIGURE THESE 3 VALUES ───────────────────────────────
const string TARGET_STAGE = "myStage";     // Must match the Name in appsettings
const string? NEXT_STAGE = "nextStage";    // Name of the next stage, or null if this is the final stage
const int ERROR_PERCENTAGE = 10;           // Simulated error rate (0-100)
// ─────────────────────────────────────────────────────────────

var stage = pipelineSettings.Stages.First(s => s.Name == TARGET_STAGE);
var workerName = $"worker-{TARGET_STAGE}-{Environment.MachineName}-{Guid.NewGuid().ToString()[..8]}";

logger.LogInformation("Starting {Worker} for stage '{Stage}'", workerName, TARGET_STAGE);

var channel = connectionFactory.CreateChannel();
channel.BasicQos(prefetchSize: 0, prefetchCount: (ushort)stage.PrefetchCount, global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (_, ea) =>
{
    var retryCount = 0;
    if (ea.BasicProperties.Headers?.TryGetValue("x-retry-count", out var rc) == true)
        retryCount = Convert.ToInt32(rc);

    try
    {
        var processEvent = JsonSerializer.Deserialize<ProcessEvent>(
            Encoding.UTF8.GetString(ea.Body.ToArray()));

        if (processEvent == null)
        {
            channel.BasicReject(ea.DeliveryTag, requeue: false);
            return;
        }

        // 1. Notify: stage started
        publisher.PublishEvent(new ProcessEvent
        {
            ProcessId = processEvent.ProcessId,
            Status = ProcessStatusEnum.StageStarted.ToConstant(),
            Worker = workerName,
            CurrentStage = TARGET_STAGE,
            Message = processEvent.Message,
            Priority = processEvent.Priority,
            Timestamp = DateTime.UtcNow
        }, RabbitMqConstants.ProcessStageStarted);

        // ──────────────────────────────────────────────
        // 2. YOUR BUSINESS LOGIC HERE
        //    Replace the simulated delay with real work.
        await Task.Delay(Random.Shared.Next(5000, 30001));
        // ──────────────────────────────────────────────

        // 3. Check for failure (replace with real error handling)
        if (Random.Shared.Next(100) < ERROR_PERCENTAGE)
        {
            var errorMsg = $"Failure at stage '{TARGET_STAGE}'";

            publisher.PublishEvent(new ProcessEvent
            {
                ProcessId = processEvent.ProcessId,
                Status = ProcessStatusEnum.Failed.ToConstant(),
                Worker = workerName, CurrentStage = TARGET_STAGE,
                ErrorMessage = errorMsg, Message = processEvent.Message,
                Priority = processEvent.Priority, Timestamp = DateTime.UtcNow
            }, RabbitMqConstants.ProcessFailed);

            // Trigger saga compensation for all completed stages
            publisher.PublishEvent(new ProcessEvent
            {
                ProcessId = processEvent.ProcessId,
                Status = ProcessStatusEnum.Compensating.ToConstant(),
                Worker = workerName, CurrentStage = TARGET_STAGE,
                ErrorMessage = errorMsg, Message = processEvent.Message,
                Priority = processEvent.Priority, Timestamp = DateTime.UtcNow
            }, RabbitMqConstants.ProcessCompensating);
        }
        else if (NEXT_STAGE != null)
        {
            // 4a. Stage completed β€” forward to next stage
            publisher.PublishEvent(new ProcessEvent
            {
                ProcessId = processEvent.ProcessId,
                Status = ProcessStatusEnum.StageCompleted.ToConstant(),
                Worker = workerName, CurrentStage = TARGET_STAGE,
                NextStage = NEXT_STAGE, Message = processEvent.Message,
                Priority = processEvent.Priority, Timestamp = DateTime.UtcNow
            }, RabbitMqConstants.ProcessStageCompleted);

            publisher.PublishToPipeline(new ProcessEvent
            {
                ProcessId = processEvent.ProcessId,
                Status = ProcessStatusEnum.Queued.ToConstant(),
                CurrentStage = NEXT_STAGE, Message = processEvent.Message,
                Priority = processEvent.Priority, Timestamp = DateTime.UtcNow
            }, $"pipeline.{NEXT_STAGE}", (byte)processEvent.Priority);
        }
        else
        {
            // 4b. Final stage β€” process finished
            publisher.PublishEvent(new ProcessEvent
            {
                ProcessId = processEvent.ProcessId,
                Status = ProcessStatusEnum.Finished.ToConstant(),
                Worker = workerName, CurrentStage = TARGET_STAGE,
                Message = processEvent.Message, Priority = processEvent.Priority,
                Timestamp = DateTime.UtcNow
            }, RabbitMqConstants.ProcessFinished);
        }

        channel.BasicAck(ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "[{Worker}] Error processing message", workerName);
        if (retryCount < stage.MaxRetries)
        {
            // Retry: publish to per-stage retry queue (TTL auto-routes back)
            var retryQueueName = $"{stage.QueueName}.retry";
            var props = channel.CreateBasicProperties();
            props.Persistent = true;
            props.Headers = new Dictionary<string, object> { { "x-retry-count", retryCount + 1 } };
            if (ea.BasicProperties.Priority > 0) props.Priority = ea.BasicProperties.Priority;
            channel.BasicPublish(exchange: "", routingKey: retryQueueName, basicProperties: props, body: ea.Body);
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        }
        else
        {
            // Max retries exceeded β€” reject sends to DLQ via dead letter exchange
            channel.BasicReject(ea.DeliveryTag, requeue: false);
        }
    }
};

channel.BasicConsume(queue: stage.QueueName, autoAck: false, consumer: consumer);
logger.LogInformation("[{Worker}] Listening on queue '{Queue}'", workerName, stage.QueueName);

await host.RunAsync();

Step 4 β€” Add to Docker Compose

Add the worker service to docker-compose.yml:

  my-stage-worker:
    build:
      context: .
      dockerfile: examples/Dockerfile       # Shared Dockerfile for all workers
      args:
        PROJECT_NAME: MqMonitor.Example.MyStageWorker
    container_name: mqmonitor-my-stage-worker
    environment:
      RabbitMq__HostName: rabbitmq
      RabbitMq__Port: 5672
      RabbitMq__UserName: ${RABBITMQ_DEFAULT_USER}
      RabbitMq__Password: ${RABBITMQ_DEFAULT_PASS}
    depends_on:
      rabbitmq:
        condition: service_healthy

Step 5 β€” Run and verify

docker compose up -d --build

Verify in the RabbitMQ Management UI (http://localhost:15672) that your new queue has 1 consumer connected.

Worker lifecycle summary

Message arrives on stage queue
  β†’ Worker publishes  process.stage.started
  β†’ Worker executes business logic
  β†’ On SUCCESS:
      If has next stage β†’ publishes process.stage.completed + forwards to next queue
      If final stage    β†’ publishes process.finished
  β†’ On FAILURE:
      Publishes process.failed + process.compensating (triggers saga compensation)
  β†’ On EXCEPTION:
      Retries via per-stage retry queue (TTL auto-routes back)
      After max retries β†’ BasicReject sends to DLQ

πŸš€ Features

  • πŸ“‘ Real-time monitoring β€” SignalR WebSocket pushes updates instantly to the frontend
  • πŸ”„ Dynamic pipeline β€” Stages configured in appsettings.json, auto-creates RabbitMQ topology
  • πŸ“‹ Kanban board β€” Visual process flow across pipeline stages with live updates
  • πŸ” Saga pattern (choreography) β€” Automatic compensation in reverse order on failures
  • 🚫 Process cancellation β€” Cancel running processes with automatic saga compensation
  • πŸ“Š Metrics dashboard β€” Process counts, stage breakdown, success/failure rates
  • πŸ”ƒ Retry + DLQ β€” Per-stage retry queues with TTL and Dead Letter Queues
  • ⚑ Priority queues β€” Message priority support across all pipeline stages
  • πŸ—οΈ Clean Architecture β€” 8-project solution with clear dependency boundaries
  • 🐳 Docker ready β€” Full stack with one docker compose up

πŸ› οΈ Technologies Used

Backend

  • .NET 8 β€” API, workers, and producer
  • ASP.NET Core β€” REST API with Swagger
  • Entity Framework Core 8 β€” PostgreSQL ORM with Code First migrations
  • RabbitMQ.Client 6.8 β€” Message broker integration
  • AutoMapper 13 β€” Object mapping between layers
  • SignalR β€” Real-time WebSocket communication

Frontend

  • React 19 β€” UI framework
  • TypeScript 5.9 β€” Type-safe development
  • Vite 7 β€” Build tool and dev server
  • Tailwind CSS 4 β€” Utility-first styling
  • @microsoft/signalr β€” Real-time WebSocket client
  • Radix UI β€” Accessible dialog components
  • Lucide React β€” Icon library
  • Sonner β€” Toast notifications

Infrastructure

  • RabbitMQ 3 β€” Message broker with Management UI
  • PostgreSQL 16 β€” Process state database
  • Docker & Docker Compose β€” Container orchestration
  • Nginx β€” Frontend static file serving

πŸ“ Project Structure

mqMonitor/
β”œβ”€β”€ MqMonitor.DTO/                    # Data Transfer Objects (no dependencies)
β”œβ”€β”€ MqMonitor.Domain/                 # Entities, enums, interfaces
β”‚   β”œβ”€β”€ Entities/                     # ProcessExecutionModel, SagaStepModel, EventLogModel
β”‚   β”œβ”€β”€ Enums/                        # ProcessStatusEnum
β”‚   β”œβ”€β”€ Messaging/Interfaces/         # IMessagePublisher
β”‚   └── Services/Interfaces/          # IProcessQueryService, IEventProjectionService
β”œβ”€β”€ MqMonitor.Infra.Interfaces/       # Repository interfaces
β”œβ”€β”€ MqMonitor.Infra/                  # Infrastructure implementations
β”‚   β”œβ”€β”€ Configuration/                # RabbitMqConstants, PipelineSettings
β”‚   β”œβ”€β”€ Context/                      # MonitorDbContext (EF Core)
β”‚   β”œβ”€β”€ Mapping/Profiles/             # AutoMapper profiles
β”‚   β”œβ”€β”€ Messaging/Contracts/          # ProcessEvent, CancelProcessCommand
β”‚   β”œβ”€β”€ RabbitMq/                     # ConnectionFactory, Publisher, TopologySetup
β”‚   β”œβ”€β”€ Repository/                   # EF Core repositories
β”‚   └── Services/                     # EventProjectionService, ProcessQueryService
β”œβ”€β”€ MqMonitor.Application/            # DI composition root (Initializer.cs)
β”œβ”€β”€ MqMonitor.API/                    # ASP.NET Core Web API
β”‚   β”œβ”€β”€ Controllers/                  # ProcessesController, QueuesController
β”‚   β”œβ”€β”€ Consumers/                    # ProcessEventConsumer, CancelCommandConsumer, CompensationConsumer
β”‚   β”œβ”€β”€ Hubs/                         # MonitorHub (SignalR)
β”‚   └── Services/                     # QueueStatsBackgroundService
β”œβ”€β”€ examples/                         # Example workers and tools
β”‚   β”œβ”€β”€ Dockerfile                    # Shared multi-stage Dockerfile
β”‚   β”œβ”€β”€ MqMonitor.Producer/           # CLI tool for sending test processes
β”‚   β”œβ”€β”€ MqMonitor.Example.ReportWorker/
β”‚   β”œβ”€β”€ MqMonitor.Example.AccountWorker/
β”‚   β”œβ”€β”€ MqMonitor.Example.RoutineWorker/
β”‚   β”œβ”€β”€ MqMonitor.Example.PaymentWorker/
β”‚   β”œβ”€β”€ MqMonitor.Example.NotificationWorker/
β”‚   └── MqMonitor.Example.AuditWorker/
β”œβ”€β”€ mqmonitor-app/                    # React frontend
β”‚   β”œβ”€β”€ src/
β”‚   β”‚   β”œβ”€β”€ components/               # kanban/, process/, queue/, table/, ui/
β”‚   β”‚   β”œβ”€β”€ contexts/                 # ProcessContext, QueueContext
β”‚   β”‚   β”œβ”€β”€ hooks/                    # useProcess, useQueue
β”‚   β”‚   β”œβ”€β”€ pages/                    # DashboardPage, ProcessPage
β”‚   β”‚   β”œβ”€β”€ services/                 # processService, queueService
β”‚   β”‚   └── types/                    # TypeScript interfaces
β”‚   └── Dockerfile                    # Node build + Nginx serve
β”œβ”€β”€ scripts/                          # init-db.sql
β”œβ”€β”€ docker-compose.yml                # Full stack orchestration
β”œβ”€β”€ .env.example                      # Environment template
└── mqMonitor.sln                     # .NET solution file

Dependency graph

MqMonitor.DTO (no deps)
    └── MqMonitor.Domain
        └── MqMonitor.Infra.Interfaces
            └── MqMonitor.Infra
                └── MqMonitor.Application
                    β”œβ”€β”€ MqMonitor.API
                    └── examples/
                        β”œβ”€β”€ MqMonitor.Producer
                        └── MqMonitor.Example.*

βš™οΈ Environment Configuration

1. Copy the environment template

cp .env.example .env

2. Edit the .env file

# PostgreSQL
POSTGRES_DB=process_monitor
POSTGRES_USER=monitor
POSTGRES_PASSWORD=your_secure_password_here
POSTGRES_PORT=5432

# RabbitMQ
RABBITMQ_DEFAULT_USER=guest
RABBITMQ_DEFAULT_PASS=your_secure_password_here
RABBITMQ_PORT=5672
RABBITMQ_MANAGEMENT_PORT=15672

# Monitor API
MONITOR_API_PORT=5000
ASPNETCORE_ENVIRONMENT=Development

# Frontend
FRONTEND_PORT=3000

IMPORTANT: Never commit the .env file with real credentials. Only the .env.example should be version controlled.


🐳 Docker Setup

Quick Start

# 1. Copy and configure environment
cp .env.example .env

# 2. Build and start all services
docker compose up -d --build

# 3. Verify all containers are running
docker compose ps

Accessing Services

Service URL Description
Frontend http://localhost:3000 React monitoring dashboard
API http://localhost:5000 REST API + Swagger
Swagger http://localhost:5000/swagger API documentation
RabbitMQ Management http://localhost:15672 Queue management UI
SignalR Hub ws://localhost:5000/hubs/monitor Real-time WebSocket

Docker Compose Commands

Action Command
Start all services docker compose up -d
Start with rebuild docker compose up -d --build
Stop all services docker compose stop
View status docker compose ps
View logs (all) docker compose logs -f
View API logs docker compose logs -f monitor
View worker logs docker compose logs -f report-worker
Remove containers docker compose down
Remove containers + volumes docker compose down -v

πŸ”§ Manual Setup (Without Docker)

Prerequisites

  • .NET 8 SDK
  • Node.js 20+
  • PostgreSQL 16
  • RabbitMQ 3.x with Management plugin

Backend

# 1. Restore and build
dotnet restore mqMonitor.sln
dotnet build mqMonitor.sln

# 2. Run database migrations (from API project)
cd MqMonitor.API
dotnet ef database update

# 3. Start the API (includes monitor, cancel, and compensation consumers)
dotnet run --project MqMonitor.API

# 4. Start example workers (one terminal per worker)
dotnet run --project examples/MqMonitor.Example.ReportWorker
dotnet run --project examples/MqMonitor.Example.AccountWorker
dotnet run --project examples/MqMonitor.Example.RoutineWorker
dotnet run --project examples/MqMonitor.Example.PaymentWorker
dotnet run --project examples/MqMonitor.Example.NotificationWorker
dotnet run --project examples/MqMonitor.Example.AuditWorker

Frontend

cd mqmonitor-app
npm install
npm run dev

Frontend available at http://localhost:5173

Producer CLI

dotnet run --project examples/MqMonitor.Producer
=== Process Producer (Pipeline) ===
Commands:
  send <stage> [count] [priority] - Send processes to a pipeline stage
  stages                          - List available stages
  quit                            - Exit

> send report 5        # Send 5 processes starting at Report stage
> send account 3 8     # Send 3 processes to Account with priority 8
> stages               # List all configured pipeline stages

πŸ“š API Documentation

Full Swagger documentation available at /swagger when running in Development mode.

Process Endpoints

Method Endpoint Description
GET /api/processes List all processes (filter: ?stage=X or ?status=Y)
POST /api/processes Create a new process
GET /api/processes/{id} Get process details
GET /api/processes/{id}/events Get event history
GET /api/processes/{id}/saga Get saga step timeline
PUT /api/processes/{id}/priority Update process priority
POST /api/processes/{id}/cancel Cancel a running process
GET /api/processes/metrics Get execution metrics

Queue Endpoints

Method Endpoint Description
GET /api/queues Get all queue stats
GET /api/queues/{name} Get specific queue stats
GET /api/queues/pipeline Get pipeline overview
GET /api/queues/stages Get configured stages

SignalR Hub

Connect to /hubs/monitor for real-time updates.

Method Description
SubscribeToAll() Receive all process updates
SubscribeToProcess(processId) Receive updates for a specific process
SubscribeToQueue(queueName) Receive queue stats for a specific queue
Server Event Payload Description
ProcessUpdated ProcessExecutionInfo Process state changed
QueueStatsUpdated QueueStatusInfo[] Queue stats refreshed (every 5s)

πŸ—οΈ Architecture

Event-Driven Pipeline

The system implements an event-driven architecture where each process flows through configurable pipeline stages:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Report  β”‚ β†’  β”‚ Account  β”‚ β†’  β”‚ Routine  β”‚ β†’  β”‚ Payment  β”‚ β†’  β”‚  Notif.  β”‚ β†’  β”‚  Audit   β”‚
β”‚  Worker  β”‚    β”‚  Worker  β”‚    β”‚  Worker  β”‚    β”‚  Worker  β”‚    β”‚  Worker  β”‚    β”‚  Worker  β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
     β”‚               β”‚               β”‚               β”‚               β”‚               β”‚
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                          β”‚
                                    events exchange
                                    (process.#)
                                          β”‚
                                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                β”‚  Monitor API       β”‚
                                β”‚  ProcessEvent      β”‚  β†’ PostgreSQL (read model)
                                β”‚  Consumer          β”‚  β†’ SignalR (real-time push)
                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Saga Pattern (Choreography)

Each process execution tracks its saga steps. On failure, the system automatically compensates completed stages in reverse order:

Normal flow:      Report βœ“ β†’ Account βœ“ β†’ Routine βœ“ β†’ Payment βœ— (FAILED)
Compensation:     Routine ← Account ← Report    (reverse order)
Final state:      All completed steps marked as COMPENSATED

RabbitMQ Topology

Exchange Type Purpose
processes.events Topic Process lifecycle events (process.#)
processes.commands Topic Control commands (cancel.process)
processes.pipeline Topic Stage-to-stage routing (pipeline.<stage>)
processes.dlx Topic Dead Letter Exchange
Queue Binding Consumer
processes.monitor process.# on events exchange ProcessEventConsumer (API)
processes.cancel cancel.process on commands exchange CancelCommandConsumer (API)
processes.compensation process.compensating on events exchange CompensationConsumer (API)
processes.<stage> pipeline.<stage> on pipeline exchange Stage Worker
processes.<stage>.dlq pipeline.<stage>.# on DLX None (investigation)
processes.<stage>.retry TTL routes back to pipeline exchange Auto (RabbitMQ TTL)

Design Patterns

Pattern Implementation
CQRS Write via RabbitMQ events, read via projected PostgreSQL model
Event Sourcing (append-only log) event_logs table stores all raw events
Saga (Choreography) Workers decide flow; compensation in reverse step order
Idempotent Consumer Deduplication by EventId in EventProjectionService
Competing Consumers Multiple worker instances can share the same stage queue
Dead Letter Queue Failed messages routed to per-stage DLQ via DLX
Retry with TTL Per-stage retry queue with configurable delay, auto-routes back
Priority Queue RabbitMQ x-max-priority on all pipeline queues

πŸ’Ύ Database

Tables

Table PK Purpose
process_executions process_id Current state of each process (read model)
event_logs event_id Append-only log of all events (event store)
saga_steps step_id Saga step tracking per process

Backup

docker compose exec postgres pg_dump -U monitor process_monitor > backup.sql

Restore

docker compose exec -T postgres psql -U monitor process_monitor < backup.sql

πŸ” Troubleshooting

Worker not connecting to queue

Check:

docker compose logs -f <worker-name>

Common causes:

  • RabbitMQ not healthy yet β€” workers depend on service_healthy condition
  • Stage name in worker doesn't match appsettings.json configuration
  • Environment variables RabbitMq__HostName and credentials not set

Multiple consumers on the same queue

Check: RabbitMQ Management UI > Queues > Consumers column

Common causes:

  • Old Docker containers still running: docker ps -a | grep mqmonitor
  • Fix: docker compose down && docker compose up -d --build

Process stuck in STARTED status

Common causes:

  • Worker crashed during processing β€” message was not acknowledged
  • Fix: Check worker logs, message will be redelivered after worker restart

Frontend not receiving updates

Check: Browser DevTools > Console for SignalR connection errors

Common causes:

  • API not running or CORS not configured for frontend URL
  • SignalR connection failed β€” check ws://localhost:5000/hubs/monitor

🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Development Setup

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/AmazingFeature)
  3. Make your changes
  4. Build the solution (dotnet build mqMonitor.sln)
  5. Commit your changes (git commit -m 'Add some AmazingFeature')
  6. Push to the branch (git push origin feature/AmazingFeature)
  7. Open a Pull Request

Coding Standards

  • Follow Clean Architecture dependency rules (never reference inward layers from outward)
  • Domain entities use private setters with factory methods (Create, Reconstruct)
  • All RabbitMQ constants defined in RabbitMqConstants.cs
  • AutoMapper profiles: 2 per entity (EF ↔ Domain, Domain ↔ DTO)
  • Database: snake_case table/column names via EF conventions

πŸ‘¨β€πŸ’» Author

Developed by Rodrigo Landim Carneiro


πŸ“„ License

This project is licensed under the MIT License β€” see the LICENSE file for details.


πŸ™ Acknowledgments


If you find this project useful, please consider giving it a star!

About

Proccess monitor and manager using rabbit mq

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors