-
Notifications
You must be signed in to change notification settings - Fork 0
Kafka Ksql Linq User Guide
synthaicode edited this page Nov 1, 2025
·
7 revisions
This guide walks you from zero to a working setup with POCO/DSL, messaging, viewing, operations (DLQ/monitoring), and appsettings (including Tumbling topic settings).
-
[KsqlTopic],[KsqlTimestamp],[KsqlTable], and where needed:[KsqlKey],[KsqlDecimal] - Define POCOs in your project and keep application logic separate (immutability optional)
[KsqlTopic("basic-produce-consume")]
public class BasicMessage
{
public int Id { get; set; }
[KsqlTimestamp] public DateTime CreatedAt { get; set; }
public string Text { get; set; } = string.Empty;
}
[KsqlTable("hourly-counts")]
public class HourlyCount
{
[KsqlKey] public int Hour { get; set; }
public long Count { get; set; }
}
public class MyAppContext : KsqlContext
{
public EventSet<BasicMessage> BasicMessages { get; set; }
public EventSet<HourlyCount> HourlyCounts { get; set; }
}Express CTAS queries with a LINQ projection:
modelBuilder.Entity<OrderSummary>().ToQuery(q => q
.From<Order>()
.Join<Customer>((o, c) => o.CustomerId == c.Id)
.Where((o, c) => c.IsActive)
.Select((o, c) => new OrderSummary { OrderId = o.Id, CustomerName = c.Name }));Tumbling + aggregation example:
modelBuilder.Entity<HourlyCount>().ToQuery(q => q
.From<BasicMessage>()
.Tumbling(m => m.CreatedAt, new Windows { Hours = new[] { 1 } })
.GroupBy(m => m.CreatedAt.Hour)
.Select(g => new HourlyCount { Hour = g.Key, Count = g.Count() }));await ctx.BasicMessages.AddAsync(new BasicMessage
{
Id = Random.Shared.Next(),
CreatedAt = DateTime.UtcNow,
Text = "Basic Flow"
});
await ctx.BasicMessages.ForEachAsync(async m =>
{
Console.WriteLine($"Consumed: {m.Text}");
await Task.CompletedTask;
});Header/metadata + manual commit (autoCommit: false):
var headers = new Dictionary<string, string> { ["source"] = "api" };
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await ctx.BasicMessages.AddAsync(
new BasicMessage { Id = 1, CreatedAt = DateTime.UtcNow, Text = "With header" },
headers,
cts.Token);
await ctx.BasicMessages.ForEachAsync(
(m, h, meta) =>
{
Console.WriteLine($"{meta.Topic}:{meta.Offset} => {h["source"]}");
ctx.BasicMessages.Commit(m);
return Task.CompletedTask;
},
timeout: TimeSpan.FromSeconds(10),
autoCommit: false);Define views using ToQuery, and use From/Join/Where/Select idioms:
modelBuilder.Entity<OrderSummary>().ToQuery(q => q
.From<Order>()
.Join<Customer>((o, c) => o.CustomerId == c.Id)
.Where((o, c) => c.IsActive)
.Select((o, c) => new OrderSummary { OrderId = o.Id, CustomerName = c.Name }));Tumbling with aggregation:
modelBuilder.Entity<HourlyCount>().ToQuery(q => q
.From<BasicMessage>()
.Tumbling(m => m.CreatedAt, new Windows { Hours = new[] { 1 } })
.GroupBy(m => m.CreatedAt.Hour)
.Select(g => new HourlyCount { Hour = g.Key, Count = g.Count() }));- Use
.OnError(ErrorAction.DLQ)to route failures to DLQ; verify viactx.Dlq.ReadAsync/ForEachAsync - Note: DLQ stores raw payloads; for transactional/stateful cases, combine with topic design and retries
await ctx.BasicMessages
.OnError(ErrorAction.DLQ)
.ForEachAsync(_ => throw new Exception("fail"));
await ctx.Dlq.ForEachAsync(r => { Console.WriteLine(r.RawText); return Task.CompletedTask; });Set at least BootstrapServers / SchemaRegistry.Url / KsqlDbUrl.
{
"KsqlDsl": {
"Common": { "BootstrapServers": "localhost:9092", "ClientId": "app" },
"SchemaRegistry": { "Url": "http://localhost:8085" },
"KsqlDbUrl": "http://localhost:8088",
"DlqTopicName": "dead-letter-queue",
"DeserializationErrorPolicy": "DLQ"
}
}Use KsqlDsl:Topics.<name>.* to customize producers/consumers and topic creation.
{
"KsqlDsl": {
"Topics": {
"basic-produce-consume": {
"Producer": { "Acks": "All" },
"Consumer": { "GroupId": "custom-group", "AutoOffsetReset": "Earliest" },
"Creation": { "NumPartitions": 3, "ReplicationFactor": 1 }
}
}
}
}Base topics retain long history; live topics keep short windows.
{
"KsqlDsl": {
"Topics": {
"bar_1m": {
"Creation": {
"NumPartitions": 3,
"ReplicationFactor": 1,
"Configs": {
"retention.ms": "3600000",
"cleanup.policy": "delete"
}
}
},
"bar_1m_live": {
"Creation": {
"NumPartitions": 2,
"ReplicationFactor": 1,
"Configs": { "retention.ms": "300000" }
}
}
}
}
}- Base (
bar_1m) retains; Live (bar_1m_live) is short‑retention for current windows - If using 1s rows (
<base>_1s_rows), it is typically short‑retention (for check/monitoring) - Local state (RocksDB) is used for tables/time buckets; see Tumbling
For SASL_SSL and other advanced settings, see Appsettings.
- DI/Logging, cancellation/timeouts, scaling, and health checks follow standard .NET practices
- For Schema Registry compatibility (FORWARD/STRICT), manage changes via POCO definitions
- API and capabilities: KSQL-API
- Tumbling basics: Tumbling
- Examples: Examples
- Configuration: Configuration-Reference, Appsettings
Guide
Core Concepts
Tumbling
- Tumbling-Overview
- Tumbling-Definition
- Tumbling-Consumption
- Tumbling-Topics-Config
- Tumbling-State-Store
- Tumbling-Schedule-Last
- Tumbling-Migration
Operations
- Produce-Consume-and-DLQ
- Operations-Startup-and-Monitoring (Index)
- Operations-Startup
- Lag-Monitoring-and-Tuning
- Streamiz-Clear
- Appsettings
- Examples
Reference