Skip to content

feat(queue): complete Gateway → Orchestrator queue integration#39

Merged
behinddwalls merged 1 commit into
mainfrom
preetam/queue-integration
Feb 23, 2026
Merged

feat(queue): complete Gateway → Orchestrator queue integration#39
behinddwalls merged 1 commit into
mainfrom
preetam/queue-integration

Conversation

@behinddwalls
Copy link
Copy Markdown
Collaborator

@behinddwalls behinddwalls commented Feb 22, 2026

Summary

Wire up the first stage of the queue pipeline: Gateway publishes land requests
to the queue, Orchestrator consumes and processes them.

Consumer Infrastructure:

  • Add Consumer interface (Register/Start/Stop) for orchestrating multiple controllers
  • Add consumer.Delivery interface to enforce separation of concerns (type-safe)
  • Controllers receive consumer.Delivery (no Ack/Nack), Consumer handles ack/nack
  • Implement subscription lifecycle, automatic ack/nack, metrics, graceful shutdown

Gateway:

  • Land controller publishes requests to land_request queue after storage
  • Queue infrastructure optional (controlled by QUEUE_MYSQL_DSN env var)

Orchestrator:

  • Request controller consumes from land_request queue
  • Wire up consumer with graceful shutdown in main.go

CLAUDE.md:

  • Document RPC vs Queue Message controller patterns
  • Add code style guidelines: use SugaredLogger, use interfaces for contracts

All unit and integration tests pass. Backward compatible with existing tests.

Test Plan

Issues

Stack

  1. @ feat(queue): complete Gateway → Orchestrator queue integration #39
  2. refactor(test): consolidate testing infrastructure with Docker Compose #41

@behinddwalls behinddwalls force-pushed the preetam/queue-integration branch 2 times, most recently from f505bad to 39e6a79 Compare February 22, 2026 07:53
@behinddwalls behinddwalls marked this pull request as ready for review February 22, 2026 08:38
@behinddwalls behinddwalls requested review from a team and sbalabanov as code owners February 22, 2026 08:38
Comment thread e2e_test/suite_test.go
s.log.Logf("Speculator ping: %s", resp.Message)
}

func (s *IntegrationSuite) TestLandRequest() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am refactoring whole container/testing part, i will add it back in next revision, removing it for as it fails

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back here: #41

Comment thread core/consumer/consumer.go
Comment thread core/consumer/consumer.go
Comment thread core/consumer/consumer.go
Comment thread consumer/consumer.go Outdated
Comment thread consumer/consumer.go Outdated

// Wait for all consumption goroutines to finish (with timeout)
done := make(chan struct{})
go func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use a signal channel instead of waitgroup to avoid this goroutine

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread entity/request.go
Comment thread example/server/gateway/main.go Outdated
Comment thread example/server/orchestrator/main.go Outdated
case <-sigCh:
fmt.Println("\nShutting down orchestrator server...")
if c != nil {
c.Stop(30000) // Stop consumers with 30s timeout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be probably better to still return the error indicating whether timeout was hit or not
Move error processing to the highest level possible

Comment thread gateway/controller/land.go Outdated
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Ack malformed messages to prevent infinite retry loop
return nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that does not sound right. May be we need a specific error type indicating that the error is not retryable (i.e. message is broken), or wrap deserialization in the consumer, too?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes..this was dummy logic so i didn't really look much as we will change it, but agreed, if we discover poison pill in controller, we should return a specific error

@behinddwalls behinddwalls force-pushed the preetam/queue-integration branch from c80effa to cac71f2 Compare February 23, 2026 22:09
@sbalabanov
Copy link
Copy Markdown
Contributor

stamping to unblock.
blockers to resolve discussed offline

@behinddwalls behinddwalls force-pushed the preetam/queue-integration branch from cac71f2 to d9faf53 Compare February 23, 2026 22:44
Wire up the first stage of the queue pipeline: Gateway publishes land requests
to the queue, Orchestrator consumes and processes them.

**Consumer Infrastructure:**
- Add Consumer interface (Register/Start/Stop) for orchestrating multiple controllers
- Add consumer.Delivery interface to enforce separation of concerns (type-safe)
- Controllers receive consumer.Delivery (no Ack/Nack), Consumer handles ack/nack
- Implement subscription lifecycle, automatic ack/nack, metrics, graceful shutdown

**Gateway:**
- Land controller publishes requests to land_request queue after storage
- Queue infrastructure optional (controlled by QUEUE_MYSQL_DSN env var)

**Orchestrator:**
- Request controller consumes from land_request queue
- Wire up consumer with graceful shutdown in main.go

**CLAUDE.md:**
- Document RPC vs Queue Message controller patterns
- Add code style guidelines: use SugaredLogger, use interfaces for contracts

All unit and integration tests pass. Backward compatible with existing tests.
@behinddwalls behinddwalls force-pushed the preetam/queue-integration branch from d9faf53 to 1cb8a36 Compare February 23, 2026 22:50
@behinddwalls behinddwalls merged commit 0916f89 into main Feb 23, 2026
1 check failed
@behinddwalls behinddwalls deleted the preetam/queue-integration branch June 2, 2026 18:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants