Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,36 @@ Three services, each following the same layout:
```
<service>/
├── controller/ # Business logic (pure, transport-agnostic)
│ ├── {method}.go # RPC controllers (e.g., land.go, ping.go)
│ ├── {method}_test.go
│ └── {step}/ # Queue message controllers (e.g., request/)
│ ├── {step}.go # Step in workflow
│ └── {step}_test.go
├── proto/ # Proto definitions (.proto files)
├── protopb/ # Generated proto code (committed to repo)
└── integration_test/
```

### Controllers

Controllers contain pure business logic, independent of the transport layer (gRPC/YARPC). They live in `{service}/controller/` and are wired up in `example/server/{service}/main.go`.
Controllers contain pure business logic, independent of infrastructure. There are two types:

**RPC Controllers** - Handle synchronous API requests in `{service}/controller/`. Accept protobuf types, independent of gRPC/YARPC transport.

```go
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error)
```

**Queue Message Controllers** - Process async queue messages in `{service}/controller/{step}/`. Implement `consumer.Controller` interface.

```go
// Receives consumer.Delivery (NOT extension/queue.Delivery)
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
// Return nil to ack, error to nack. Consumer handles ack/nack automatically.
}
```

Controllers receive `consumer.Delivery` (subset interface without Ack/Nack methods) to enforce separation: controllers do business logic, consumer framework handles infrastructure.

### Entities

Expand All @@ -70,7 +92,10 @@ entity/
**Entity guidelines:**
1. Keep entities pure and framework-agnostic — no external dependencies
2. Use value types, not references
3. Prefer `int64` Unix epoch milliseconds over `time.Time`
3. Prefer `int64` milliseconds over `time.Time` and `time.Duration`:
- Timestamps: Unix epoch milliseconds (e.g., `CreatedAt int64`) — use `time.UnixMilli()` method
- Durations/timeouts: milliseconds (e.g., `TimeoutMs int64`, `DelayMs int64`)
- Use `time.Duration(ms) * time.Millisecond` to convert to `time.Duration` when needed
4. Every field must have a comment explaining its meaning
5. Reference other entities by ID (string or int), not directly
6. Use string enums with clear names; assign sentinel values (`""` for strings, `0` for ints) to unreachable/unknown enum variants
Expand Down Expand Up @@ -104,7 +129,9 @@ extension/

### Import Paths

- Controllers: `github.com/uber/submitqueue/{service}/controller`
- RPC Controllers: `github.com/uber/submitqueue/{service}/controller`
- Queue Controllers: `github.com/uber/submitqueue/{service}/controller/{step}`
- Consumer: `github.com/uber/submitqueue/consumer`
- Proto (generated): `github.com/uber/submitqueue/{service}/protopb`
- Extensions: `github.com/uber/submitqueue/extension/{extension}`
- Extension impl: `github.com/uber/submitqueue/extension/{extension}/{impl}`
Expand Down Expand Up @@ -164,6 +191,11 @@ All generated proto files are **committed to the repository**. When modifying `.
- Tests: `{file}_test.go`
- BUILD files: Always `BUILD.bazel`

### Directory Naming

- Use **singular** names for directories (e.g., `mock/` not `mocks/`, `entity/` not `entities/`)
- This applies to all folders including test mocks, extensions, entities, and service directories

### Common Make Targets

```bash
Expand All @@ -189,6 +221,10 @@ make clean-proto # Remove generated proto files
3. Add controller in `{service}/controller/`
4. Wire up in `example/server/{service}/main.go`

**Add new queue message controller:**
1. Create `{service}/controller/{step}/` with controller implementing `consumer.Controller`
2. Wire up in `example/server/{service}/main.go`: register → start → stop on shutdown

**Add new extension implementation:**
1. Create `extension/{extension}/{impl}/` directory
2. Implement factory and core interfaces
Expand All @@ -204,3 +240,20 @@ make clean-proto # Remove generated proto files
1. **Avoid asserting on error messages** — assert on error type if it is part of the contract, or assert generic error otherwise.
2. **Avoid blocking operations for synchronization** — do not use `time.Sleep`. Design the tested routine to signal back (channels, callbacks, condition variables).
3. **Use testify assertions** — use `stretchr/assert` or `require` instead of `t.Fatal()`.

### Code Style Guidelines

1. **Use SugaredLogger for structured logging** — always use `zap.SugaredLogger` with structured logging methods:
- `logger.Debugw(msg, key1, val1, key2, val2, ...)` for debug logs
- `logger.Infow(msg, key1, val1, key2, val2, ...)` for info logs
- `logger.Errorw(msg, key1, val1, key2, val2, ...)` for error logs
- Never use unstructured methods like `Debug()`, `Info()`, `Error()`, or `Printf()`
- Example: `logger.Infow("starting consumer", "subscriber_name", subscriberName, "controller_count", len(controllers))`

2. **Use interfaces for contracts** — define interfaces for public APIs and dependencies:
- Public components should return/accept interfaces, not concrete structs
- Unexported structs implement the interfaces
- Makes testing easier through mocking
- Example: `func New(...) Consumer` returns interface, not `*consumer`
- Implementation struct is unexported: `type consumer struct { ... }`

35 changes: 35 additions & 0 deletions core/consumer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "consumer",
srcs = [
"consumer.go",
"controller.go",
"error.go",
],
importpath = "github.com/uber/submitqueue/core/consumer",
visibility = ["//visibility:public"],
deps = [
"//entity/queue",
"//extension/queue",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "consumer_test",
srcs = [
"consumer_test.go",
"error_test.go",
],
embed = [":consumer"],
deps = [
"//entity/queue",
"//extension/queue",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//zaptest",
],
)
55 changes: 55 additions & 0 deletions core/consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Consumer

The consumer package orchestrates queue message processing. It manages subscription lifecycle, message consumption, ack/nack, and graceful shutdown.

## Interfaces

### Consumer

The top-level orchestrator. Register controllers, start consuming, and stop gracefully.

```go
c := consumer.New(logger, scope, queue, "worker-hostname")

c.Register(myController)
c.Start(ctx)

// On shutdown:
if err := c.Stop(30000); err != nil {
logger.Errorw("consumer stop error", "error", err)
}
```

### Controller

Business logic for processing queue messages. Implement this interface to handle deliveries for a specific topic.

```go
type Controller interface {
Process(ctx context.Context, delivery Delivery) error
Name() string
Topic() string
ConsumerGroup() string
SubscriptionConfig(subscriberName string) queue.SubscriptionConfig
}
```

### Delivery

A restricted view of a queue delivery exposed to controllers. Hides Ack/Nack (handled automatically by Consumer) while exposing message data and `ExtendVisibilityTimeout`.

## Error Handling

Controllers signal processing outcome via the return value of `Process()`:

- **`return nil`** — success, message is acked.
- **`return err`** — retryable failure, message is nacked for retry.
- **`return consumer.NewNonRetryableError(err)`** — poison pill, message is acked and removed from the queue to prevent infinite retry loops.

## Lifecycle

1. **Register** controllers before starting.
2. **Start** subscribes to all topics and spawns consume loops.
3. **Stop** cancels all subscriptions and waits for goroutines to finish (with timeout).

Once stopped, the consumer cannot be restarted — `Register()` and `Start()` return errors.
Loading
Loading