diff --git a/APIM-Policy/.gitignore b/APIM-Policy/.gitignore new file mode 100644 index 00000000..ace0ad02 --- /dev/null +++ b/APIM-Policy/.gitignore @@ -0,0 +1,2 @@ +# Ignore all files in the 'untracked' directory +untracked/ \ No newline at end of file diff --git a/docs/BACKEND_HOSTS.md b/docs/BACKEND_HOSTS.md index 7507c531..474843a1 100644 --- a/docs/BACKEND_HOSTS.md +++ b/docs/BACKEND_HOSTS.md @@ -90,3 +90,52 @@ IP1="10.0.1.5" ## Mixing Methods You can mix methods across different hosts (e.g., `Host1` uses a connection string, `Host2` uses the legacy format), but you should not mix definitions for the *same* host number. If `Host1` is a connection string, `Probe_path1` and `IP1` will be ignored. + +--- + +## Path-Based Routing + +The `path` parameter in the connection string controls which requests are routed to each host. + +### How Path Matching Works + +1. **Specific paths take precedence**: Hosts with explicit paths (e.g., `/api/v1`) are matched before catch-all hosts. +2. **Path prefix is stripped**: When forwarding to a matched host, the matching prefix is removed from the request path. +3. **Catch-all fallback**: Hosts with `path=/` or no path specified handle requests that don't match any specific path. + +### Path Matching Examples + +**Configuration:** +```bash +Host1="host=https://chat-service.internal;path=/chat" +Host2="host=https://embed-service.internal;path=/embeddings" +Host3="host=https://default-service.internal;path=/" +``` + +**Request Routing:** + +| Incoming Request | Matched Host | Forwarded Path | +|-----------------|--------------|----------------| +| `GET /chat/completions` | Host1 | `GET /completions` | +| `POST /embeddings/create` | Host2 | `POST /create` | +| `GET /models` | Host3 | `GET /models` | +| `GET /chat` | Host1 | `GET /` | + +### Path Configuration Options + +| Path Value | Behavior | +|------------|----------| +| `/api/v1` | Matches requests starting with `/api/v1`, strips prefix | +| `/api/v1/*` | Same as above (wildcard is implicit) | +| `/` | Catch-all, matches any path, no stripping | +| `/*` | Same as `/` | +| (empty) | Same as `/` | + +### Best Practices + +1. **Use specific paths for service isolation**: Route different AI models or API versions to dedicated backends. +2. **Always have a catch-all**: Include at least one host with `path=/` to handle unexpected routes. +3. **Avoid overlapping paths**: If you have `/api` and `/api/v1`, the more specific path (`/api/v1`) should be tried first. + +See [LOAD_BALANCING.md](LOAD_BALANCING.md) for details on how hosts are selected after path filtering. + diff --git a/docs/CIRCUIT_BREAKER.md b/docs/CIRCUIT_BREAKER.md index 5b38ebbd..74403a5a 100644 --- a/docs/CIRCUIT_BREAKER.md +++ b/docs/CIRCUIT_BREAKER.md @@ -29,4 +29,46 @@ Control the sensitivity of the circuit breaker using these environment variables * **Tolerant**: Set `CBErrorThreshold=100`. Useful for "flaky" non-critical backends where you strictly prefer retries over disabling the host. ## Global Safety Net -The proxy monitors the state of **all** circuit breakers. If **all** configured backends are tripped (meaning the entire backend tier is down), the proxy may enter a fail-safe mode or return a `503 Service Unavailable` to the client immediately, protecting the proxy itself from resource exhaustion. + +The proxy monitors the state of **all** circuit breakers. If **all** configured backends are tripped (meaning the entire backend tier is down), the proxy returns a `503 Service Unavailable` to the client immediately, protecting the proxy itself from resource exhaustion. + +--- + +## Integration with Load Balancing + +The circuit breaker is checked **per-host** during the backend selection loop. This means: + +1. **A single tripped host doesn't block the request** - the proxy simply skips to the next host in the iterator. +2. **Healthy hosts continue receiving traffic** - only the failing host is isolated. +3. **Automatic recovery** - as the circuit closes, traffic resumes without manual intervention. + +### Request Flow with Circuit Breaker + +``` +FOR EACH HOST in load balancer: + │ + ├─ CheckFailedStatus() ──[OPEN]──► SKIP (log and continue to next host) + │ + └─[CLOSED]──► Send request to host + │ + ├─[Success]──► Return response ✓ + │ + └─[Failure]──► Record failure, try next host + (may trip circuit if threshold exceeded) +``` + +### Example Scenario + +``` +Hosts: [A, B, C] +Circuit Breaker Status: A=OPEN, B=CLOSED, C=CLOSED + +Request arrives: + 1. Iterator selects Host A → Circuit OPEN → SKIP + 2. Iterator selects Host B → Circuit CLOSED → Send request → 200 OK ✓ + +Result: Request succeeds despite Host A being unhealthy +``` + +See [LOAD_BALANCING.md](LOAD_BALANCING.md) for details on how hosts are selected and iterated. + diff --git a/docs/CONFIGURATION_SETTINGS.md b/docs/CONFIGURATION_SETTINGS.md new file mode 100644 index 00000000..1bc94af9 --- /dev/null +++ b/docs/CONFIGURATION_SETTINGS.md @@ -0,0 +1,222 @@ +# BackendOptions Settings - Organized by Restart Requirement + +## Legend + +| Tag | Description | +|-----|-------------| +| **[WARM]** | Hot-reloadable (read per-request or periodically refreshed) | +| **[DRAIN]** | Requires draining all workers (stop accepting, wait for in-flight to complete) | +| **[COLD]** | Requires cold restart (read once at startup, configures DI/infrastructure) | +| **[PARTIAL]** | Mixed - some settings WARM, others COLD/DRAIN | + +--- + +## [WARM] Settings - Can be changed without restart + +### Async (per-request settings) + +| Setting | Property Name | +|---------|---------------| +| Timeout | `AsyncTimeout` | +| TTLSecs | `AsyncTTLSecs` | +| TriggerTimeout | `AsyncTriggerTimeout` | +| ClientRequestHeader | `AsyncClientRequestHeader` | +| ClientConfigFieldName | `AsyncClientConfigFieldName` | + +### Logging - Read per-request or per-event + +| Setting | Property Name | +|---------|---------------| +| Console | `LogConsole` | +| ConsoleEvent | `LogConsoleEvent` | +| Poller | `LogPoller` | +| Probes | `LogProbes` | +| Headers | `LogHeaders` | +| AllRequestHeaders | `LogAllRequestHeaders` | +| AllRequestHeadersExcept | `LogAllRequestHeadersExcept` | +| AllResponseHeaders | `LogAllResponseHeaders` | +| AllResponseHeadersExcept | `LogAllResponseHeadersExcept` | + +### Request - Read per-request + +| Setting | Property Name | +|---------|---------------| +| MaxAttempts | `MaxAttempts` | +| TimeoutHeader | `TimeoutHeader` | +| TTLHeader | `TTLHeader` | +| DefaultTTLSecs | `DefaultTTLSecs` | +| RequiredHeaders | `RequiredHeaders` | +| StripHeaders | `StripRequestHeaders` | +| DisallowedHeaders | `DisallowedHeaders` | +| DependencyHeaders | `DependancyHeaders` | + +### Response - Read per-response + +| Setting | Property Name | +|---------|---------------| +| StripHeaders | `StripResponseHeaders` | + +### StatusCodes - Read per-response + +| Setting | Property Name | +|---------|---------------| +| Acceptable | `AcceptableStatusCodes` | + +### Validation - Read per-request + +| Setting | Property Name | +|---------|---------------| +| Headers | `ValidateHeaders` | +| AuthAppID.Enabled | `ValidateAuthAppID` | +| AuthAppID.Url | `ValidateAuthAppIDUrl` | +| AuthAppID.FieldName | `ValidateAuthAppFieldName` | +| AuthAppID.Header | `ValidateAuthAppIDHeader` | + +### Server (metadata only) + +| Setting | Property Name | +|---------|---------------| +| IDStr | `IDStr` | +| ContainerApp | `ContainerApp` | +| Revision | `Revision` | + +--- + +## [DRAIN] Settings - Require stopping all workers before restart + +> ⚠️ These settings affect shared state, external connections, or would cause inconsistency during rolling update. Drain all in-flight requests before changing. + +### Async - Switching modes or connections with in-flight requests causes data loss + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| Enabled | `AsyncModeEnabled` | Mode switch with in-flight requests | +| BlobStorage.ConnectionString | `AsyncBlobStorageConnectionString` | Connection change with pending writes | +| BlobStorage.UseMI | `AsyncBlobStorageUseMI` | Auth change with pending writes | +| BlobStorage.AccountUri | `AsyncBlobStorageAccountUri` | Connection change with pending writes | +| ServiceBus.ConnectionString | `AsyncSBConnectionString` | Connection change with pending messages | +| ServiceBus.Queue | `AsyncSBQueue` | Queue change with pending messages | +| ServiceBus.UseMI | `AsyncSBUseMI` | Auth change with pending messages | +| ServiceBus.Namespace | `AsyncSBNamespace` | Namespace change with pending messages | + +### Hosts - Changing backends with in-flight requests causes routing errors + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| Hosts | `Hosts` | Backend routing changes | + +### LoadBalancing - Changing strategy mid-flight causes uneven distribution + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| Mode | `LoadBalanceMode` | Strategy change mid-flight | +| IterationMode | `IterationMode` | Iterator behavior change | +| UseSharedIterators | `UseSharedIterators` | State inconsistency with active iterators | + +### OAuth - Changing auth mid-flight causes 401s on in-flight requests + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| Enabled | `UseOAuth` | Auth change mid-flight | +| UseGov | `UseOAuthGov` | Endpoint change mid-flight | +| Audience | `OAuthAudience` | Token audience change | + +### Server - Infrastructure changes with active queue + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| Port | `Port` | Listener stop required | +| Workers | `Workers` | Worker count with active queue | +| MaxQueueLength | `MaxQueueLength` | Queue resize with pending requests | + +### Storage - Storage changes with pending writes = data loss + +| Setting | Property Name | Reason | +|---------|---------------|--------| +| DbEnabled | `StorageDbEnabled` | Toggling with pending writes | +| DbContainerName | `StorageDbContainerName` | Container change with pending writes | + +--- + +## [COLD] Settings - Require restart but can use rolling update + +### Async.BlobStorage + +| Setting | Property Name | +|---------|---------------| +| WorkerCount | `AsyncBlobWorkerCount` | + +### CircuitBreaker - Configured at startup + +| Setting | Property Name | +|---------|---------------| +| ErrorThreshold | `CircuitBreakerErrorThreshold` | +| Timeslice | `CircuitBreakerTimeslice` | + +### HealthProbe - Timer and sidecar client created at startup + +| Setting | Property Name | +|---------|---------------| +| Sidecar | `HealthProbeSidecar` | +| SidecarEnabled | `HealthProbeSidecarEnabled` | +| SidecarUrl | `HealthProbeSidecarUrl` | + +### Hosts + +| Setting | Property Name | +|---------|---------------| +| HostName | `HostName` | + +### LoadBalancing.SharedIterator + +| Setting | Property Name | +|---------|---------------| +| TTLSeconds | `SharedIteratorTTLSeconds` | +| CleanupIntervalSeconds | `SharedIteratorCleanupIntervalSeconds` | + +### Polling - Poller timer configured at startup + +| Setting | Property Name | +|---------|---------------| +| Interval | `PollInterval` | +| Timeout | `PollTimeout` | +| SuccessRate | `SuccessRate` | + +### Request + +| Setting | Property Name | +|---------|---------------| +| Timeout | `Timeout` (HttpClient timeout) | + +### Server + +| Setting | Property Name | +|---------|---------------| +| TerminationGracePeriodSeconds | `TerminationGracePeriodSeconds` | +| TrackWorkers | `TrackWorkers` | + +--- + +## [PARTIAL] Settings - Mixed restart requirements + +### Priority + +| Setting | Property Name | Requirement | +|---------|---------------|-------------| +| Default | `DefaultPriority` | [WARM] | +| KeyHeader | `PriorityKeyHeader` | [WARM] | +| Keys | `PriorityKeys` | [WARM] | +| Values | `PriorityValues` | [WARM] | +| Workers | `PriorityWorkers` | [DRAIN] | + +### User + +| Setting | Property Name | Requirement | +|---------|---------------|-------------| +| IDFieldName | `UserIDFieldName` | [WARM] | +| ProfileHeader | `UserProfileHeader` | [WARM] | +| ConfigUrl | `UserConfigUrl` | [WARM] | +| PriorityThreshold | `UserPriorityThreshold` | [WARM] | +| UniqueHeaders | `UniqueUserHeaders` | [WARM] | +| SuspendedConfigUrl | `SuspendedUserConfigUrl` | [WARM] | +| UseProfiles | `UseProfiles` | [COLD] | diff --git a/docs/LOAD_BALANCING.md b/docs/LOAD_BALANCING.md new file mode 100644 index 00000000..a0576adb --- /dev/null +++ b/docs/LOAD_BALANCING.md @@ -0,0 +1,238 @@ +# Load Balancing & Backend Selection + +SimpleL7Proxy uses a sophisticated multi-stage algorithm to select the optimal backend for each request. This document explains how backends are chosen, filtered, and iterated. + +## Algorithm Overview + +``` +REQUEST ARRIVES + │ + ▼ +┌──────────────────────────┐ +│ 1. Filter hosts by path │ → Specific path hosts OR catch-all hosts +└──────────────────────────┘ + │ + ▼ +┌──────────────────────────┐ +│ 2. Create Iterator │ → RoundRobin / Latency / Random +│ (LoadBalanceMode) │ +└──────────────────────────┘ + │ + ▼ +┌──────────────────────────┐ +│ 3. FOR EACH HOST: │ +│ ├─ Circuit breaker OK?│ → Skip if OPEN +│ ├─ TTL not expired? │ → 412 if expired +│ ├─ Send request │ +│ └─ Success? → RETURN │ +└──────────────────────────┘ + │ + ▼ (all hosts failed) +┌──────────────────────────┐ +│ 429s collected? → Requeue│ +│ Else → 503 Service │ +│ Unavailable │ +└──────────────────────────┘ +``` + +--- + +## Stage 1: Path-Based Host Filtering + +Before load balancing, hosts are filtered based on the request path. The proxy maintains two categories of hosts: + +| Category | Description | Example Path | +|----------|-------------|--------------| +| **Specific Path Hosts** | Hosts with explicit path prefixes | `/api/v1/*`, `/chat/*`, `/embeddings` | +| **Catch-All Hosts** | Hosts that handle any path | `/` or `/*` | + +### Matching Rules + +1. **Specific paths take precedence**: If any host's path matches the request, only those hosts are used. +2. **Path prefix is stripped**: When forwarding to a matched host, the matching prefix is removed from the request path. +3. **Catch-all fallback**: If no specific path matches, catch-all hosts are used with the original path. + +### Example + +``` +Configured Hosts: + Host1: path=/api/v1 → https://api-v1.internal + Host2: path=/api/v2 → https://api-v2.internal + Host3: path=/ → https://default.internal + +Request: GET /api/v1/users/123 + +Result: + - Matches Host1 (specific path /api/v1) + - Forwarded as: GET /users/123 to https://api-v1.internal + - Host2 and Host3 are NOT considered +``` + +--- + +## Stage 2: Load Balance Mode + +Once hosts are filtered, an iterator is created based on the configured `LoadBalanceMode`. + +### Available Modes + +| Mode | Environment Variable | Behavior | +|------|---------------------|----------| +| **Round Robin** | `LoadBalanceMode=roundrobin` | Uses a **global counter** shared across all workers. Each request gets the "next" host, ensuring fair distribution. | +| **Latency** | `LoadBalanceMode=latency` | Hosts are **sorted by average latency** (lowest first). Fastest hosts are tried first. | +| **Random** | `LoadBalanceMode=random` | Hosts are **shuffled randomly** for each request. All hosts are tried but in unpredictable order. | + +### Configuration + +```bash +# Default is random +LoadBalanceMode=latency +``` + +### When to Use Each Mode + +| Scenario | Recommended Mode | +|----------|------------------| +| All backends have equal capacity | `roundrobin` | +| Backends have different response times | `latency` | +| Want to avoid predictable patterns | `random` | +| Testing/debugging specific hosts | `roundrobin` with single host | + +--- + +## Stage 3: Iteration Mode + +The iteration mode controls how many times the proxy attempts to reach backends before giving up. + +| Mode | Environment Variable | Behavior | +|------|---------------------|----------| +| **SinglePass** | `IterationMode=SinglePass` | Try each matching host **once**. If all fail → error. | +| **MultiPass** | `IterationMode=MultiPass` | Retry across all hosts up to `MaxAttempts` total. Will cycle through hosts multiple times. | + +### Configuration + +```bash +IterationMode=SinglePass +MaxAttempts=30 # Only used in MultiPass mode +``` + +### Example: MultiPass with 3 Hosts + +``` +Hosts: [A, B, C] +MaxAttempts: 7 + +Attempt 1: Host A → 503 (fail) +Attempt 2: Host B → 503 (fail) +Attempt 3: Host C → 503 (fail) +Attempt 4: Host A → 503 (fail) # Second pass begins +Attempt 5: Host B → 503 (fail) +Attempt 6: Host C → 503 (fail) +Attempt 7: Host A → 200 (success!) ✓ +``` + +--- + +## Stage 4: Shared vs Per-Request Iterators + +Control whether concurrent requests share iterator state or each get their own. + +| Setting | Behavior | +|---------|----------| +| `UseSharedIterators=false` (default) | Each request gets its **own iterator**. Simple but may cause uneven distribution under high concurrency. | +| `UseSharedIterators=true` | Requests to the **same path** share an iterator. Ensures fair distribution across concurrent requests. | + +### When to Use Shared Iterators + +- **High concurrency**: Many simultaneous requests to the same path +- **Fair distribution required**: Need to ensure all backends get equal traffic +- **Round-robin mode**: Most beneficial when combined with `roundrobin` + +### Configuration + +```bash +UseSharedIterators=true +SharedIteratorTTLSeconds=300 # How long to keep unused iterators +SharedIteratorCleanupIntervalSeconds=60 # Cleanup frequency +``` + +--- + +## Stage 5: Per-Host Circuit Breaker Check + +Before sending a request to each host, the circuit breaker status is checked. + +``` +FOR EACH HOST in iterator: + └─ CheckFailedStatus() ──[OPEN]──► SKIP (continue to next host) + └─[CLOSED]──► Proceed with request +``` + +- **OPEN circuit**: Host is skipped immediately, no request sent +- **CLOSED circuit**: Request is attempted +- **All circuits OPEN**: Returns `503 Service Unavailable` + +See [CIRCUIT_BREAKER.md](CIRCUIT_BREAKER.md) for detailed circuit breaker configuration. + +--- + +## Response Handling + +After sending a request, the response determines the next action: + +| Response | Action | +|----------|--------| +| `2xx` (Success) | Return response to client ✓ | +| `3xx`, `404`, `5xx` | Try next host | +| `429` with `S7PREQUEUE` header | Collect for potential requeue, try next host | +| `412` (Precondition Failed) | Request TTL expired, stop iteration | + +### Requeue Behavior + +If all hosts return `429` with the `S7PREQUEUE` header, the request is requeued with a delay based on the shortest `retry-after` value. + +--- + +## Monitoring & Diagnostics + +### Logging + +Enable debug logging to see backend selection: + +```bash +LogHeaders=true +``` + +Log output includes: +- Which hosts matched the path +- Which host was selected +- Circuit breaker status for skipped hosts +- Attempt count and duration + +### Metrics + +Key metrics to monitor: +- `BackendAttempts`: Number of hosts tried per request +- `Backend-Host`: Which host ultimately served the request +- `Total-Latency`: End-to-end request duration + +--- + +## Configuration Summary + +| Variable | Default | Description | +|----------|---------|-------------| +| `LoadBalanceMode` | `random` | Algorithm: `roundrobin`, `latency`, or `random` | +| `IterationMode` | `SinglePass` | Retry strategy: `SinglePass` or `MultiPass` | +| `MaxAttempts` | `30` | Max total attempts (MultiPass only) | +| `UseSharedIterators` | `false` | Share iterators across concurrent requests | +| `SharedIteratorTTLSeconds` | `300` | TTL for unused shared iterators | +| `SharedIteratorCleanupIntervalSeconds` | `60` | Cleanup interval for expired iterators | + +--- + +## Related Documentation + +- [BACKEND_HOSTS.md](BACKEND_HOSTS.md) - Host configuration and connection strings +- [CIRCUIT_BREAKER.md](CIRCUIT_BREAKER.md) - Circuit breaker configuration +- [CONFIGURATION_SETTINGS.md](CONFIGURATION_SETTINGS.md) - All configuration options diff --git a/docs/design.md b/docs/design.md index ba506822..7dabfd58 100644 --- a/docs/design.md +++ b/docs/design.md @@ -1,12 +1,47 @@ This document serves as the high level description of the source code. -Overview: - -The service receives incoming requests and retransmits it to the lowest latency backend. If the backend fails, the service will retry the request against the next lowest latency backend. When deployed in Azure Container Apps, it can be setup with auto-scale to allow it handle larger volumes. The service utilizes the DNS record for each backend host, however in the case that DNS is not available, it can be configured with the hostname and IP address combination as well. Details for each request can be logged to application insights or an EventHub endpoint. When configured with Oauth2, the service will obtain a token for the backend requests. - - - -Libraries used: +## Overview + +SimpleL7Proxy is a high-performance Layer 7 reverse proxy with intelligent backend selection. It receives incoming requests, queues them by priority, and routes them to healthy backends using configurable load balancing strategies. + +### Key Capabilities + +- **Multi-mode Load Balancing**: Supports round-robin, latency-based, and random backend selection +- **Path-Based Routing**: Routes requests to specific backends based on URL path matching +- **Circuit Breaker**: Automatically stops traffic to failing backends and self-heals +- **Priority Queuing**: Higher priority requests are processed before lower priority ones +- **Retry Logic**: Automatically retries failed requests against alternate backends +- **Async Mode**: Supports long-running requests via Azure Service Bus notifications + +### Request Flow + +``` +┌─────────────┐ ┌──────────────┐ ┌────────────────┐ ┌─────────────┐ +│ Client │───►│ Server.cs │───►│ Priority Queue │───►│ ProxyWorker │ +│ │ │ (Listener) │ │ │ │ │ +└─────────────┘ └──────────────┘ └────────────────┘ └──────┬──────┘ + │ + ┌────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ Backend Selection (IteratorFactory) │ +│ 1. Filter hosts by request path (specific vs catch-all) │ +│ 2. Create iterator based on LoadBalanceMode (roundrobin/latency/random) │ +│ 3. For each host: check circuit breaker → send request → handle response │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ + ┌───────────────┐ + │ Backend Hosts │ + └───────────────┘ +``` + +When deployed in Azure Container Apps, it can be set up with auto-scale to handle larger volumes. The service utilizes DNS records for each backend host; however, if DNS is not available, it can be configured with hostname and IP address combinations. Details for each request can be logged to Application Insights or an EventHub endpoint. When configured with OAuth2, the service will obtain a token for backend requests. + +--- + +## Libraries Used Purpose: Authenticate to Azure and acquire Oauth2 tokens. * Azure.Core: Provides essential classes and utilities for working with Azure services, including HTTP pipeline configuration, retry policies, and diagnostics. @@ -32,22 +67,50 @@ Purpose: Provide .NET service and dependency injection. * Microsoft.Extensions.Options: Supports the configuration and management of options and settings in your application, including support for validating and reloading options. -Main Code flow: +## Main Code Flow + +| File | Purpose | +|------|---------| +| `Program.cs` | Startup, read configuration, setup .NET DI container | +| `Server.cs` | Listens for incoming requests and inserts into the priority queue | +| `Backends.cs` | Measures latencies to backends, manages host health and circuit breakers | +| `ProxyWorker.cs` | Pulls messages from the priority queue and proxies to backends | + +### Backend Selection Components + +| File | Purpose | +|------|---------| +| `IteratorFactory.cs` | Creates load-balanced iterators based on configuration | +| `RoundRobinHostIterator.cs` | Distributes requests evenly using a global counter | +| `LatencyHostIterator.cs` | Orders hosts by average response latency | +| `RandomHostIterator.cs` | Shuffles hosts randomly for each request | +| `SharedIteratorRegistry.cs` | Manages shared iterators for concurrent requests | +| `CircuitBreaker.cs` | Tracks failures and trips circuits for unhealthy hosts | + +## Helpers + +| File | Purpose | +|------|---------| +| `AppInsightsTextWriter.cs` | Sends data to console and Application Insights | +| `EventHubClient.cs` | Sends messages to EventHub | +| `PriorityQueue.cs` | Implements priority queue (highest priority exits first) | + +## Runtime Objects + +| File | Purpose | +|------|---------| +| `BaseHostHealth.cs` | Represents a single backend with health metrics | +| `RequestData.cs` | Data received for an incoming request | +| `ProxyData.cs` | Represents a response received from a backend | + +--- -Program.cs : Startup , read configuration, setup the .NET framework service -Server.cs : listens for incoming requests and inserts into the priority queue -backends.cs : measures the latencies to the backends and orders the hosts based on success rate. -proxyWorker.cs: pulls a message from the priority queue and proxies the request to the best backend +## Related Documentation -Helpers: -AppInsightsTextWriter.cs : sends 1 copy of the data to the console and another copy to app insights -EventHubClient.cs : sends a message into the eventhub -PriorityQueue: implements a priority queue, highest priority exits first +- [LOAD_BALANCING.md](LOAD_BALANCING.md) - Detailed backend selection algorithm +- [CIRCUIT_BREAKER.md](CIRCUIT_BREAKER.md) - Circuit breaker configuration +- [BACKEND_HOSTS.md](BACKEND_HOSTS.md) - Host configuration options -Runtime Objects: -Backend.cs : represents a single backend -RequestData.cs : data received for an incoming request -ProxyData.cs : represents a response received from a backend diff --git a/src/SimpleL7Proxy/Backend/Backends.cs b/src/SimpleL7Proxy/Backend/Backends.cs index f2a2d957..4fb2be83 100644 --- a/src/SimpleL7Proxy/Backend/Backends.cs +++ b/src/SimpleL7Proxy/Backend/Backends.cs @@ -384,7 +384,7 @@ private async Task GetHostStatus(BaseHostHealth host, HttpClient client) private void FilterActiveHosts() { var newActiveHosts = _backendHosts - .Where(h => h.SuccessRate() > _successRate) + .Where(h => h.SuccessRate() >= _successRate) .Select(h => { h.CalculatedAverageLatency = h.AverageLatency(); @@ -447,7 +447,7 @@ private void DisplayHostStatus() if (_backendHosts != null) foreach (var host in _backendHosts.OrderBy(h => h.AverageLatency())) { - statusIndicator = host.SuccessRate() > _successRate ? "Good " : "Errors"; + statusIndicator = host.SuccessRate() >= _successRate ? "Good " : "Errors"; var roundedLatency = Math.Round(host.AverageLatency(), 3); var successRatePercentage = Math.Round(host.SuccessRate() * 100, 2); var hoststatus = host.GetStatus(out int calls, out int errors, out double average); diff --git a/src/SimpleL7Proxy/Backend/BaseHostHealth.cs b/src/SimpleL7Proxy/Backend/BaseHostHealth.cs index 884d86af..e3df7042 100644 --- a/src/SimpleL7Proxy/Backend/BaseHostHealth.cs +++ b/src/SimpleL7Proxy/Backend/BaseHostHealth.cs @@ -34,7 +34,7 @@ protected BaseHostHealth(HostConfig config, ILogger logger) public override string ToString() { - return $"{Protocol}://{Host}:{Port}"; + return $"{Protocol}://{Hostname}:{Port}"; } #region Runtime Performance Tracking diff --git a/src/SimpleL7Proxy/Backend/HostConfig.cs b/src/SimpleL7Proxy/Backend/HostConfig.cs index 75d8e490..6b6b0a87 100644 --- a/src/SimpleL7Proxy/Backend/HostConfig.cs +++ b/src/SimpleL7Proxy/Backend/HostConfig.cs @@ -210,7 +210,7 @@ private static ParsedConfig TryParseConfig(string hostname, string? probepath, s // try an parse the hostname if for non direct mode hosts. if (!result.DirectMode) { - result.Hostname = hostname; + result.Hostname = result.Host; } else { Console.WriteLine($"Direct mode host detected: {result.Host}"); diff --git a/src/SimpleL7Proxy/Backend/ProbableHostHealth.cs b/src/SimpleL7Proxy/Backend/ProbableHostHealth.cs index 750effb4..1b2142eb 100644 --- a/src/SimpleL7Proxy/Backend/ProbableHostHealth.cs +++ b/src/SimpleL7Proxy/Backend/ProbableHostHealth.cs @@ -12,6 +12,7 @@ public class ProbeableHostHealth : BaseHostHealth private readonly bool[] _callResults = new bool[MaxData]; private int _currentIndex = 0; private int _count = 0; + private bool _hasHadFirstSuccess = false; public string ProbePath => Config.ProbePath; public string ProbeUrl => Config.ProbeUrl; @@ -26,6 +27,20 @@ public ProbeableHostHealth(HostConfig hostConfig, ILogger logger) public override void AddCallSuccess(bool success) { + // Ignore failures until we've had at least one success (cold start warmup) + if (!_hasHadFirstSuccess) + { + if (success) + { + _hasHadFirstSuccess = true; + } + else + { + // Skip recording failures before first success + return; + } + } + // Add the new call result to the circular buffer _callResults[_currentIndex] = success; _currentIndex = (_currentIndex + 1) % MaxData; @@ -37,9 +52,9 @@ public override void AddCallSuccess(bool success) public override double SuccessRate() { - // If there are no call results, return 0.0 - if (_count == 0) - return 0.0; + // If we haven't had any successful probes yet, return 1.0 to give benefit of doubt during warmup + if (!_hasHadFirstSuccess || _count == 0) + return 1.0; // Count successful calls in the active portion of the buffer int successCount = 0; diff --git a/src/SimpleL7Proxy/BlobStorage/BlobWriteQueue.cs b/src/SimpleL7Proxy/BlobStorage/BlobWriteQueue.cs index 1e769d71..81197020 100644 --- a/src/SimpleL7Proxy/BlobStorage/BlobWriteQueue.cs +++ b/src/SimpleL7Proxy/BlobStorage/BlobWriteQueue.cs @@ -30,6 +30,9 @@ public class BlobWriteQueueOptions /// Enable batching optimization for writes to same container. public bool EnableBatching { get; set; } = true; + /// Enable deduplication of writes to the same blob (keeps only the last write). Set to false to write all operations. + public bool EnableDeduplication { get; set; } = true; + /// Metrics logging interval in seconds. public int MetricsIntervalSeconds { get; set; } = 30; } @@ -175,6 +178,23 @@ public async Task EnqueueAsync(BlobWriteOperation operation, CancellationT try { var workerId = GetWorkerForBlob(operation.ContainerName, operation.BlobName); + + // Back-pressure: graduated delays based on queue depth to slow down producers + var queueDepth = _workerChannels[workerId].Reader.Count; + int delayMs = queueDepth switch + { + >= 150 => 300, + >= 100 => 200, + >= 50 => 100, + _ => 0 + }; + + if (delayMs > 0) + { + _logger.LogDebug("[BlobWriteQueue] Back-pressure: queue depth {Depth} - delaying {Delay}ms", queueDepth, delayMs); + await Task.Delay(delayMs, cancellationToken).ConfigureAwait(false); + } + await _workerChannels[workerId].Writer.WriteAsync(operation, cancellationToken).ConfigureAwait(false); Interlocked.Increment(ref _operationsQueued); @@ -210,17 +230,53 @@ public Task StartAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("[BlobWriteQueue] Stopping..."); + + // Signal no more writes will come - workers will exit after draining foreach (var channel in _workerChannels) { channel.Writer.Complete(); } - _shutdownCts.Cancel(); - + + // Wait for workers to finish processing remaining items + // DON'T cancel the shutdown token - let in-flight operations complete + var shutdownTimeout = TimeSpan.FromSeconds(60); // Allow time for blob operations to complete + + _logger.LogInformation("[BlobWriteQueue] Waiting for workers to complete (timeout: {Timeout}s)...", shutdownTimeout.TotalSeconds); + try { - await Task.WhenAll(_workers).ConfigureAwait(false); + var workerTask = Task.WhenAll(_workers); + var completedTask = await Task.WhenAny(workerTask, Task.Delay(shutdownTimeout, cancellationToken)) + .ConfigureAwait(false); + + if (completedTask != workerTask) + { + _logger.LogWarning("[BlobWriteQueue] Shutdown timeout - cancelling remaining operations"); + // Only cancel as a last resort after timeout + _shutdownCts.Cancel(); + + // Give cancelled operations a moment to clean up + try + { + await Task.WhenAny(workerTask, Task.Delay(5000)).ConfigureAwait(false); + } + catch { } + } + else + { + // Workers completed normally + await workerTask.ConfigureAwait(false); + _logger.LogDebug("[BlobWriteQueue] All workers completed gracefully"); + } + } + catch (OperationCanceledException) + { + _logger.LogDebug("[BlobWriteQueue] Shutdown cancelled"); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "[BlobWriteQueue] Error during shutdown"); } - catch (OperationCanceledException) { } var avgQueueTime = _operationsCompleted > 0 ? _totalQueueTimeMs / _operationsCompleted : 0; var avgProcessTime = _operationsCompleted > 0 ? _totalProcessTimeMs / _operationsCompleted : 0; @@ -416,39 +472,49 @@ private async Task ExecuteBatchAsync( try { - // Deduplicate by container+blob name - keep only the LAST (most recent) write for each unique blob - // Group by both container and blob name to handle same blob name in different containers - var deduplicatedOps = batch - .GroupBy(op => $"{op.ContainerName}/{op.BlobName}") - .Select(group => group.OrderBy(op => op.EnqueuedAt).Last()) // Keep chronologically last operation - .ToList(); - - var duplicateCount = batch.Count - deduplicatedOps.Count; - if (duplicateCount > 0) + List deduplicatedOps; + + if (_options.EnableDeduplication) { - _logger.LogDebug("[Worker-{WorkerId}] Deduplicated {DuplicateCount} operations - Processing {UniqueCount} unique blobs", - workerId, duplicateCount, deduplicatedOps.Count); - - // Mark duplicate (superseded) operations as successful - var duplicateOps = batch + // Deduplicate by container+blob name - keep only the LAST (most recent) write for each unique blob + // Group by both container and blob name to handle same blob name in different containers + deduplicatedOps = batch .GroupBy(op => $"{op.ContainerName}/{op.BlobName}") - .SelectMany(group => group.OrderBy(op => op.EnqueuedAt).SkipLast(1)); // All except the last - - foreach (var dupOp in duplicateOps) + .Select(group => group.OrderBy(op => op.EnqueuedAt).Last()) // Keep chronologically last operation + .ToList(); + + var duplicateCount = batch.Count - deduplicatedOps.Count; + if (duplicateCount > 0) { - _logger.LogTrace("[Worker-{WorkerId}] Operation {OperationId} superseded by later write to {Container}/{Blob} (enqueued at {EnqueuedAt})", - workerId, dupOp.OperationId, dupOp.ContainerName, dupOp.BlobName, dupOp.EnqueuedAt.ToString("HH:mm:ss.fff")); + _logger.LogDebug("[Worker-{WorkerId}] Deduplicated {DuplicateCount} operations - Processing {UniqueCount} unique blobs", + workerId, duplicateCount, deduplicatedOps.Count); - dupOp.SetResult(new BlobWriteResult - { - Success = true, - Duration = TimeSpan.Zero, - QueueTime = DateTime.UtcNow - dupOp.EnqueuedAt - }); + // Mark duplicate (superseded) operations as successful + var duplicateOps = batch + .GroupBy(op => $"{op.ContainerName}/{op.BlobName}") + .SelectMany(group => group.OrderBy(op => op.EnqueuedAt).SkipLast(1)); // All except the last - Interlocked.Increment(ref _operationsCompleted); + foreach (var dupOp in duplicateOps) + { + _logger.LogTrace("[Worker-{WorkerId}] Operation {OperationId} superseded by later write to {Container}/{Blob} (enqueued at {EnqueuedAt})", + workerId, dupOp.OperationId, dupOp.ContainerName, dupOp.BlobName, dupOp.EnqueuedAt.ToString("HH:mm:ss.fff")); + + dupOp.SetResult(new BlobWriteResult + { + Success = true, + Duration = TimeSpan.Zero, + QueueTime = DateTime.UtcNow - dupOp.EnqueuedAt + }); + + Interlocked.Increment(ref _operationsCompleted); + } } } + else + { + // No deduplication - write all operations + deduplicatedOps = batch; + } // Execute all UNIQUE (most recent) writes in parallel var writeTasks = deduplicatedOps.Select(async operation => diff --git a/src/SimpleL7Proxy/BlobStorage/BlobWriter.cs b/src/SimpleL7Proxy/BlobStorage/BlobWriter.cs index bc0042f0..ac11b75c 100644 --- a/src/SimpleL7Proxy/BlobStorage/BlobWriter.cs +++ b/src/SimpleL7Proxy/BlobStorage/BlobWriter.cs @@ -104,7 +104,7 @@ public async Task InitClientAsync(string userId, string containerName) /// A writable stream to the blob. public async Task CreateBlobAndGetOutputStreamAsync(string userId, string blobName) { - _logger.LogTrace($"[BLOB-TRACE] CreateBlobAndGetOutputStreamAsync | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] CreateBlobAndGetOutputStreamAsync | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); // Get the client for the userId if (!_containerClients.TryGetValue(userId, out var _containerClient)) @@ -130,12 +130,12 @@ public async Task CreateBlobAndGetOutputStreamAsync(string userId, strin { try { - _logger.LogTrace($"[BLOB-TRACE] WRITE-START | Container: {userId} | Blob: {blobName} | Attempt: {attempt + 1}/{maxRetries} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] WRITE-START | Container: {userId} | Blob: {blobName} | Attempt: {attempt + 1}/{maxRetries} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); // OpenWriteAsync will create the blob if it does not exist and return a writable stream. var stream = await blobClient.OpenWriteAsync(overwrite: true).ConfigureAwait(false); - _logger.LogTrace($"[BLOB-TRACE] WRITE-SUCCESS | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] WRITE-SUCCESS | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); return stream; } catch (Azure.RequestFailedException ex) when (ex.Status == 409 && attempt < maxRetries - 1) @@ -183,7 +183,7 @@ public async Task BlobExistsAsync(string userId, string blobName) public async Task ReadBlobAsStreamAsync(string userId, string blobName) { - _logger.LogTrace($"[BLOB-TRACE] READ-START | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] READ-START | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); // Get the client for the userId if (!_containerClients.TryGetValue(userId, out var _containerClient)) @@ -201,7 +201,7 @@ public async Task ReadBlobAsStreamAsync(string userId, string blobName) var blobClient = _containerClient.GetBlobClient(blobName); var stream = await blobClient.OpenReadAsync().ConfigureAwait(false); - _logger.LogTrace($"[BLOB-TRACE] READ-SUCCESS | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] READ-SUCCESS | Container: {userId} | Blob: {blobName} | Thread: {System.Threading.Thread.CurrentThread.ManagedThreadId} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); return stream; } catch (Exception ex) diff --git a/src/SimpleL7Proxy/BlobStorage/QueuedBlobWriter.cs b/src/SimpleL7Proxy/BlobStorage/QueuedBlobWriter.cs index d89e013d..a207b5aa 100644 --- a/src/SimpleL7Proxy/BlobStorage/QueuedBlobWriter.cs +++ b/src/SimpleL7Proxy/BlobStorage/QueuedBlobWriter.cs @@ -55,6 +55,12 @@ public override async Task FlushAsync(CancellationToken cancellationToken) // Queue the buffered data for writing var data = _buffer.ToArray(); + + // Clear the buffer BEFORE enqueueing to prevent duplicate writes + // if FlushAsync is called multiple times + _buffer.SetLength(0); + _buffer.Position = 0; + var operation = new BlobWriteOperation { ContainerName = _containerName, diff --git a/src/SimpleL7Proxy/CoordinatedShutdownService.cs b/src/SimpleL7Proxy/CoordinatedShutdownService.cs index cbafb5c5..33bd1286 100644 --- a/src/SimpleL7Proxy/CoordinatedShutdownService.cs +++ b/src/SimpleL7Proxy/CoordinatedShutdownService.cs @@ -116,11 +116,20 @@ public async Task StopAsync(CancellationToken cancellationToken) ["WorkerStates"] = string.Join(", ", HealthCheckService.GetWorkerState()) }; data.SendEvent(); - _backendTokenProvider?.StopAsync(cancellationToken).ConfigureAwait(false); - _backupAPIService?.StopAsync(cancellationToken).ConfigureAwait(false); - _serviceBusRequestService?.StopAsync(cancellationToken).ConfigureAwait(false); + + if (_backendTokenProvider != null) + await _backendTokenProvider.StopAsync(cancellationToken).ConfigureAwait(false); + + // BackupAPIService is NOT registered as IHostedService - we control its shutdown explicitly here + // to ensure it stops AFTER all proxy workers have completed and flushed their status updates + if (_backupAPIService != null) + await _backupAPIService.StopAsync(cancellationToken).ConfigureAwait(false); + + // ServiceBusRequestService is stopped explicitly here for ordering control + if (_serviceBusRequestService != null) + await _serviceBusRequestService.StopAsync(cancellationToken).ConfigureAwait(false); + _eventClient?.StopTimer(); - //await Task.CompletedTask; } } \ No newline at end of file diff --git a/src/SimpleL7Proxy/DTO/RequestDataBackupService.cs b/src/SimpleL7Proxy/DTO/RequestDataBackupService.cs index 4ca8466f..6027b458 100644 --- a/src/SimpleL7Proxy/DTO/RequestDataBackupService.cs +++ b/src/SimpleL7Proxy/DTO/RequestDataBackupService.cs @@ -48,15 +48,15 @@ public async Task RestoreIntoAsync(RequestData rdata) var bodyBlobName = blobname + ".body"; if (await _blobWriter.BlobExistsAsync(Constants.Server, bodyBlobName)) { - _logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: ReadBody | Guid: {rdata.Guid} | Container: {Constants.Server} | Blob: {bodyBlobName} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: ReadBody | Guid: {rdata.Guid} | Container: {Constants.Server} | Blob: {bodyBlobName} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); using Stream bodyStream = await _blobWriter.ReadBlobAsStreamAsync(Constants.Server, bodyBlobName); var bodyStreamReader = new StreamReader(bodyStream); var datastr = await bodyStreamReader.ReadToEndAsync(); rdata.setBody(Encoding.UTF8.GetBytes(datastr)); - _logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: ReadBody-Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: ReadBody-Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); } - _logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] BackupService.RestoreIntoAsync | Action: Complete | Guid: {rdata.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); return; } catch (BlobWriterException e) @@ -81,7 +81,7 @@ public async Task BackupAsync(RequestData requestData) { var operation = "Creating blob"; - _logger.LogTrace($"[BLOB-TRACE] BackupService.BackupAsync | Action: Start | Guid: {requestData.Guid} | Container: {Constants.Server} | Blob: {requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogTrace($"[BLOB-TRACE] BackupService.BackupAsync | Action: Start | Guid: {requestData.Guid} | Container: {Constants.Server} | Blob: {requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); try { diff --git a/src/SimpleL7Proxy/Events/AppInsightsEventClient.cs b/src/SimpleL7Proxy/Events/AppInsightsEventClient.cs index f7f9afb3..9dd47bbd 100644 --- a/src/SimpleL7Proxy/Events/AppInsightsEventClient.cs +++ b/src/SimpleL7Proxy/Events/AppInsightsEventClient.cs @@ -15,6 +15,7 @@ public Task StartTimer() } public void StopTimer() { } public int Count => 0; + public string ClientType => "AppInsights"; public void SendData(string? value) => telemetryClient.TrackEvent(value); public void SendData(ProxyEvent proxyEvent) diff --git a/src/SimpleL7Proxy/Events/BackupAPI/BackupStatService.cs b/src/SimpleL7Proxy/Events/BackupAPI/BackupStatService.cs index 41493bca..dec1daa9 100644 --- a/src/SimpleL7Proxy/Events/BackupAPI/BackupStatService.cs +++ b/src/SimpleL7Proxy/Events/BackupAPI/BackupStatService.cs @@ -70,6 +70,13 @@ public Task StartAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken) { + // Guard against double-stop + if (isShuttingDown) + { + _logger.LogDebug("[SHUTDOWN] BackupAPIService.StopAsync called but already stopping"); + return; + } + _logger.LogInformation("[SHUTDOWN] BackupAPIService stopping..."); isShuttingDown = true; @@ -95,7 +102,8 @@ public async Task StopAsync(CancellationToken cancellationToken) } } - // Don't dispose the cancellation token source here - let the task complete + // Now that the task has completed (or timed out), dispose the CTS + _cancellationTokenSource?.Dispose(); } public bool UpdateStatus(RequestAPIDocument message) @@ -236,10 +244,10 @@ public async Task EventWriter(CancellationToken token) } } - _cancellationTokenSource?.Dispose(); + // Note: Don't dispose CTS here - StopAsync owns the lifecycle } - _logger.LogInformation("Backup API service is stopping."); + _logger.LogInformation("Backup API service stopped."); } DateTime _lastDrainTime = DateTime.UtcNow; diff --git a/src/SimpleL7Proxy/Events/BackupAPI/IBackupStatService.cs b/src/SimpleL7Proxy/Events/BackupAPI/IBackupStatService.cs index fdecea0c..16a175d8 100644 --- a/src/SimpleL7Proxy/Events/BackupAPI/IBackupStatService.cs +++ b/src/SimpleL7Proxy/Events/BackupAPI/IBackupStatService.cs @@ -6,6 +6,7 @@ namespace SimpleL7Proxy.BackupAPI public interface IBackupAPIService { + Task StartAsync(CancellationToken cancellationToken); Task StopAsync(CancellationToken cancellationToken); bool UpdateStatus(RequestAPIDocument message); diff --git a/src/SimpleL7Proxy/Events/CompositeEventClient.cs b/src/SimpleL7Proxy/Events/CompositeEventClient.cs index 653604ca..4c1c4313 100644 --- a/src/SimpleL7Proxy/Events/CompositeEventClient.cs +++ b/src/SimpleL7Proxy/Events/CompositeEventClient.cs @@ -37,6 +37,7 @@ public int Count return count; } } + public string ClientType => string.Join(", ", eventClients.Select(c => c.ClientType)); public void SendData(string? value) { foreach (var client in eventClients) diff --git a/src/SimpleL7Proxy/Events/EventHubClient.cs b/src/SimpleL7Proxy/Events/EventHubClient.cs index 31a7922b..d495dc16 100644 --- a/src/SimpleL7Proxy/Events/EventHubClient.cs +++ b/src/SimpleL7Proxy/Events/EventHubClient.cs @@ -36,6 +36,7 @@ public EventHubClient(EventHubConfig? config, ILogger logger) } public int Count => _logBuffer.Count; + public string ClientType => isRunning ? "EventHub" : "EventHub (Disabled)"; public async Task StartAsync(CancellationToken cancellationToken) { // Handle null or invalid configuration gracefully - just don't start the service diff --git a/src/SimpleL7Proxy/Events/IEventClient.cs b/src/SimpleL7Proxy/Events/IEventClient.cs index 00dd8ff6..4fe71f5d 100644 --- a/src/SimpleL7Proxy/Events/IEventClient.cs +++ b/src/SimpleL7Proxy/Events/IEventClient.cs @@ -5,6 +5,7 @@ namespace SimpleL7Proxy.Events; public interface IEventClient { int Count { get; } + string ClientType { get; } //public Task StartTimer(); public void StopTimer(); diff --git a/src/SimpleL7Proxy/Events/LogFileEventClient.cs b/src/SimpleL7Proxy/Events/LogFileEventClient.cs index 3170d26c..46e1676d 100644 --- a/src/SimpleL7Proxy/Events/LogFileEventClient.cs +++ b/src/SimpleL7Proxy/Events/LogFileEventClient.cs @@ -40,6 +40,7 @@ public LogFileEventClient(string filename) } public int Count => _logBuffer.Count; + public string ClientType => "LogFile"; public Task StartAsync(CancellationToken cancellationToken) diff --git a/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs b/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs index e5e68f58..14d12f90 100644 --- a/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs +++ b/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs @@ -113,8 +113,9 @@ public Task StopAsync(CancellationToken cancellationToken) // Only set the flag if we're not already shutting down if (!isShuttingDown) { - _logger.LogInformation("[SHUTDOWN] ⏹ AsyncFeeder shutting down"); isShuttingDown = true; + _cancellationTokenSource.Cancel(); + _logger.LogInformation("[SHUTDOWN] ⏹ AsyncFeeder shutting down"); return readerTask ?? Task.CompletedTask; } @@ -147,7 +148,7 @@ public async Task EventReader(CancellationToken token) try { - await processor.StartProcessingAsync().ConfigureAwait(false); + await processor.StartProcessingAsync(token).ConfigureAwait(false); _logger.LogInformation("[SERVICE] ✓ AsyncFeeder successfully started processing messages from 'feeder' queue"); } catch (UnauthorizedAccessException ex) @@ -183,7 +184,14 @@ public async Task EventReader(CancellationToken token) catch (TaskCanceledException) { // Task was canceled, exit gracefully - _logger.LogInformation("[SHUTDOWN] AsyncFeeder service task was canceled."); + if (!isShuttingDown) + { + _logger.LogWarning("[SHUTDOWN] AsyncFeeder service task was canceled unexpectedly."); + } + else + { + _logger.LogInformation($"[SHUTDOWN] AsyncFeeder service shutdown initiated."); + } } catch (OperationCanceledException) { diff --git a/src/SimpleL7Proxy/Program.cs b/src/SimpleL7Proxy/Program.cs index f4ee1c56..b85e9a4c 100644 --- a/src/SimpleL7Proxy/Program.cs +++ b/src/SimpleL7Proxy/Program.cs @@ -116,6 +116,9 @@ public static async Task Main(string[] args) var backupAPIService = serviceProvider.GetRequiredService(); var userPriority = serviceProvider.GetRequiredService(); RequestData.InitializeServiceBusRequestService(serviceBusRequestService, backupAPIService, userPriority, options.Value); + + // Manually start BackupAPIService (not registered as IHostedService for controlled shutdown) + await backupAPIService.StartAsync(CancellationToken.None); //_ = serviceBusService.StartAsync(CancellationToken.None); @@ -245,6 +248,7 @@ private static void ConfigureDependencyInjection(IServiceCollection services, IL BatchWaitTimeMs = 100, MaxBatchSize = 25, EnableBatching = true, + EnableDeduplication = true, // Disable deduplication - write all operations MetricsIntervalSeconds = 30 }; }); @@ -319,8 +323,9 @@ private static void ConfigureDependencyInjection(IServiceCollection services, IL services.AddSingleton(sp => sp.GetRequiredService()); services.AddHostedService(sp => sp.GetRequiredService()); + // Note: BackupAPIService is NOT registered as IHostedService - its lifecycle is controlled + // explicitly by CoordinatedShutdownService to ensure proper shutdown ordering services.AddSingleton(); - services.AddHostedService(sp => (BackupAPIService)sp.GetRequiredService()); services.AddSingleton(); services.AddSingleton(); diff --git a/src/SimpleL7Proxy/Proxy/AsyncWorker.cs b/src/SimpleL7Proxy/Proxy/AsyncWorker.cs index d7cae198..dadf13e0 100644 --- a/src/SimpleL7Proxy/Proxy/AsyncWorker.cs +++ b/src/SimpleL7Proxy/Proxy/AsyncWorker.cs @@ -236,10 +236,8 @@ private void SetBlobNames(bool isBackground = false) } headerBlobName = dataBlobName + "-Headers"; - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.CreateUserBlobs | Action: CreateBlobs | Guid: {_requestData.Guid} | UserId: {_userId} | DataBlob: {dataBlobName} | HeaderBlob: {headerBlobName} | IsBackground: {isBackground} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); - - _logger.LogTrace("[AsyncWorker:{Guid}] Creating user blobs - Data: {DataBlob}, Header: {HeaderBlob}", - _requestData.Guid, dataBlobName, headerBlobName); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.CreateUserBlobs | Action: CreateBlobs | Guid: {Guid} | UserId: {UserId} | DataBlob: {DataBlob} | HeaderBlob: {HeaderBlob} | IsBackground: {IsBackground}", + // _requestData.Guid, _userId, dataBlobName, headerBlobName, isBackground); // Create both blobs in parallel var dataStreamTask = _blobWriter.CreateBlobAndGetOutputStreamAsync(_userId, dataBlobName); @@ -250,7 +248,7 @@ private void SetBlobNames(bool isBackground = false) var dataStream = await dataStreamTask; var headerStream = await headerStreamTask; - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.CreateUserBlobs | Action: CreateBlobs-Complete | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.CreateUserBlobs | Action: CreateBlobs-Complete | Guid: {Guid}", _requestData.Guid); return (dataStream, headerStream); } @@ -306,14 +304,14 @@ public async Task GetOrCreateDataStreamAsync() { if (_requestData.OutputStream == null) { - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.GetOrCreateDataStream | Action: LazyCreate | Guid: {_requestData.Guid} | DataBlob: {dataBlobName} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.GetOrCreateDataStream | Action: LazyCreate | Guid: {Guid} | DataBlob: {DataBlob}", _requestData.Guid, dataBlobName); try { var dataStream = await _blobWriter.CreateBlobAndGetOutputStreamAsync(_userId, dataBlobName); _requestData.OutputStream = new BufferedStream(dataStream); - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.GetOrCreateDataStream | Action: Created | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.GetOrCreateDataStream | Action: Created | Guid: {Guid}", _requestData.Guid); } catch (Exception ex) { @@ -428,6 +426,14 @@ public async Task StartAsync() await _requestData.Context.Response.OutputStream.WriteAsync(message).ConfigureAwait(false); await _requestData.Context.Response.OutputStream.FlushAsync().ConfigureAwait(false); _requestData.Context.Response.Close(); + + // CRITICAL: Clear the OutputStream after sending 202 response + // The client connection is now closed, so the original OutputStream is invalid. + // GetOrCreateDataStreamAsync() checks if OutputStream is null to decide whether + // to create a new blob stream. Without this, it would return the closed client + // stream instead of creating a blob stream, causing data to be lost. + _requestData.OutputStream = null; + _logger.LogDebug("[AsyncWorker:{Guid}] 202 response written and connection closed", _requestData.Guid); } catch (Exception writeEx) @@ -436,10 +442,13 @@ public async Task StartAsync() _requestData.Guid); //proxyEventData["x-Status"] = "Network Error"; // Client disconnected? + + // Even on error, clear the OutputStream - the client is disconnected anyway + _requestData.OutputStream = null; } - _logger.LogInformation("[AsyncWorker:{Guid}] Async worker started successfully - DataBlob: {DataBlobUri}, HeaderBlob: {HeaderBlobUri}", - _requestData.Guid, _dataBlobUri, _headerBlobUri); + // _logger.LogInformation("[AsyncWorker:{Guid}] Async worker started successfully - DataBlob: {DataBlobUri}, HeaderBlob: {HeaderBlobUri}", + // _requestData.Guid, _dataBlobUri, _headerBlobUri); _taskCompletionSource.TrySetResult(true); // Set the task completion source to indicate that the worker has started } else @@ -490,7 +499,8 @@ public async Task WriteHeaders(HttpStatusCode status, WebHeaderCollection // Create or recreate the stream if needed if (_hos == null) { - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.WriteHeaders | Action: RecreateStream | Guid: {_requestData.Guid} | UserId: {_userId} | HeaderBlob: {headerBlobName} | Attempt: {attempt + 1}/{MaxRetryAttempts} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.WriteHeaders | Action: RecreateStream | Guid: {Guid} | UserId: {UserId} | HeaderBlob: {HeaderBlob} | Attempt: {Attempt}/{MaxAttempts}", + // _requestData.Guid, _userId, headerBlobName, attempt + 1, MaxRetryAttempts); var stream = await _blobWriter.CreateBlobAndGetOutputStreamAsync(_userId, headerBlobName) .ConfigureAwait(false); @@ -503,7 +513,7 @@ public async Task WriteHeaders(HttpStatusCode status, WebHeaderCollection } _hos = stream; - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.WriteHeaders | Action: RecreateStream-Complete | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.WriteHeaders | Action: RecreateStream-Complete | Guid: {Guid}", _requestData.Guid); } // Convert WebHeaderCollection to Dictionary for proper JSON serialization @@ -585,31 +595,57 @@ public async Task WriteHeaders(HttpStatusCode status, WebHeaderCollection /// private async Task ResetStreamAsync() { + // Reset header output stream if (_hos != null) { - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.ResetStream | Action: Reset | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetStream | Action: Reset | Guid: {Guid}", _requestData.Guid); try { await _hos.FlushAsync().ConfigureAwait(false); _hos.Dispose(); - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.ResetStream | Action: Disposed | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetStream | Action: Disposed | Guid: {Guid}", _requestData.Guid); } catch (ObjectDisposedException) { - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.ResetStream | Action: AlreadyDisposed | Guid: {_requestData.Guid} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetStream | Action: AlreadyDisposed | Guid: {Guid}", _requestData.Guid); // Stream was already disposed, ignore } catch (Exception ex) { - Console.WriteLine($"[BLOB-TRACE] AsyncWorker.ResetStream | Action: Error | Guid: {_requestData.Guid} | Error: {ex.Message} | Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}"); - _logger.LogError(ex, "[AsyncWorker:{Guid}] Error while resetting stream - Type: {ExceptionType}", - _requestData.Guid, ex.GetType().FullName); + _logger.LogError(ex, "[BLOB-TRACE] AsyncWorker.ResetStream | Action: Error | Guid: {Guid} | Error: {ErrorMessage}", + _requestData.Guid, ex.Message); } finally { _hos = null; } } + + // Reset data output stream - CRITICAL: must flush and close to commit blob data + if (_requestData?.OutputStream != null) + { + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetDataStream | Action: Reset | Guid: {Guid}", _requestData.Guid); + try + { + await _requestData.OutputStream.FlushAsync().ConfigureAwait(false); + await _requestData.OutputStream.DisposeAsync().ConfigureAwait(false); + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetDataStream | Action: Disposed | Guid: {Guid}", _requestData.Guid); + } + catch (ObjectDisposedException) + { + //_logger.LogInformation("[BLOB-TRACE] AsyncWorker.ResetDataStream | Action: AlreadyDisposed | Guid: {Guid}", _requestData.Guid); + // Stream was already disposed, ignore + } + catch (Exception ex) + { + _logger.LogError(ex, "[BLOB-TRACE] AsyncWorker.ResetDataStream | Action: Error | Guid: {Guid} | Error: {ErrorMessage}", + _requestData.Guid, ex.Message); + } + finally + { + _requestData.OutputStream = null; + } + } } /// diff --git a/src/SimpleL7Proxy/Proxy/HealthCheckService.cs b/src/SimpleL7Proxy/Proxy/HealthCheckService.cs index c44933d4..d78a29d9 100644 --- a/src/SimpleL7Proxy/Proxy/HealthCheckService.cs +++ b/src/SimpleL7Proxy/Proxy/HealthCheckService.cs @@ -383,10 +383,11 @@ public void BuildHealthResponse(string path, int hostCount, bool hasFailedHosts, .Append('\n'); // Add event hub status - _stringBuilder.Append("Event Hub: "); + _stringBuilder.Append("Event Client: "); if (_eventClient != null) { - _stringBuilder.Append("Enabled - ") + _stringBuilder.Append(_eventClient.ClientType) + .Append(" - ") .Append(_eventClient.Count) .Append(" Items"); } @@ -523,7 +524,7 @@ public HealthStatusEnum GetStatus() bool hasFailed = _backends.CheckFailedStatus(); // Debug logging - remove after fixing - Console.WriteLine($"[STARTUP-DEBUG] IsReadyToWork={isReady}, hostCount={hostCount}, hasFailed={hasFailed}, activeWorkers={ActiveWorkers}"); + // Console.WriteLine($"[STARTUP-DEBUG] IsReadyToWork={isReady}, hostCount={hostCount}, hasFailed={hasFailed}, activeWorkers={ActiveWorkers}"); if (!isReady) { diff --git a/src/SimpleL7Proxy/Proxy/ProxyWorker.cs b/src/SimpleL7Proxy/Proxy/ProxyWorker.cs index f3f06e96..8bff0f72 100644 --- a/src/SimpleL7Proxy/Proxy/ProxyWorker.cs +++ b/src/SimpleL7Proxy/Proxy/ProxyWorker.cs @@ -468,7 +468,7 @@ public async Task TaskRunnerAsync() eventData.Exception = ex; eventData["WorkerState"] = workerState; - if (ex.Message == "Cannot access a disposed object." || ex.Message.StartsWith("Unable to write data") || ex.Message.Contains("Broken Pipe")) // The client likely closed the connection + if (ex.Message.StartsWith("Cannot access a disposed object") || ex.Message.StartsWith("Unable to write data") || ex.Message.Contains("Broken Pipe")) // The client likely closed the connection { _logger.LogInformation("Client closed connection: {FullURL}", incomingRequest?.FullURL ?? "Unknown"); eventData["InnerErrorDetail"] = "Client Disconnected"; @@ -588,45 +588,51 @@ private async Task WriteResponseAsync(RequestData request, ProxyData pr) var context = request.Context; - // Set the response status code - context.Response.StatusCode = (int)pr.StatusCode; + // For async requests that triggered, the 202 Accepted response was already sent and + // the connection was closed by AsyncWorker. Skip writing to the HttpListenerResponse. + // The actual backend response will be streamed to blob storage in StreamResponseAsync. + if (!request.AsyncTriggered) + { + // Set the response status code + context.Response.StatusCode = (int)pr.StatusCode; - // Copy headers to the response - //ProxyHelperUtils.CopyHeaders(request.Headers, proxyRequest, true, _options.StripRequestHeaders); + // Copy headers to the response + //ProxyHelperUtils.CopyHeaders(request.Headers, proxyRequest, true, _options.StripRequestHeaders); - //CopyHeadersToResponse(pr.Headers, context.Response.Headers); // Already done? + //CopyHeadersToResponse(pr.Headers, context.Response.Headers); // Already done? - // Set content-specific headers - if (pr.ContentHeaders != null) - { - foreach (var key in pr.ContentHeaders.AllKeys) + // Set content-specific headers + if (pr.ContentHeaders != null) { - switch (key.ToLower()) + foreach (var key in pr.ContentHeaders.AllKeys) { - case "content-length": - var length = pr.ContentHeaders[key]; - if (long.TryParse(length, out var contentLength)) - { - context.Response.ContentLength64 = contentLength; - } - else - { - Console.WriteLine($"Invalid Content-Length: {length}"); - } - break; + switch (key.ToLower()) + { + case "content-length": + var length = pr.ContentHeaders[key]; + if (long.TryParse(length, out var contentLength)) + { + context.Response.ContentLength64 = contentLength; + } + else + { + _logger.LogWarning("Invalid Content-Length: {Length}", length); + } + break; - case "content-type": - context.Response.ContentType = pr.ContentHeaders[key]; - break; + case "content-type": + context.Response.ContentType = pr.ContentHeaders[key]; + break; - default: - context.Response.Headers[key] = pr.ContentHeaders[key]; - break; + default: + context.Response.Headers[key] = pr.ContentHeaders[key]; + break; + } } } - } - context.Response.KeepAlive = false; + context.Response.KeepAlive = false; + } // we need 3 things: // 1. The processor to use => pr.StreamingProcessor @@ -672,7 +678,15 @@ public void ExpelAsyncRequest() // Called during shutdown to evict any in-progress async requests ... then worker will exit _isEvictingAsyncRequest = true; _logger.LogDebug("Expelling async request in progress, cancelling the token."); - _asyncExpelSource.Cancel(); + try + { + _asyncExpelSource.Cancel(); + } + catch (ObjectDisposedException) + { + // CTS was already disposed - worker has finished or encountered an error + _logger.LogDebug("AsyncExpelSource already disposed - worker has completed"); + } } } @@ -1550,24 +1564,35 @@ private async Task StreamResponseAsync(RequestData request, ProxyData pr) { _logger.LogDebug("Streaming to {Destination} for request {Guid}", destinationType, request.Guid); await processor.CopyToAsync(proxyResponse.Content, destination).ConfigureAwait(false); + + // Explicit flush for async blob streams - QueuedBlobStream requires FlushAsync to enqueue the data + if (destinationType == "async blob") + { + //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: FlushAfterCopy | Guid: {Guid}", request.Guid); + await destination.FlushAsync().ConfigureAwait(false); + //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: FlushComplete | Guid: {Guid}", request.Guid); + } + } + else + { + //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: NoData | Guid: {Guid} | Destination: {HasDestination} | Content: {HasContent}", + // request.Guid, destination != null, proxyResponse.Content != null); } } catch (HttpListenerException ex) { - _logger.LogDebug(ex, "Client disconnected during streaming for request {Guid}", request.Guid); + _logger.LogDebug(ex, "[BLOB-TRACE] StreamResponseAsync | Action: Error-HttpListener | Guid: {Guid} | Error: {ErrorMessage}", + request.Guid, ex.Message); } catch (Exception ex) when (ex is IOException || ex.InnerException is IOException) { - _logger.LogDebug(ex, "IO error or client disconnected for request {Guid}", request.Guid); + _logger.LogDebug(ex, "[BLOB-TRACE] StreamResponseAsync | Action: Error-IO | Guid: {Guid} | Error: {ErrorMessage}", + request.Guid, ex.Message); } catch (Exception ex) { - _logger.LogError(ex, "Error streaming response for request {Guid}. Type: {ExType}, Message: {ExMessage}, StackTrace: {StackTrace}, InnerException: {InnerEx}", - request.Guid, - ex.GetType().FullName, - ex.Message, - ex.StackTrace, - ex.InnerException?.ToString() ?? "none"); + _logger.LogError(ex, "[BLOB-TRACE] StreamResponseAsync | Action: Error-General | Guid: {Guid} | Error: {ErrorMessage} | Type: {ExType}", + request.Guid, ex.Message, ex.GetType().FullName); // throw new ProxyErrorException( // ProxyErrorException.ErrorType.ClientDisconnected, // HttpStatusCode.InternalServerError, @@ -1641,8 +1666,10 @@ private async Task HandleBackgroundCheckResultAsync( double timeout = request.Timeout; CancellationTokenSource cts; - // ✅ Dispose old CTS before creating new one + // ✅ Dispose old CTS before creating new one and clear the reference _asyncExpelSource?.Dispose(); + _asyncExpelSource = null; + if (request.runAsync) { timeout = _options.AsyncTimeout; diff --git a/src/SimpleL7Proxy/Proxy/RequestLifecycleManager.cs b/src/SimpleL7Proxy/Proxy/RequestLifecycleManager.cs index c4d57813..a0037788 100644 --- a/src/SimpleL7Proxy/Proxy/RequestLifecycleManager.cs +++ b/src/SimpleL7Proxy/Proxy/RequestLifecycleManager.cs @@ -95,13 +95,13 @@ public void TransitionToSuccess(RequestData request, HttpStatusCode statusCode) case RequestType.Async: SetStatus(request, ServiceBusMessageStatusEnum.AsyncProcessed, RequestAPIStatusEnum.Completed); - _logger.LogInformation("[Lifecycle:{Guid}] Async request completed successfully - Status: {StatusCode}", + _logger.LogDebug("[Lifecycle:{Guid}] Async request completed successfully - Status: {StatusCode}", request.Guid, statusCode); break; case RequestType.AsyncBackground: SetStatus(request, ServiceBusMessageStatusEnum.BackgroundRequestSubmitted, RequestAPIStatusEnum.BackgroundProcessing); - _logger.LogInformation("[Lifecycle:{Guid}] Background request submitted - BackgroundRequestId: {BackgroundRequestId}, Status: {StatusCode}", + _logger.LogDebug("[Lifecycle:{Guid}] Background request submitted - BackgroundRequestId: {BackgroundRequestId}, Status: {StatusCode}", request.Guid, request.BackgroundRequestId, statusCode); break; @@ -109,13 +109,13 @@ public void TransitionToSuccess(RequestData request, HttpStatusCode statusCode) if (request.BackgroundRequestCompleted) { SetStatus(request, ServiceBusMessageStatusEnum.AsyncProcessed, RequestAPIStatusEnum.Completed); - _logger.LogInformation("[Lifecycle:{Guid}] Background check completed successfully - Status: {StatusCode}", + _logger.LogDebug("[Lifecycle:{Guid}] Background check completed successfully - Status: {StatusCode}", request.Guid, statusCode); } else { SetStatus(request, ServiceBusMessageStatusEnum.CheckingBackgroundRequestStatus, RequestAPIStatusEnum.BackgroundProcessing); - _logger.LogInformation("[Lifecycle:{Guid}] Background check still processing - Status: {StatusCode}", + _logger.LogDebug("[Lifecycle:{Guid}] Background check still processing - Status: {StatusCode}", request.Guid, statusCode); } break; diff --git a/src/SimpleL7Proxy/User/UserProfile.cs b/src/SimpleL7Proxy/User/UserProfile.cs index a4ab4632..5ea24b18 100644 --- a/src/SimpleL7Proxy/User/UserProfile.cs +++ b/src/SimpleL7Proxy/User/UserProfile.cs @@ -84,6 +84,10 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken) { await ConfigReader(stoppingToken).ConfigureAwait(false); } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } catch (Exception ex) { lock (_profileErrorEventLock) diff --git a/test/nullserver/Python/claude-code-cli.txt b/test/nullserver/Python/claude-code-cli.txt new file mode 100644 index 00000000..a587a83a --- /dev/null +++ b/test/nullserver/Python/claude-code-cli.txt @@ -0,0 +1,279 @@ +event: message_start +data: {"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_vrtx_01NeA5aqgeRtDTptwxARLuk8","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":10,"cache_creation_input_tokens":440,"cache_read_input_tokens":18281,"cache_creation":{"ephemeral_5m_input_tokens":440,"ephemeral_1h_input_tokens":0},"output_tokens":3}} } + +event: ping +data: {"type": "ping"} + +event: content_block_start +data: {"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":"","signature":""} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"The user aske"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"d \"what is the size of Earth?\""} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" -"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" this is a simple"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" factual question about Earth's physical"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" dimensions,"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" which"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" I"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" answere"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"d."} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n\nLooking"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" at the context"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":":"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n- This"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" is a general"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" knowledge"}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" question\n- It's not"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" relate"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"d to programming"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" or"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" software"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" development"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n- The"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" user got"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" a"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" complete"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" answer"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n- There"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"'s no obvious"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" follow-up action"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" in"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" a"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" development"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" context\n\nThis"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" seems like either"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":":\n1. A test/"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"random"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" question\n2. The"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" user exploring"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" what"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" Claude"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" Code"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" can"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" do\n3. A genuine curios"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"ity question\n\nWhat"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" would they"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" naturally"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" type next?"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n- They"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" might ask another"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" question"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n- They might ask"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" something"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" programming"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"-related\n- They might not"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" respon"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"d at all\n- There"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"'s no obvious follow-up from"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" \""} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"what is the size of Earth?\""} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"\n\nThe"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" guidance"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" says \"Stay"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" silent if the next step isn't obvious"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" from what the user said.\"\n\nThis"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" is not"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" a situation where there"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"'s an"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" obvious next step."} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" The"}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" user aske"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"d a fact"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"ual question,"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" got"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" an"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" answer, and there's no clear"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" indication"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" of what they want"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" to do next. This isn"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"'t a programming task"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" with"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" a natural"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" progression"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":".\n\nI should stay"} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":" silent."} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":""} } + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"ErAJCkgICxACGAIqQDbNfL48i+sSPNTolJT540txct72oFEOLEMqEz8NI2lXHuNe/Zb30a491IDiqqlbav75RyMbHzjNXeU1SJuHQtASDML7YJjH1U+SLPZMJhoMaGAP8yzSt+FpSChvIjB8s8jZk432wpy4dBDCp0yzLRtNT+d4BLdREiLpeqq/tmtiTZWUZc4QWW6/vLLiXdEqlQj6cIyUQUjq/8ROQrFq0AgvtxQHchPSgJaJ0B9MubnZljAbZ7DoQnv2YaZNK7fA/e2FslVd7kxMiGWi1bOGoF2Hxb2SXkZT72PIpJjRhCaBPHrn6OKfogJSVp6EGwIM+lNe90yjqhxxfJ+KgVMA1YO599Sx9rO0flK2hL9VxqpHf9vAyjROb2yrBBIjaWcXdRZugk3vZkI3UTnVOpoSoCGy4QEymjUid1L1SiU5qEJmSXO1T/k5cX1XIZ3BqQwqojeCbSDctZv/tMEmx88XNgXmSFVEM0CrBZyz9jR7MjJFZfFM+H0A356/r9IO4Wpf/LPI+x0HEy3jFtjKM/8VM3YWu5UDsgyxjRaaCD4/4xlGVp9xCjHluuLah6MjqsThhk3QVgupItZITkLKBq1Z65NaY6IoV4eh3pTvYjg+F9eQUukMdxiq+49afgIgcerBUsK1gYuDt3vM17XeLxUm3DRkadxoGLb7ISSyDHheAYaH6i28i+/CagGUXksZzglaQTC4Zbuphy9PJnPhmPofQdfbiKpUW42DgXsUIV20h1cveFwfqjXNsOSyN7Sp7csHBi8Vj4uQH0W7tXzEp+bylPxSlINOTecSr5Vf0zvSWQxe+fmZaAhA6USi2t2rQ2OAPWWhZwmB0fj8zu/QaTw/fq2LU9lKfjNhCIs/dda1veZgMrH/dDk6/Nc9kqblb+NfqVFC0qL3233AAAHFS8P157W6eWtGLFqMHT0hcw2NUdTXGbEgnSQopWwX05/SoLvnNerNf9oJXEq1IYpClTBhcMqa3DI+y7fCa8YaJStzGGrjVIQTtQ1aduJjJgl3EtcPeI1hnglruoUD2IyhMK8fNmhOx7TZCNAxOxkXksmoYAtYzd5J9jX4zZk2G02hH6umLqqT0H2myXjF7/hKTQX3HwV7tNj750T9T5qBtmDPiB/mIKwBnEbprQtOfxHSaxd6I5r8q0aDGPfynJSEk4U9iOymL2F1dE1nSRvDpw26vTwIJPppQh49MwInp8OlaHuQZ5risCng6vPtK8rag/z6iVDLVhqvMWWatNspK95gbnzulC+T5uZSr0kFXibMDr2oBq1o0k1kjl74poJGLeEj/OGaKZK/MyxCaLcRqHqZT4KMiqtgEgMjnof88/EptrQ+nOgAvCWCzmelV5yKpRK/Gy0T/wTCdgRmqEigySYSob2xTDhl55AXK6RG3+qCD6yBGNSztlCo4nrYzJy4QUxXtsa9Jhh8Y2W4MMfAIAVPAM8dtXD08PH8GPPJuC4STMHGEV767SsSj4Pn1jiigLCWq4zeAuDhwm1+by/XoOKMJlQ094atuL5xbL6W/KwC1zgC7auLjmvnNXEHDCuRElHSf8x1Pw4yJm204sbFKj9cVfbaFud+bepmGAE="} } + +event: content_block_stop +data: {"type":"content_block_stop","index":0 } + +event: message_delta +data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":246} } + +event: message_stop +data: {"type":"message_stop" } + diff --git a/test/nullserver/Python/stream_server.py b/test/nullserver/Python/stream_server.py index 7d3a5585..0d94c614 100644 --- a/test/nullserver/Python/stream_server.py +++ b/test/nullserver/Python/stream_server.py @@ -164,6 +164,7 @@ def do_GET(self): if parsed_path.path.startswith('/file/'): filename = parsed_path.path[len('/file/'):] + print("Opening file for streaming response: " + parsed_path.path + " -> " + filename ) # Make sure the file exists if not os.path.exists(filename): self.send_response(404) diff --git a/test/openai/call-proxy.sh b/test/openai/call-proxy.sh index 577551ef..96d5cf50 100644 --- a/test/openai/call-proxy.sh +++ b/test/openai/call-proxy.sh @@ -7,7 +7,7 @@ # Default API key (fallback if not specified in HOSTMAP) # source env.sh - + # Map host aliases to hostname|folder|apikey (use empty after | for no folder, use empty for default APIKEY) declare -A HOSTMAP HOSTMAP["tr2"]="nvmtr2apim.azure-api.net|bf2|" @@ -19,7 +19,8 @@ HOSTMAP["local-resp"]="localhost:8000|resp|" HOSTMAP["local-direct"]="localhost:8000|api2/openai|" HOSTMAP["local"]="localhost:8000||" HOSTMAP["openai3"]="nvmopenai3.openai.azure.com|openai|" -HOSTMAP["nvm2"]="nvm2.openai.azure.com|openai|" +HOSTMAP["nvm2"]="nvm2.openai.azure.com|openai|" +HOSTMAP["lopenai"]="localhost:8000|openai|" HOSTMAP["foundry"]="localhost:8000|aif2/openai|" HOSTMAP["null"]="localhost:3000||" HOSTMAP["aca"]="simplel7dev.agreeableisland-74a4ba5f.eastus.azurecontainerapps.io|" @@ -30,6 +31,7 @@ HOSTMAP["aca-resp"]="simplel7dev.agreeableisland-74a4ba5f.eastus.azurecontainera # Map request types to HTTP method and partial URLs (format: "METHOD /url") declare -A URLS +URLS["4.0chat"]="POST /deployments/gpt-4o/chat/completions?api-version=2025-01-01-preview" URLS["4.1chat"]="POST /deployments/gpt-4.1/chat/completions?api-version=2025-01-01-preview" URLS["4.1request"]="POST /v1/responses" URLS["4.1response"]="GET /v1/responses" @@ -251,4 +253,4 @@ else else "${curl_cmd[@]}" fi -fi \ No newline at end of file +fi< \ No newline at end of file diff --git a/test/openai/response_call.json b/test/openai/response_call.json index 6473c154..3eb62f98 100644 --- a/test/openai/response_call.json +++ b/test/openai/response_call.json @@ -5,7 +5,7 @@ "content": "I am going to Paris, what should I see?" } ], - "background": true, + "background": false, "model": "gpt-4o", "metadata": {} } \ No newline at end of file diff --git a/test/openai/response_call_background.json b/test/openai/response_call_background.json new file mode 100644 index 00000000..6473c154 --- /dev/null +++ b/test/openai/response_call_background.json @@ -0,0 +1,11 @@ +{ + "input": [ + { + "role": "user", + "content": "I am going to Paris, what should I see?" + } + ], + "background": true, + "model": "gpt-4o", + "metadata": {} +} \ No newline at end of file