Skip to content

API Workflow

synthaicode edited this page Oct 25, 2025 · 2 revisions

Purpose: A concise checklist for getting started from code: Build → Register → Send/Receive → View → DLQ. For detailed APIs, see API-Reference.

1) Build the context (load configuration)

var configuration = new ConfigurationBuilder()
  .AddJsonFile("appsettings.json").Build();

var ctx = KsqlContextBuilder.Create()
  .UseConfiguration(configuration)
  .UseSchemaRegistry(configuration["KsqlDsl:SchemaRegistry:Url"]!)
  .EnableLogging(LoggerFactory.Create(b => b.AddConsole()))
  .BuildContext<MyAppContext>();

2) Register entities (model)

[KsqlTopic("basic-produce-consume")]
public class BasicMessage
{
  public int Id { get; set; }
  [KsqlTimestamp] public DateTime CreatedAt { get; set; }
  public string Text { get; set; } = string.Empty;
}

protected override void OnModelCreating(IModelBuilder b)
  => b.Entity<BasicMessage>();

3) Produce and consume

await ctx.Set<BasicMessage>().AddAsync(new BasicMessage
{
  Id = Random.Shared.Next(),
  CreatedAt = DateTime.UtcNow,
  Text = "Basic Flow"
});

await ctx.Set<BasicMessage>().ForEachAsync(m =>
{
  Console.WriteLine($"Consumed: {m.Text}");
  return Task.CompletedTask;
});

4) Define a view (ToQuery)

modelBuilder.Entity<OrderSummary>().ToQuery(q => q
  .From<Order>()
  .Join<Customer>((o, c) => o.CustomerId == c.Id)
  .Where((o, c) => c.IsActive)
  .Select((o, c) => new OrderSummary { OrderId = o.Id, CustomerName = c.Name }));

5) Check DLQ (read failures)

await foreach (var rec in ctx.Dlq.ReadAsync())
{
  Console.WriteLine(rec.RawText);
}

See also: API-Reference, Appsettings, Produce-Consume-and-DLQ

Clone this wiki locally