Conversation
Four changes land together to unblock running tests under -race across frame and its consumers: - queue/subscriber: guard s.subscription with a mutex so Stop() called from listen() on context cancel and from queueManager.Close() no longer race on the nil-out. Promote s.state to atomic.Int32 (and SubscriberState to int32) so State()/IsIdle() stop racing Receive(). - service: guard Service.driver with a mutex. Stop() calling Shutdown() while initializeServerDrivers()/startServerDriver() is still running no longer races on the field. - data.BaseModel: derive CreatedAt (and ModifiedAt on first insert) from the xid embedded timestamp so sort-by-id == sort-by-created_at. Reject caller-supplied non-xid IDs explicitly rather than falling back to time.Now(); hypertable composite PKs rely on that invariant. - datastore.BaseRepository: BulkCreate retains bare ON CONFLICT DO NOTHING as the portable default, and gains WithBulkCreateConflictColumns(...) so callers can target a specific unique index - e.g. (id, created_at) for hypertable-promoted tables or a domain-unique column for upsert flows.
There was a problem hiding this comment.
Code Review
This pull request updates the BaseModel to derive CreatedAt timestamps from xid identifiers and adds support for configurable ON CONFLICT columns in bulk repository operations. It also significantly improves concurrency safety by introducing mutexes and atomic operations to manage the lifecycle of the queue subscriber and the server driver. Review feedback points out potential race conditions in recreateSubscription that could cause restarts after shutdown and in initializeServerDrivers where concurrent initialization might leak driver instances.
| s.mu.Lock() | ||
| sub := s.subscription | ||
| s.subscription = nil | ||
| s.mu.Unlock() |
There was a problem hiding this comment.
The recreateSubscription method should check if the subscriber is still intended to be active before proceeding with the subscription swap. If Stop() has been called, isInit will be false. Without an early return, this method will attempt to open a new subscription even after the subscriber was explicitly stopped, which can lead to an infinite loop of restarts in the listen goroutine if a receive operation was in progress during the shutdown.
s.mu.Lock()
if !s.isInit.Load() {
s.mu.Unlock()
return
}
sub := s.subscription
s.subscription = nil
s.mu.Unlock()| if s.getDriver() != nil { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
There is a potential race condition in initializeServerDrivers. Multiple concurrent calls could pass the getDriver() == nil check before any of them sets the driver, leading to multiple driver instances being created and leaked. Furthermore, a call to Stop() during initialization might miss the driver if it hasn't been set yet. Consider making the check-and-set operation atomic relative to the driver field by holding the driverMu lock throughout the initialization or using a double-checked locking pattern.
…ver init leak Two refinements from review on PR #646: - recreateSubscription: bail out if isInit is already false (Stop() ran first), and after creating the replacement subscription, re-check isInit and shut down the new subscription if Stop() raced in while we were creating. Previously a late recreate could orphan a live subscription past shutdown. - initializeServerDrivers: do the final "is driver already set" check under the driver mutex. Without it, two concurrent callers could both construct drivers and clobber each other, leaking the http.Server listener from the loser.
Summary
Four fixes that together unblock running the full test suite under
-racein frame and in every consumer that depends on it.
s.subscriptionwith a mutex soStop()called from
listen()on context cancel and fromqueueManager.Close()no longer race on the nil-out. Promote
s.state(andSubscriberState)to
atomic.Int32soState()/IsIdle()stop racingReceive().Service.driverwith a mutex soStop().Shutdown()no longer races
initializeServerDrivers()/startServerDriver(). Thisrace was pre-existing and also blocked enabling
-racein downstreamtest suites.
BeforeCreatenow callsGenIDfirst and derivesCreatedAt/ModifiedAtfrom the xid embedded timestamp. Sort-by-idbecomes equivalent to sort-by-created_at, which is a prerequisite for
promoting these tables to TimescaleDB hypertables with composite
(id, created_at)primary keys. Non-xid caller-supplied IDs are nowrejected explicitly instead of silently falling back to
time.Now().BulkCreatekeeps bareON CONFLICT DO NOTHINGas the portable default. New functional optionWithBulkCreateConflictColumns(cols...)lets callers target a specificunique index - e.g.
(id, created_at)for hypertables or a domaincolumn for upsert flows.
All packages pass under
go test -race -count=1 ./....Test plan
go test -race -count=1 ./queue/...go test -race -count=1 ./data/...go test -race -count=1 ./datastore/...go test -race -count=1 .(service suite incl. TestBackGroundConsumer)go test -race -count=1 ./...(full suite)make format(lint clean)