## Configuration Reference — appsettings.json (Kafka.Ksql.Linq) Kafka.Ksql.Linq uses `appsettings.json` to configure Kafka, Schema Registry, ksqlDB, per‑topic policies, and optional local caches. This reference reflects the `KsqlDslOptions` structure and related configuration classes. --- ### 1. Basic structure ```json { "KsqlDsl": { "Common": { /* global Kafka settings */ }, "Topics": { /* per-topic settings */ }, "SchemaRegistry": { /* schema registry settings */ }, "Entities": [ /* optional entity cache settings */ ], "DlqTopicName": "dead_letter_queue", "DlqOptions": { /* DLQ topic settings */ }, "DeserializationErrorPolicy": "Skip|Retry|DLQ", "ReadFromFinalTopicByDefault": false, "KsqlDbUrl": "http://localhost:8088" } } ``` --- ### 1.1 Common (Kafka‑wide settings) | Key | Description | |-----|-------------| | `BootstrapServers` | Kafka broker endpoints | | `ClientId` | Client identifier | | `RequestTimeoutMs` | Operation timeout (ms) | | `MetadataMaxAgeMs` | Metadata maximum age (ms) | | `SecurityProtocol` | `Plaintext` / `SaslPlaintext` / `SaslSsl` / `Ssl` | | `SaslMechanism` | e.g. `Plain`, `ScramSha256`, `ScramSha512` | | `SaslUsername`, `SaslPassword` | SASL credentials | | `SslCaLocation` | CA certificate path | | `SslCertificateLocation` | Client certificate path | | `SslKeyLocation` | Private key path | | `SslKeyPassword` | Private key password | | `AdditionalProperties` | Extra Kafka properties (key-value) | ```json "Common": { "BootstrapServers": "localhost:9092", "ClientId": "ksql-dsl-client", "RequestTimeoutMs": 30000, "MetadataMaxAgeMs": 300000, "SecurityProtocol": "Plaintext", "SaslMechanism": "Plain", "SaslUsername": "user", "SaslPassword": "pass", "SslCaLocation": "/path/ca.pem", "SslCertificateLocation": "/path/cert.pem", "SslKeyLocation": "/path/key.pem", "SslKeyPassword": "secret", "AdditionalProperties": {} } ``` --- ### 1.2 Topics (per‑topic settings) Producer settings map to `Ksql.Linq.Configuration.Messaging.ProducerSection`, Consumer settings map to `ConsumerSection`, and structure settings map to `TopicCreationSection`. ```json "Topics": { "my-topic": { "Producer": { "Acks": "All", "CompressionType": "Snappy", "EnableIdempotence": true, "MaxInFlightRequestsPerConnection": 1, "LingerMs": 5, "BatchSize": 16384, "DeliveryTimeoutMs": 120000, "RetryBackoffMs": 100, "Retries": 2147483647, "BufferMemory": 33554432, "Partitioner": null }, "Consumer": { "GroupId": "my-group", "AutoOffsetReset": "Latest", "EnableAutoCommit": true, "AutoCommitIntervalMs": 5000, "SessionTimeoutMs": 30000, "HeartbeatIntervalMs": 3000, "MaxPollIntervalMs": 300000, "MaxPollRecords": 500, "FetchMinBytes": 1, "FetchMaxWaitMs": 500, "FetchMaxBytes": 52428800, "PartitionAssignmentStrategy": null, "IsolationLevel": "ReadUncommitted" }, "Creation": { "NumPartitions": 1, "ReplicationFactor": 1, "Configs": {}, "EnableAutoCreation": false } } } ``` #### Dynamic topic configuration For windowed/tumbling pipelines that generate timeframe topics (e.g., `bar_1m_live`), `KsqlDsl:Topics` settings are resolved with these rules: - Priority: exact name first, then progressively shortened by underscores (e.g., `bar_1m_live` → `bar_1m` → `bar`) - Suffixes like `.pub`/`.int` inherit structure from the base name; define `Creation` on the base, not on `.pub`/`.int` Provide `Creation.NumPartitions`/`Creation.ReplicationFactor` and optional `Creation.Configs.retention.ms` at the base to cascade to related derivatives; specify a complete topic name to override only that derivative. ```json "Topics": { "bar_1m": { "Creation": { "NumPartitions": 2, "Configs": { "retention.ms": "60000" } } }, "bar_1m_live": { "Creation": { "NumPartitions": 3, "Configs": { "retention.ms": "120000" } } } } ``` In the example, `bar_1m_live` inherits from `bar_1m` but is overridden by its complete‑name entry. --- ### 1.4 Entities (table cache settings) `KsqlDsl:Entities` maps to `Ksql.Linq.Configuration.EntityConfiguration`. ```json "Entities": [ { "Entity": "OrderEntity", "SourceTopic": "orders", "EnableCache": true, "StoreName": "orders_store", "BaseDirectory": "/var/lib/ksql_cache" } ] ``` | Key | Description | |-----|-------------| | `Entity` | Target POCO class name | | `SourceTopic` | Source Kafka topic | | `EnableCache` | Enable caching (bool) | | `StoreName` | Cache name (defaults to topic-based name) | | `BaseDirectory` | Root directory for RocksDB | --- ### 1.5 Validation - `ValidationMode` is configurable and defaults to `Strict`. --- ### 1.6 DLQ settings ```json "DlqTopicName": "dead_letter_queue", "DlqOptions": { "RetentionMs": 5000, "NumPartitions": 1, "ReplicationFactor": 1, "EnableAutoCreation": true, "AdditionalConfigs": { "cleanup.policy": "delete" } } ``` If unspecified, `DlqTopicName` defaults to `dead-letter-queue`. | Key | Description | |------|-------------| | `DlqTopicName` | DLQ topic name | | `RetentionMs` | Message retention (ms) | | `NumPartitions` | Partition count | | `ReplicationFactor` | Replication factor | | `EnableAutoCreation` | Automatically create topic | | `AdditionalConfigs` | Extra topic configs | --- ### Mapping DSL to appsettings | Kafka setting item | DSL specification | appsettings.json key | Notes | |----------------------|---------------------------------------|-----------------------------------------------------|-------| | Bootstrap servers | — | `KsqlDsl:Common:BootstrapServers` | Kafka cluster | | Schema Registry URL | — | `KsqlDsl:SchemaRegistry:Url` | Schema Registry | | ksqlDB URL | — | `KsqlDsl:KsqlDbUrl` | ksqlDB REST endpoint | | AutoOffsetReset | `.WithAutoOffsetReset(...)` | `KsqlDsl:Topics..Consumer.AutoOffsetReset` | `Earliest` / `Latest` | | GroupId | `.WithGroupId(...)` | `KsqlDsl:Topics..Consumer.GroupId` | Consumer group ID | | Topic name | `[KsqlTopic("orders")]` | `KsqlDsl:Topics.` | Name comes from attribute/Fluent APIs | | Partition count | Fluent or attribute | `KsqlDsl:Topics..Creation.NumPartitions` | DSL or config | | Replication factor | — (configure) | `KsqlDsl:Topics..Creation.ReplicationFactor` | Cluster‑dependent | | DLQ configuration | `.OnError(ErrorAction.DLQ)` | `KsqlDsl:DlqTopicName`, `KsqlDsl:DlqOptions.*` | Retention, partitions, etc. | --- ### 2. Implementation example (MyKsqlContext, Order, OrderCount) ```csharp public class Order { public string ProductId { get; set; } public decimal Amount { get; set; } } public class MyKsqlContext : KsqlContext { protected override void OnModelCreating(KsqlModelBuilder modelBuilder) { modelBuilder.Entity() .WithGroupId("orders-consumer") .WithAutoOffsetReset(AutoOffsetReset.Earliest); modelBuilder.Entity() .WithGroupId("order-counts-consumer") .WithAutoOffsetReset(AutoOffsetReset.Latest) .UseFinalTopic(); } } ``` ```json { "KsqlDsl": { "Common": { "BootstrapServers": "localhost:9092", "ClientId": "app" }, "SchemaRegistry": { "Url": "http://localhost:8081" }, "KsqlDbUrl": "http://localhost:8088", "Topics": { "orders": { "Consumer": { "GroupId": "orders-consumer", "AutoOffsetReset": "Earliest" } } } } } ```