Skip to content

Commit

Permalink
unacknowledged and incomplete scans are re-published to the notificat…
Browse files Browse the repository at this point in the history
…ion queue
  • Loading branch information
0xdiba committed Dec 14, 2015
1 parent ad3344b commit 0c68439
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions database/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/mozilla/tls-observatory/logger"
)

// RegisterScanListener "subscribes" to the notifications published to the scan_listener notifier.
// It has as input the usual sb attributes and returns an int64 channel which can be consumed
// for newly created scan id's.
func (db *DB) RegisterScanListener(dbname, user, password, hostport, sslmode string) <-chan int64 {

log := logger.GetLogger()
Expand All @@ -24,20 +27,19 @@ func (db *DB) RegisterScanListener(dbname, user, password, hostport, sslmode str
}

listenerChan := make(chan int64)
listenerName := "scan_listener"

conn_info := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s",
connInfo := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s",
user, password, hostport, dbname, sslmode)

go func() {

listener_name := "scan_listener"

listener := pq.NewListener(conn_info, 100*time.Millisecond, 10*time.Second, reportProblem)
err := listener.Listen(listener_name)
listener := pq.NewListener(connInfo, 100*time.Millisecond, 10*time.Second, reportProblem)
err := listener.Listen(listenerName)

if err != nil {
log.WithFields(logrus.Fields{
"listener": listener_name,
"listener": listenerName,
"error": err.Error(),
}).Error("could not listen for notification")
close(listenerChan)
Expand Down Expand Up @@ -66,6 +68,30 @@ func (db *DB) RegisterScanListener(dbname, user, password, hostport, sslmode str

}()

go func() {
fiveminticker := time.NewTicker(1 * time.Minute)

This comment has been minimized.

Copy link
@jvehent

jvehent Dec 14, 2015

Contributor

oh look, a five minutes ticker that ticks every minute :p

This comment has been minimized.

Copy link
@0xdiba

0xdiba Dec 14, 2015

Author Contributor

Debugging got into the repository.
At least I did it on a separate branch 😄

unackedQuery := fmt.Sprintf("select pg_notify('%s', ''||id ) from scans where ack=FALSE and timestamp < NOW() - INTERVAL '30 seconds'", listenerName)
zerocomplQuery := "update scans set ack=false where completion_perc=0 and timestamp < NOW() - INTERVAL '1 minute'"

This comment has been minimized.

Copy link
@jvehent

jvehent Dec 14, 2015

Contributor

inline those in the Exec call for clarity, no need to store them as variables.

for {
select {
case <-fiveminticker.C:

This comment has been minimized.

Copy link
@jvehent

jvehent Dec 14, 2015

Contributor

I don't mind the ticker, but this could just be a time.Sleep() since there's only one select case.

This comment has been minimized.

Copy link
@0xdiba

0xdiba Dec 14, 2015

Author Contributor

Yeah I had in mind to have a separate ticker/case for each different abnormality we wanted to handle.
I'll convert it to a Sleep() since there is no need for that now.

_, err := db.Exec(zerocomplQuery)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Error("Could not run zero completion update query")
}

_, err = db.Exec(unackedQuery)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Error("Could not run unacknowledged scans periodic check.")
}
}
}
}()

return listenerChan
}

Expand Down

0 comments on commit 0c68439

Please sign in to comment.