This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

db: Send an initialization tick for Trigger

Our codebase assumes that database triggers will send an initialization
tick so that the code can perform an initial sync. This is necessary to
prevent race conditions where an event that would normally cause a
trigger happens before the Trigger is registered with the database.

This commit makes the Trigger function send an initialization tick.
TriggerTick already had this behavior.
  • Loading branch information...
kklin committed Nov 26, 2017
1 parent 142e889 commit 6bcbd48019af426a51c8ab180d19629d654e1111
Showing with 25 additions and 8 deletions.
  1. +10 −7 db/db.go
  2. +6 −0 db/db_test.go
  3. +5 −1 db/logger.go
  4. +4 −0 minion/engine_test.go
View
@@ -103,7 +103,8 @@ func (tr Transaction) Run(do func(db Database) error) error {
// Trigger registers a new database trigger that watches changes to 'tableName'. Any
// change to the table, including row insertions, deletions, and modifications, will
// cause a notification on 'Trigger.C'.
// cause a notification on 'Trigger.C'. So that clients properly initialize,
// Trigger() sends an initialization tick at startup.
func (cn Conn) Trigger(tt ...TableType) Trigger {
trigger := Trigger{C: make(chan struct{}, 1), stop: make(chan struct{})}
cn.Txn(tt...).Run(func(db Database) error {
@@ -113,6 +114,8 @@ func (cn Conn) Trigger(tt ...TableType) Trigger {
}
return nil
})
trigger.C <- struct{}{}
c.Inc("Trigger")
return trigger
}
@@ -128,17 +131,17 @@ func (cn Conn) TriggerTick(seconds int, tt ...TableType) Trigger {
defer ticker.Stop()
for {
select {
case trigger.C <- struct{}{}:
c.Inc("Trigger")
default:
}
select {
case <-ticker.C:
case <-trigger.stop:
return
}
select {
case trigger.C <- struct{}{}:
c.Inc("Trigger")
default:
}
}
}()
View
@@ -258,6 +258,12 @@ func TestTrigger(t *testing.T) {
ct := conn.Trigger(BlueprintTable)
ct2 := conn.Trigger(BlueprintTable)
// The initial ticks.
triggerRecv(t, mt)
triggerRecv(t, mt2)
triggerRecv(t, ct)
triggerRecv(t, ct2)
triggerNoRecv(t, mt)
triggerNoRecv(t, mt2)
triggerNoRecv(t, ct)
View
@@ -11,7 +11,11 @@ func (conn Conn) runLogger() {
for _, t := range AllTables {
t := t
go func() {
for range conn.Trigger(t).C {
trigger := conn.Trigger(t).C
// Drain the initial trigger to avoid logging the empty database
// tables.
<-trigger
for range trigger {
conn.logTable(t)
}
}()
View
@@ -18,6 +18,8 @@ const testImage = "alpine"
func TestContainerTxn(t *testing.T) {
conn := db.New()
trigg := conn.Trigger(db.ContainerTable).C
// Drain the initial trigger.
<-trigg
testContainerTxn(t, conn, blueprint.Blueprint{})
assert.False(t, fired(trigg))
@@ -260,6 +262,8 @@ func testContainerTxn(t *testing.T, conn db.Conn, bp blueprint.Blueprint) {
func TestConnectionTxn(t *testing.T) {
conn := db.New()
trigg := conn.Trigger(db.ConnectionTable).C
// Drain the initial trigger.
<-trigg
testConnectionTxn(t, conn, blueprint.Blueprint{})
assert.False(t, fired(trigg))

0 comments on commit 6bcbd48

Please sign in to comment.