diff --git a/docs/quix-cloud/managed-services/dynamic-configuration.md b/docs/quix-cloud/managed-services/dynamic-configuration.md index a11eb8b5..90a6b394 100644 --- a/docs/quix-cloud/managed-services/dynamic-configuration.md +++ b/docs/quix-cloud/managed-services/dynamic-configuration.md @@ -132,14 +132,341 @@ This service can leverage a blob storage configured on our platform (see [blob s The blob storage configuration is automatically injected only when `contentStore` is set to `file`. -## SDK Integration +## API Reference -The **Quix Streams SDK** provides built-in functionality to: +The Dynamic Configuration Manager provides a REST API for managing configurations. The API is available at the service endpoint once deployed. -- Subscribe to configuration change events. -- Download and cache the latest configuration from the API. -- Join configuration values with live data streams at the right time. +### Base URL +``` +http://:/api/v1 +``` + +### Authentication +All API requests require authentication via the `Authorization` header: +``` +Authorization: Bearer +``` + +### Create Configuration + +Create a new configuration: + +```http +POST /api/v1/configurations +Content-Type: application/json + +{ + "metadata": { + "type": "device-config", + "target_key": "sensor-001", + "valid_from": "2024-01-01T00:00:00Z", + "category": "sensors" + }, + "content": { + "device": { + "name": "Temperature Sensor 001", + "model": "TS-2000", + "location": "Building A, Floor 2" + }, + "calibration": { + "offset": 0.5, + "scale": 1.02, + "last_calibrated": "2024-01-01T00:00:00Z" + }, + "firmware": { + "version": "2.1.3", + "features": ["temperature", "humidity"] + } + }, + "replace": false +} +``` + +### Request Body + +- **metadata.type** (string, required): Configuration type identifier +- **metadata.target_key** (string, required): Target key for configuration matching +- **metadata.valid_from** (ISO8601 datetime, optional): When this configuration becomes valid +- **metadata.category** (string, optional): Category for grouping configurations +- **content** (object, optional): The actual configuration data (JSON object) +- **replace** (boolean, optional): If true, creates a new version if configuration already exists (default: false) + +**Note:** The configuration `id` is automatically generated from `type` and `target_key` using SHA-1 hash. + +### Update Configuration + +Update an existing configuration (creates a new version): + +```http +PUT /api/v1/configurations/{id} +Content-Type: application/json + +{ + "metadata": { + "valid_from": "2024-01-15T00:00:00Z", + "category": "sensors" + }, + "content": { + "device": { + "name": "Temperature Sensor 001", + "model": "TS-2000", + "location": "Building A, Floor 3" + }, + "calibration": { + "offset": 0.3, + "scale": 1.01, + "last_calibrated": "2024-01-15T10:30:00Z" + } + } +} +``` + +### Request Body + +- **metadata.valid_from** (ISO8601 datetime, optional): Update when this configuration becomes valid +- **metadata.category** (string, optional): Update the category +- **content** (object, optional): Updated configuration data + +**Note:** Only fields you provide will be updated. Omitted fields remain unchanged. + +### Upload Binary Configuration + +For non-JSON configurations (firmware files, calibration data, etc.), use the file upload endpoint: + +```http +POST /api/v1/configurations/{id}/versions/{version}/content +Content-Type: multipart/form-data + +file: +``` + +**Note:** Binary content must be uploaded separately after creating the configuration metadata. The service automatically detects and stores binary content with appropriate `content_type`. + +### Search Configurations + +Search for configurations using various criteria: + +```http +GET /api/v1/configurations/search?type=device-config&target_key=sensor-001&limit=10&offset=0 +``` + +### Query Parameters + +- **type** (string, optional): Filter by configuration type +- **target_key** (string, optional): Filter by target key +- **category** (string, optional): Filter by category +- **limit** (integer, optional): Maximum number of results (default: 20) +- **offset** (integer, optional): Number of results to skip (default: 0) + +### Get Configuration + +Retrieve a specific configuration: + +```http +GET /api/v1/configurations/{id} +``` + +### Get Configuration Version + +Retrieve a specific version of a configuration: + +```http +GET /api/v1/configurations/{id}/versions/{version} +``` + +### Delete Configuration + +Delete a configuration (all versions): + +```http +DELETE /api/v1/configurations/{id} +``` + +### Storage Modes + +#### MongoDB Mode (Default) +- **Content Type**: JSON only +- **Size Limit**: 16 MB per configuration +- **Use Case**: Structured configuration data +- **Setup**: Configure MongoDB connection parameters + +#### File Mode (Blob Storage) +- **Content Type**: Any binary data +- **Size Limit**: Depends on blob storage provider +- **Use Case**: Large files, firmware, binary data +- **Setup**: Configure blob storage credentials + +To use file mode, set `contentStore: file` in your deployment configuration. + +## Using with Quix Streams join_lookup + +The Dynamic Configuration Manager integrates seamlessly with Quix Streams' `join_lookup` feature to enrich streaming data with configuration data in real-time. + +### Basic Integration + +```python +from quixstreams import Application +from quixstreams.dataframe.joins.lookups import QuixConfigurationService + +# Initialize the application +app = Application() + +# Create a lookup instance pointing to your configuration topic +lookup = QuixConfigurationService( + topic=app.topic("device-configurations"), + app_config=app.config +) + +# Create your main data stream +sdf = app.dataframe(app.topic("sensor-data")) + +# Enrich sensor data with device configuration +sdf = sdf.join_lookup( + lookup=lookup, + fields={ + "device_name": lookup.json_field( + jsonpath="$.device.name", + type="device-config" + ), + "calibration_params": lookup.json_field( + jsonpath="$.calibration", + type="device-config" + ), + "firmware_version": lookup.json_field( + jsonpath="$.firmware.version", + type="device-config" + ) + }, + on="device_id" # The field to match on +) + +# Process the enriched data +sdf = sdf.apply(lambda value: { + **value, + "device_info": f"{value['device_name']} (v{value['firmware_version']})" +}) + +# Output to destination topic +sdf.to_topic(app.topic("enriched-sensor-data")) + +if __name__ == "__main__": + app.run() +``` + +### Advanced Configuration Matching + +Use custom key matching logic for complex scenarios: + +```python +def custom_key_matcher(value, key): + """Custom logic to determine configuration key""" + device_type = value.get("device_type", "unknown") + location = value.get("location", "default") + return f"{device_type}-{location}" + +# Use custom key matching +sdf = sdf.join_lookup( + lookup=lookup, + fields={ + "config": lookup.json_field( + jsonpath="$", + type="location-config" + ) + }, + on=custom_key_matcher +) +``` + +### Binary Configuration Support + +For non-JSON configurations (firmware files, calibration data, etc.): + +```python +sdf = sdf.join_lookup( + lookup=lookup, + fields={ + "firmware_binary": lookup.bytes_field( + type="firmware" + ), + "calibration_data": lookup.bytes_field( + type="calibration" + ) + }, + on="device_id" +) +``` + +### How join_lookup Works with Dynamic Configuration + +1. **Configuration Events**: When configurations are updated via the API, lightweight Kafka events are published to your configuration topic. + +2. **Real-time Processing**: The `join_lookup` feature listens to these events, fetches the latest configuration content, and caches it locally. + +3. **Stream Enrichment**: As your main data stream processes records, `join_lookup` automatically enriches each record with the appropriate configuration data based on the matching key and timestamp. + +4. **Version Management**: The system automatically handles configuration versioning, ensuring that each record is enriched with the configuration version that was valid at the time the record was created. + +5. **Performance Optimization**: Local caching minimizes API calls and reduces latency for high-throughput applications. + +### Advanced Use Cases + +#### Custom Target Key Matching + +For complex matching logic that goes beyond simple field matching: + +```python +class CustomLookup(QuixConfigurationService): + def _config_id(self, type: str, key: str) -> str: + """Override to implement custom ID generation""" + # Custom logic for generating configuration IDs + if type == "device-config": + # Extract device type and location from key + parts = key.split("-") + device_type = parts[0] + location = parts[1] if len(parts) > 1 else "default" + return f"{device_type}-{location}" + return super()._config_id(type, key) + +# Use custom lookup +custom_lookup = CustomLookup( + topic=app.topic("device-configurations"), + app_config=app.config +) +``` + +#### Multi-Type Configuration Enrichment + +Enrich with multiple configuration types: + +```python +sdf = sdf.join_lookup( + lookup=lookup, + fields={ + # Device configuration + "device_info": lookup.json_field( + jsonpath="$.device", + type="device-config" + ), + # Location configuration + "location_info": lookup.json_field( + jsonpath="$.location", + type="location-config" + ), + # Calibration data + "calibration": lookup.json_field( + jsonpath="$", + type="calibration-config" + ) + }, + on="device_id" +) +``` + +### Benefits -👉 See [SDK -documentation](../../quix-streams/api-reference/dataframe.html#streamingdataframejoin_lookup) -for details on `join_lookup` and related features. +- **Real-time Updates**: Configuration changes are immediately available to your streaming applications +- **Large File Support**: Handle configuration files too large for direct Kafka streaming +- **Version Control**: Automatic versioning ensures data consistency +- **Performance**: Local caching minimizes API calls and latency +- **Flexibility**: Support for both JSON and binary configuration content +- **Scalability**: Efficient handling of high-throughput data streams