-
Notifications
You must be signed in to change notification settings - Fork 0
Configuration Reference
Kafka.Ksql.Linq uses appsettings.json to configure Kafka, Schema Registry, ksqlDB, per‑topic policies, and optional local caches. This reference reflects the KsqlDslOptions structure and related configuration classes.
{
"KsqlDsl": {
"Common": { /* global Kafka settings */ },
"Topics": { /* per-topic settings */ },
"SchemaRegistry": { /* schema registry settings */ },
"Entities": [ /* optional entity cache settings */ ],
"DlqTopicName": "dead_letter_queue",
"DlqOptions": { /* DLQ topic settings */ },
"DeserializationErrorPolicy": "Skip|Retry|DLQ",
"ReadFromFinalTopicByDefault": false,
"KsqlDbUrl": "http://localhost:8088"
}
}| Key | Description |
|---|---|
BootstrapServers |
Kafka broker endpoints |
ClientId |
Client identifier |
RequestTimeoutMs |
Operation timeout (ms) |
MetadataMaxAgeMs |
Metadata maximum age (ms) |
SecurityProtocol |
Plaintext / SaslPlaintext / SaslSsl / Ssl
|
SaslMechanism |
e.g. Plain, ScramSha256, ScramSha512
|
SaslUsername, SaslPassword
|
SASL credentials |
SslCaLocation |
CA certificate path |
SslCertificateLocation |
Client certificate path |
SslKeyLocation |
Private key path |
SslKeyPassword |
Private key password |
AdditionalProperties |
Extra Kafka properties (key-value) |
"Common": {
"BootstrapServers": "localhost:9092",
"ClientId": "ksql-dsl-client",
"RequestTimeoutMs": 30000,
"MetadataMaxAgeMs": 300000,
"SecurityProtocol": "Plaintext",
"SaslMechanism": "Plain",
"SaslUsername": "user",
"SaslPassword": "pass",
"SslCaLocation": "/path/ca.pem",
"SslCertificateLocation": "/path/cert.pem",
"SslKeyLocation": "/path/key.pem",
"SslKeyPassword": "secret",
"AdditionalProperties": {}
}Producer settings map to Ksql.Linq.Configuration.Messaging.ProducerSection, Consumer settings map to ConsumerSection, and structure settings map to TopicCreationSection.
"Topics": {
"my-topic": {
"Producer": {
"Acks": "All",
"CompressionType": "Snappy",
"EnableIdempotence": true,
"MaxInFlightRequestsPerConnection": 1,
"LingerMs": 5,
"BatchSize": 16384,
"DeliveryTimeoutMs": 120000,
"RetryBackoffMs": 100,
"Retries": 2147483647,
"BufferMemory": 33554432,
"Partitioner": null
},
"Consumer": {
"GroupId": "my-group",
"AutoOffsetReset": "Latest",
"EnableAutoCommit": true,
"AutoCommitIntervalMs": 5000,
"SessionTimeoutMs": 30000,
"HeartbeatIntervalMs": 3000,
"MaxPollIntervalMs": 300000,
"MaxPollRecords": 500,
"FetchMinBytes": 1,
"FetchMaxWaitMs": 500,
"FetchMaxBytes": 52428800,
"PartitionAssignmentStrategy": null,
"IsolationLevel": "ReadUncommitted"
},
"Creation": {
"NumPartitions": 1,
"ReplicationFactor": 1,
"Configs": {},
"EnableAutoCreation": false
}
}
}For windowed/tumbling pipelines that generate timeframe topics (e.g., bar_1m_live), KsqlDsl:Topics settings are resolved with these rules:
- Priority: exact name first, then progressively shortened by underscores (e.g.,
bar_1m_live→bar_1m→bar) - Suffixes like
.pub/.intinherit structure from the base name; defineCreationon the base, not on.pub/.int
Provide Creation.NumPartitions/Creation.ReplicationFactor and optional Creation.Configs.retention.ms at the base to cascade to related derivatives; specify a complete topic name to override only that derivative.
"Topics": {
"bar_1m": {
"Creation": {
"NumPartitions": 2,
"Configs": { "retention.ms": "60000" }
}
},
"bar_1m_live": {
"Creation": {
"NumPartitions": 3,
"Configs": { "retention.ms": "120000" }
}
}
}In the example, bar_1m_live inherits from bar_1m but is overridden by its complete‑name entry.
KsqlDsl:Entities maps to Ksql.Linq.Configuration.EntityConfiguration.
"Entities": [
{
"Entity": "OrderEntity",
"SourceTopic": "orders",
"EnableCache": true,
"StoreName": "orders_store",
"BaseDirectory": "/var/lib/ksql_cache"
}
]| Key | Description |
|---|---|
Entity |
Target POCO class name |
SourceTopic |
Source Kafka topic |
EnableCache |
Enable caching (bool) |
StoreName |
Cache name (defaults to topic-based name) |
BaseDirectory |
Root directory for RocksDB |
-
ValidationModeis configurable and defaults toStrict.
"DlqTopicName": "dead_letter_queue",
"DlqOptions": {
"RetentionMs": 5000,
"NumPartitions": 1,
"ReplicationFactor": 1,
"EnableAutoCreation": true,
"AdditionalConfigs": {
"cleanup.policy": "delete"
}
}If unspecified, DlqTopicName defaults to dead-letter-queue.
| Key | Description |
|---|---|
DlqTopicName |
DLQ topic name |
RetentionMs |
Message retention (ms) |
NumPartitions |
Partition count |
ReplicationFactor |
Replication factor |
EnableAutoCreation |
Automatically create topic |
AdditionalConfigs |
Extra topic configs |
| Kafka setting item | DSL specification | appsettings.json key | Notes |
|---|---|---|---|
| Bootstrap servers | — | KsqlDsl:Common:BootstrapServers |
Kafka cluster |
| Schema Registry URL | — | KsqlDsl:SchemaRegistry:Url |
Schema Registry |
| ksqlDB URL | — | KsqlDsl:KsqlDbUrl |
ksqlDB REST endpoint |
| AutoOffsetReset | .WithAutoOffsetReset(...) |
KsqlDsl:Topics.<topic>.Consumer.AutoOffsetReset |
Earliest / Latest
|
| GroupId | .WithGroupId(...) |
KsqlDsl:Topics.<topic>.Consumer.GroupId |
Consumer group ID |
| Topic name | [KsqlTopic("orders")] |
KsqlDsl:Topics.<topic> |
Name comes from attribute/Fluent APIs |
| Partition count | Fluent or attribute | KsqlDsl:Topics.<topic>.Creation.NumPartitions |
DSL or config |
| Replication factor | — (configure) | KsqlDsl:Topics.<topic>.Creation.ReplicationFactor |
Cluster‑dependent |
| DLQ configuration | .OnError(ErrorAction.DLQ) |
KsqlDsl:DlqTopicName, KsqlDsl:DlqOptions.*
|
Retention, partitions, etc. |
public class Order
{
public string ProductId { get; set; }
public decimal Amount { get; set; }
}
public class MyKsqlContext : KsqlContext
{
protected override void OnModelCreating(KsqlModelBuilder modelBuilder)
{
modelBuilder.Entity<Order>()
.WithGroupId("orders-consumer")
.WithAutoOffsetReset(AutoOffsetReset.Earliest);
modelBuilder.Entity<OrderCount>()
.WithGroupId("order-counts-consumer")
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.UseFinalTopic();
}
}{
"KsqlDsl": {
"Common": { "BootstrapServers": "localhost:9092", "ClientId": "app" },
"SchemaRegistry": { "Url": "http://localhost:8081" },
"KsqlDbUrl": "http://localhost:8088",
"Topics": {
"orders": {
"Consumer": {
"GroupId": "orders-consumer",
"AutoOffsetReset": "Earliest"
}
}
}
}
}Guide
Core Concepts
Tumbling
- Tumbling-Overview
- Tumbling-Definition
- Tumbling-Consumption
- Tumbling-Topics-Config
- Tumbling-State-Store
- Tumbling-Schedule-Last
- Tumbling-Migration
Operations
- Produce-Consume-and-DLQ
- Operations-Startup-and-Monitoring (Index)
- Operations-Startup
- Lag-Monitoring-and-Tuning
- Streamiz-Clear
- Appsettings
- Examples
Reference