diff --git a/ReleaseNotes/version2.2.md b/ReleaseNotes/version2.2.md index d96ea9a0..384ce82d 100644 --- a/ReleaseNotes/version2.2.md +++ b/ReleaseNotes/version2.2.md @@ -1,6 +1,18 @@ # Release Notes # -2.2.10-d1 +2.2.10.2 + +Proxy: +* Performance improvements in server for Probe path and header checks +* Bug fix when checking for change config parameters +* Spindown old hosts that are no longer in the config +* De-register old hosts from the global circuit breaker when removed +* Bug Fix for gov cloud default credential +* Add MaxEvents to control how many undrained events can be in memory +* Slow down the incoming requests as we start to use up the event buffer + + +2.2.10.1 Deployment: * Update script to assign the data reader role to the ACA serivce prinicpal for app configuration diff --git a/SimpleL7Proxy.sln b/SimpleL7Proxy.sln index fd973ffb..e0e60132 100644 --- a/SimpleL7Proxy.sln +++ b/SimpleL7Proxy.sln @@ -66,8 +66,7 @@ Global GlobalSection(NestedProjects) = preSolution {1BEAB9EB-9991-41ED-99B5-F7A8D374491A} = {806C4881-406B-4504-BCC9-781F89BFCCB9} {F562EE95-23FC-48A2-B5CD-21438BFE8926} = {7FB4896E-B4A7-4030-A112-06B8E8A701B7} - {8911A389-8D4E-4268-90B7-9AC12C59861E} = {7FB4896E-B4A7-4030-A112-06B8E8A701B7} - {9F502DB0-81A5-4AAB-B915-AAABAA55B0A3} = {8911A389-8D4E-4268-90B7-9AC12C59861E} + {9F502DB0-81A5-4AAB-B915-AAABAA55B0A3} = {7FB4896E-B4A7-4030-A112-06B8E8A701B7} {5315AE06-D9C2-456F-BE0C-4B6996540CC9} = {7FB4896E-B4A7-4030-A112-06B8E8A701B7} {F045A36C-FFC3-4732-B898-1FF6ED71B4AE} = {5315AE06-D9C2-456F-BE0C-4B6996540CC9} {B7220CAA-AAAA-4E8E-BECE-6420B7002D4A} = {7FB4896E-B4A7-4030-A112-06B8E8A701B7} diff --git a/deployment/AppConfiguration/deploy.sh b/deployment/AppConfiguration/deploy.sh index 82fd1c80..c1a30ba1 100644 --- a/deployment/AppConfiguration/deploy.sh +++ b/deployment/AppConfiguration/deploy.sh @@ -316,8 +316,9 @@ for entry in "${CONFIG_ENTRIES[@]}"; do SOURCE="cs-default" # Handle enum defaults like "TypeName.Value" → "Value" # Only match Identifier.Identifier (e.g. IterationModeEnum.SinglePass) - # Avoid mangling URLs, file paths, or floats that also contain dots - if [[ "${VALUE}" =~ ^[A-Za-z_][A-Za-z0-9_]*\.[A-Za-z_][A-Za-z0-9_]*$ ]]; then + # Require PascalCase first segment (uppercase start) so filenames + # like "eventslog.json" are not mistaken for enum qualifiers. + if [[ "${VALUE}" =~ ^[A-Z][A-Za-z0-9_]*\.[A-Za-z_][A-Za-z0-9_]*$ ]]; then VALUE="${VALUE##*.}" fi fi diff --git a/docs/AZURE_APP_CONFIGURATION.md b/docs/AZURE_APP_CONFIGURATION.md new file mode 100644 index 00000000..1413c7a6 --- /dev/null +++ b/docs/AZURE_APP_CONFIGURATION.md @@ -0,0 +1,258 @@ +# Azure App Configuration Integration + +This document describes how to use Azure App Configuration for hot-reloading **[Warm]** settings without restarting the proxy. + +## Overview + +The proxy supports three types of configuration settings: + +| Type | Behavior | Label in App Config | Example | +|------|----------|--------------------|---------| +| **Warm** | Hot-reloaded from Azure App Config | *(none)* or `APPCONFIG_LABEL` | `MaxAttempts`, `LogConsole`, `DefaultPriority` | +| **Cold** | Read at startup, requires restart | `Cold` | `PollInterval`, `Timeout`, `Workers` | +| **Hidden** | Not published | — | `AsyncBlobStorageConnectionString` (parsed at runtime) | + +Both Warm and Cold settings are stored under the `Warm:` key prefix (for a single `Select("Warm:*")` query), but are distinguished by their **label**: +- Warm settings use the deployment label (default: no label) +- Cold settings always use label `Cold` + +This makes it easy to identify which settings require a restart when browsing the Azure portal's Configuration Explorer — just look at the **Label** column. + +## Environment Variables + +Configure the connection to Azure App Configuration: + +```bash +# Option 1: Managed Identity (recommended for production) +AZURE_APPCONFIG_ENDPOINT=https://your-appconfig.azconfig.io + +# Option 2: Connection String (for development) +AZURE_APPCONFIG_CONNECTION_STRING=Endpoint=https://...;Id=...;Secret=... + +# Optional: Label filter (default: no label) +AZURE_APPCONFIG_LABEL=Production + +# Optional: Refresh interval in seconds (default: 30) +AZURE_APPCONFIG_REFRESH_SECONDS=30 +``` + +## Azure App Configuration Key Structure + +All settings are stored under the `Warm:` key prefix. **Labels** distinguish the reload mode: + +``` +# Warm settings (label = none or APPCONFIG_LABEL) — hot-reloaded +Warm:MaxAttempts = 3 +Warm:LogConsole = true +Warm:DefaultPriority = 5 +Warm:Sentinel = 1 # Change this to trigger refresh + +# Cold settings (label = Cold) — read at startup only +Warm:Server:Port = 8000 +Warm:Server:Workers = 100 +Warm:Server:Timeout = 100000 +``` + +### Sentinel Key Pattern + +The `Warm:Sentinel` key is used to trigger configuration refresh: + +1. The refresh service polls Azure App Configuration every N seconds +2. It only checks if `Warm:Sentinel` has changed +3. If changed, **all** Warm settings are reloaded +4. This minimizes API calls while allowing instant updates + +**To trigger a refresh:** Update `Warm:Sentinel` to any new value (e.g., increment a counter or use a timestamp). + +## Setting Up Azure App Configuration + +### 1. Create the Resource + +```bash +# Create resource group +az group create --name rg-proxy --location eastus + +# Create App Configuration store +az appconfig create \ + --name appconfig-proxy \ + --resource-group rg-proxy \ + --location eastus \ + --sku Standard +``` + +### 2. Assign Managed Identity Access + +```bash +# Get the Container App's managed identity +IDENTITY_ID=$(az containerapp show \ + --name your-proxy-app \ + --resource-group rg-proxy \ + --query identity.principalId -o tsv) + +# Get App Configuration resource ID +APPCONFIG_ID=$(az appconfig show \ + --name appconfig-proxy \ + --resource-group rg-proxy \ + --query id -o tsv) + +# Assign App Configuration Data Reader role +az role assignment create \ + --role "App Configuration Data Reader" \ + --assignee $IDENTITY_ID \ + --scope $APPCONFIG_ID +``` + +### 3. Import Initial Settings + +Create a JSON file with your warm settings: + +```json +{ + "Warm:MaxAttempts": 3, + "Warm:DefaultPriority": 5, + "Warm:LogConsole": true, + "Warm:LogProbes": false, + "Warm:AcceptableStatusCodes": "[200, 201, 202]", + "Warm:Sentinel": "1" +} +``` + +Import to App Configuration: + +```bash +az appconfig kv import \ + --name appconfig-proxy \ + --source file \ + --path warm-settings.json \ + --format json \ + --label Production +``` + +### 4. Update Container App Environment + +```bash +az containerapp update \ + --name your-proxy-app \ + --resource-group rg-proxy \ + --set-env-vars \ + AZURE_APPCONFIG_ENDPOINT=https://appconfig-proxy.azconfig.io \ + AZURE_APPCONFIG_LABEL=Production \ + AZURE_APPCONFIG_REFRESH_SECONDS=30 +``` + +## Available Warm Settings + +These settings can be hot-reloaded: + +### Logging +- `LogConsole` - Enable console logging +- `LogConsoleEvent` - Enable console event logging +- `LogPoller` - Log poller activity +- `LogProbes` - Log health probe activity +- `LogHeaders` - Headers to include in logs +- `LogAllRequestHeaders` - Log all request headers +- `LogAllRequestHeadersExcept` - Exclude specific request headers +- `LogAllResponseHeaders` - Log all response headers +- `LogAllResponseHeadersExcept` - Exclude specific response headers + +### Request Processing +- `MaxAttempts` - Maximum retry attempts +- `DefaultPriority` - Default request priority +- `DefaultTTLSecs` - Default time-to-live +- `TimeoutHeader` - Header containing timeout value +- `TTLHeader` - Header containing TTL value + +### Validation +- `RequiredHeaders` - Headers that must be present +- `DisallowedHeaders` - Headers that are not allowed +- `ValidateHeaders` - Header validation rules +- `ValidateAuthAppID` - Enable app ID validation +- `ValidateAuthAppIDUrl` - URL for app ID validation +- `ValidateAuthAppFieldName` - Field name for app ID +- `ValidateAuthAppIDHeader` - Header containing app ID + +### User Management +- `UserConfigUrl` - URL for user configuration +- `SuspendedUserConfigUrl` - URL for suspended users +- `UserProfileHeader` - Header containing user profile +- `UserIDFieldName` - Field name for user ID +- `UniqueUserHeaders` - Headers that identify unique users +- `UserPriorityThreshold` - Priority threshold for users + +### Priority +- `PriorityKeyHeader` - Header containing priority key +- `PriorityKeys` - List of priority keys +- `PriorityValues` - Priority values for each key + +### Response Handling +- `AcceptableStatusCodes` - Status codes to accept +- `StripResponseHeaders` - Headers to remove from response +- `StripRequestHeaders` - Headers to remove from request + +### Async Settings (timing only) +- `AsyncTimeout` - Async operation timeout +- `AsyncTTLSecs` - Async TTL in seconds +- `AsyncTriggerTimeout` - Trigger timeout +- `AsyncClientRequestHeader` - Client async header +- `AsyncClientConfigFieldName` - Config field name + +## Monitoring Refresh + +The proxy logs configuration refresh activity: + +``` +[CONFIG] ✓ Azure App Configuration initialized with Warm settings refresh +[CONFIG] Azure App Configuration refresh service started with 30s interval +[CONFIG] Configuration refresh check completed - changes detected +[CONFIG] Warm settings changed - applying to BackendOptions +[CONFIG] ✓ Warm settings applied successfully +``` + +## Troubleshooting + +### Settings Not Refreshing + +1. Check the sentinel key was updated: + ```bash + az appconfig kv show --name appconfig-proxy --key "Warm:Sentinel" + ``` + +2. Verify the label filter matches: + ```bash + az appconfig kv list --name appconfig-proxy --label Production + ``` + +3. Check proxy logs for refresh errors + +### Authentication Failures + +1. Verify managed identity is enabled on the Container App +2. Check role assignment is correct (App Configuration Data Reader) +3. Ensure the endpoint URL is correct + +### Performance Considerations + +- Default refresh interval is 30 seconds +- Only the sentinel key is checked on each poll +- Full refresh only occurs when sentinel changes +- Consider longer intervals for production (60-120 seconds) + +## Example: Changing MaxAttempts at Runtime + +```bash +# Update the setting +az appconfig kv set \ + --name appconfig-proxy \ + --key "Warm:MaxAttempts" \ + --value "5" \ + --label Production + +# Trigger refresh by updating sentinel +az appconfig kv set \ + --name appconfig-proxy \ + --key "Warm:Sentinel" \ + --value "$(date +%s)" \ + --label Production +``` + +Within 30 seconds (or your configured interval), all proxy instances will pick up the new value without restart. diff --git a/docs/CONFIGURATION_SETTINGS.md b/docs/CONFIGURATION_SETTINGS.md index 1bc94af9..96a19ed4 100644 --- a/docs/CONFIGURATION_SETTINGS.md +++ b/docs/CONFIGURATION_SETTINGS.md @@ -27,9 +27,9 @@ | Setting | Property Name | |---------|---------------| -| Console | `LogConsole` | -| ConsoleEvent | `LogConsoleEvent` | -| Poller | `LogPoller` | +| LogToConsole | `LogToConsole` | +| LogToEvents | `LogToEvents` | +| LogToAI | `LogToAI` | | Probes | `LogProbes` | | Headers | `LogHeaders` | | AllRequestHeaders | `LogAllRequestHeaders` | @@ -140,11 +140,13 @@ ## [COLD] Settings - Require restart but can use rolling update -### Async.BlobStorage +### Async | Setting | Property Name | |---------|---------------| -| WorkerCount | `AsyncBlobWorkerCount` | +| BlobStorageConfig | `AsyncBlobStorageConfig` | +| SBConfig | `AsyncSBConfig` | +| BlobWorkerCount | `AsyncBlobWorkerCount` | ### CircuitBreaker - Configured at startup @@ -188,13 +190,38 @@ |---------|---------------| | Timeout | `Timeout` (HttpClient timeout) | +### Security + +| Setting | Property Name | +|---------|---------------| +| IgnoreSSLCert | `IgnoreSSLCert` | + ### Server | Setting | Property Name | |---------|---------------| +| MaxEvents | `MaxEvents` | | TerminationGracePeriodSeconds | `TerminationGracePeriodSeconds` | | TrackWorkers | `TrackWorkers` | +### Logging - Configured at startup + +| Setting | Property Name | +|---------|---------------| +| Level | `LogLevel` | +| EventLoggers | `EventLoggers` | +| EventHeaders | `EventHeaders` | +| LogFileName | `LogFileName` | + +### EventHub - Configured at startup + +| Setting | Property Name | +|---------|---------------| +| ConnectionString | `EventHubConnectionString` | +| Name | `EventHubName` | +| Namespace | `EventHubNamespace` | +| StartupSeconds | `EventHubStartupSeconds` | + --- ## [PARTIAL] Settings - Mixed restart requirements @@ -220,3 +247,6 @@ | UniqueHeaders | `UniqueUserHeaders` | [WARM] | | SuspendedConfigUrl | `SuspendedUserConfigUrl` | [WARM] | | UseProfiles | `UseProfiles` | [COLD] | +| UserConfigRequired | `UserConfigRequired` | [COLD] | +| UserConfigRefreshIntervalSecs | `UserConfigRefreshIntervalSecs` | [COLD] | +| UserSoftDeleteTTLMinutes | `UserSoftDeleteTTLMinutes` | [COLD] | diff --git a/docs/ENVIRONMENT_VARIABLES.md b/docs/ENVIRONMENT_VARIABLES.md index 6df20e94..705b9447 100644 --- a/docs/ENVIRONMENT_VARIABLES.md +++ b/docs/ENVIRONMENT_VARIABLES.md @@ -33,7 +33,7 @@ For production deployments, consider also configuring: | Variable | Type | Description | Default | | ----------------------------- | ---- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------- | -| **MaxQueueLength** | int | Sets the maximum number of requests allowed in the queue. | 10 | +| **MaxQueueLength** | int | Sets the maximum number of requests allowed in the queue. | 1000 | | **Port** | int | The port on which SimpleL7Proxy listens for incoming traffic. | 80 | | **TERMINATION_GRACE_PERIOD_SECONDS** | int | The number of seconds SimpleL7Proxy waits before forcing itself to shut down. | 30 | | **Workers** | int | The number of worker threads used to process incoming proxy requests. | 10 | @@ -51,6 +51,9 @@ For production deployments, consider also configuring: | ----------------------------- | ---- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------- | | **SuspendedUserConfigUrl** | string | URL or file path to fetch the list of suspended users. | file:config.json | | **UseProfiles** | bool | If true, enables user profile functionality for custom handling based on user profiles. | false | +| **UserConfigRequired** | bool | If true, a valid user profile must be found for the request to proceed. Requires restart. | false | +| **UserConfigRefreshIntervalSecs** | int | Interval in seconds between user configuration refreshes. Requires restart. | 3600 (1 hour) | +| **UserSoftDeleteTTLMinutes** | int | Time in minutes before a soft-deleted user profile is permanently removed. Requires restart. | 360 (6 hours) | | **UserConfigUrl** | string | URL or file path to fetch user configuration data. | file:config.json | | **UserPriorityThreshold** | float | Floating point threshold (0.0-1.0) for user priority calculations. If a user owns more than this percentage of requests, their priority is lowered to prevent monopolization. For details, see [Advanced Configuration](ADVANCED_CONFIGURATION.md#user-governance). | 0.1 | | **ValidateAuthAppFieldName** | string | Name of the field in the authentication payload to validate as the App ID. | authAppID | @@ -101,9 +104,11 @@ For production deployments, consider also configuring: | **LOGTOFILE** | bool | **Legacy.** When **EVENT_LOGGERS** is not set: `true` enables file logging, `false` enables EventHub logging. Prefer **EVENT_LOGGERS** for new deployments. | false | | **LogHeaders** | string | Comma-separated list of specific headers to log for debugging. | (empty) | | **LogProbes** | bool | If true, logs details about health probe requests to backends. | false | -| **LogConsoleEvent** | bool | If true, logs events to the console output. | true | -| **LogConsole** | bool | Enables general console logging. | true | -| **LogPoller** | bool | Enables logging for the backend poller. | true | +| **LogToConsole** | string list | Comma-separated list of log categories to write to the console. Use `*` for all categories. | * | +| **LogToEvents** | string list | Comma-separated list of log categories to send to event loggers (EventHub, file). | async,backend,circuitbreaker,custom,exception,profile,proxy,enqueued,auth | +| **LogToAI** | string list | Comma-separated list of log categories to send to Application Insights. Use `*` for all categories. | * | +| **LOG_LEVEL** | string | Minimum logging level (e.g., `Trace`, `Debug`, `Information`, `Warning`, `Error`, `Critical`). | Information | +| **EVENT_HEADERS** | string | Fully-qualified class name for event header enrichment. | SimpleL7Proxy.Events.CommonEventHeaders | | **StorageDbContainerName** | string | Container name for request storage if enabled. | Requests | | **StorageDbEnabled** | bool | Enables archiving requests to storage. | false | | **RequestIDPrefix** | string | The prefix appended to every request ID. | S7P | @@ -111,21 +116,23 @@ For production deployments, consider also configuring: ## Async Processing Variables | Variable | Type | Description | Default | -| -------BlobStorageAccountUri**| string | Uri for Blob Storage (overrides AsyncBlobStorageConfig). | (empty) | -| **AsyncBlobStorageUseMI** | bool | Use Managed Identity for Blob Storage. | false | +| ----------------------------- | ---- | ------------------------------------------------------------------------------------------------------------------- | ---------------------------------------- | +| **AsyncBlobStorageConfig** | string | Composite connection string for Azure Blob Storage. Format: `uri=,mi=`. Parsed into `AsyncBlobStorageAccountUri` and `AsyncBlobStorageUseMI`. | uri=https://mystorageaccount.blob.core.windows.net,mi=true | +| **AsyncBlobStorageAccountUri**| string | URI for Blob Storage (parsed from AsyncBlobStorageConfig). | (empty) | +| **AsyncBlobStorageConnectionString** | string | Connection string for Azure Blob Storage (parsed from AsyncBlobStorageConfig). | example-connection-string | +| **AsyncBlobStorageUseMI** | bool | Use Managed Identity for Blob Storage (parsed from AsyncBlobStorageConfig). | false | | **AsyncBlobWorkerCount** | int | Number of workers for async blob processing. | 2 | -| **AsyncClientConfigFieldName** | string | User profile field name that designates if the client configuration. It contains enabled, containername, topic, timeout. | async-config | +| **AsyncClientConfigFieldName** | string | User profile field name that designates the client configuration. It contains enabled, containername, topic, timeout. | async-config | | **AsyncClientRequestHeader** | string | Header indicating async mode is requested. | AsyncMode | -| **AsyncSBConnectionString** | string | Azure Service Bus connection string for async operations. | example-sb-connection-string | -| **AsyncSBNamespace** | string | Service Bus namespace (overrides AsyncSBConfig). | (empty) | -| **AsyncSBQueue** | string | Service Bus queue name (overrides AsyncSBConfig). | (empty) | -| **AsyncSBUseMI** | bool | Use Managed Identity for Service Bus. | false | +| **AsyncModeEnabled** | bool | Enables or disables async processing mode. Requires restart. | false | +| **AsyncSBConfig** | string | Composite connection string for Azure Service Bus. Format: `cs=,ns=,q=,mi=`. Parsed into individual SB settings. | cs=example-sb-connection-string,ns=example-namespace,q=requeststatus,mi=false | +| **AsyncSBConnectionString** | string | Azure Service Bus connection string (parsed from AsyncSBConfig). | example-sb-connection-string | +| **AsyncSBNamespace** | string | Service Bus namespace (parsed from AsyncSBConfig). | (empty) | +| **AsyncSBQueue** | string | Service Bus queue name (parsed from AsyncSBConfig). | (empty) | +| **AsyncSBUseMI** | bool | Use Managed Identity for Service Bus (parsed from AsyncSBConfig). | false | +| **AsyncTimeout** | int | Timeout in milliseconds for async operations. The maximum amount of time an async request will run for. | 1800000 (30 min) | +| **AsyncTriggerTimeout** | int | Timeout for async trigger operations in ms. | 10000 | | **AsyncTTLSecs** | int | TTL for async requests in seconds. | 86400 (24 hours) | -| **AsyncTriggerTimeout** | int | Timeout for async trigger operations in ms. | 10000 | false | -| **AsyncTimeout** | int | Timeout in milliseconds for async operations. The maximum amount of time async request will run for. | 1800000 (30 min) | -| **AsyncBlobStorageConnectionString** | string | Connection string for Azure Blob Storage used in async mode. | example-connection-string | -| **AsyncClientConfigFieldName** | string | User profile field name that designates if the client configuration. It contains enabled, containername, topic, timeout. | async-config | -| **AsyncSBConnectionString** | string | Azure Service Bus connection string for async operations. | example-sb-connection-string | ## Connection Management Variables @@ -146,7 +153,7 @@ For production deployments, consider also configuring: | Variable | Type | Description | Default | | ----------------------------- | ---- | ------------------------------------------------------------------------------------------------------------------- | ---------------------------------------- | -| **AcceptableStatusCodes** | int array | The list of HTTP status codes considered successful. If a host returns a code not in this list, it's deemed a failure. | 200, 401, 403, 404, 408, 410, 412, 417, 400 | +| **AcceptableStatusCodes** | int array | The list of HTTP status codes considered successful. If a host returns a code not in this list, it's deemed a failure. | 200, 202, 401, 403, 404, 408, 410, 412, 417, 400 | | **APPENDHOSTSFILE / AppendHostsFile** | bool | If true, appends host/IP pairs to /etc/hosts for DNS resolution. Both case variants are supported. | false | | **CBErrorThreshold** | int | The error threshold percentage for the circuit breaker. If the error rate surpasses this value in **CBTimeslice** time period, the circuit breaks. | 50 | | **CBTimeslice** | int | The duration (in seconds) of the sampling window for the circuit breaker's error rate. | 60 | @@ -165,6 +172,10 @@ For production deployments, consider also configuring: | **Timeout** | int | Connection timeout (in milliseconds) for each backend request. If exceeded, SimpleL7Proxy tries the next available host. | 1200000 (20 mins) | | **UseOAuth** | bool | Enables or disables OAuth token fetching for outgoing requests. | false | | **UseOAuthGov** | bool | If true, uses the government cloud OAuth endpoint for token acquisition. | false | +| **UseSharedIterators** | bool | When true, requests to the same path share the same host iterator for fair round-robin distribution. | false | +| **SharedIteratorTTLSeconds** | int | How long (in seconds) an unused shared iterator lives before cleanup. | 60 | +| **SharedIteratorCleanupIntervalSeconds** | int | How often (in seconds) to run cleanup of stale shared iterators. | 30 | +| **MaxEvents** | int | Maximum number of events the proxy can store in memory. | 100000 | ## User Profile Configuration diff --git a/docs/OBSERVABILITY.md b/docs/OBSERVABILITY.md index 307f0981..c915db18 100644 --- a/docs/OBSERVABILITY.md +++ b/docs/OBSERVABILITY.md @@ -9,18 +9,68 @@ Data is emitted to the following configured sinks: 3. **Local Log File**: JSON event log for debugging/testing. Include `file` in `EVENT_LOGGERS` and optionally set `LOGFILE_NAME`. 4. **Console/Stdout**: For container logging and local debugging. -Event Hubs and Local Log File are **sibling backends** managed by the `CompositeEventClient` — they can run simultaneously. Set `EVENT_LOGGERS=file,eventhub` to enable both. Each backend self-registers on successful startup; if one fails (e.g., EventHub timeout), the others continue unaffected. +--- + +## Event Logging Architecture + +Every proxied request produces a `ProxyEvent` — a key/value dictionary containing request metadata, timing, token usage, and other dimensions. These events are serialized to JSON and dispatched to one or more **event logger backends** via the `CompositeEventClient` fan-out. + +### How It Works + +``` +ProxyEvent.SendEvent() + └── CompositeEventClient.SendData(json) + ├── LogFileEventClient.SendData(json) ← if "file" enabled + ├── EventHubClient.SendData(json) ← if "eventhub" enabled + └── CustomLogger.SendData(json) ← if custom type enabled +``` + +1. `ProxyEvent` collects per-request data (status, duration, tokens, headers, etc.). +2. Common fields (version, revision, container name) are injected from **Event Headers** (`ICommonEventData`). +3. The event is serialized and passed to `CompositeEventClient.SendData()`. +4. `CompositeEventClient` iterates a `FrozenDictionary` snapshot of registered backends — zero-lock, zero-allocation on the hot path. +5. Each backend buffers events in a `ConcurrentQueue` and flushes them asynchronously on a background loop. + +### Configuration Reference + +| Environment Variable | Default | Description | +| :--- | :--- | :--- | +| `EVENT_LOGGERS` | `file` | Comma-separated list of backends to enable: `file`, `eventhub`, or a fully-qualified type name. | +| `LOGFILE_NAME` | `eventslog.json` | Output path for the local file logger. | +| `LOGTOFILE` | `false` | **(Legacy)** When `EVENT_LOGGERS` is not set, `true` → `file`, `false` → `eventhub`. | +| `EVENTHUB_CONNECTIONSTRING` | — | Connection string for Azure Event Hubs. | +| `EVENTHUB_NAMESPACE` | — | Event Hub namespace for managed-identity auth (alternative to connection string). | +| `EVENTHUB_NAME` | — | The specific Event Hub name to write to. | +| `EVENT_HEADERS` | `SimpleL7Proxy.Events.CommonEventHeaders` | Fully-qualified type name of the `ICommonEventData` implementation that supplies default event fields. | + +### Built-in Backends + +**Local Log File** (`file`) +- Writes JSON lines to disk via `LogFileEventClient`. +- Buffers events in a `ConcurrentQueue` and flushes batches of up to 99 events every 500 ms using a reusable `StringBuilder`. +- Suitable for local debugging and testing. + +**Azure Event Hubs** (`eventhub`) +- Streams events to Azure Event Hubs via `EventHubClient`. +- Supports connection-string auth or managed-identity auth (`EVENTHUB_NAMESPACE`). +- Batches events using the Event Hubs SDK `EventDataBatch` for throughput. +- If the Event Hub connection fails at startup, the backend is silently disabled and other backends continue. + +Both backends are **sibling sinks** managed by `CompositeEventClient` — they can run simultaneously. Set `EVENT_LOGGERS=file,eventhub` to enable both. Each backend self-registers on successful startup; if one fails (e.g., EventHub timeout), the others continue unaffected. + +--- ## Custom Event Loggers -Besides the built-in `file` and `eventhub` backends, you can create your own logger by implementing `IEventClient` and `IHostedService` in the `SimpleL7Proxy` assembly. +Besides the built-in `file` and `eventhub` backends, you can create your own event logger by implementing `IEventClient` and `IHostedService` in the `SimpleL7Proxy` assembly. ### Steps -1. Create a class that implements both interfaces. -2. Accept `CompositeEventClient` and `ILogger` in the constructor (DI resolves them automatically). -3. In `StartAsync`, perform any setup, then call `_composite.Add(this)` to register. -4. Reference it by fully-qualified name in `EVENT_LOGGERS`. +1. Create a class that implements both `IEventClient` and `IHostedService`. +2. Accept `CompositeEventClient` (and any other DI services) in the constructor. +3. In `StartAsync`, perform setup and call `_composite.Add(this)` to register with the fan-out. +4. In `SendData`, process or forward the JSON event string. +5. Reference the class by its **fully-qualified type name** in `EVENT_LOGGERS`. ### Example @@ -67,7 +117,69 @@ public class ConsoleEventLogger : IEventClient, IHostedService EVENT_LOGGERS=file,SimpleL7Proxy.Events.ConsoleEventLogger ``` -> **Note:** Only types within the `SimpleL7Proxy` assembly are resolved. External assemblies cannot be loaded via `EVENT_LOGGERS` for security. +The proxy uses `ActivatorUtilities.CreateInstance` to construct custom loggers, so all constructor dependencies are resolved from DI automatically. If instantiation fails, the error is logged and the remaining loggers continue. + +> **Security:** Only types within the `SimpleL7Proxy` assembly are resolved. External assemblies cannot be loaded via `EVENT_LOGGERS`. + +--- + +## Custom Event Headers + +Every event includes a set of **default fields** (version, revision, container name) that are injected by an `ICommonEventData` implementation. You can customize these fields by providing your own class. + +### Default Behaviour + +The built-in `CommonEventHeaders` class produces: + +| Field | Source | +| :--- | :--- | +| `Ver` | `Constants.VERSION` (build-time) | +| `Revision` | `BackendOptions.Revision` | +| `ContainerApp` | `BackendOptions.ContainerApp` | + +### Creating a Custom Implementation + +1. Create a class that implements `ICommonEventData`. +2. Accept `IOptions` in the constructor. +3. Return a `FrozenDictionary` from `DefaultEventData()`. +4. Set `EVENT_HEADERS` to the fully-qualified type name. + +```csharp +using System.Collections.Frozen; +using Microsoft.Extensions.Options; +using SimpleL7Proxy.Config; + +namespace SimpleL7Proxy.Events; + +public class MyCustomEventHeaders : ICommonEventData +{ + private readonly FrozenDictionary _data; + + public MyCustomEventHeaders(IOptions options) + { + var bo = options.Value; + _data = new Dictionary + { + ["Ver"] = Constants.VERSION, + ["Revision"] = bo.Revision, + ["ContainerApp"] = bo.ContainerApp, + ["Region"] = Environment.GetEnvironmentVariable("AZURE_REGION") ?? "unknown", + ["Tenant"] = Environment.GetEnvironmentVariable("TENANT_ID") ?? "default" + }.ToFrozenDictionary(); + } + + public FrozenDictionary DefaultEventData() => _data; +} +``` + +**Usage:** +``` +EVENT_HEADERS=SimpleL7Proxy.Events.MyCustomEventHeaders +``` + +### Fallback Behaviour + +If the configured `EVENT_HEADERS` type cannot be found, does not implement `ICommonEventData`, or throws during construction, the proxy logs a warning and falls back to `CommonEventHeaders` automatically. The proxy will always start successfully regardless of event header misconfiguration. ## AI Token Metrics (Streaming) Standard gateways cannot count tokens in streaming responses (Server-Sent Events/SSE) because the "usage" field is often only sent in the final chunk, or requires aggregating chunks. diff --git a/src/SimpleL7Proxy/Backend/BackendTokenProvider.cs b/src/SimpleL7Proxy/Backend/BackendTokenProvider.cs index 0092bcca..563620a2 100644 --- a/src/SimpleL7Proxy/Backend/BackendTokenProvider.cs +++ b/src/SimpleL7Proxy/Backend/BackendTokenProvider.cs @@ -2,9 +2,6 @@ using System.Threading; using System.Threading.Tasks; using Azure.Core; -using Azure.Identity; -using Microsoft.ApplicationInsights; -using Microsoft.Extensions.Options; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using SimpleL7Proxy.Config; @@ -19,14 +16,14 @@ public class BackendTokenProvider : IHostedService private readonly HashSet _audiences = new(); private readonly Dictionary _refreshTasks = new(); private static CancellationToken _cancellationToken = CancellationToken.None; - private readonly BackendOptions _options; + private readonly DefaultCredential _defaultCredential; private readonly ILogger _logger; public BackendTokenProvider( - IOptions backendOptions, + DefaultCredential defaultCredential, ILogger logger) { - _options = backendOptions.Value; + _defaultCredential = defaultCredential; _logger = logger; } @@ -78,12 +75,7 @@ public void StartTokenRefresh() private void StartAudienceRefreshTask(string audience) { if (_refreshTasks.ContainsKey(audience)) return; - var options = new DefaultAzureCredentialOptions(); - if (_options.UseOAuthGov == true) - { - options.AuthorityHost = AzureAuthorityHosts.AzureGovernment; - } - var credential = new DefaultAzureCredential(options); + var credential = _defaultCredential.Credential; var refreshTask = Task.Run(async () => { try diff --git a/src/SimpleL7Proxy/Backend/Backends.cs b/src/SimpleL7Proxy/Backend/Backends.cs index 0d2b9d81..08683837 100644 --- a/src/SimpleL7Proxy/Backend/Backends.cs +++ b/src/SimpleL7Proxy/Backend/Backends.cs @@ -1,10 +1,8 @@ using System.Diagnostics; using Microsoft.Extensions.Options; using System.Text; -using Azure.Identity; using Azure.Core; using System.Text.Json; -using Microsoft.ApplicationInsights; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Hosting; using System.Threading; @@ -338,13 +336,16 @@ private async Task GetHostStatus(BaseHostHealth host, HttpClient client) //"S7P-Uri Format Exception"; _probeEvent["Code"] = "-"; } - catch (System.Threading.Tasks.TaskCanceledException e) + catch (System.Threading.Tasks.TaskCanceledException) { - // WriteOutput($"Poller: Host Timeout: {host.host}"); - _probeEvent.Type = EventType.Exception; - _probeEvent.Exception = e; - _probeEvent["Code"] = "-"; + // Probe timeout is a normal operational signal — host is slow/down. + // Not an exception; the circuit breaker handles it via AddCallSuccess(false). + _probeEvent.Type = EventType.Poller; + _probeEvent["Code"] = "Timeout"; _probeEvent["Timeout"] = client.Timeout.TotalMilliseconds.ToString(); + + + _circuitBreaker.TrackStatus(408, true, "ProbeTimeout"); } catch (HttpRequestException e) { diff --git a/src/SimpleL7Proxy/Backend/BaseHostHealth.cs b/src/SimpleL7Proxy/Backend/BaseHostHealth.cs index e3df7042..47869f31 100644 --- a/src/SimpleL7Proxy/Backend/BaseHostHealth.cs +++ b/src/SimpleL7Proxy/Backend/BaseHostHealth.cs @@ -29,7 +29,8 @@ public abstract class BaseHostHealth protected BaseHostHealth(HostConfig config, ILogger logger) { Config = config ?? throw new ArgumentNullException(nameof(config)); - logger.LogInformation($"[CONFIG] ✓ {(config.DirectMode ? "Direct" : "APIM")} host registered: {config.Host} Path: {config.PartialPath} Probe: {config.ProbePath}"); + logger.LogInformation("[HOST-MANAGER] ✓ {Mode} host registered: {Host} | Path: {Path} | Probe: {Probe}", + config.DirectMode ? "Direct" : "APIM", config.Host, config.PartialPath, config.ProbePath); } public override string ToString() diff --git a/src/SimpleL7Proxy/Backend/CircuitBreaker.cs b/src/SimpleL7Proxy/Backend/CircuitBreaker.cs index ae396d2d..87ffe558 100644 --- a/src/SimpleL7Proxy/Backend/CircuitBreaker.cs +++ b/src/SimpleL7Proxy/Backend/CircuitBreaker.cs @@ -24,6 +24,7 @@ public class CircuitBreaker : ICircuitBreaker // Instance state tracking private bool _isCurrentlyBlocked = false; + private bool _isDeregistered = false; public string ID { get; set; } = ""; @@ -125,6 +126,27 @@ public bool CheckFailedStatus() return isCurrentlyFailed; } + /// + /// Removes this instance from the global circuit-breaker counters. + /// Safe to call multiple times — only the first call has any effect. + /// + public void Deregister() + { + if (_isDeregistered) return; + _isDeregistered = true; + + Interlocked.Decrement(ref _totalCircuitBreakersCount); + + if (_isCurrentlyBlocked) + { + _isCurrentlyBlocked = false; + Interlocked.Decrement(ref _blockedCircuitBreakersCount); + } + + _logger.LogDebug("[CB] Circuit breaker {ID} deregistered. Total: {Total}, Blocked: {Blocked}", + ID, _totalCircuitBreakersCount, _blockedCircuitBreakersCount); + } + /// /// Checks if all circuit breakers globally are in a failed state /// diff --git a/src/SimpleL7Proxy/Backend/HostCollectionManager.cs b/src/SimpleL7Proxy/Backend/HostCollectionManager.cs index e98657ad..827224df 100644 --- a/src/SimpleL7Proxy/Backend/HostCollectionManager.cs +++ b/src/SimpleL7Proxy/Backend/HostCollectionManager.cs @@ -9,7 +9,7 @@ namespace SimpleL7Proxy.Backend; /// /// Singleton manager that owns the authoritative host list. /// Reads are lock-free (volatile snapshot reference). -/// Writes (CRUD) take a lock, build a new snapshot, and atomically swap. +/// Writes build a new snapshot and atomically swap via volatile reference. /// Old snapshots remain valid for any in-flight workers holding a reference. /// /// Startup flow: @@ -20,7 +20,6 @@ namespace SimpleL7Proxy.Backend; /// public sealed class HostCollectionManager : IHostHealthCollection { - private readonly object _writeLock = new(); private volatile HostCollectionSnapshot _current; private HostCollectionSnapshot? _pending; private List? _stagedConfigs; @@ -47,13 +46,11 @@ public void StageHost(HostConfig config) { ArgumentNullException.ThrowIfNull(config, nameof(config)); - lock (_writeLock) - { - _stagedConfigs ??= []; - _stagedConfigs.Add(config); - _logger.LogDebug("[HOST-MANAGER] Staged host: {Host} ({Count} staged)", - config.Host, _stagedConfigs.Count); - } + config.FreezeHash(); + _stagedConfigs ??= []; + _stagedConfigs.Add(config); + _logger.LogDebug("[HOST-MANAGER] Staged host: {Host} ({Count} staged)", + config.Host, _stagedConfigs.Count); } /// @@ -64,105 +61,211 @@ public void LoadFromConfig(IEnumerable hostConfigs) { ArgumentNullException.ThrowIfNull(hostConfigs, nameof(hostConfigs)); - lock (_writeLock) - { - _version++; - _pending = HostCollectionSnapshot.Build(hostConfigs, _logger, _version); - _logger.LogInformation("[HOST-MANAGER] Pending snapshot built (v{Version}, {Count} hosts)", - _version, _pending.Hosts.Count); - } + _version++; + _pending = HostCollectionSnapshot.Build(hostConfigs, _logger, _version); + _logger.LogInformation("[HOST-MANAGER] Pending snapshot built (v{Version}, {Count} hosts)", + _version, _pending.Hosts.Count); } /// /// Atomically swaps the pending snapshot in as Current. - /// If hosts were staged via , builds the snapshot from them. + /// If hosts were staged via , deduplicates them by + /// , activates only unique configs + /// (creating circuit breakers), and builds the snapshot. /// If was used, uses the pre-built pending snapshot. /// Freezes the snapshot before activation. /// public void Activate() { - lock (_writeLock) + // Build from staged configs if present + if (_stagedConfigs != null && _stagedConfigs.Count > 0) { - // Build from staged configs if present - if (_stagedConfigs != null && _stagedConfigs.Count > 0) + var uniqueConfigs = DeduplicateStagedConfigs(_stagedConfigs); + _stagedConfigs = null; + + // Compare staged hashes against current snapshot — skip rebuild if identical + if (MatchesCurrentSnapshot(uniqueConfigs)) { - _version++; - _pending = HostCollectionSnapshot.Build(_stagedConfigs, _logger, _version); - _stagedConfigs = null; + _logger.LogDebug("[HOST-MANAGER] Staged hosts match current snapshot — no changes, skipping activation"); + return; } - if (_pending == null) + // Activate circuit breakers only for configs that survived dedup + _logger.LogInformation("[HOST-MANAGER] Activating {Count} host(s)...", uniqueConfigs.Count); + foreach (var config in uniqueConfigs) { - _logger.LogWarning("[HOST-MANAGER] Activate() called with no pending snapshot or staged hosts"); - return; + config.Activate(); } - _pending.Freeze(); + _version++; + _pending = HostCollectionSnapshot.Build(uniqueConfigs, _logger, _version); + } - var oldVersion = _current.Version; - _current = _pending; - _pending = null; + if (_pending == null) + { + _logger.LogWarning("[HOST-MANAGER] Activate() called with no pending snapshot or staged hosts"); + return; + } - _logger.LogInformation("[HOST-MANAGER] ✓ Snapshot activated (v{OldVersion} → v{NewVersion}, {Count} hosts)", - oldVersion, _current.Version, _current.Hosts.Count); + _pending.Freeze(); - IteratorFactory.InvalidateCache(); - } + // Mark hosts from the old snapshot that are not in the new one for spin-down + var oldSnapshot = _current; + SpinDownRetiredHosts(oldSnapshot, _pending); + + var oldVersion = _current.Version; + _current = _pending; + _pending = null; + + _logger.LogInformation("[HOST-MANAGER] ✓ Snapshot activated (v{OldVersion} → v{NewVersion}, {Count} hosts)", + oldVersion, _current.Version, _current.Hosts.Count); + + IteratorFactory.InvalidateCache(); } - /// - public BaseHostHealth AddHost(HostConfig config) + /// + /// Removes duplicate staged configs based on . + /// Keeps the first occurrence of each unique configuration. + /// Configs without a frozen hash are always kept (treated as unique). + /// + private List DeduplicateStagedConfigs(List configs) { - ArgumentNullException.ThrowIfNull(config, nameof(config)); + var seen = new HashSet(StringComparer.Ordinal); + var unique = new List(configs.Count); + var dupCount = 0; - lock (_writeLock) + foreach (var config in configs) { - // Create the new host health instance - BaseHostHealth host; - if (config.DirectMode || string.IsNullOrEmpty(config.ProbePath) || config.ProbePath == "/") + var hash = config.FrozenHash; + if (hash == null || seen.Add(hash)) { - host = new NonProbeableHostHealth(config, _logger); + unique.Add(config); } else { - host = new ProbeableHostHealth(config, _logger); + dupCount++; + _logger.LogWarning("[HOST-MANAGER] Duplicate host config skipped: {Host} (hash {Hash})", + config.Host, hash); } + } - // Build new list including the new host - var newHosts = new List(_current.Hosts) { host }; - _version++; - _current = HostCollectionSnapshot.BuildFromHosts(newHosts, _version, _logger); + if (dupCount > 0) + { + _logger.LogInformation("[HOST-MANAGER] Dedup: {Original} staged → {Unique} unique ({Dups} duplicates removed)", + configs.Count, unique.Count, dupCount); + } + + return unique; + } + + /// + /// Returns true when the staged configs have the exact same set of + /// frozen hashes as the current active snapshot (same count, same hashes). + /// + private bool MatchesCurrentSnapshot(List stagedConfigs) + { + var currentHosts = _current.Hosts; + if (stagedConfigs.Count != currentHosts.Count) + return false; + + // Build a bag of current hashes (multiset comparison — order doesn't matter) + var currentHashes = new Dictionary(currentHosts.Count, StringComparer.Ordinal); + foreach (var host in currentHosts) + { + var hash = host.Config.FrozenHash; + if (hash == null) return false; // unhashed → can't compare, treat as changed + currentHashes[hash] = currentHashes.GetValueOrDefault(hash) + 1; + } + + foreach (var config in stagedConfigs) + { + var hash = config.FrozenHash; + if (hash == null) return false; + if (!currentHashes.TryGetValue(hash, out var count) || count == 0) + return false; + currentHashes[hash] = count - 1; + } + + return true; + } - _logger.LogInformation("[CRUD] ✓ Host added: {Host} (v{Version}, total: {Count})", - config.Host, _version, _current.Hosts.Count); + /// + /// Marks hosts in the old snapshot that are absent from the new snapshot + /// for graceful spin-down via . + /// Comparison uses so only config + /// changes are detected — identity (Guid) differences are ignored. + /// + private void SpinDownRetiredHosts(HostCollectionSnapshot oldSnapshot, HostCollectionSnapshot newSnapshot) + { + if (oldSnapshot.Hosts.Count == 0) return; - IteratorFactory.InvalidateCache(); - return host; + // Build the set of hashes in the new snapshot + var newHashes = new HashSet(newSnapshot.Hosts.Count, StringComparer.Ordinal); + foreach (var host in newSnapshot.Hosts) + { + var hash = host.Config.FrozenHash; + if (hash != null) newHashes.Add(hash); + } + + foreach (var oldHost in oldSnapshot.Hosts) + { + var hash = oldHost.Config.FrozenHash; + if (hash != null && newHashes.Contains(hash)) + continue; + + oldHost.Config.SpinDown(); + _logger.LogInformation("[HOST-MANAGER] Host spinning down: {Host} (removed from active snapshot)", + oldHost.Config.Host); } } /// - public bool RemoveHost(Guid hostId) + public BaseHostHealth AddHost(HostConfig config) { - lock (_writeLock) + ArgumentNullException.ThrowIfNull(config, nameof(config)); + + // Create the new host health instance + BaseHostHealth host; + if (config.DirectMode || string.IsNullOrEmpty(config.ProbePath) || config.ProbePath == "/") { - var existing = _current.Hosts.FirstOrDefault(h => h.guid == hostId); - if (existing == null) - { - _logger.LogWarning("[CRUD] Host not found for removal: {HostId}", hostId); - return false; - } + host = new NonProbeableHostHealth(config, _logger); + } + else + { + host = new ProbeableHostHealth(config, _logger); + } - var newHosts = _current.Hosts.Where(h => h.guid != hostId).ToList(); - _version++; - _current = HostCollectionSnapshot.BuildFromHosts(newHosts, _version, _logger); + // Build new list including the new host + var newHosts = new List(_current.Hosts) { host }; + _version++; + _current = HostCollectionSnapshot.BuildFromHosts(newHosts, _version, _logger); - _logger.LogInformation("[CRUD] ✓ Host removed: {Host} (v{Version}, total: {Count})", - existing.Host, _version, _current.Hosts.Count); + _logger.LogInformation("[CRUD] ✓ Host added: {Host} (v{Version}, total: {Count})", + config.Host, _version, _current.Hosts.Count); + + IteratorFactory.InvalidateCache(); + return host; + } - IteratorFactory.InvalidateCache(); - return true; + /// + public bool RemoveHost(Guid hostId) + { + var existing = _current.Hosts.FirstOrDefault(h => h.guid == hostId); + if (existing == null) + { + _logger.LogWarning("[CRUD] Host not found for removal: {HostId}", hostId); + return false; } + + var newHosts = _current.Hosts.Where(h => h.guid != hostId).ToList(); + _version++; + _current = HostCollectionSnapshot.BuildFromHosts(newHosts, _version, _logger); + + _logger.LogInformation("[CRUD] ✓ Host removed: {Host} (v{Version}, total: {Count})", + existing.Host, _version, _current.Hosts.Count); + + IteratorFactory.InvalidateCache(); + return true; } /// @@ -170,28 +273,25 @@ public bool UpdateHost(Guid hostId, Action mutate) { ArgumentNullException.ThrowIfNull(mutate, nameof(mutate)); - lock (_writeLock) + var existing = _current.Hosts.FirstOrDefault(h => h.guid == hostId); + if (existing == null) { - var existing = _current.Hosts.FirstOrDefault(h => h.guid == hostId); - if (existing == null) - { - _logger.LogWarning("[CRUD] Host not found for update: {HostId}", hostId); - return false; - } + _logger.LogWarning("[CRUD] Host not found for update: {HostId}", hostId); + return false; + } - // Apply the mutation - mutate(existing.Config); + // Apply the mutation + mutate(existing.Config); - // Re-categorize (host may have moved between specific-path and catch-all) - _version++; - _current = HostCollectionSnapshot.BuildFromHosts( - new List(_current.Hosts), _version, _logger); + // Re-categorize (host may have moved between specific-path and catch-all) + _version++; + _current = HostCollectionSnapshot.BuildFromHosts( + new List(_current.Hosts), _version, _logger); - _logger.LogInformation("[CRUD] ✓ Host updated: {Host} (v{Version})", - existing.Host, _version); + _logger.LogInformation("[CRUD] ✓ Host updated: {Host} (v{Version})", + existing.Host, _version); - IteratorFactory.InvalidateCache(); - return true; - } + IteratorFactory.InvalidateCache(); + return true; } } diff --git a/src/SimpleL7Proxy/Backend/HostCollectionSnapshot.cs b/src/SimpleL7Proxy/Backend/HostCollectionSnapshot.cs index b787bd1d..f5260464 100644 --- a/src/SimpleL7Proxy/Backend/HostCollectionSnapshot.cs +++ b/src/SimpleL7Proxy/Backend/HostCollectionSnapshot.cs @@ -127,7 +127,7 @@ public static HostCollectionSnapshot Build( CategorizeHost(host, specificPathHosts, catchAllHosts); } - logger.LogCritical("[CONFIG] Host categorization complete: {SpecificCount} specific hosts, {CatchAllCount} catch-all hosts", + logger.LogInformation("[HOST-MANAGER] Categorized: {SpecificCount} specific-path, {CatchAllCount} catch-all", specificPathHosts.Count, catchAllHosts.Count); return new HostCollectionSnapshot(hosts, specificPathHosts, catchAllHosts, version, logger); diff --git a/src/SimpleL7Proxy/Backend/HostConfig.cs b/src/SimpleL7Proxy/Backend/HostConfig.cs index eae3059e..d156af70 100644 --- a/src/SimpleL7Proxy/Backend/HostConfig.cs +++ b/src/SimpleL7Proxy/Backend/HostConfig.cs @@ -2,9 +2,9 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; -using Microsoft.ApplicationInsights; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.DependencyInjection; @@ -22,7 +22,7 @@ public class HostConfig public static BackendTokenProvider? _tokenProvider; private static ILogger? _logger = Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance; private static IServiceProvider? _serviceProvider; - private readonly ICircuitBreaker _circuitBreaker; + private ICircuitBreaker? _circuitBreaker; public Guid Guid { get; } = Guid.NewGuid(); private ParsedConfig ParsedConfig { get; set; } public string Audience => ParsedConfig.Audience; @@ -46,19 +46,117 @@ public class HostConfig public string Url => ParsedConfig.Host; public string ProbeUrl { get; set; } - + private string? _frozenHash; + + /// + /// Computes a deterministic SHA256 hash over all configuration-relevant fields. + /// Two instances with identical configuration will + /// produce the same hash regardless of runtime state (Guid, circuit breaker, etc.). + /// + public string Hash() + { + // Order must stay stable — append every ParsedConfig field. + var sb = new StringBuilder(512); + sb.Append(Audience).Append('|'); + sb.Append(DirectMode).Append('|'); + sb.Append(Host).Append('|'); + sb.Append(Hostname).Append('|'); + sb.Append(IpAddr ?? string.Empty).Append('|'); + sb.Append(PartialPath).Append('|'); + sb.Append(ProbePath).Append('|'); + sb.Append(Processor ?? string.Empty).Append('|'); + sb.Append(StripPrefix).Append('|'); + sb.Append(UseOAuth).Append('|'); + sb.Append(UsesRetryAfter); + + Span hashBytes = stackalloc byte[SHA256.HashSizeInBytes]; + SHA256.HashData(Encoding.UTF8.GetBytes(sb.ToString()), hashBytes); + return Convert.ToHexStringLower(hashBytes); + } + + /// + /// Snapshots the current so it can be retrieved later + /// via without recomputing. + /// Call this once after construction when the configuration is final. + /// + public void FreezeHash() => _frozenHash = Hash(); + + /// + /// Returns the hash captured by , or null + /// if has not been called yet. + /// + public string? FrozenHash => _frozenHash; + + /// + /// Whether has been called (circuit breaker created). + /// + public bool IsActivated => _circuitBreaker is not null; + + /// + /// Whether this host has been marked for spin-down. + /// A spinning-down host is no longer part of the active snapshot but may + /// still be referenced by in-flight workers holding an old snapshot. + /// Once all in-flight work drains, the host can be reclaimed by GC. + /// + public bool IsSpinningDown { get; private set; } + + /// + /// UTC timestamp when was called, or null + /// if the host is still active. + /// + public DateTime? SpinDownTime { get; private set; } + + /// + /// Marks this host for graceful spin-down. The circuit breaker stops + /// accepting new status tracking, and the host will no longer be selected + /// for new requests. In-flight requests on old snapshots continue + /// to completion. + /// + public void SpinDown() + { + if (IsSpinningDown) return; + IsSpinningDown = true; + SpinDownTime = DateTime.UtcNow; + + // Remove this host's circuit breaker from global counters. + // Deregister() is itself idempotent, but we only reach here once + // because of the IsSpinningDown guard above. + _circuitBreaker?.Deregister(); + } + + /// + /// Creates the circuit breaker and other expensive runtime resources. + /// Call this after the dedup check — only for configs that will actually + /// be used. Safe to call multiple times (no-op after first call). + /// + public void Activate() + { + if (_circuitBreaker is not null) return; + + if (_serviceProvider == null) + throw new InvalidOperationException("HostConfig service provider not initialized. Call Initialize first."); + + _circuitBreaker = _serviceProvider.GetService() + ?? throw new InvalidOperationException("ICircuitBreaker service not registered in DI container."); + + _circuitBreaker.ID = ParsedConfig.Host; + } + + private ICircuitBreaker CircuitBreaker => + _circuitBreaker ?? throw new InvalidOperationException( + $"HostConfig for '{Host}' has not been activated. Call Activate() before using circuit breaker."); /// /// Tracks status for circuit breaker /// - public void TrackStatus(int code, bool wasFailure, string state) => _circuitBreaker.TrackStatus(code, wasFailure, state); + public void TrackStatus(int code, bool wasFailure, string state) => CircuitBreaker.TrackStatus(code, wasFailure, state); /// /// Checks if this host's circuit breaker is in failed state /// - public bool CheckFailedStatus() => _circuitBreaker.CheckFailedStatus(); + public bool CheckFailedStatus() => CircuitBreaker.CheckFailedStatus(); - public string GetCircuitBreakerStatusString() => _circuitBreaker.GetCircuitBreakerStatusString(); + public string GetCircuitBreakerStatusString() => CircuitBreaker.GetCircuitBreakerStatusString(); /// /// Initializes the HostConfig with required dependencies @@ -78,22 +176,11 @@ public static void Initialize(BackendTokenProvider tokenProvider, ILogger logger /// public HostConfig(string hostname, string? probepath = "", string? ip = null, string? audience = "") { - // Get CircuitBreaker instance from DI container - if (_serviceProvider == null) - throw new InvalidOperationException("HostConfig service provider not initialized. Call SetServiceProvider first."); - - _circuitBreaker = _serviceProvider.GetService() - ?? throw new InvalidOperationException("ICircuitBreaker service not registered in DI container."); - _logger?.LogDebug("[CONFIG] Configuring backend host: {hostname}", hostname); ParsedConfig = TryParseConfig(hostname, probepath, ip, audience); - // If host does not have a protocol, add one - string hostForUri = ParsedConfig.Host; - _circuitBreaker.ID = hostForUri; - // parse the host, protocol and port - Uri uri = new Uri(hostForUri); + Uri uri = new Uri(ParsedConfig.Host); Protocol = uri.Scheme; Port = uri.Port; @@ -223,7 +310,7 @@ private static ParsedConfig TryParseConfig(string hostname, string? probepath, s result.Hostname = new Uri(result.Host).Host; if (string.IsNullOrEmpty(result.Processor)) { - result.Processor = "Default"; + result.Processor = StreamProcessor.StreamProcessorFactory.DEFAULT_PROCESSOR; } } diff --git a/src/SimpleL7Proxy/Backend/ICircuitBreaker.cs b/src/SimpleL7Proxy/Backend/ICircuitBreaker.cs index 9461cd10..dafb8888 100644 --- a/src/SimpleL7Proxy/Backend/ICircuitBreaker.cs +++ b/src/SimpleL7Proxy/Backend/ICircuitBreaker.cs @@ -4,7 +4,14 @@ public interface ICircuitBreaker { public string ID { get; set; } void TrackStatus(int code, bool wasFailure, string state); + bool CheckFailedStatus(); + /// + /// Removes this instance from the global circuit-breaker counters. + /// Must be called exactly once when the owning host is retired. + /// + void Deregister(); + public string GetCircuitBreakerStatusString(); } \ No newline at end of file diff --git a/src/SimpleL7Proxy/Backend/Iterators/IteratorFactory.cs b/src/SimpleL7Proxy/Backend/Iterators/IteratorFactory.cs index 82ab2543..1612261e 100644 --- a/src/SimpleL7Proxy/Backend/Iterators/IteratorFactory.cs +++ b/src/SimpleL7Proxy/Backend/Iterators/IteratorFactory.cs @@ -108,8 +108,9 @@ private static (List hosts, string modifiedPath) FilterHostsByPa List catchAllHosts, string requestPath) { - // Evaluate all matches once + // Evaluate all matches once, excluding hosts marked for spin-down var matchedHosts = specificHosts + .Where(host => !host.Config.IsSpinningDown) .Select(host => (host, result: host.Config.SupportsPath(requestPath))) .Where(x => x.result.IsMatch) .ToList(); @@ -120,8 +121,9 @@ private static (List hosts, string modifiedPath) FilterHostsByPa return (matchedHosts.Select(x => x.host).ToList(), matchedHosts[0].result.StrippedPath); } - // No specific match - return catch-all hosts with original path - return (catchAllHosts, requestPath); + // No specific match - return catch-all hosts, excluding spinning-down ones + var activeCatchAll = catchAllHosts.Where(h => !h.Config.IsSpinningDown).ToList(); + return (activeCatchAll, requestPath); } diff --git a/src/SimpleL7Proxy/BlobStorage/BlobWriterFactory.cs b/src/SimpleL7Proxy/BlobStorage/BlobWriterFactory.cs index 9aa5835a..7a18ed79 100644 --- a/src/SimpleL7Proxy/BlobStorage/BlobWriterFactory.cs +++ b/src/SimpleL7Proxy/BlobStorage/BlobWriterFactory.cs @@ -1,6 +1,5 @@ using System.Reflection.Metadata.Ecma335; using Azure.Storage.Blobs; -using Azure.Identity; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -18,15 +17,18 @@ public interface IBlobWriterFactory public class BlobWriterFactory : IBlobWriterFactory { + private readonly DefaultCredential _defaultCredential; private readonly IOptionsMonitor _optionsMonitor; private readonly ILogger _logger; private readonly ILogger _nullBlobWriterLogger; public BlobWriterFactory( + DefaultCredential defaultCredential, IOptionsMonitor optionsMonitor, ILogger logger, ILogger nullBlobWriterLogger) { + _defaultCredential = defaultCredential; _optionsMonitor = optionsMonitor; _logger = logger; _nullBlobWriterLogger = nullBlobWriterLogger; @@ -86,7 +88,7 @@ private IBlobWriter CreateBlobWriterWithManagedIdentity(string storageAccountUri blobServiceUri = new Uri(storageAccountUri); // Use DefaultAzureCredential for managed identity - var credential = new DefaultAzureCredential(); + var credential = _defaultCredential.Credential; var blobServiceClient = new BlobServiceClient(blobServiceUri, credential); var blobWriter = new BlobWriter(blobServiceClient, _logger); blobWriter.UsesMI = true; // Set on BlobWriter, not BlobServiceClient diff --git a/src/SimpleL7Proxy/Config/AppConfigBootstrap.cs b/src/SimpleL7Proxy/Config/AppConfigBootstrap.cs index f749e381..aa131709 100644 --- a/src/SimpleL7Proxy/Config/AppConfigBootstrap.cs +++ b/src/SimpleL7Proxy/Config/AppConfigBootstrap.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.Logging; using Azure.Data.AppConfiguration; -using Azure.Identity; namespace SimpleL7Proxy.Config; @@ -20,10 +19,15 @@ public class AppConfigBootstrap private readonly string? _endpoint; private readonly string? _connectionString; private readonly string? _labelFilter; + private readonly BackendOptions _defaultOptions; + private readonly DefaultCredential _defaultCredential; - public AppConfigBootstrap(ILogger logger) + public AppConfigBootstrap(ILogger logger, BackendOptions backendOptions, DefaultCredential defaultCredential) { + _logger = logger; + _defaultOptions = backendOptions; + _defaultCredential = defaultCredential; _endpoint = Environment.GetEnvironmentVariable("AZURE_APPCONFIG_ENDPOINT"); _connectionString = Environment.GetEnvironmentVariable("AZURE_APPCONFIG_CONNECTION_STRING"); @@ -86,8 +90,10 @@ public void Start() { try { + var credential = _defaultCredential.Credential; + ConfigurationClient client = !string.IsNullOrEmpty(_endpoint) - ? new ConfigurationClient(new Uri(_endpoint), new DefaultAzureCredential()) + ? new ConfigurationClient(new Uri(_endpoint), credential) : new ConfigurationClient(_connectionString!); // Build a lookup from App Config key path → env var name using the descriptors. diff --git a/src/SimpleL7Proxy/Config/AzureAppConfigurationExtensions.cs b/src/SimpleL7Proxy/Config/AzureAppConfigurationExtensions.cs index 6c671dcd..8c9127f1 100644 --- a/src/SimpleL7Proxy/Config/AzureAppConfigurationExtensions.cs +++ b/src/SimpleL7Proxy/Config/AzureAppConfigurationExtensions.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration.AzureAppConfiguration; using Microsoft.Extensions.Logging; -using Azure.Identity; using Microsoft.Extensions.DependencyInjection; @@ -63,6 +62,7 @@ public static IServiceCollection AddAzureAppConfigurationWithWarmRefresh( /// public static IConfigurationBuilder AddAzureAppConfigurationWithWarmSupport( this IConfigurationBuilder builder, + DefaultCredential defaultCredential, ILogger? logger = null) { var endpoint = Environment.GetEnvironmentVariable("AZURE_APPCONFIG_ENDPOINT"); @@ -89,7 +89,9 @@ public static IConfigurationBuilder AddAzureAppConfigurationWithWarmSupport( { if (!string.IsNullOrEmpty(endpoint)) { - options.Connect(new Uri(endpoint), new DefaultAzureCredential()); + var credential = defaultCredential.Credential; + + options.Connect(new Uri(endpoint), credential); logger?.LogInformation("[CONFIG] Connecting to Azure App Configuration via Managed Identity: {Endpoint}", endpoint); } else diff --git a/src/SimpleL7Proxy/Config/AzureAppConfigurationRefreshService.cs b/src/SimpleL7Proxy/Config/AzureAppConfigurationRefreshService.cs index 3dfa5925..4b22e88d 100644 --- a/src/SimpleL7Proxy/Config/AzureAppConfigurationRefreshService.cs +++ b/src/SimpleL7Proxy/Config/AzureAppConfigurationRefreshService.cs @@ -63,7 +63,7 @@ public AzureAppConfigurationRefreshService( _refreshInterval = TimeSpan.FromSeconds(intervalSeconds); _logger.LogInformation("[CONFIG] Discovered {Count} warm-decorated BackendOptions properties", _warmDescriptors.Count); - _logger.LogInformation("[CONFIG] Warm BackendOptions tracked for in-place update: {WarmConfigs}", + _logger.LogDebug("[CONFIG] Warm BackendOptions tracked for in-place update: {WarmConfigs}", string.Join(", ", _warmConfigNames.OrderBy(n => n, StringComparer.OrdinalIgnoreCase))); } @@ -331,6 +331,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) catch (Exception ex) { _logger.LogWarning(ex, "[CONFIG] Configuration refresh failed - will retry"); + Console.WriteLine(ex.StackTrace); } } diff --git a/src/SimpleL7Proxy/Config/BackendOptions.cs b/src/SimpleL7Proxy/Config/BackendOptions.cs index 18f88653..8cb77ab4 100644 --- a/src/SimpleL7Proxy/Config/BackendOptions.cs +++ b/src/SimpleL7Proxy/Config/BackendOptions.cs @@ -36,16 +36,16 @@ public class BackendOptions public string LoadBalanceMode { get; set; } = "latency"; // "latency", "roundrobin", "random" // ── Logging ── - [ConfigOption("Logging:LogConsole")] - public bool LogConsole { get; set; } = true; - [ConfigOption("Logging:LogConsoleEvent")] - public bool LogConsoleEvent { get; set; } = false; - [ConfigOption("Logging:LogPoller")] - public bool LogPoller { get; set; } = true; + [ConfigOption("Logging:LogToConsole")] + public List LogToConsole { get; set; } = ["*"]; + [ConfigOption("Logging:LogToEvents")] + public List LogToEvents { get; set; } = ["async","backend","circuitbreaker","custom","exception","profile","proxy","enqueued","auth"]; + [ConfigOption("Logging:LogToAI")] + public List LogToAI { get; set; } = ["*"]; + + public bool LogProbes { get; set; } = true; [ConfigOption("Logging:LogHeaders")] public List LogHeaders { get; set; } = []; - [ConfigOption("Logging:LogProbes")] - public bool LogProbes { get; set; } = true; [ConfigOption("Logging:LogAllRequestHeaders")] public bool LogAllRequestHeaders { get; set; } = false; [ConfigOption("Logging:LogAllRequestHeadersExcept")] @@ -134,7 +134,7 @@ public class BackendOptions public string OAuthAudience { get; set; } = ""; [ConfigOption("Security:UseOAuth", Mode = ConfigMode.Cold)] public bool UseOAuth { get; set; } = false; - [ConfigOption("Security:UseOAuthGov", Mode = ConfigMode.Cold)] + [ConfigOption("Security:UseOAuthGov", Mode = ConfigMode.Hidden)] public bool UseOAuthGov { get; set; } = false; // ── Server ── @@ -142,6 +142,8 @@ public class BackendOptions public IterationModeEnum IterationMode { get; set; } = IterationModeEnum.SinglePass; [ConfigOption("Server:MaxQueueLength", Mode = ConfigMode.Cold)] public int MaxQueueLength { get; set; } = 1000; + [ConfigOption("Server:MaxEvents", Mode = ConfigMode.Cold)] + public int MaxEvents { get; set; } = 100000; [ConfigOption("Server:PollInterval", Mode = ConfigMode.Cold)] public int PollInterval { get; set; } = 15000; [ConfigOption("Server:PollTimeout", Mode = ConfigMode.Cold)] @@ -223,6 +225,10 @@ public class BackendOptions public string AppInsightsConnectionString { get; set; } = ""; [ConfigOption("Logging:EventLoggers", ConfigName = "EVENT_LOGGERS", Mode = ConfigMode.Cold)] public string EventLoggers { get; set; } = "file"; + + [ConfigOption("Logging:EventData", ConfigName = "EVENT_HEADERS", Mode = ConfigMode.Cold)] + public string EventHeaders { get; set; } = "SimpleL7Proxy.Events.CommonEventHeaders"; + [ConfigOption("Logging:LogToFile", ConfigName = "LOGTOFILE", Mode = ConfigMode.Hidden)] public bool LogToFile { get; set; } = false; [ConfigOption("Logging:LogFileName", ConfigName = "LOGFILE_NAME", Mode = ConfigMode.Cold)] @@ -236,7 +242,7 @@ public class BackendOptions [ConfigOption("EventHub:Namespace", ConfigName = "EVENTHUB_NAMESPACE", Mode = ConfigMode.Cold)] public string EventHubNamespace { get; set; } = ""; [ConfigOption("EventHub:StartupSeconds", ConfigName = "EVENTHUB_STARTUP_SECONDS", Mode = ConfigMode.Cold)] - public int EventHubStartupSeconds { get; set; } = 0; + public int EventHubStartupSeconds { get; set; } = 10; // ── Security ── [ConfigOption("Security:IgnoreSSLCert", ConfigName = "IgnoreSSLCert", Mode = ConfigMode.Cold)] diff --git a/src/SimpleL7Proxy/Config/BackendOptionsExtensions.cs b/src/SimpleL7Proxy/Config/BackendOptionsExtensions.cs new file mode 100644 index 00000000..e6d6a9ea --- /dev/null +++ b/src/SimpleL7Proxy/Config/BackendOptionsExtensions.cs @@ -0,0 +1,24 @@ +namespace SimpleL7Proxy.Config; + +/// +/// Extension methods for that provide a cleaner +/// calling syntax for configuration parsing operations. +/// +public static class BackendOptionsExtensions +{ + /// + /// Applies a single configuration field from the environment dictionary to this + /// instance, falling back to the corresponding + /// default value when the environment variable is absent or set to the + /// default placeholder. + /// + /// The instance to update. + /// Dictionary of environment/configuration key-value pairs. + /// A default instance providing fallback values. + /// The environment variable (dictionary key) to look up. + /// The name of the property to set. + public static void ApplyFieldFromEnv(this BackendOptions target, Dictionary env, BackendOptions defaults, string envVar, string property) + { + ConfigParser.ApplyFieldFromEnv(env, target, defaults, envVar, property); + } +} diff --git a/src/SimpleL7Proxy/Config/ConfigBootstrapper.cs b/src/SimpleL7Proxy/Config/ConfigBootstrapper.cs index 6f328cac..6ba4818e 100644 --- a/src/SimpleL7Proxy/Config/ConfigBootstrapper.cs +++ b/src/SimpleL7Proxy/Config/ConfigBootstrapper.cs @@ -1,4 +1,3 @@ -using Microsoft.ApplicationInsights; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.ApplicationInsights.WorkerService; using Microsoft.Extensions.Configuration; @@ -29,7 +28,6 @@ public static class ConfigBootstrapper { private static ILogger? _logger; static Dictionary EnvVars = new Dictionary(); - private static readonly BackendOptions s_defaults = new(); public static BackendOptions CreateBackendOptions(ILogger logger, AppConfigBootstrap appConfigBootstrap) @@ -53,9 +51,12 @@ public static BackendOptions CreateBackendOptions(ILogger logger, AppConfigBoots { foreach (var kvp in appConfigSettings) { - // strip [" and "] from keys and values if present to support both raw and JSON-style formats - string key = kvp.Key.Trim().TrimStart('[').TrimEnd(']').TrimStart('"').TrimEnd('"'); - string value = kvp.Value.Trim().TrimStart('[').TrimEnd(']').TrimStart('"').TrimEnd('"'); + // Keys from AppConfigBootstrap are already plain config names (e.g. "DependancyHeaders"). + // Values may be JSON-style arrays like ["a","b"] — leave them intact; + // downstream parsers (ToListOfString, ReadEnvironmentVariableOrDefault) + // already handle bracket/quote stripping correctly. + string key = kvp.Key.Trim(); + string value = kvp.Value.Trim(); effectiveEnvironment[key] = value; } _logger?.LogInformation("[BOOTSTRAP] Applied {Count} App Configuration value(s) to effective environment", appConfigSettings.Count); @@ -63,13 +64,18 @@ public static BackendOptions CreateBackendOptions(ILogger logger, AppConfigBoots var backendOptions = ConfigParser.ParseOptions(effectiveEnvironment); ConfigureHttpClientFromOptions(effectiveEnvironment, backendOptions); - OutputEnvVars(backendOptions); - return backendOptions; } - private static void OutputEnvVars(BackendOptions backendOptions) + public static void OutputEnvVars(BackendOptions backendOptions) { + + ProxyEvent pe = new() + { + Type = EventType.CustomEvent, + ["Message"] = "Configuration loaded", + }; + // Build Warm / Cold / Hidden buckets from [ConfigOption] attributes var warm = new SortedDictionary(StringComparer.OrdinalIgnoreCase); var cold = new SortedDictionary(StringComparer.OrdinalIgnoreCase); @@ -91,12 +97,17 @@ private static void OutputEnvVars(BackendOptions backendOptions) _ => hidden }; bucket[$"{attr.Mode}:{attr.KeyPath}"] = display; + pe[attr.KeyPath]= display; } + Console.WriteLine("Writing to disk"); + + pe.SendEvent(); + // generate a JSON representation for logging var all = warm.Concat(cold).Concat(hidden).ToDictionary(kvp => kvp.Key, kvp => kvp.Value); string json = System.Text.Json.JsonSerializer.Serialize(all, new System.Text.Json.JsonSerializerOptions { WriteIndented = true }); - + // _logger?.LogInformation("Effective configuration:\n{ConfigJson}", json); static string MaskSensitive(string key, string value) @@ -344,7 +355,7 @@ private static void ConfigureHttpClientFromOptions(Dictionary en } /// - /// Clears and re-populates by iterating + /// Clears and re-populates the backend host list by iterating /// over Host1..N, Probe_path1..N, and IP1..N keys. /// /// Each parsed is staged into @@ -374,7 +385,6 @@ private static void ConfigureHttpClientFromOptions(Dictionary en public static void RegisterBackends(BackendOptions backendOptions, IConfiguration? configuration = null, Dictionary? cfg = null, IHostHealthCollection? hostCollection = null) { //backendOptions.Client.Timeout = TimeSpan.FromMilliseconds(backendOptions.Timeout); - var hostSettingsSnapshot = new Dictionary(StringComparer.OrdinalIgnoreCase); string? ReadWithFallback(string key) { @@ -384,81 +394,57 @@ public static void RegisterBackends(BackendOptions backendOptions, IConfiguratio ?? configuration?[$"Cold:{key}"] ?? configuration?[key]; - if (!string.IsNullOrWhiteSpace(configured)) - { - return configured.Trim(); - } - - return Environment.GetEnvironmentVariable(key)?.Trim(); + return !string.IsNullOrWhiteSpace(configured) + ? configured.Trim() + : Environment.GetEnvironmentVariable(key)?.Trim(); } - backendOptions.Hosts.Clear(); + var hostsFileContent = new StringBuilder(); - int i = 1; - StringBuilder sb = new(); - while (true) + foreach (var entry in ReadHostEntries(ReadWithFallback)) { - - var hostKey = $"Host{i}"; - var probePathKey = $"Probe_path{i}"; - var ipKey = $"IP{i}"; - - - var hostname = ReadWithFallback(hostKey); - if (string.IsNullOrEmpty(hostname)) break; - - var probePath = ReadWithFallback(probePathKey); - var ip = ReadWithFallback(ipKey); - - _logger.LogInformation($"Found a Host: {hostKey}, Probe Path: {probePathKey}, HostName: {hostname}"); - hostSettingsSnapshot[hostKey] = hostname; - if (!string.IsNullOrEmpty(probePath)) - { - hostSettingsSnapshot[probePathKey] = probePath; - } - - if (!string.IsNullOrEmpty(ip)) - { - hostSettingsSnapshot[ipKey] = ip; - } - - try - { - _logger?.LogDebug($"Found host {hostname} with probe path {probePath} and IP {ip}"); - - // Resolve HostConfig from DI using the factory - HostConfig bh = new HostConfig(hostname, probePath, ip, backendOptions.OAuthAudience); - backendOptions.Hosts.Add(bh); - hostCollection?.StageHost(bh); - - sb.AppendLine($"{ip} {bh.Host}"); - } - - catch (UriFormatException e) - { - _logger?.LogError($"Could not add Host{i} with {hostname} : {e.Message}"); - Console.WriteLine(e.StackTrace); - } - - i++; + try + { + var hostConfig = new HostConfig(entry.Hostname, entry.ProbePath, entry.Ip, backendOptions.OAuthAudience); + hostCollection?.StageHost(hostConfig); + hostsFileContent.AppendLine($"{entry.Ip} {hostConfig.Host}"); + } + catch (UriFormatException e) + { + _logger?.LogError(e, "Could not add {HostKey} with {Hostname}", entry.HostKey, entry.Hostname); + } } - var appendHostsFile = ReadWithFallback("APPENDHOSTSFILE") - ?? ReadWithFallback("AppendHostsFile"); + AppendHostsFileIfEnabled( + ReadWithFallback("APPENDHOSTSFILE") ?? ReadWithFallback("AppendHostsFile"), + hostsFileContent); - if (!string.IsNullOrEmpty(appendHostsFile)) - { - hostSettingsSnapshot["APPENDHOSTSFILE"] = appendHostsFile; - } + hostCollection?.Activate(); + } - if (appendHostsFile?.Equals("true", StringComparison.OrdinalIgnoreCase) == true) + + private record ParsedHostEntry(string HostKey, string Hostname, string? ProbePath, string? Ip); + private static IEnumerable ReadHostEntries(Func readWithFallback) + { + for (int i = 1; ; i++) { - _logger?.LogInformation($"Appending {sb} to /etc/hosts"); - using StreamWriter sw = File.AppendText("/etc/hosts"); - sw.WriteLine(sb.ToString()); + var hostname = readWithFallback($"Host{i}"); + if (string.IsNullOrEmpty(hostname)) yield break; + + yield return new ParsedHostEntry( + $"Host{i}", + hostname, + readWithFallback($"Probe_path{i}"), + readWithFallback($"IP{i}") + ); } + } + private static void AppendHostsFileIfEnabled(string? flag, StringBuilder hostsContent) + { + if (flag?.Equals("true", StringComparison.OrdinalIgnoreCase) != true) return; - // Snapshot is updated only after all Host/Probe_path/IP entries are parsed and applied. - hostCollection?.Activate(); + _logger?.LogInformation("Appending {HostEntries} to /etc/hosts", hostsContent); + using StreamWriter sw = File.AppendText("/etc/hosts"); + sw.WriteLine(hostsContent.ToString()); } } diff --git a/src/SimpleL7Proxy/Config/ConfigParser.cs b/src/SimpleL7Proxy/Config/ConfigParser.cs index da885463..08db8f02 100644 --- a/src/SimpleL7Proxy/Config/ConfigParser.cs +++ b/src/SimpleL7Proxy/Config/ConfigParser.cs @@ -1,4 +1,3 @@ -using Microsoft.Extensions.Configuration; using SimpleL7Proxy.Backend.Iterators; using System.Reflection; @@ -22,6 +21,7 @@ private static readonly (string envVar, string property)[] SimpleFields = ("DefaultPriority", "DefaultPriority"), ("DefaultTTLSecs", "DefaultTTLSecs"), ("MaxQueueLength", "MaxQueueLength"), + ("MaxEvents", "MaxEvents"), ("MaxAttempts", "MaxAttempts"), ("PollInterval", "PollInterval"), ("PollTimeout", "PollTimeout"), @@ -61,10 +61,6 @@ private static readonly (string envVar, string property)[] SimpleFields = ("AsyncModeEnabled", "AsyncModeEnabled"), ("LogAllRequestHeaders", "LogAllRequestHeaders"), ("LogAllResponseHeaders", "LogAllResponseHeaders"), - ("LogConsole", "LogConsole"), - ("LogConsoleEvent", "LogConsoleEvent"), - ("LogPoller", "LogPoller"), - ("LogProbes", "LogProbes"), ("StorageDbEnabled", "StorageDbEnabled"), ("UseOAuth", "UseOAuth"), ("UseOAuthGov", "UseOAuthGov"), @@ -77,6 +73,9 @@ private static readonly (string envVar, string property)[] SimpleFields = ("LogAllRequestHeadersExcept", "LogAllRequestHeadersExcept"), ("LogAllResponseHeadersExcept", "LogAllResponseHeadersExcept"), ("LogHeaders", "LogHeaders"), + ("LogToConsole", "LogToConsole"), + ("LogToEvents", "LogToEvents"), + ("LogToAI", "LogToAI"), ("PriorityKeys", "PriorityKeys"), ("RequiredHeaders", "RequiredHeaders"), ("StripRequestHeaders", "StripRequestHeaders"), @@ -87,6 +86,7 @@ private static readonly (string envVar, string property)[] SimpleFields = ("LOG_LEVEL", "LogLevel"), ("APPINSIGHTS_CONNECTIONSTRING", "AppInsightsConnectionString"), ("EVENT_LOGGERS", "EventLoggers"), + ("EVENT_HEADERS", "EventHeaders"), ("LOGTOFILE", "LogToFile"), ("LOGFILE_NAME", "LogFileName"), @@ -117,7 +117,7 @@ public static BackendOptions ParseOptions(Dictionary appCfgVars) // 2. Value from environment variable (if set) // 3. Value from environment variable alias (if set) // 4. Value from App Configuration (if set) - ApplyFieldFromEnv(appCfgVars, opts, defaults, envVarName, propertyName); + opts.ApplyFieldFromEnv(appCfgVars, defaults, envVarName, propertyName); } opts.AcceptableStatusCodes = ReadEnvironmentVariableOrDefault(appCfgVars, "AcceptableStatusCodes", defaults.AcceptableStatusCodes); @@ -140,7 +140,6 @@ public static BackendOptions ParseOptions(Dictionary appCfgVars) ApplyDerivedSettingsFromConfigNames( opts, - configuration: null, nameof(BackendOptions.HealthProbeSidecar), nameof(BackendOptions.LoadBalanceMode), nameof(BackendOptions.PriorityKeys), @@ -280,7 +279,6 @@ public static void ApplyDerivedSettings(BackendOptions backendOptions, params Pr public static void ApplyDerivedSettingsFromConfigNames( BackendOptions backendOptions, - IConfiguration? configuration, params string[] changedConfigNames) { if (changedConfigNames.Length == 0) @@ -292,7 +290,6 @@ public static void ApplyDerivedSettingsFromConfigNames( .ToDictionary(d => d.ConfigName, d => d.Property, StringComparer.OrdinalIgnoreCase); var changedProperties = new List(changedConfigNames.Length); - var shouldRefreshBackends = false; foreach (var changedConfigName in changedConfigNames) { @@ -305,22 +302,12 @@ public static void ApplyDerivedSettingsFromConfigNames( { changedProperties.Add(property); } - - if (IsBackendHostConfigName(changedConfigName)) - { - shouldRefreshBackends = true; - } } if (changedProperties.Count > 0) { ApplyDerivedSettings(backendOptions, [.. changedProperties]); } - - if (shouldRefreshBackends) - { - ConfigBootstrapper.RegisterBackends(backendOptions, configuration, null); - } } public static bool IsBackendHostConfigName(string configName) @@ -656,7 +643,7 @@ private static List ToListOfString(string s) trimmed = trimmed[1..^1]; } - return [.. trimmed.Split(',').Select(p => p.Trim())]; + return [.. trimmed.Split(',').Select(p => p.Trim().Trim('"')).Where(p => p.Length > 0)]; } private static List ToListOfInt(string s) @@ -672,7 +659,7 @@ private static List ToListOfInt(string s) trimmed = trimmed[1..^1]; } - return trimmed.Split(',').Select(p => int.Parse(p.Trim())).ToList(); + return trimmed.Split(',').Select(p => int.Parse(p.Trim().Trim('"'))).ToList(); } private static Dictionary ParseConfigString(string config, Dictionary keyAliases) diff --git a/src/SimpleL7Proxy/Config/DefaultCredential.cs b/src/SimpleL7Proxy/Config/DefaultCredential.cs new file mode 100644 index 00000000..d1cfabd4 --- /dev/null +++ b/src/SimpleL7Proxy/Config/DefaultCredential.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.Options; +using Azure.Identity; + +namespace SimpleL7Proxy.Config; + +public class DefaultCredential(BackendOptions options) +{ + public DefaultAzureCredential Credential { get; } = + new(options.UseOAuthGov == true + ? new DefaultAzureCredentialOptions { AuthorityHost = AzureAuthorityHosts.AzureGovernment } + : new DefaultAzureCredentialOptions()); +} diff --git a/src/SimpleL7Proxy/Config/WarmOptions.cs b/src/SimpleL7Proxy/Config/WarmOptions.cs index 5631d049..45874411 100644 --- a/src/SimpleL7Proxy/Config/WarmOptions.cs +++ b/src/SimpleL7Proxy/Config/WarmOptions.cs @@ -210,20 +210,22 @@ public static (List Changes, Dictionary ParsedVal Dictionary snapshot, ILogger? logger = null) { - var changes = new List(); - var parsedValues = new Dictionary(StringComparer.OrdinalIgnoreCase); + var changeList = new List(); + var updates = new Dictionary(StringComparer.OrdinalIgnoreCase); var hostChanges = new Dictionary(StringComparer.OrdinalIgnoreCase); var defaultTarget = new BackendOptions(); var env = new Dictionary(1, StringComparer.OrdinalIgnoreCase); + // iterate over the configuration snapshot values to detect changes compared to the live options foreach (var kvp in snapshot) { - var snapshotKey = kvp.Key["Warm:".Length..]; var rawValue = kvp.Value; if (string.IsNullOrEmpty(rawValue)) continue; + // we only care about warm changes with a "Warm:" prefix + var snapshotKey = kvp.Key["Warm:".Length..]; if (snapshotKey.StartsWith("Host") || snapshotKey.StartsWith("Probe") || snapshotKey.StartsWith("IP")) { // skip Host/Probe/IP entries which are used for dynamic host discovery and not mapped to BackendOptions properties @@ -231,6 +233,7 @@ public static (List Changes, Dictionary ParsedVal continue; } + // filter out the keys we dont care about if (!_warmDescriptorsByKeyPath.Value.TryGetValue(snapshotKey, out var descriptor) && !_warmDescriptorsByConfigName.Value.TryGetValue(snapshotKey, out descriptor)) continue; @@ -242,31 +245,30 @@ public static (List Changes, Dictionary ParsedVal var currentValue = field.GetValue(liveOptions); - object? newValue; - if (rawValue == DefaultPlaceholder) - { - newValue = field.GetValue(defaultTarget); - } - else - { - env.Clear(); - env[configName] = rawValue; + // ApplyFieldFromEnv handles DefaultPlaceholder ("-") internally — + // it treats it as absent and falls back to the default value. + // Use a single-entry dict because snapshot keys are "Warm:KeyPath" + // but ApplyFieldFromEnv looks up by configName (e.g. "DependancyHeaders"). + env.Clear(); + env[configName] = rawValue; - ConfigParser.ApplyFieldFromEnv( - env, - defaultTarget, - liveOptions, - configName, - field.Name); + defaultTarget.ApplyFieldFromEnv( + env, + liveOptions, + configName, + field.Name); - newValue = field.GetValue(defaultTarget); - } + var newValue = field.GetValue(defaultTarget); - if (Equals(currentValue, newValue)) + if (DeepEquals(currentValue, newValue)) continue; - parsedValues[configName] = newValue; - changes.Add(new ConfigChange + // // debug outout .. show all three: new config, old value amd new parsed value + // logger?.LogInformation("[CONFIG] Detected change for {Property}: {Old} → {New} (raw: {Raw})", + // descriptor.ConfigName, FormatValue(currentValue), FormatValue(newValue), rawValue); + + updates[configName] = newValue; + changeList.Add(new ConfigChange { PropertyName = configName, KeyPath = descriptor.Attribute.KeyPath, @@ -275,7 +277,48 @@ public static (List Changes, Dictionary ParsedVal }); } - return (changes, parsedValues, hostChanges); + return (changeList, updates, hostChanges); + } + + /// + /// Formats a value for logging, expanding collections into readable strings. + /// + private static string FormatValue(object? rawValue) + { + if (rawValue == null) return ""; + return rawValue switch + { + string s => s, + int[] arr => string.Join(", ", arr), + IEnumerable list => string.Join(", ", list), + IEnumerable list => string.Join(", ", list), + IDictionary dict => string.Join(", ", dict.Select(kvp => $"{kvp.Key}={kvp.Value}")), + IDictionary dict => string.Join(", ", dict.Select(kvp => $"{kvp.Key}:{kvp.Value}")), + _ => rawValue.ToString() ?? "" + }; + } + + /// + /// Deep-compares two values, handling collections (List, array, Dictionary) + /// that would otherwise fail reference-equality checks via . + /// + private static bool DeepEquals(object? a, object? b) + { + if (ReferenceEquals(a, b)) return true; + if (a is null || b is null) return false; + if (a.GetType() != b.GetType()) return false; + + return (a, b) switch + { + (int[] la, int[] lb) => la.SequenceEqual(lb), + (IList la, IList lb) => la.SequenceEqual(lb), + (IList la, IList lb) => la.SequenceEqual(lb), + (IDictionary da, IDictionary db) => + da.Count == db.Count && da.All(kvp => db.TryGetValue(kvp.Key, out var v) && v == kvp.Value), + (IDictionary da, IDictionary db) => + da.Count == db.Count && da.All(kvp => db.TryGetValue(kvp.Key, out var v) && v == kvp.Value), + _ => Equals(a, b) + }; } private static IReadOnlyList DiscoverDescriptors() diff --git a/src/SimpleL7Proxy/Constants.cs b/src/SimpleL7Proxy/Constants.cs index 2bf0ed94..a81942cd 100644 --- a/src/SimpleL7Proxy/Constants.cs +++ b/src/SimpleL7Proxy/Constants.cs @@ -11,7 +11,7 @@ public static class Constants public const string RoundRobin = "roundrobin"; public const string Random = "random"; public const string Server = "simplel7proxy"; - public const string VERSION = "2.2.10.1"; + public const string VERSION = "2.2.10.2"; public const int AnyPriority = -1; diff --git a/src/SimpleL7Proxy/Events/CommonEventHeaders.cs b/src/SimpleL7Proxy/Events/CommonEventHeaders.cs new file mode 100644 index 00000000..99849708 --- /dev/null +++ b/src/SimpleL7Proxy/Events/CommonEventHeaders.cs @@ -0,0 +1,18 @@ +using System.Collections.Frozen; +using Microsoft.Extensions.Options; +using SimpleL7Proxy.Config; + +namespace SimpleL7Proxy.Events; + +public class CommonEventHeaders(IOptions options) : ICommonEventData +{ + private readonly FrozenDictionary _defaultEventData = + new Dictionary + { + ["Ver"] = Constants.VERSION, + ["Revision"] = options.Value.Revision, + ["ContainerApp"] = options.Value.ContainerApp + }.ToFrozenDictionary(); + + public FrozenDictionary DefaultEventData() => _defaultEventData; +} \ No newline at end of file diff --git a/src/SimpleL7Proxy/Events/CompositeEventClient.cs b/src/SimpleL7Proxy/Events/CompositeEventClient.cs index 4c8c35db..921c7a74 100644 --- a/src/SimpleL7Proxy/Events/CompositeEventClient.cs +++ b/src/SimpleL7Proxy/Events/CompositeEventClient.cs @@ -41,16 +41,13 @@ public async Task StopTimerAsync() } } + // Return the max count of all the clients public int Count { get { - var count = 0; - foreach (var client in _frozen.Keys) - { - count += client.Count; - } - return count; + var snapshot = _frozen; + return snapshot.Count == 0 ? 0 : snapshot.Keys.Max(c => c.Count); } } diff --git a/src/SimpleL7Proxy/Events/EventHubClient.cs b/src/SimpleL7Proxy/Events/EventHubClient.cs index d5463060..736199c5 100644 --- a/src/SimpleL7Proxy/Events/EventHubClient.cs +++ b/src/SimpleL7Proxy/Events/EventHubClient.cs @@ -17,6 +17,7 @@ public class EventHubClient : IEventClient, IHostedService, IDisposable private bool _disposed = false; private readonly EventHubConfig? _config; + private readonly DefaultCredential _defaultCredential; private EventHubProducerClient? _producerClient; private EventDataBatch? _batchData; private readonly ILogger _logger; @@ -33,9 +34,13 @@ public class EventHubClient : IEventClient, IHostedService, IDisposable private static int entryCount = 0; //public EventHubClient(string connectionString, string eventHubName, ILogger? logger = null) - public EventHubClient(CompositeEventClient composite, IOptions options, ILogger logger) + public EventHubClient(CompositeEventClient composite, + IOptions options, + ILogger logger, + DefaultCredential defaultCredential) { var BackendOptions = options?.Value ?? throw new ArgumentNullException(nameof(options)); + _defaultCredential = defaultCredential ?? throw new ArgumentNullException(nameof(defaultCredential)); try { _config = new EventHubConfig(BackendOptions); @@ -45,6 +50,8 @@ public EventHubClient(CompositeEventClient composite, IOptions o logger.LogError(ex, "Failed to initialize EventHubConfig. EventHubClient will be disabled."); _config = null; } + + _composite = composite ?? throw new ArgumentNullException(nameof(composite)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); // All initialization happens in StartAsync @@ -66,17 +73,21 @@ public async Task StartAsync(CancellationToken cancellationToken) { try { if (!string.IsNullOrEmpty(_config.ConnectionString)) { + + _logger.LogInformation("[EVENT HUB] connecting via connection string, eventhubname :" + _config.EventHubName ); _producerClient = new EventHubProducerClient(_config.ConnectionString, _config.EventHubName); } else if (!string.IsNullOrEmpty(_config.EventHubNamespace)) { + + var credential = _defaultCredential.Credential; - // NOTE: this breaks in gov cloud because of the namespace suffix.. needs a better solution var fullyQualifiedNamespace = _config.EventHubNamespace; - if (!fullyQualifiedNamespace.EndsWith(".servicebus.windows.net")) + if (!fullyQualifiedNamespace.EndsWith(".servicebus.windows.net") && + !fullyQualifiedNamespace.EndsWith(".servicebus.usgovcloudapi.net")) fullyQualifiedNamespace = $"{_config.EventHubNamespace}.servicebus.windows.net"; - _producerClient = new EventHubProducerClient(fullyQualifiedNamespace, _config.EventHubName, new Azure.Identity.DefaultAzureCredential()); + _producerClient = new EventHubProducerClient(fullyQualifiedNamespace, _config.EventHubName, credential); } _batchData = await _producerClient!.CreateBatchAsync(cts.Token).ConfigureAwait(false); @@ -216,22 +227,6 @@ public void SendData(string? value) _logBuffer.Enqueue(value); } - // public void SendData(Dictionary data) - // { - // if (!isRunning || isShuttingDown) return; - - // string jsonData = JsonSerializer.Serialize(data); - // SendData(jsonData); - // } - - // public void SendData(ConcurrentDictionary eventData, string? name = null) - // { - // if (!isRunning || isShuttingDown) return; - - // string jsonData = JsonSerializer.Serialize(eventData); - // SendData(jsonData); - // } - protected virtual void Dispose(bool disposing) { if (!_disposed) diff --git a/src/SimpleL7Proxy/Events/EventType.cs b/src/SimpleL7Proxy/Events/EventType.cs new file mode 100644 index 00000000..feb33a55 --- /dev/null +++ b/src/SimpleL7Proxy/Events/EventType.cs @@ -0,0 +1,22 @@ +namespace SimpleL7Proxy.Events; + +public enum EventType +{ + AsyncProcessing, + Backend, + BackendRequest, + CircuitBreakerError, + Console, + CustomEvent, + Exception, + Poller, + Probe, + ProfileError, + ProxyError, + ProxyRequest, + ProxyRequestEnqueued, + ProxyRequestExpired, + ProxyRequestRequeued, + ServerError, + Authentication, +} \ No newline at end of file diff --git a/src/SimpleL7Proxy/Events/ICommonEventData.cs b/src/SimpleL7Proxy/Events/ICommonEventData.cs new file mode 100644 index 00000000..a34de08b --- /dev/null +++ b/src/SimpleL7Proxy/Events/ICommonEventData.cs @@ -0,0 +1,7 @@ +using System.Collections.Frozen; +namespace SimpleL7Proxy.Events; + +public interface ICommonEventData +{ + FrozenDictionary DefaultEventData(); +} diff --git a/src/SimpleL7Proxy/Events/IEventClient.cs b/src/SimpleL7Proxy/Events/IEventClient.cs index fb2adab5..ca00360b 100644 --- a/src/SimpleL7Proxy/Events/IEventClient.cs +++ b/src/SimpleL7Proxy/Events/IEventClient.cs @@ -9,7 +9,5 @@ public interface IEventClient public Task StopTimerAsync(); void SendData(string? value); -// void SendData(Dictionary data); - //void SendData( ConcurrentDictionary eventData, string? name=""); - //void SendData(ProxyEvent eventData, IDictionary? extraProperties = null); + } diff --git a/src/SimpleL7Proxy/Events/LogFileEventClient.cs b/src/SimpleL7Proxy/Events/LogFileEventClient.cs index 48be7957..a7bd6d48 100644 --- a/src/SimpleL7Proxy/Events/LogFileEventClient.cs +++ b/src/SimpleL7Proxy/Events/LogFileEventClient.cs @@ -4,6 +4,11 @@ using System.Text; using System.Text.Json; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + + +using SimpleL7Proxy.Config; namespace SimpleL7Proxy.Events; @@ -22,9 +27,11 @@ public class LogFileEventClient : IEventClient, IHostedService private static int entryCount = 0; private readonly CompositeEventClient _composite; + private readonly StringBuilder _sb = new(); private static Stream log = null!; private static StreamWriter writer = null!; - public LogFileEventClient(string filename, CompositeEventClient composite) + + public LogFileEventClient(string filename, CompositeEventClient composite, IOptions options ) { _composite = composite ?? throw new ArgumentNullException(nameof(composite)); // create file stream to a log file @@ -88,8 +95,7 @@ public async Task EventWriter(CancellationToken token) { while (true) { - LogNextBatch(99); - if (_logBuffer.Count == 0) + if (LogNextBatch(99) == 0) break; } @@ -104,22 +110,24 @@ public async Task EventWriter(CancellationToken token) } // Add the log to the batch up to count number at a time - private void LogNextBatch(int count) + private int LogNextBatch(int count) { - int initialCount = count; + _sb.Clear(); + int drained = 0; - for (int i = 0; i < initialCount; i++) + while (drained < count && _logBuffer.TryDequeue(out string? line)) { - if (!_logBuffer.TryDequeue(out string? log)) - { - break; - } - - writer.WriteLine(log); - Interlocked.Decrement(ref entryCount); + _sb.AppendLine(line); + drained++; } - writer.Flush(); + if (drained > 0) + { + writer.Write(_sb); + writer.Flush(); + Interlocked.Add(ref entryCount, -drained); + } + return drained; } public async Task StopTimerAsync() diff --git a/src/SimpleL7Proxy/Events/LogTargetAttr.cs b/src/SimpleL7Proxy/Events/LogTargetAttr.cs new file mode 100644 index 00000000..e2514a73 --- /dev/null +++ b/src/SimpleL7Proxy/Events/LogTargetAttr.cs @@ -0,0 +1,64 @@ +namespace SimpleL7Proxy.Events; + +/// +/// Per-event-type enable/disable flags for a single log destination. +/// Constructed from a config list (e.g. ["*"], ["BackendRequest","Exception"]). +/// +public class LogTargetAttr +{ + public bool Async; + public bool BackendRequest; + public bool CircuitBreakerError; + public bool Console; + public bool CustomEvent; + public bool Exception; + public bool ProfileError; + public bool ProxyRequest; + public bool ProxyRequestEnqueued; + public bool Authentication; + + /// + /// Returns whether the given is enabled for this destination. + /// + public bool IsEnabled(EventType type) => type switch + { + EventType.AsyncProcessing => Async, + EventType.Backend or EventType.Poller or EventType.Probe => BackendRequest, + EventType.CircuitBreakerError => CircuitBreakerError, + EventType.Console => Console, + EventType.CustomEvent => CustomEvent, + EventType.Exception or EventType.ServerError => Exception, + EventType.ProfileError => ProfileError, + EventType.ProxyError or EventType.BackendRequest + or EventType.ProxyRequest or EventType.ProxyRequestExpired + or EventType.ProxyRequestRequeued => ProxyRequest, + EventType.ProxyRequestEnqueued => ProxyRequestEnqueued, + EventType.Authentication => Authentication, + _ => true, + }; + + /// + /// Creates a from a config list. + /// A list containing "*" enables all event types. + /// + public static LogTargetAttr From(List? configList) + { + var list = configList ?? []; + bool all = list.Contains("*"); + var set = all ? null : new HashSet(list, StringComparer.OrdinalIgnoreCase); + + return new LogTargetAttr + { + Async = all || set!.Contains("async"), + BackendRequest = all || set!.Contains("backend"), + CircuitBreakerError = all || set!.Contains("circuitbreaker"), + Console = all || set!.Contains("console"), + CustomEvent = all || set!.Contains("custom"), + Exception = all || set!.Contains("exception"), + ProfileError = all || set!.Contains("profile"), + ProxyRequest = all || set!.Contains("proxy"), + ProxyRequestEnqueued = all || set!.Contains("enqueued"), + Authentication = all || set!.Contains("auth"), + }; + } +} diff --git a/src/SimpleL7Proxy/Events/ProxyEvent.cs b/src/SimpleL7Proxy/Events/ProxyEvent.cs index ed47434b..8629b01d 100644 --- a/src/SimpleL7Proxy/Events/ProxyEvent.cs +++ b/src/SimpleL7Proxy/Events/ProxyEvent.cs @@ -14,33 +14,24 @@ namespace SimpleL7Proxy.Events { - public enum EventType - { - AsyncProcessing, - Backend, - BackendRequest, - CircuitBreakerError, - Console, - CustomEvent, - Exception, - Poller, - Probe, - ProfileError, - ProxyError, - ProxyRequest, - ProxyRequestEnqueued, - ProxyRequestExpired, - ProxyRequestRequeued, - ServerError, - Authentication, - } - - public class ProxyEvent : ConcurrentDictionary + public class ProxyEvent : ConcurrentDictionary, IConfigChangeSubscriber { private static IOptions _options = null!; private static IEventClient? _eventClient; private static TelemetryClient? _telemetryClient; - private static readonly Uri LOCALHOSTURI = new Uri("http://localhost"); + + /// + /// Singleton instance used for config-change subscription. + /// Created by . + /// + private static ProxyEvent? _subscriberInstance; + private static readonly Uri LOCALHOSTURI = new Uri("http://localhost"); + + // ── Per-event-type log routing ── + // Updated by UpdateLogTargets() from BackendOptions.LogToConsole / LogToEvents / LogToAI lists. + public static LogTargetAttr ConAttr = new(); + public static LogTargetAttr EventAttr = new(); + public static LogTargetAttr AIAttr = new(); public EventType Type { get; set; } = EventType.Console; public HttpStatusCode Status { get; set; } = 0; @@ -55,6 +46,7 @@ public class ProxyEvent : ConcurrentDictionary public static void Initialize( IOptions backendOptions, IEventClient? eventClient = null, + ICommonEventData? commonEventData = null, TelemetryClient? telemetryClient = null) { _options = backendOptions ?? throw new ArgumentNullException(nameof(backendOptions)); @@ -62,13 +54,45 @@ public static void Initialize( _telemetryClient = telemetryClient; // null when APPINSIGHTS_CONNECTIONSTRING is not set // Set default parameters that should be included with every event (frozen = immutable + optimized reads) - DefaultParams = new Dictionary(3) - { - ["Ver"] = Constants.VERSION, - ["Revision"] = _options.Value.Revision, - ["ContainerApp"] = _options.Value.ContainerApp - }.ToFrozenDictionary(); + DefaultParams = commonEventData?.DefaultEventData() ?? DefaultParams; + UpdateLogTargets(backendOptions.Value); + } + + /// + /// Subscribes to warm config changes for LogToConsole, LogToEvents, and LogToAI. + /// Call once after when the is available. + /// + public static void SubscribeToConfigChanges(ConfigChangeNotifier notifier) + { + _subscriberInstance ??= new ProxyEvent(); + notifier.Subscribe(_subscriberInstance, + o => o.LogToConsole, + o => o.LogToEvents, + o => o.LogToAI); + } + + /// + public Task OnConfigChangedAsync( + IReadOnlyList changes, + BackendOptions backendOptions, + CancellationToken cancellationToken) + { + UpdateLogTargets(backendOptions); + return Task.CompletedTask; + } + + /// + /// Parses , , + /// and into , , + /// . A list containing "*" enables all event types for that destination. + /// Safe to call on config hot-reload. + /// + public static void UpdateLogTargets(BackendOptions options) + { + ConAttr = LogTargetAttr.From(options.LogToConsole); + EventAttr = LogTargetAttr.From(options.LogToEvents); + AIAttr = LogTargetAttr.From(options.LogToAI); } /// @@ -113,79 +137,36 @@ public void SendEvent() { try { - bool logEvent = false; - bool logDependency = false; - bool logRequest = false; - bool logException = false; - bool logToEventClient = false; - // Console.WriteLine($"Sending event: {Type} with Status: {Status} and Duration: {Duration.TotalMilliseconds} ms"); + bool logToConsole = ConAttr.IsEnabled(Type); + bool logToEventClient = EventAttr.IsEnabled(Type); + bool logToAI = AIAttr.IsEnabled(Type); - // Determine the type of telemetry to send based on event type - switch (Type) + // Determine AI telemetry shape based on event type + if (logToAI && _telemetryClient is not null) { - case EventType.Backend: - case EventType.CustomEvent: - case EventType.Probe: - if (_options?.Value.LogProbes == true) - { - logEvent = true; - logToEventClient = true; - } - break; - case EventType.ServerError: - case EventType.CircuitBreakerError: - logEvent = true; - logToEventClient = true; - break; - case EventType.Console: - if (_options?.Value.LogConsole == true) - { - logEvent = true; - logToEventClient = true; - } - break; - case EventType.Poller: - if (_options?.Value.LogPoller == true) - { - logEvent = true; - logToEventClient = true; - } - break; - case EventType.BackendRequest: - logDependency = true; - logToEventClient = true; - break; - case EventType.ProxyRequestEnqueued: - case EventType.ProxyRequestRequeued: - logEvent = true; - logToEventClient = true; - break; - case EventType.ProxyRequestExpired: - case EventType.ProxyError: - case EventType.ProxyRequest: - logRequest = true; - logToEventClient = true; - break; - case EventType.Exception: - logException = true; - logToEventClient = true; - break; - default: - // For any other event type, we can log it as a custom event - logEvent = true; - logToEventClient = true; - break; - } - - // Add replica-lifetime values at send time - - if (_telemetryClient is not null) - { - if (logDependency) TrackDependancy(); - else if (logRequest) TrackRequest(); - else if (logEvent) TrackEvent(); - else if (logException) TrackException(); + switch (Type) + { + case EventType.BackendRequest: + TrackDependancy(); + break; + + case EventType.Exception: + case EventType.ServerError: + TrackException(); + break; + + case EventType.ProxyError: + case EventType.ProxyRequest: + case EventType.ProxyRequestExpired: + case EventType.ProxyRequestRequeued: + TrackRequest(); + break; + + default: + TrackEvent(); + break; + } } if (logToEventClient && _eventClient is not null) @@ -194,13 +175,11 @@ public void SendEvent() eventParams["Type"] = "S7P-" + Type.ToString(); eventParams["MID"] = MID ?? "N/A"; AddDefaultProperties(eventParams); - // Send the event to all registered event clients (EventHub, LogFile, etc.) _eventClient.SendData(ConvertToJson(this, eventParams)); } } catch (Exception ex) { - // Prevent telemetry errors from affecting application operation Console.Error.WriteLine($"Error sending telemetry: {ex.Message}"); } } @@ -393,7 +372,8 @@ public void WriteOutput(string data = "") Type = EventType.Console; - if (_options.Value.LogConsoleEvent) + // Only send to event client if this event type is enabled for it + if ( EventAttr.Console ) { _eventClient?.SendData(ConvertToJson(this)); } diff --git a/src/SimpleL7Proxy/Events/ServiceBus/ServiceBusFactory.cs b/src/SimpleL7Proxy/Events/ServiceBus/ServiceBusFactory.cs index 698404fb..fef5ada7 100644 --- a/src/SimpleL7Proxy/Events/ServiceBus/ServiceBusFactory.cs +++ b/src/SimpleL7Proxy/Events/ServiceBus/ServiceBusFactory.cs @@ -1,4 +1,3 @@ -using Azure.Identity; using Azure.Messaging.ServiceBus; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -13,6 +12,7 @@ namespace SimpleL7Proxy.ServiceBus { public class ServiceBusFactory { + private readonly DefaultCredential _defaultCredential; private readonly IOptionsMonitor _optionsMonitor; private readonly ILogger _logger; @@ -29,10 +29,12 @@ public class ServiceBusFactory /// The factory initializes the ServiceBusClient only when AsyncModeEnabled is true in the backend options. /// This prevents unnecessary connection creation when async processing is not required. /// - public ServiceBusFactory(IOptionsMonitor optionsMonitor, ILogger logger) + public ServiceBusFactory(IOptionsMonitor optionsMonitor, ILogger logger, DefaultCredential defaultCredential) { _optionsMonitor = optionsMonitor ?? throw new ArgumentNullException(nameof(optionsMonitor)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _defaultCredential = defaultCredential ?? throw new ArgumentNullException(nameof(defaultCredential)); + var options = optionsMonitor.CurrentValue; try { @@ -120,7 +122,7 @@ private ServiceBusClient CreateServiceBusClientWithManagedIdentity(string fullyQ _logger.LogInformation("[INIT] ✓ ServiceBusClient created with managed identity - Namespace: {Namespace}", fullyQualifiedNamespace); // Use DefaultAzureCredential for managed identity authentication - var credential = new DefaultAzureCredential(); + var credential = _defaultCredential.Credential; return new ServiceBusClient(fullyQualifiedNamespace, credential); } catch (Exception ex) diff --git a/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs b/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs index fd59ffce..bd6f9613 100644 --- a/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs +++ b/src/SimpleL7Proxy/Feeder/AsyncFeeder.cs @@ -1,5 +1,4 @@ using Azure.Core; -using Azure.Identity; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; diff --git a/src/SimpleL7Proxy/Feeder/NormalRequest.cs b/src/SimpleL7Proxy/Feeder/NormalRequest.cs index 7d15727e..7213050e 100644 --- a/src/SimpleL7Proxy/Feeder/NormalRequest.cs +++ b/src/SimpleL7Proxy/Feeder/NormalRequest.cs @@ -1,6 +1,5 @@ using Azure.Core; -using Azure.Identity; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; diff --git a/src/SimpleL7Proxy/Feeder/OpenAIBackgroundRequest.cs b/src/SimpleL7Proxy/Feeder/OpenAIBackgroundRequest.cs index 9b9c6e95..8535cc54 100644 --- a/src/SimpleL7Proxy/Feeder/OpenAIBackgroundRequest.cs +++ b/src/SimpleL7Proxy/Feeder/OpenAIBackgroundRequest.cs @@ -1,5 +1,4 @@ using Azure.Core; -using Azure.Identity; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; diff --git a/src/SimpleL7Proxy/ProbeServer.cs b/src/SimpleL7Proxy/ProbeServer.cs index 88a22cde..c5d9f5e8 100644 --- a/src/SimpleL7Proxy/ProbeServer.cs +++ b/src/SimpleL7Proxy/ProbeServer.cs @@ -17,6 +17,7 @@ using SimpleL7Proxy.Config; using Shared.HealthProbe; using SimpleL7Proxy.Queue; +using SimpleL7Proxy.Events; namespace SimpleL7Proxy; @@ -32,6 +33,7 @@ public class ProbeServer : BackgroundService, IConfigChangeSubscriber private static HealthStatusEnum _readinessStatus = HealthStatusEnum.ReadinessZeroHosts; private static HealthStatusEnum _startupStatus = HealthStatusEnum.StartupZeroHosts; + private static int _activeUndrainedEvents = 0; // Active snapshots published to readers (use Volatile.Read/Write for memory ordering) @@ -39,6 +41,7 @@ public class ProbeServer : BackgroundService, IConfigChangeSubscriber private Timer? _probeTimer; private readonly BackendOptions _backendOptions; private HttpClient? _selfCheckClient; + private IEventClient? _eventClient; static readonly byte[] s_okBytes = Encoding.UTF8.GetBytes("OK\n"); static readonly int s_okLength = s_okBytes.Length; @@ -50,12 +53,19 @@ public class ProbeServer : BackgroundService, IConfigChangeSubscriber public static HealthStatusEnum StartupStatus = HealthStatusEnum.StartupZeroHosts; private static int FailedAttempts = 0; - public ProbeServer(IBackendService backends, HealthCheckService healthService, ILogger logger, IOptions backendOptions, ConfigChangeNotifier configChangeNotifier) + public ProbeServer( + IBackendService backends, + HealthCheckService healthService, + ILogger logger, + IOptions backendOptions, + ConfigChangeNotifier configChangeNotifier, + IEventClient eventClient) { _backends = backends ?? throw new ArgumentNullException(nameof(backends)); _healthService = healthService ?? throw new ArgumentNullException(nameof(healthService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _backendOptions = backendOptions?.Value ?? throw new ArgumentNullException(nameof(backendOptions)); + _eventClient = eventClient ?? throw new ArgumentNullException(nameof(eventClient)); // Subscribe for HealthProbeSidecar changes (HealthProbeSidecarEnabled & Url are parsed from it) configChangeNotifier.Subscribe(this, options => options.HealthProbeSidecar); @@ -90,6 +100,7 @@ private void StartProbeServer() _probeTimer = new Timer(_ => { _startupStatus = _readinessStatus = _healthService.GetStatus(); + _activeUndrainedEvents = _eventClient?.Count ?? 0; // Push to sidecar if enabled (fire-and-forget async to avoid blocking threadpool) var client = _selfCheckClient; @@ -143,6 +154,9 @@ private async Task PushStatusToSidecarAsync(HttpClient selfCheckClient) } } + public int EventCount => _activeUndrainedEvents; + + // TODO: no need for stopwatch any longer public async Task LivenessResponseAsync(HttpListenerContext lc) { // Liveness probe check - use pre-allocated objects diff --git a/src/SimpleL7Proxy/Program.cs b/src/SimpleL7Proxy/Program.cs index 0ca3bd5a..d0f7e7c4 100644 --- a/src/SimpleL7Proxy/Program.cs +++ b/src/SimpleL7Proxy/Program.cs @@ -60,16 +60,21 @@ public static async Task Main(string[] args) }); var startupLogger = startupLoggerFactory.CreateLogger(); - var appConfigBootstrap = new AppConfigBootstrap(startupLoggerFactory.CreateLogger()); - - // Kick off App Configuration download early so values are ready - // by the time LoadBackendOptions reads environment variables. + BackendOptions defaultBackendOptions = new BackendOptions + { + // Bootstrap the bootstrapper !!!! + // We can't even connect to App Config unless we know this + UseOAuthGov = string.Equals( + Environment.GetEnvironmentVariable("UseOAuthGov"), "true", StringComparison.OrdinalIgnoreCase) + }; + DefaultCredential defaultCredential = new DefaultCredential(defaultBackendOptions); + var appConfigBootstrap = new AppConfigBootstrap(startupLoggerFactory.CreateLogger(), defaultBackendOptions, defaultCredential); appConfigBootstrap.Start(); var hostBuilder = Host.CreateDefaultBuilder(args) .ConfigureAppConfiguration((hostContext, config) => { - config.AddAzureAppConfigurationWithWarmSupport(startupLogger); + config.AddAzureAppConfigurationWithWarmSupport(defaultCredential, startupLogger); }) .ConfigureLogging(logging => { @@ -83,7 +88,7 @@ public static async Task Main(string[] args) .ConfigureServices((hostContext, services) => { ConfigureApplicationInsights(services); - ConfigureDependencyInjection(services, startupLogger, appConfigBootstrap); + ConfigureDependencyInjection(services, startupLogger, appConfigBootstrap, defaultCredential); }); @@ -116,7 +121,9 @@ public static async Task Main(string[] args) // Initialize ProxyEvent with BackendOptions - ProxyEvent.Initialize(options, eventClient, telemetryClient); + var commonEventData = serviceProvider.GetRequiredService(); + ProxyEvent.Initialize(options, eventClient, commonEventData, telemetryClient); + ProxyEvent.SubscribeToConfigChanges(serviceProvider.GetRequiredService()); // Initialize HostConfig with all required dependencies including service provider for circuit breaker DI HostConfig.Initialize(backendTokenProvider, startupLogger, serviceProvider); @@ -126,6 +133,7 @@ public static async Task Main(string[] args) var hostCollection = serviceProvider.GetRequiredService(); ConfigBootstrapper.RegisterBackends(options.Value, configuration, null, hostCollection); + try { //ServiceBusRequestService? serviceBusService = null; @@ -159,6 +167,18 @@ public static async Task Main(string[] args) startupLogger.LogError(ex, "[ERROR] ✗ ServiceBus initialization failed"); } + // Log confirmation once all IHostedService.StartAsync calls have completed. + // This fires after the framework has started every hosted service (including event loggers). + var appLifetime = serviceProvider.GetRequiredService(); + appLifetime.ApplicationStarted.Register(() => + { + var composite = serviceProvider.GetRequiredService(); + ConfigBootstrapper.OutputEnvVars(options.Value); + + startupLogger.LogInformation("[INIT] ✓ All hosted services started — active event loggers: {Loggers}", + composite.ClientType); + }); + try { await frameworkHost.RunAsync(); @@ -214,84 +234,19 @@ private static void ConfigureApplicationInsights(IServiceCollection services) } } - private static void ConfigureDependencyInjection(IServiceCollection services, ILogger startupLogger, AppConfigBootstrap appConfigBootstrap) + private static void ConfigureDependencyInjection(IServiceCollection services, ILogger startupLogger, AppConfigBootstrap appConfigBootstrap, DefaultCredential defaultCredential) { services.AddSingleton(appConfigBootstrap); + services.AddSingleton(defaultCredential); TryAddCompositeEventClient(services); // register the backend options var backendOptions = ConfigBootstrapper.CreateBackendOptions(startupLogger, appConfigBootstrap); services.AddBackendHostConfiguration(startupLogger, backendOptions); - // EVENT_LOGGERS is a comma-separated list of event logger backends to enable. - // Supported values: "file", "eventhub" - // Example: EVENT_LOGGERS="file,eventhub" enables both simultaneously. - // Falls back to legacy LOGTOFILE behaviour when EVENT_LOGGERS is not set. - - var eventLoggersRaw = backendOptions.EventLoggers; - HashSet enabledLoggers; - - if (!string.IsNullOrWhiteSpace(eventLoggersRaw)) - { - enabledLoggers = new HashSet( - eventLoggersRaw.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries), - StringComparer.OrdinalIgnoreCase); - Console.WriteLine($"[CONFIG] EVENT_LOGGERS: {string.Join(", ", enabledLoggers)}"); - } - else - { - // Legacy fallback: LOGTOFILE=true → file, otherwise → eventhub - enabledLoggers = new HashSet(StringComparer.OrdinalIgnoreCase) - { - backendOptions.LogToFile ? "file" : "eventhub" - }; - Console.WriteLine($"[CONFIG] EVENT_LOGGERS not set, falling back to legacy: {string.Join(", ", enabledLoggers)}"); - } - - // Ensure CompositeEventClient is registered before any individual clients - - foreach ( var loggername in enabledLoggers) - { - if (loggername == "file") - { - services.AddSingleton(svc => - new LogFileEventClient(backendOptions.LogFileName, svc.GetRequiredService())); - services.AddSingleton(svc => (IHostedService)svc.GetRequiredService()); - } - else if ( loggername == "eventhub") - { - // EventHubClient reads its own config from env vars (EVENTHUB_CONNECTIONSTRING, EVENTHUB_NAME, etc.) - services.AddSingleton(); - services.AddSingleton(svc => svc.GetRequiredService()); - } - else { - // Reflection fallback: resolve type name within this assembly only (prevents cross-assembly loading) - var loggerType = typeof(Program).Assembly.GetType(loggername, throwOnError: false); - if (loggerType == null) - { - startupLogger.LogWarning("[CONFIG] Event logger type '{LoggerType}' not found. Skipping.", loggername); - continue; - } - - if (!typeof(IEventClient).IsAssignableFrom(loggerType)) - { - startupLogger.LogWarning("[CONFIG] Event logger type '{LoggerType}' does not implement IEventClient. Skipping.", loggername); - continue; - } - - // Register as concrete singleton so DI can resolve constructor dependencies - services.AddSingleton(loggerType); - - // If it implements IHostedService, register it so the host calls StartAsync - if (typeof(IHostedService).IsAssignableFrom(loggerType)) - { - services.AddSingleton(svc => (IHostedService)svc.GetRequiredService(loggerType)); - } - - startupLogger.LogInformation("[CONFIG] Registered event logger: {LoggerType}", loggername); - } - - } + // Register event headers and event loggers + RegisterEventHeaders(services, startupLogger, backendOptions); + RegisterEventLoggers(services, startupLogger, backendOptions, backendOptions.EventLoggers); // Wire up Azure App Configuration warm-refresh service (no-op if AZURE_APPCONFIG_ENDPOINT is not set) @@ -432,6 +387,104 @@ private static void ConfigureDependencyInjection(IServiceCollection services, IL services.AddHostedService(); } + private static void RegisterEventHeaders(IServiceCollection services, ILogger startupLogger, BackendOptions backendOptions) + { + var registered = false; + var eventdataclass = backendOptions.EventHeaders; + try + { + var dataType = string.IsNullOrEmpty(eventdataclass) + ? null + : typeof(Program).Assembly.GetType(eventdataclass, throwOnError: false); + + if (dataType != null && typeof(ICommonEventData).IsAssignableFrom(dataType)) + { + var instance = (ICommonEventData)Activator.CreateInstance(dataType, Options.Create(backendOptions))!; + services.AddSingleton(dataType, instance); + services.AddSingleton(instance); + registered = true; + } + } + catch (Exception ex) + { + startupLogger.LogWarning(ex, "[CONFIG] Failed to register EventHeaders '{EventDataType}'.", eventdataclass); + } + finally + { + if (!registered) + { + startupLogger.LogWarning("[CONFIG] EventHeaders '{EventDataType}' not found or invalid. Falling back to CommonEventHeaders.", eventdataclass); + services.AddSingleton(); + } + } + } + + private static void RegisterEventLoggers(IServiceCollection services, ILogger startupLogger, BackendOptions backendOptions, string? eventLoggersRaw) + { + HashSet enabledLoggers; + if (!string.IsNullOrWhiteSpace(eventLoggersRaw)) + { + enabledLoggers = new HashSet( + eventLoggersRaw.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries), + StringComparer.OrdinalIgnoreCase); + Console.WriteLine($"[CONFIG] EVENT_LOGGERS: {string.Join(", ", enabledLoggers)}"); + } + else + { + enabledLoggers = new HashSet(StringComparer.OrdinalIgnoreCase) + { + backendOptions.LogToFile ? "file" : "eventhub" + }; + Console.WriteLine($"[CONFIG] EVENT_LOGGERS not set, falling back to legacy: {string.Join(", ", enabledLoggers)}"); + } + + foreach (var loggername in enabledLoggers) + { + if (loggername == "file") + { + services.AddSingleton(svc => + new LogFileEventClient(backendOptions.LogFileName, svc.GetRequiredService(), svc.GetRequiredService>())); + services.AddSingleton(svc => (IHostedService)svc.GetRequiredService()); + } + else if (loggername == "eventhub") + { + services.AddSingleton(); + services.AddSingleton(svc => svc.GetRequiredService()); + } + else + { + try + { + var loggerType = typeof(Program).Assembly.GetType(loggername, throwOnError: false); + if (loggerType == null || !typeof(IEventClient).IsAssignableFrom(loggerType)) + { + startupLogger.LogWarning("[CONFIG] Event logger type '{LoggerType}' not found or does not implement IEventClient. Skipping.", loggername); + continue; + } + + var capturedType = loggerType; + services.AddSingleton(capturedType, svc => + { + var instance = ActivatorUtilities.CreateInstance(svc, capturedType); + startupLogger.LogInformation("[CONFIG] ✓ Instantiated event logger: {LoggerType}", capturedType.Name); + return instance; + }); + + if (typeof(IHostedService).IsAssignableFrom(capturedType)) + { + services.AddSingleton(svc => (IHostedService)svc.GetRequiredService(capturedType)); + } + + startupLogger.LogInformation("[CONFIG] Registered event logger: {LoggerType}", loggername); + } + catch (Exception ex) + { + startupLogger.LogWarning(ex, "[CONFIG] Failed to register event logger '{LoggerType}'. Skipping.", loggername); + } + } + } + } + /// /// Ensures CompositeEventClient is registered exactly once. /// diff --git a/src/SimpleL7Proxy/Proxy/ProxyErrorException.cs b/src/SimpleL7Proxy/Proxy/ProxyErrorException.cs index ab29be8a..06c08412 100644 --- a/src/SimpleL7Proxy/Proxy/ProxyErrorException.cs +++ b/src/SimpleL7Proxy/Proxy/ProxyErrorException.cs @@ -15,7 +15,8 @@ public enum ErrorType InvalidHeader, DisallowedAppID, UnknownProfile, - AsyncWorkerError + AsyncWorkerError, + ContentTooLarge } public ErrorType Type { get; set; } = type; diff --git a/src/SimpleL7Proxy/Proxy/ProxyWorker.cs b/src/SimpleL7Proxy/Proxy/ProxyWorker.cs index 35ee6791..072383a5 100644 --- a/src/SimpleL7Proxy/Proxy/ProxyWorker.cs +++ b/src/SimpleL7Proxy/Proxy/ProxyWorker.cs @@ -1075,6 +1075,16 @@ public async Task ProxyToBackEndAsync(RequestData request) } } } + catch (OutOfMemoryException oomEx) + { + TriggerHostCB = false; + _logger.LogCritical(oomEx, "Out of memory caching request body for {Guid}", request.Guid); + intCode = (int)HttpStatusCode.RequestEntityTooLarge; // 413 + throw new ProxyErrorException( + ProxyErrorException.ErrorType.ContentTooLarge, + HttpStatusCode.InternalServerError, + $"Request body too large to buffer: {oomEx.Message}"); + } catch (S7PRequeueException e) { TriggerHostCB = false; @@ -1590,7 +1600,7 @@ private static bool IsInvalidHeaderException(Exception ex) private async Task StreamResponseAsync(RequestData request, ProxyData pr) { ProxyEvent requestSummary = request.EventData; - string processWith = pr.StreamingProcessor ?? "DefaultStream"; + string processWith = pr.StreamingProcessor ?? StreamProcessorFactory.DEFAULT_PROCESSOR; var proxyResponse = pr.BodyResponseMessage; if (proxyResponse == null) @@ -1607,7 +1617,8 @@ private async Task StreamResponseAsync(RequestData request, ProxyData pr) _logger.LogDebug("Resolved processor: {ProcessorName} for request {Guid}", resolvedProcessor, request.Guid); // Route response to appropriate destination based on execution mode - Stream? destination = null; + Stream? destination; + bool needsFlush = false; string destinationType; if (request.IsBackgroundCheck && request.asyncWorker != null) @@ -1619,6 +1630,7 @@ private async Task StreamResponseAsync(RequestData request, ProxyData pr) else if (request.runAsync && request.asyncWorker != null) { destinationType = "async blob"; + needsFlush = true; // QueuedBlobStream requires FlushAsync to enqueue data destination = await request.asyncWorker.GetOrCreateDataStreamAsync().ConfigureAwait(false); } else if (request.OutputStream != null) @@ -1629,27 +1641,19 @@ private async Task StreamResponseAsync(RequestData request, ProxyData pr) else { _logger.LogError("OutputStream is null for request {Guid}, cannot stream response", request.Guid); - destinationType = "none"; + return; } - if (destination != null && proxyResponse.Content != null) + if (proxyResponse.Content != null) { _logger.LogDebug("Streaming to {Destination} for request {Guid}", destinationType, request.Guid); await processor.CopyToAsync(proxyResponse.Content, destination).ConfigureAwait(false); - - // Explicit flush for async blob streams - QueuedBlobStream requires FlushAsync to enqueue the data - if (destinationType == "async blob") + + if (needsFlush) { - //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: FlushAfterCopy | Guid: {Guid}", request.Guid); await destination.FlushAsync().ConfigureAwait(false); - //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: FlushComplete | Guid: {Guid}", request.Guid); } } - else - { - //_logger.LogInformation("[BLOB-TRACE] StreamResponseAsync | Action: NoData | Guid: {Guid} | Destination: {HasDestination} | Content: {HasContent}", - // request.Guid, destination != null, proxyResponse.Content != null); - } } catch (HttpListenerException ex) { diff --git a/src/SimpleL7Proxy/Proxy/StreamProcessor/CompleteAllUsageProcessor.cs b/src/SimpleL7Proxy/Proxy/StreamProcessor/CompleteAllUsageProcessor.cs index 572ebd58..b8139fb4 100644 --- a/src/SimpleL7Proxy/Proxy/StreamProcessor/CompleteAllUsageProcessor.cs +++ b/src/SimpleL7Proxy/Proxy/StreamProcessor/CompleteAllUsageProcessor.cs @@ -13,6 +13,10 @@ namespace SimpleL7Proxy.StreamProcessor /// public class CompleteAllUsageProcessor : JsonStreamProcessor { + // Pre-compiled regex for extracting usage/usageMetadata JSON blocks from streaming responses + private static readonly Regex s_usageJsonRegex = new( + @"""(?:[uU]sage|[uU]sage[mM]etadata)"":\s*(\{(?:[^{}]|(?\{)|(?<-open>\}))*(?(open)(?!))\})", + RegexOptions.Singleline | RegexOptions.Compiled); protected override int MaxLines => 100; protected override int MinLineLength => 1; @@ -37,9 +41,7 @@ protected override void ProcessLastLines(string[] lastLines, string primaryLine) int startIndex = Array.IndexOf(lastLines, primaryLine); var input = string.Join(" ", lastLines[startIndex..]); - // Use a regex to extract the json for either usage or usageMetadata. - var jsonPattern = @"""(?:[uU]sage|[uU]sage[mM]etadata)"":\s*(\{(?:[^{}]|(?\{)|(?<-open>\}))*\}(?(open)(?!)))"; - var matches = Regex.Matches(input, jsonPattern, RegexOptions.Singleline); + var matches = s_usageJsonRegex.Matches(input); int count=0; if (matches.Count > 0) diff --git a/src/SimpleL7Proxy/Proxy/StreamProcessor/JsonStreamProcessor.cs b/src/SimpleL7Proxy/Proxy/StreamProcessor/JsonStreamProcessor.cs index e9cc4bac..381a1f5b 100644 --- a/src/SimpleL7Proxy/Proxy/StreamProcessor/JsonStreamProcessor.cs +++ b/src/SimpleL7Proxy/Proxy/StreamProcessor/JsonStreamProcessor.cs @@ -42,6 +42,11 @@ namespace SimpleL7Proxy.StreamProcessor /// public abstract class JsonStreamProcessor : BaseStreamProcessor { + // Pre-compiled regex for extracting "id" field from JSON lines + private static readonly Regex s_idRegex = new( + @"\s*""id""\s*:\s*""([^""]+)""", + RegexOptions.Singleline | RegexOptions.Compiled); + protected Dictionary data = new(); protected virtual int MaxLines { get; } = 10; protected virtual int MinLineLength { get; } = 20; @@ -146,8 +151,6 @@ public override async Task CopyToAsync(System.Net.Http.HttpContent sourceContent string? usageLine = null; - var idPattern = @"\s*""id""\s*:\s*""([^""]+)"""; - var backgroundRequestFound = false; var modelFound = false; BackgroundCompleted = false; @@ -188,8 +191,7 @@ public override async Task CopyToAsync(System.Net.Http.HttpContent sourceContent // Console.WriteLine("This is a background request : " + line); - var match = Regex.Match(line, idPattern, RegexOptions.Singleline); - var jsonBlock = String.Empty; + var match = s_idRegex.Match(line); if (match.Success) { diff --git a/src/SimpleL7Proxy/Proxy/StreamProcessor/MultiAllUsageProcessor.cs b/src/SimpleL7Proxy/Proxy/StreamProcessor/MultiAllUsageProcessor.cs index 78dec28d..e76679b7 100644 --- a/src/SimpleL7Proxy/Proxy/StreamProcessor/MultiAllUsageProcessor.cs +++ b/src/SimpleL7Proxy/Proxy/StreamProcessor/MultiAllUsageProcessor.cs @@ -13,6 +13,10 @@ namespace SimpleL7Proxy.StreamProcessor /// public class MultiLineAllUsageProcessor : JsonStreamProcessor { + // Pre-compiled regex for extracting usage/usageMetadata JSON blocks from streaming responses + private static readonly Regex s_usageJsonRegex = new( + @"""(?:[uU]sage|[uU]sage[mM]etadata)"":\s*(\{(?:[^{}]|(?\{)|(?<-open>\}))*(?(open)(?!))\})", + RegexOptions.Singleline | RegexOptions.Compiled); protected override int MaxLines => 100; protected override int MinLineLength => 1; @@ -36,9 +40,7 @@ protected override void ProcessLastLines(string[] lastLines, string primaryLine) int startIndex = Array.IndexOf(lastLines, primaryLine); var input = string.Join(" ", lastLines[startIndex..]); - // Use a regex to extract the json for either usage or usageMetadata. - var jsonPattern = @"""(?:[uU]sage|[uU]sage[mM]etadata)"":\s*(\{(?:[^{}]|(?\{)|(?<-open>\}))*\}(?(open)(?!)))"; - var match = Regex.Match(input, jsonPattern, RegexOptions.Singleline); + var match = s_usageJsonRegex.Match(input); var jsonBlock = String.Empty; if (match.Success) diff --git a/src/SimpleL7Proxy/Proxy/StreamProcessor/StreamProcessorFactory.cs b/src/SimpleL7Proxy/Proxy/StreamProcessor/StreamProcessorFactory.cs index 7a353705..b156644f 100644 --- a/src/SimpleL7Proxy/Proxy/StreamProcessor/StreamProcessorFactory.cs +++ b/src/SimpleL7Proxy/Proxy/StreamProcessor/StreamProcessorFactory.cs @@ -27,8 +27,8 @@ public sealed class StreamProcessorFactory }; // Constants for processor selection logic - private const string DEFAULT_PROCESSOR = "Default"; - private const string STREAM_PROCESSOR = "DefaultStream"; + public const string DEFAULT_PROCESSOR = "DefaultStream"; + public const string STREAM_PROCESSOR = "DefaultStream"; private static readonly string[] PROCESSOR_SUFFIXES = ["Processor", "Parser"]; private const string TOKEN_PROCESSOR_HEADER = "TOKENPROCESSOR"; private const string EVENT_STREAM_MEDIA = "text/event-stream"; @@ -79,11 +79,15 @@ DEFAULT_PROCESSOR when mediaType.Equals(EVENT_STREAM_MEDIA, StringComparison.Ord /// /// Gets a stream processor instance by name with fallback to default. - /// Optimized for performance - uses singleton for default processor to reduce allocations. + /// Returns a pre-allocated singleton for "DefaultStream" (stateless, no allocation). + /// Creates a new instance per call for all other processors (OpenAI, AllUsage, + /// MultiLineAllUsage, AllUsage-2) since they may hold per-request state. + /// Falls back to the default processor if the requested name is unknown or construction fails. /// /// The name of the processor to retrieve - /// The actual processor name that was used (after fallback) - /// An instance of the requested stream processor + /// The actual processor name that was used (may differ from + /// if fallback occurred) + /// A singleton or new instance of the requested stream processor public IStreamProcessor GetStreamProcessor(string processorName, out string resolvedProcessorName) { if (!ProcessorFactories.TryGetValue(processorName, out var factory)) diff --git a/src/SimpleL7Proxy/server.cs b/src/SimpleL7Proxy/server.cs index f6cd4a4a..7f2b97d0 100644 --- a/src/SimpleL7Proxy/server.cs +++ b/src/SimpleL7Proxy/server.cs @@ -1,6 +1,6 @@ using System.Net; using System.Text.Json; -using Microsoft.ApplicationInsights; +using System.Collections.Frozen; using Microsoft.ApplicationInsights.DataContracts; using Microsoft.Extensions.Options; using System.Collections.Concurrent; @@ -50,9 +50,14 @@ public class Server : BackgroundService, IConfigChangeSubscriber private readonly ProbeServer _probeServer; + // Precomputed frozen collections for O(1) hot-path lookups, recomputed on config change + private volatile FrozenSet _disallowedHeaders; + private volatile FrozenSet _priorityKeys; + private volatile FrozenDictionary _priorityKeyToValue; + // Precomputed validation rules to avoid dictionary iteration and string ops per request private readonly record struct ValidateHeaderRule(string SourceHeader, string AllowedValuesHeader, string DisplayName); - private readonly ValidateHeaderRule[] _validateHeaderRules; + private ValidateHeaderRule[] _validateHeaderRules; // Constructor to initialize the server with backend options and telemetry client. public Server( @@ -121,7 +126,13 @@ public Server( options => options.PollInterval ]); - // Precompute validation header rules once at startup + // Precompute frozen sets and validation rules at startup + _disallowedHeaders = _options.DisallowedHeaders.ToFrozenSet(StringComparer.OrdinalIgnoreCase); + _priorityKeys = _options.PriorityKeys.ToFrozenSet(StringComparer.OrdinalIgnoreCase); + _priorityKeyToValue = _options.PriorityKeys + .Zip(_options.PriorityValues) + .ToFrozenDictionary(x => x.First, x => x.Second, StringComparer.OrdinalIgnoreCase); + _validateHeaderRules = _options.ValidateHeaders .Select(kvp => new ValidateHeaderRule( kvp.Key, @@ -151,7 +162,21 @@ public Task OnConfigChangedAsync( CancellationToken cancellationToken) { _logger.LogInformation("[CONFIG] Server changed — Settings live updated without restart"); - // apply the changes + + // Recompute frozen sets from updated options + _disallowedHeaders = backendOptions.DisallowedHeaders.ToFrozenSet(StringComparer.OrdinalIgnoreCase); + _priorityKeys = backendOptions.PriorityKeys.ToFrozenSet(StringComparer.OrdinalIgnoreCase); + _priorityKeyToValue = _options.PriorityKeys + .Zip(_options.PriorityValues) + .ToFrozenDictionary(x => x.First, x => x.Second, StringComparer.OrdinalIgnoreCase); + + _validateHeaderRules = backendOptions.ValidateHeaders + .Select(kvp => new ValidateHeaderRule( + kvp.Key, + kvp.Value, + kvp.Key.StartsWith("S7", StringComparison.Ordinal) ? kvp.Key[2..] : kvp.Key)) + .ToArray(); + return Task.CompletedTask; } @@ -244,6 +269,11 @@ public async Task Run(CancellationToken cancellationToken) bool doUserProfile = _options.UseProfiles; // Only enable async mode if configured AND blob storage is available (not using NullBlobWriter) bool doAsync = _options.AsyncModeEnabled && !(_blobWriter is NullBlobWriter); + int maxEvents = _options.MaxEvents; + int maxEvents_90Percent = (int)(maxEvents * .9); + int maxEvents_80Percent = (int)(maxEvents * .8); + int maxEvents_60Percent = (int)(maxEvents * .6); + int maxEvents_50Percent = (int)(maxEvents * .5); while (!cancellationToken.IsCancellationRequested) { @@ -270,35 +300,25 @@ public async Task Run(CancellationToken cancellationToken) } // if it's a probe, then bypass all the below checks and enqueue the request - if (Constants.probes.Contains(lc.Request.Url?.PathAndQuery)) + var probePath = lc.Request.Url?.PathAndQuery; + switch (probePath) { - // Get ProbeData from pool using modulo rotation - var probePath = lc.Request.Url!.PathAndQuery; - var fallthrough = false; - - // Fast-path for probes to avoid queue and worker latency - switch (probePath) - { - case Constants.Liveness: - await _probeServer.LivenessResponseAsync(lc); - break; - case Constants.Readiness: - await _probeServer.ReadinessResponseAsync(lc); - break; - case Constants.Startup: - await _probeServer.StartupResponseAsync(lc); - break; - default: - fallthrough = true; - break; - - } - - if (!fallthrough) + case Constants.Liveness: + await _probeServer.LivenessResponseAsync(lc); + continue; + case Constants.Readiness: + await _probeServer.ReadinessResponseAsync(lc); continue; + case Constants.Startup: + await _probeServer.StartupResponseAsync(lc); + continue; + case Constants.Health: + case Constants.ForceGC: + break; // fall through to queue path + default: + break; // not a probe, fall through } - - + int priority = _options.DefaultPriority; int userPriorityBoost = 0; var notEnqued = false; @@ -306,7 +326,7 @@ public async Task Run(CancellationToken cancellationToken) var retrymsg = ""; var logmsg = ""; - Interlocked.Increment(ref counter); + counter++; var requestId = _options.IDStr + counter.ToString(); //delayCts.Cancel(); @@ -334,274 +354,296 @@ public async Task Run(CancellationToken cancellationToken) if (!_isShuttingDown) { - try + int eventCount = _probeServer.EventCount; + if ( eventCount > maxEvents_50Percent) { + int cnt = eventCount > maxEvents_90Percent ? 1000 + : eventCount > maxEvents_80Percent ? 5000 + : eventCount > maxEvents_60Percent ? 200 + : 100; + await Task.Delay(cnt); + } + + if ( eventCount > maxEvents) { - rd.Debug = rd.Headers["S7PDEBUG"] != null && string.Equals(rd.Headers["S7PDEBUG"], "true", StringComparison.OrdinalIgnoreCase); + notEnqued = true; + notEnquedCode = 429; + retrymsg = ed["Message"] = "Max Events Exceeds Threshold"; + logmsg = "MAX EVENTS => 429:"; + } + else if (_backends.CheckFailedStatus()) + // Check circuit breaker status and enqueue the request + { + notEnqued = true; + notEnquedCode = 429; - if (_options.ValidateAuthAppID) - { - string? authAppID = rd.Headers[_options.ValidateAuthAppIDHeader]; - if (!string.IsNullOrEmpty(authAppID) && _userProfile.IsAuthAppIDValid(authAppID)) - { - if (rd.Debug) - Console.WriteLine($"AuthAppID {rd.Headers[_options.ValidateAuthAppIDHeader]} is valid."); - } - else - { - if (rd.Debug) - Console.WriteLine($"AuthAppID {rd.Headers[_options.ValidateAuthAppIDHeader]} is invalid."); + ed["Message"] = "Circuit breaker on - 429"; + retrymsg = $"Too many failures in last {_options.CircuitBreakerTimeslice} seconds"; + logmsg = "Circuit breaker on => 429:"; + } + else if (_requestsQueue.thrdSafeCount >= _options.MaxQueueLength) + { + notEnqued = true; + notEnquedCode = 429; - throw new ProxyErrorException( - ProxyErrorException.ErrorType.DisallowedAppID, - HttpStatusCode.Forbidden, - "Invalid AuthAppID: " + rd.Headers[_options.ValidateAuthAppIDHeader] + "\n" - ); - } - } + retrymsg = ed["Message"] = "Queue is full"; + logmsg = "Queue is full => 429:"; + } + else if (_backends.ActiveHostCount() == 0) + { + notEnqued = true; + notEnquedCode = 429; - // Remove any disallowed headers - foreach (var header in _options.DisallowedHeaders) - { - if (rd.Debug && !String.IsNullOrEmpty(rd.Headers.Get(header))) - Console.WriteLine($"Disallowed header {header} removed from request."); - rd.Headers.Remove(header); - } + retrymsg = ed["Message"] = "No active hosts"; + logmsg = "No active hosts => 429:"; + } + else + { - rd.UserID = ""; - // Normalize path once: ensure non-empty and starts with '/' - if (string.IsNullOrEmpty(rd.Path)) - rd.Path = "/"; - else if (!rd.Path.StartsWith('/')) - rd.Path = "/" + rd.Path; - rd.Headers["S7Path"] = rd.Path; // Copy path - // Lookup the user profile and add the headers to the request - if (doUserProfile) + try { - var requestUser = rd.Headers[_options.UserProfileHeader]; - if (!string.IsNullOrEmpty(requestUser)) - { - rd.profileUserId = requestUser; - (var headers, var isSoftDeleted, var isStale) = _userProfile.GetUserProfile(requestUser); + rd.Debug = rd.Headers["S7PDEBUG"] != null && string.Equals(rd.Headers["S7PDEBUG"], "true", StringComparison.OrdinalIgnoreCase); + - if (headers != null && headers.Count > 0) + if (_options.ValidateAuthAppID) + { + string? authAppID = rd.Headers[_options.ValidateAuthAppIDHeader]; + if (!string.IsNullOrEmpty(authAppID) && _userProfile.IsAuthAppIDValid(authAppID)) { - foreach (var header in headers) - { - if (!header.Key.StartsWith("internal-")) - { - rd.Headers.Set(header.Key, header.Value); - if (rd.Debug) - Console.WriteLine($"Add Header: {header.Key} = {header.Value}"); - } - } + if (rd.Debug) + Console.WriteLine($"AuthAppID {rd.Headers[_options.ValidateAuthAppIDHeader]} is valid."); } else { if (rd.Debug) - Console.WriteLine($"User profile for {requestUser} not found."); + Console.WriteLine($"AuthAppID {rd.Headers[_options.ValidateAuthAppIDHeader]} is invalid."); + throw new ProxyErrorException( - ProxyErrorException.ErrorType.UnknownProfile, + ProxyErrorException.ErrorType.DisallowedAppID, HttpStatusCode.Forbidden, - "User profile not found: " + requestUser + "\n" + "Invalid AuthAppID: " + rd.Headers[_options.ValidateAuthAppIDHeader] + "\n" ); } } - } - // Check for any required headers - if (_options.RequiredHeaders.Count > 0) - { - // Note: Returns the first missing required header only - var missing = _options.RequiredHeaders.FirstOrDefault(x => string.IsNullOrEmpty(rd.Headers[x])); - if (!string.IsNullOrEmpty(missing)) + // Remove any disallowed headers (FrozenSet for O(1) contains, but iterate to remove) + foreach (var header in _disallowedHeaders) { - if (rd.Debug) - Console.WriteLine($"Required header {missing} is missing from request."); - - throw new ProxyErrorException( - ProxyErrorException.ErrorType.IncompleteHeaders, - HttpStatusCode.ExpectationFailed, - "Required header is missing: " + missing - ); + if (rd.Debug && !string.IsNullOrEmpty(rd.Headers.Get(header))) + Console.WriteLine($"Disallowed header {header} removed from request."); + rd.Headers.Remove(header); } - } - // Validate headers using precomputed rules and zero-alloc span tokenization - if (_validateHeaderRules.Length > 0) - { - foreach (ref readonly var rule in _validateHeaderRules.AsSpan()) + rd.UserID = ""; + // Normalize path once: ensure non-empty and starts with '/' + if (string.IsNullOrEmpty(rd.Path)) + rd.Path = "/"; + else if (!rd.Path.StartsWith('/')) + rd.Path = "/" + rd.Path; + rd.Headers["S7Path"] = rd.Path; // Copy path + // Lookup the user profile and add the headers to the request + if (doUserProfile) { - var lookup = rd.Headers[rule.SourceHeader]!.AsSpan().Trim(); - var allowedSpan = rd.Headers[rule.AllowedValuesHeader]!.AsSpan(); - bool matched = false; - - foreach (var range in allowedSpan.Split(',')) + var requestUser = rd.Headers[_options.UserProfileHeader]; + if (!string.IsNullOrEmpty(requestUser)) { - var pattern = allowedSpan[range].Trim(); - if (pattern.Length > 0 && pattern[^1] == '*') + rd.profileUserId = requestUser; + (var headers, var isSoftDeleted, var isStale) = _userProfile.GetUserProfile(requestUser); + + if (headers != null && headers.Count > 0) { - if (lookup.StartsWith(pattern[..^1], StringComparison.OrdinalIgnoreCase)) + foreach (var header in headers) { - matched = true; - break; + if (!header.Key.StartsWith("internal-")) + { + rd.Headers.Set(header.Key, header.Value); + if (rd.Debug) + Console.WriteLine($"Add Header: {header.Key} = {header.Value}"); + } } } - else if (lookup.Equals(pattern, StringComparison.OrdinalIgnoreCase)) + else { - matched = true; - break; + if (rd.Debug) + Console.WriteLine($"User profile for {requestUser} not found."); + throw new ProxyErrorException( + ProxyErrorException.ErrorType.UnknownProfile, + HttpStatusCode.Forbidden, + "User profile not found: " + requestUser + "\n" + ); } } + } - if (!matched) + // Check for any required headers + if (_options.RequiredHeaders.Count > 0) + { + // Note: Returns the first missing required header only + var missing = _options.RequiredHeaders.FirstOrDefault(x => string.IsNullOrEmpty(rd.Headers[x])); + if (!string.IsNullOrEmpty(missing)) { if (rd.Debug) - Console.WriteLine($"Validation check failed for {rule.DisplayName}: {lookup}"); + Console.WriteLine($"Required header {missing} is missing from request."); + throw new ProxyErrorException( - ProxyErrorException.ErrorType.InvalidHeader, + ProxyErrorException.ErrorType.IncompleteHeaders, HttpStatusCode.ExpectationFailed, - $"Validation check failed for {rule.DisplayName}: {lookup}\n" + "Required header is missing: " + missing ); } } - if (rd.Debug) - Console.WriteLine($"Validation check passed for all headers."); - } - // Determine priority boost based on the UserID - if (_options.UniqueUserHeaders.Count > 0) - { - foreach (var header in _options.UniqueUserHeaders) + // Validate headers using precomputed rules and zero-alloc span tokenization + if (_validateHeaderRules.Length > 0) { - rd.UserID += rd.Headers[header] ?? ""; - } - } - - if (String.IsNullOrEmpty(rd.UserID)) - { - rd.UserID = "defaultUser"; - } + foreach (ref readonly var rule in _validateHeaderRules.AsSpan()) + { + var lookup = rd.Headers[rule.SourceHeader]!.AsSpan().Trim(); + var allowedSpan = rd.Headers[rule.AllowedValuesHeader]!.AsSpan(); + bool matched = false; - ed["UserID"] = rd.UserID; - ed["S7P-ID"] = rd.MID; + foreach (var range in allowedSpan.Split(',')) + { + var pattern = allowedSpan[range].Trim(); + if (pattern.Length > 0 && pattern[^1] == '*') + { + if (lookup.StartsWith(pattern[..^1], StringComparison.OrdinalIgnoreCase)) + { + matched = true; + break; + } + } + else if (lookup.Equals(pattern, StringComparison.OrdinalIgnoreCase)) + { + matched = true; + break; + } + } - if (rd.Debug) - Console.WriteLine($"UserID: {rd.UserID}"); + if (!matched) + { + if (rd.Debug) + Console.WriteLine($"Validation check failed for {rule.DisplayName}: {lookup}"); + throw new ProxyErrorException( + ProxyErrorException.ErrorType.InvalidHeader, + HttpStatusCode.ExpectationFailed, + $"Validation check failed for {rule.DisplayName}: {lookup}\n" + ); + } + } + if (rd.Debug) + Console.WriteLine($"Validation check passed for all headers."); + } - // ASYNC: Determine if the request is allowed async operation - if (doAsync && bool.TryParse(rd.Headers[_options.AsyncClientRequestHeader], out var asyncEnabled) && asyncEnabled) - { - var clientInfo = _userProfile.GetAsyncParams(rd.profileUserId); - if (clientInfo != null) + // Determine priority boost based on the UserID + if (_options.UniqueUserHeaders.Count > 0) { - rd.runAsync = true; - rd.AsyncBlobAccessTimeoutSecs = clientInfo.AsyncBlobAccessTimeoutSecs; - rd.BlobContainerName = clientInfo.ContainerName; - rd.SBTopicName = clientInfo.SBTopicName; - rd.AsyncClientConfig = clientInfo; // Store the full config for AsyncWorker - ed["AsyncBlobContainer"] = clientInfo.ContainerName; - ed["AsyncSBTopic"] = clientInfo.SBTopicName; - ed["BlobAccessTimeout"] = clientInfo.AsyncBlobAccessTimeoutSecs.ToString(); - ed["GenerateSAS"] = clientInfo.GenerateSasTokens.ToString(); + foreach (var header in _options.UniqueUserHeaders) + { + rd.UserID += rd.Headers[header] ?? ""; + } } - if (rd.Debug) + if (String.IsNullOrEmpty(rd.UserID)) { - Console.WriteLine($"AsyncEnabled: {rd.runAsync}"); + rd.UserID = "defaultUser"; } - } - // Determine priority boost based on the UserID - rd.Guid = _userPriority.addRequest(rd.UserID); - bool shouldBoost = _userPriority.boostIndicator(rd.UserID, out float boostValue); - userPriorityBoost = shouldBoost ? 1 : 0; + ed["UserID"] = rd.UserID; + ed["S7P-ID"] = rd.MID; - ed["GUID"] = rd.Guid.ToString(); + if (rd.Debug) + Console.WriteLine($"UserID: {rd.UserID}"); - var priorityKey = rd.Headers[_priorityHeaderName]; - if (!string.IsNullOrEmpty(priorityKey) && _options.PriorityKeys.Contains(priorityKey)) //lookup the priority - { - var index = _options.PriorityKeys.IndexOf(priorityKey); - if (index >= 0) + // ASYNC: Determine if the request is allowed async operation + if (doAsync && bool.TryParse(rd.Headers[_options.AsyncClientRequestHeader], out var asyncEnabled) && asyncEnabled) { - priority = _options.PriorityValues[index]; - } - } - rd.Priority = priority; - rd.Priority2 = userPriorityBoost; - rd.EnqueueTime = DateTime.UtcNow; - - ed["S7P-Priority"] = priority.ToString(); - ed["S7P-Priority2"] = userPriorityBoost.ToString(); + var clientInfo = _userProfile.GetAsyncParams(rd.profileUserId); + if (clientInfo != null) + { + rd.runAsync = true; + rd.AsyncBlobAccessTimeoutSecs = clientInfo.AsyncBlobAccessTimeoutSecs; + rd.BlobContainerName = clientInfo.ContainerName; + rd.SBTopicName = clientInfo.SBTopicName; + rd.AsyncClientConfig = clientInfo; // Store the full config for AsyncWorker + ed["AsyncBlobContainer"] = clientInfo.ContainerName; + ed["AsyncSBTopic"] = clientInfo.SBTopicName; + ed["BlobAccessTimeout"] = clientInfo.AsyncBlobAccessTimeoutSecs.ToString(); + ed["GenerateSAS"] = clientInfo.GenerateSasTokens.ToString(); + } - // Save the timeout header value if it exists - if (rd.Headers[_options.TimeoutHeader] != null && int.TryParse(rd.Headers[_options.TimeoutHeader], out var timeout)) - { - rd.defaultTimeout = timeout; - } - else - { - rd.defaultTimeout = _options.Timeout; - } + if (rd.Debug) + { + Console.WriteLine($"AsyncEnabled: {rd.runAsync}"); + } + } - // Calculate expiresAt time based on the timeout header or default TTL - rd.CalculateExpiration(_options.DefaultTTLSecs, _options.TTLHeader); - ed["DefaultTimeout"] = rd.defaultTimeout.ToString(); + // Determine priority boost based on the UserID + rd.Guid = _userPriority.addRequest(rd.UserID); + bool shouldBoost = _userPriority.boostIndicator(rd.UserID, out float boostValue); + userPriorityBoost = shouldBoost ? 1 : 0; + + ed["GUID"] = rd.Guid.ToString(); + + var priorityKey = rd.Headers[_priorityHeaderName]; + // if (!string.IsNullOrEmpty(priorityKey) && _priorityKeys.Contains(priorityKey)) //lookup the priority + // { + // var index = _options.PriorityKeys.IndexOf(priorityKey); + // if (index >= 0) + // { + // priority = _options.PriorityValues[index]; + // } + // } + if (!string.IsNullOrEmpty(priorityKey) && _priorityKeyToValue.TryGetValue(priorityKey, out var mappedPriority)) + { + priority = mappedPriority; + } + rd.Priority = priority; + rd.Priority2 = userPriorityBoost; + rd.EnqueueTime = DateTime.UtcNow; - // Check circuit breaker status and enqueue the request - if (_backends.CheckFailedStatus()) - { - notEnqued = true; - notEnquedCode = 429; + ed["S7P-Priority"] = priority.ToString(); + ed["S7P-Priority2"] = userPriorityBoost.ToString(); - ed["Message"] = "Circuit breaker on - 429"; - retrymsg = $"Too many failures in last {_options.CircuitBreakerTimeslice} seconds"; - logmsg = "Circuit breaker on => 429:"; - } - else if (_requestsQueue.thrdSafeCount >= _options.MaxQueueLength) - { - notEnqued = true; - notEnquedCode = 429; + // Save the timeout header value if it exists + if (rd.Headers[_options.TimeoutHeader] != null && int.TryParse(rd.Headers[_options.TimeoutHeader], out var timeout)) + { + rd.defaultTimeout = timeout; + } + else + { + rd.defaultTimeout = _options.Timeout; + } - retrymsg = ed["Message"] = "Queue is full"; - logmsg = "Queue is full => 429:"; - } - else if (_backends.ActiveHostCount() == 0) - { - notEnqued = true; - notEnquedCode = 429; + // Calculate expiresAt time based on the timeout header or default TTL + rd.CalculateExpiration(_options.DefaultTTLSecs, _options.TTLHeader); + ed["DefaultTimeout"] = rd.defaultTimeout.ToString(); - retrymsg = ed["Message"] = "No active hosts"; - logmsg = "No active hosts => 429:"; - } + // Enqueue the request + if (!_requestsQueue.Enqueue(rd, priority, userPriorityBoost, rd.EnqueueTime)) + { + notEnqued = true; + notEnquedCode = 429; + retrymsg = ed["Message"] = "Failed to enqueue request"; + logmsg = "Failed to enqueue request => 429:"; + } - // Enqueue the request + // ASYNC: If the request is allowed to run async, set the status + if (!notEnqued && doAsync) + { + rd.SBStatus = ServiceBusMessageStatusEnum.Queued; + } - else if (!_requestsQueue.Enqueue(rd, priority, userPriorityBoost, rd.EnqueueTime)) + } + catch (ProxyErrorException e) { notEnqued = true; - notEnquedCode = 429; + notEnquedCode = (int)e.StatusCode; - retrymsg = ed["Message"] = "Failed to enqueue request"; - logmsg = "Failed to enqueue request => 429:"; + logmsg = retrymsg = ed["Message"] = e.Message; } - - // ASYNC: If the request is allowed to run async, set the status - if (!notEnqued && doAsync) - { - rd.SBStatus = ServiceBusMessageStatusEnum.Queued; - } - - } - catch (ProxyErrorException e) - { - notEnqued = true; - notEnquedCode = (int)e.StatusCode; - - logmsg = retrymsg = ed["Message"] = e.Message; - } + } // end of allowed to proccess check } else { diff --git a/test/eventHub/writeToStorage/BlobWriter.cs b/test/eventHub/writeToStorage/BlobWriter.cs index 1c197a95..7d8eb7f3 100644 --- a/test/eventHub/writeToStorage/BlobWriter.cs +++ b/test/eventHub/writeToStorage/BlobWriter.cs @@ -1,4 +1,3 @@ -using Azure.Identity; using Azure.Storage.Blobs; using Microsoft.Extensions.Logging; using System.Text; @@ -11,13 +10,16 @@ namespace EventHubToStorage; public class BlobWriter { private readonly BlobServiceClient _blobServiceClient; + private readonly DefaultCredential _defaultCredential; + private readonly ILogger? _logger; public bool UsesMI { get; set; } - private BlobWriter(BlobServiceClient blobServiceClient, ILogger? logger = null) + private BlobWriter(BlobServiceClient blobServiceClient, DefaultCredential defaultCredential, ILogger? logger = null) { _blobServiceClient = blobServiceClient; + _defaultCredential = defaultCredential; _logger = logger; } @@ -29,7 +31,7 @@ public static BlobWriter CreateWithManagedIdentity(string storageAccountUri, ILo try { var blobServiceUri = new Uri(storageAccountUri); - var credential = new DefaultAzureCredential(); + var credential = _defaultCredential.Credential; var blobServiceClient = new BlobServiceClient(blobServiceUri, credential); var blobWriter = new BlobWriter(blobServiceClient, logger)