Skip to content

Operations Startup

synthaicode edited this page Oct 25, 2025 · 3 revisions

Purpose: phased steps to safely bring up a Ksql.Linq app. Covers Schema Registry registration and initial DDL; monitoring/tuning live elsewhere.

Phases (steps)

  • Phase 0: register models (Set<T>() registration, type definitions)
  • Phase 1: register schemas (Schema Registry)
    • Register Avro key/value schemas for all entities
    • Mind compatibility and type mappings (e.g., decimal)
  • Phase 2: base DDL (KSQL)
    • Create Kafka topics (num.partitions / replication.factor)
    • Run CREATE STREAM/TABLE in ksqlDB
  • Phase 3: execution DDL (KSQL)
    • Create CREATE TABLE AS SELECT (CTAS) or CREATE STREAM + INSERT INTO
    • Verify RUNNING state and topic flow/records
  • Note (rows_last init)
    • If <base>_1s_rows exists, ensure <base>_1s_rows_last via create‑if‑missing

Verification commands

  • SHOW STREAMS; / SHOW TABLES;
  • DESCRIBE <name>;
  • SELECT * FROM <name> EMIT CHANGES LIMIT 1;

Startup Fill (considerations)

  • Explicitly register monitored TABLEs; verify Kafka/ksqlDB connectivity/permissions and base DDL
  • Perform initial “fill” and create rows_last if needed

Skipping Schema Registry (option)

  • If SR is out‑of‑band, have the Context skip registration
public class AppContext : KsqlContext
{
    protected override bool SkipSchemaRegistration => true; // skip SR registration and related DDL
    public AppContext(IConfiguration cfg, ILoggerFactory? lf = null) : base(cfg, lf) { }
}
  • Additional options
var options = new KsqlContextOptions()
    .UseSchemaRegistry(new SchemaRegistryConfig { Url = "http://unused" })
    .ConfigureValidation(autoRegister: false, failOnErrors: false);

DDL generation (examples)

  • Generate CREATE STREAM/TABLE for a simple entity
var gen = new Ksql.Linq.Query.Pipeline.DDLQueryGenerator();
var model = ctx.ModelRegistry.Get(typeof(BasicMessage));
var provider = new EntityModelDdlAdapter(model);
var ddl = model.StreamTableType == StreamTableType.Table
    ? gen.GenerateCreateTable(provider)
    : gen.GenerateCreateStream(provider);
Console.WriteLine(ddl);
  • Generate execution DDL (CTAS/INSERT)
var ddl2 = KsqlCreateStatementBuilder.Build(
    model.GetTopicName(), model.QueryModel!, model.KeySchemaFullName, model.ValueSchemaFullName,
    t => ctx.ResolveTopicName(t));
Console.WriteLine(ddl2);

Notes

  • When skipping SR, ensure referenced subjects exist and DDL aligns before execution
  • Keep SR/client initialization minimal when skipping (e.g., dummy URL)

Related

Clone this wiki locally