Skip to content

API Reference

synthaicode edited this page Oct 25, 2025 · 9 revisions

This page catalogs commonly used APIs. It is not exhaustive, but aims to make core APIs discoverable quickly. For a step‑by‑step flow, see API-Workflow.

  • Purpose: list key attributes, builder, fluent DSL, messaging, time buckets, and core interfaces
  • Quick start path: see API-Workflow

Attributes

  • [KsqlTopic(name)]: Bind an entity to a Kafka topic
    • Params: name (required topic name)
    • Optional: PartitionCount (default 1), ReplicationFactor (default 1)
    • Examples: [KsqlTopic("orders")], [KsqlTopic("orders", PartitionCount=3, ReplicationFactor=2)]
  • [KsqlTimestamp]: Mark the event time (maps to KSQL TIMESTAMP)
    • Types: DateTime / DateTimeOffset (use UTC)
  • [KsqlDecimal(precision, scale)]: Avro decimal mapping
  • [KsqlKey(order)]: Mark primary‑key properties (use order for composite keys)
  • [KsqlIgnore]: Exclude a property from schema/message
  • [KsqlTable]: Treat the entity as a TABLE (default is STREAM)

Builder API

  • KsqlContextBuilder.Create()
  • .UseConfiguration(IConfiguration cfg)
  • .UseSchemaRegistry(string url)
  • .EnableLogging(ILoggerFactory lf)
  • .BuildContext<TContext>() -> IKsqlContext

Fluent/LINQ API

  • ModelBuilder.Entity<T>(readOnly=false, writeOnly=false) register entity
  • .ToQuery(Func<IQueryBuilder,IQueryBuilder> build) declare a view
  • Query methods: From<TSource>(), Join<TRight>(expr), Where(expr), Select(selector)
  • Streaming handler: ctx.Set<T>().Where(...).ForEachAsync(...)

Messaging APIs

  • IKsqlContext.Set<T>() -> IEventSet<T> get an event set
  • IEventSet<T>.AddAsync(T entity, CancellationToken? ct = null) produce
  • IEventSet<T>.ForEachAsync(Func<T,Task> handler, CancellationToken? ct = null) consume (push)
  • Handler helpers: .OnError(...), .WithRetry(...), .Commit(...) (for manual commit)

TimeBucket

  • Read: TimeBucket.ReadAsync<T>(ctx, Period.Minutes(n), string[] keys, CancellationToken ct)
  • Write: TimeBucket.WriteAsync<T>(ctx, Period.Minutes(n), T row, CancellationToken ct = default)
  • Notes: common frames are 1s/1m/5m/etc. 1s rows are stored in the base rows topic (e.g., {topic}_1s_rows).

Key filter rules (ReadAsync/ToListAsync)

  • Order matters: keys must follow the POCO primary‑key order (declaration order or [KsqlKey(order)]).
    • Example: if the key is [BROKER, SYMBOL], provide keys as new[] { "X", "USDJPY" }.
    • Prefix filters: you may pass a left‑prefix only, e.g., new[] { "X" } to select by the first key only.
  • Full scan: passing no keys selects all keys (use with care).
  • Types: provide strings that map predictably to your key parts (avoid binary/opaque formats).
  • Prefix rule: partial filters must be left‑to‑right prefixes; middle or suffix‑only filters are not supported.
  • Convenience: TimeBucket.Get<T>(ctx, period).WithKeys(keys).ToListAsync() is available to build and execute reads fluently.

Core interfaces

  • IKsqlContext: application context for KSQL operations
  • IEventSet<T>: produce/consume APIs for events
  • IDlqClient: read access to DLQ
  • ITableCache<T>: local table cache (when enabled)

Configuration keys (appsettings.json)

  • KsqlDsl.Common.BootstrapServers: Kafka bootstrap servers
  • KsqlDsl.SchemaRegistry.Url: Schema Registry URL
  • KsqlDsl.KsqlDbUrl: ksqlDB URL
  • KsqlDsl.DlqTopicName: DLQ topic name
  • KsqlDsl.DeserializationErrorPolicy: policy for deserialization failures

DLQ record shape

public sealed class DlqRecord
{
  public string SourceTopic { get; init; } = "";
  public string ErrorCode  { get; init; } = "";
  public string RawText    { get; init; } = "";
}
  • Typical usage: inspect RawText, correlate with ErrorCode, and track origin using SourceTopic.

KSQL view example

modelBuilder.Entity<OrderView>().ToQuery(q => q
  .From<Order>()
  .Where(o => o.Amount > 0)
  .Select(o => new OrderView { Id = o.Id, Amount = o.Amount }));

Generates roughly:

CREATE STREAM OrderView AS
SELECT Id, Amount
FROM Order
WHERE Amount > 0;

See also: API-Workflow, Appsettings, Produce-Consume-and-DLQ, Tumbling-Overview

Clone this wiki locally