Skip to content

Add Avro/Kafka code generation with typed producers, consumers, and RPC#184

Merged
oyvindberg merged 3 commits intomainfrom
avro-kafka-codegen
Feb 1, 2026
Merged

Add Avro/Kafka code generation with typed producers, consumers, and RPC#184
oyvindberg merged 3 commits intomainfrom
avro-kafka-codegen

Conversation

@oyvindberg
Copy link
Collaborator

@oyvindberg oyvindberg commented Jan 26, 2026

Typr now generates type-safe JVM code from Apache Avro schemas (.avsc) and protocols (.avpr), with full support for Kafka producers, consumers, and request-reply RPC patterns.

Core Features

  • Records - Immutable data classes (Java records, Kotlin data classes, Scala case classes)
  • Enums - Type-safe enumerations with JSON serialization
  • Complex Unions - Sealed interfaces for multi-type unions like ["string", "int", "boolean"]
  • Recursive Types - Self-referential schemas (trees, linked lists)
  • Logical Types - UUID, date, time, timestamp-millis, decimal with precision
  • $ref Support - Reference schemas from other files
  • Precise Types - Compile-time validated wrappers like Decimal10_2
  • Custom Wrapper Types - Use x-typr-wrapper to generate type-safe ID wrappers

Wire Formats

Wire Format Description
Avro + Confluent Binary Avro with magic byte + schema ID (for Schema Registry)
Avro Plain binary Avro without Schema Registry
JSON JSON serialization (Jackson, Circe, ZIO JSON)

Typed Kafka Producers

Generated producers are fully typed and handle serialization automatically:

// Type-safe producer for order-events topic
public record OrderEventsProducer(
    Producer<String, OrderEvents> producer, 
    String topic
) implements AutoCloseable {
  
  // Send with just key and value
  public Future<RecordMetadata> send(String key, OrderEvents value) {
    return producer.send(new ProducerRecord<>(topic, key, value));
  }
  
  // Send with typed headers
  public Future<RecordMetadata> send(String key, OrderEvents value, StandardHeaders headers) {
    return producer.send(new ProducerRecord<>(topic, null, key, value, headers.toHeaders()));
  }
}

Usage:

var producer = new OrderEventsProducer(kafkaProducer);

// Send strongly-typed events
producer.send(orderId, new OrderPlaced(orderId, customerId, amount, Instant.now(), items, null));
producer.send(orderId, new OrderCancelled(orderId, customerId, "Out of stock", Instant.now(), null));

Multi-Event Topics (Sealed Interfaces)

When multiple event types share a topic, Typr generates a sealed interface:

public sealed interface OrderEvents permits OrderCancelled, OrderPlaced, OrderUpdated {
  static OrderEvents fromGenericRecord(GenericRecord record) {
    return switch (record.getSchema().getFullName()) {
      case "com.example.events.OrderCancelled" -> OrderCancelled.fromGenericRecord(record);
      case "com.example.events.OrderPlaced" -> OrderPlaced.fromGenericRecord(record);
      case "com.example.events.OrderUpdated" -> OrderUpdated.fromGenericRecord(record);
      default -> throw new IllegalArgumentException("Unknown schema: " + record.getSchema().getFullName());
    };
  }
  
  GenericRecord toGenericRecord();
}

Typed Kafka Consumers

Generated consumers dispatch to typed handler methods:

public record OrderEventsConsumer(
    Consumer<String, OrderEvents> consumer,
    OrderEventsHandler handler,
    String topic
) implements AutoCloseable {
  
  public void poll(Duration timeout) {
    ConsumerRecords<String, OrderEvents> records = consumer.poll(timeout);
    records.forEach(record -> {
      String key = record.key();
      OrderEvents value = record.value();
      StandardHeaders headers = StandardHeaders.fromHeaders(record.headers());
      
      switch (value) {
        case OrderPlaced e -> handler.handleOrderPlaced(key, e, headers);
        case OrderCancelled e -> handler.handleOrderCancelled(key, e, headers);
        case OrderUpdated e -> handler.handleOrderUpdated(key, e, headers);
        default -> handler.handleUnknown(key, value, headers);
      }
    });
  }
}

Handler interface you implement:

public interface OrderEventsHandler {
  void handleOrderPlaced(String key, OrderPlaced event, StandardHeaders headers);
  void handleOrderCancelled(String key, OrderCancelled event, StandardHeaders headers);
  void handleOrderUpdated(String key, OrderUpdated event, StandardHeaders headers);
  
  default void handleUnknown(String key, OrderEvents event, StandardHeaders headers) {
    throw new IllegalStateException("Unknown event type: " + event.getClass());
  }
}

Typed Kafka Headers

Define header schemas and get type-safe header classes:

public record StandardHeaders(UUID correlationId, Instant timestamp, Optional<String> source) {
  
  public static StandardHeaders fromHeaders(Headers headers) {
    UUID correlationId = UUID.fromString(new String(headers.lastHeader("correlationId").value()));
    Instant timestamp = Instant.ofEpochMilli(Long.parseLong(new String(headers.lastHeader("timestamp").value())));
    Optional<String> source = Optional.ofNullable(headers.lastHeader("source")).map(h -> new String(h.value()));
    return new StandardHeaders(correlationId, timestamp, source);
  }
  
  public Headers toHeaders() {
    Headers headers = new RecordHeaders();
    headers.add("correlationId", correlationId.toString().getBytes(UTF_8));
    headers.add("timestamp", Long.toString(timestamp.toEpochMilli()).getBytes(UTF_8));
    source.ifPresent(v -> headers.add("source", v.getBytes(UTF_8)));
    return headers;
  }
}

Custom Wrapper Types (x-typr-wrapper)

Add x-typr-wrapper to any field to generate a type-safe wrapper:

{
  "type": "record",
  "name": "CustomerOrder",
  "fields": [
    {"name": "orderId", "type": "string", "x-typr-wrapper": "OrderId"},
    {"name": "customerId", "type": "long", "x-typr-wrapper": "CustomerId"},
    {"name": "email", "type": ["null", "string"], "x-typr-wrapper": "Email"}
  ]
}

Generated record uses wrappers:

public record CustomerOrder(
    OrderId orderId,           // Not String!
    CustomerId customerId,     // Not Long!
    Optional<Email> email,     // Not Optional<String>!
    Long amount                // No wrapper specified
) { ... }

Effect Types (Higher-Kinded)

Effect Type Return Type Use Case
Blocking T Synchronous Java code
CompletableFuture CompletableFuture<T> Async Java
Mutiny Uni<T> Quarkus/Reactive
CatsIO IO[T] Scala Cats Effect
ZIO ZIO[Any, Throwable, T] Scala ZIO

Kotlin with Mutiny (Quarkus):

interface UserService {
  fun getUser(userId: String): Uni<Result<User, UserNotFoundError>>
  fun createUser(email: String, name: String): Uni<Result<User, ValidationError>>
}

Scala with Cats IO:

trait UserService {
  def getUser(userId: String): IO[Result[User, UserNotFoundError]]
  def createUser(email: String, name: String): IO[Result[User, ValidationError]]
}

Kafka RPC (from .avpr protocols)

For request/reply patterns, define an Avro protocol and get generated client/server:

// Generated client (Spring Boot)
@Service
public record UserServiceClient(ReplyingKafkaTemplate<String, Object, Object> replyingTemplate) {
  public Result<User, UserNotFoundError> getUser(String userId) throws Exception {
    var reply = replyingTemplate.sendAndReceive(new ProducerRecord<>("user-service-requests", request)).get().value();
    return switch (reply) {
      case GetUserResponse.Success s -> new Result.Ok(s.value());
      case GetUserResponse.Error e -> new Result.Err(e.error());
    };
  }
}

