Skip to content

Quick Start

synthaicode edited this page Oct 25, 2025 · 4 revisions

Prerequisites

  • .NET 8 SDK
  • Kafka / Schema Registry / ksqlDB running

What you will do

  • Load configuration (BootstrapServers / SchemaRegistry.Url / KsqlDbUrl) and create KsqlContext
  • Register entities with Entity<T>() and register Avro schemas in Schema Registry
  • Apply required KSQL DDL via CREATE / CREATE IF NOT EXISTS
  • Produce with AddAsync and consume with ForEachAsync (Push)

Minimal appsettings.json

{
  "KsqlDsl": {
    "Common": { "BootstrapServers": "localhost:9092", "ClientId": "app" },
    "SchemaRegistry": { "Url": "http://localhost:8081" },
    "KsqlDbUrl": "http://localhost:8088",
    "DlqTopicName": "dead-letter-queue",
    "DeserializationErrorPolicy": "DLQ"
  }
}

Minimal code (produce/consume)

using Ksql.Linq;
using Ksql.Linq.Core.Attributes;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

[KsqlTopic("quickstart-basic")]
public class Hello { public int Id { get; set; } public string Text { get; set; } = ""; }

public class AppCtx : KsqlContext
{
  public AppCtx(IConfiguration cfg, ILoggerFactory? lf=null) : base(cfg, lf) {}
  public EventSet<Hello> Hellos { get; set; }
  protected override void OnModelCreating(IModelBuilder b) => b.Entity<Hello>();
}

var cfg = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
await using var ctx = new AppCtx(cfg, LoggerFactory.Create(b => b.AddConsole()));

await ctx.Hellos.AddAsync(new Hello { Id = 1, Text = "Hello Ksql.Linq" });
await ctx.Hellos.ForEachAsync(m => { Console.WriteLine(m.Text); return Task.CompletedTask; });

Run

  1. Save appsettings.json
  2. Put the code into Program.cs
  3. dotnet run

Next steps

Clone this wiki locally