// Generated server (Spring Boot)
@Service
public record UserServiceServer(UserServiceHandler handler) {
  @KafkaListener(topics = "user-service-requests")
  @SendTo
  public Object handleRequest(UserServiceRequest request) {
    return switch (request) {
      case GetUserRequest r -> handleGetUser(r);
      case CreateUserRequest r -> handleCreateUser(r);
    };
  }
}

Test Projects

Project Language Framework Wire Format Effect Type
java Java - Avro + Confluent Blocking
java-vanilla Java - Avro Blocking
java-async Java - Avro + Confluent CompletableFuture
java-json Java - JSON Blocking
java-spring Java Spring Boot Avro + Confluent Blocking
java-quarkus Java Quarkus Avro + Confluent Blocking
kotlin Kotlin - Avro + Confluent Blocking
kotlin-json Kotlin - JSON Blocking
kotlin-quarkus-mutiny Kotlin Quarkus Avro + Confluent Uni (Mutiny)
scala Scala 3 - Avro + Confluent Blocking
scala-cats Scala 3 Cats Effect Avro + Confluent IO
scala-json Scala 3 - JSON Blocking

🤖 Generated with Claude Code

oyvindberg and others added 3 commits January 26, 2026 18:44
Typr now generates type-safe JVM code from Apache Avro schemas (.avsc) and
protocols (.avpr), with full support for Kafka producers, consumers, and
request-reply RPC patterns.

- **Records** - Immutable data classes (Java records, Kotlin data classes, Scala case classes)
- **Enums** - Type-safe enumerations with JSON serialization
- **Complex Unions** - Sealed interfaces for multi-type unions like `["string", "int", "boolean"]`
- **Recursive Types** - Self-referential schemas (trees, linked lists)
- **Logical Types** - UUID, date, time, timestamp-millis, decimal with precision
- **$ref Support** - Reference schemas from other files
- **Precise Types** - Compile-time validated wrappers like `Decimal10_2`

- **AvroEncoded** - Binary Avro with Confluent Schema Registry
- **JsonEncoded** - JSON serialization (Jackson, Circe, ZIO JSON)

- **Typed Producers** - `orderEventsProducer.send(new OrderPlaced(...))`
- **Typed Consumers** - Abstract handlers with methods per event type
- **Multi-Event Topics** - Sealed interfaces for topics with multiple event types
- **Serializers/Deserializers** - Kafka Serde implementations
- **Topic Bindings** - Type-safe `TypedTopic<K, V>` definitions

- **Spring Boot** - `@Service`, `KafkaTemplate`, `@KafkaListener`, `@SendTo`
- **Quarkus** - `@ApplicationScoped`, `Emitter`, `@Incoming`, `@Outgoing`

- **Service Interfaces** - Clean async interfaces from Avro protocol definitions
- **Result ADT** - `Result<User, UserNotFoundError>` for explicit error handling
- **Client/Server** - Generated Kafka request-reply implementations
- **Effect Types** - Blocking, CompletableFuture, Uni (Mutiny), IO (Cats), ZIO

| Project | Language | Framework | Wire Format |
|---------|----------|-----------|-------------|
| java | Java | - | Avro Binary |
| java-vanilla | Java | - | Avro Binary |
| java-async | Java | - | CompletableFuture |
| java-json | Java | - | JSON |
| java-spring | Java | Spring Boot | Avro + RPC |
| java-quarkus | Java | Quarkus | Avro + RPC |
| kotlin | Kotlin | - | Avro Binary |
| kotlin-json | Kotlin | - | JSON |
| kotlin-quarkus-mutiny | Kotlin | Quarkus | Uni effect |
| scala | Scala 3 | - | Avro Binary |
| scala-cats | Scala 3 | Cats Effect | IO effect |
| scala-json | Scala 3 | - | JSON |

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Change relative link ../readme.md to absolute /db/ path.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@oyvindberg oyvindberg merged commit cbaa3da into main Feb 1, 2026
12 checks passed
@oyvindberg oyvindberg deleted the avro-kafka-codegen branch February 1, 2026 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant