Skip to content

Commit

Permalink
aggregate-known: avoid keeping an issuer's full serial list in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jschanck committed Nov 3, 2022
1 parent 14f3aeb commit 54157b2
Showing 1 changed file with 65 additions and 54 deletions.
119 changes: 65 additions & 54 deletions go/cmd/aggregate-known/aggregate-known.go
@@ -1,11 +1,13 @@
package main

import (
"bufio"
"context"
"encoding/json"
"flag"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -37,37 +39,53 @@ type knownWorkUnit struct {
}

type knownWorker struct {
loadStorage storage.StorageBackend
saveStorage storage.StorageBackend
savePath string
remoteCache storage.RemoteCache
}

func (kw knownWorker) run(wg *sync.WaitGroup, workChan <-chan knownWorkUnit, quitChan <-chan struct{}) {
func (kw knownWorker) run(ctx context.Context, wg *sync.WaitGroup, workChan <-chan knownWorkUnit) {
defer wg.Done()

ctx := context.Background()
err := os.MkdirAll(kw.savePath, permModeDir)
if err != nil && !os.IsExist(err) {
glog.Fatalf("Could not make directory %s: %s", kw.savePath, err)
}

for tuple := range workChan {
var serialCount int
serials := make([]types.Serial, 0, 128*1024)

for _, expDate := range tuple.expDates {
select {
case <-quitChan:
glog.Warningf("Signal on worker quit channel, quitting (count=%d).", serialCount)
return
default:
// Wrap in anonymous function to defer a writer.Flush & fd.Close per work unit
func() {
path := filepath.Join(kw.savePath, tuple.issuer.ID())
fd, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, permMode)
if err != nil {
glog.Fatalf("[%s] Could not open known certificates file: %s", tuple.issuer.ID(), err)
}
defer fd.Close()

writer := bufio.NewWriter(fd)
defer writer.Flush()

var serialCount uint64

for _, expDate := range tuple.expDates {
select {
case <-ctx.Done():
glog.Warningf("Signal on worker quit channel, quitting (count=%d).", serialCount)
return
default:
}

if expDate.IsExpiredAt(time.Now()) {
if glog.V(1) {
glog.Warningf("Date %s is expired now, skipping (issuer=%s)", expDate, tuple.issuer.ID())
}
continue
}

// Sharded by expiry date, so this should be fairly small.
known := storage.NewKnownCertificates(expDate, tuple.issuer, kw.remoteCache)

knownSet := known.Known()
knownSetLen := len(knownSet)
knownSetLen := uint64(len(knownSet))

if knownSetLen == 0 {
// This is almost certainly due to an hour-rollover since the loader ran, and expired all the next hour's
Expand All @@ -76,24 +94,27 @@ func (kw knownWorker) run(wg *sync.WaitGroup, workChan <-chan knownWorkUnit, qui
" (current count this worker=%d)", tuple.issuerDN, tuple.issuer.ID(), expDate, serialCount)
}

serials = append(serials, knownSet...)
serialCount += knownSetLen

// This assertion should catch issues where append failed to append everything. For improvement
// in processing speed, pull this out, but right now it seems valuable.
if len(serials) != serialCount {
glog.Fatalf("expDate=%s issuer=%s serial count math error! expected %d but got %d", expDate,
tuple.issuer.ID(), serialCount, len(serials))
for _, s := range knownSet {
_, err := writer.WriteString(s.HexString())
if err != nil {
glog.Fatalf("[%s] Could not write serials: %s", tuple.issuer.ID(), err)
}
err = writer.WriteByte('\n')
if err != nil {
glog.Fatalf("[%s] Could not write serials: %s", tuple.issuer.ID(), err)
}
}
}
}
glog.Infof("[%s] %d total known serials for %s (shards=%d)", tuple.issuer.ID(),
serialCount, tuple.issuerDN, len(tuple.expDates))
}()

if err := kw.saveStorage.StoreKnownCertificateList(ctx, tuple.issuer, serials); err != nil {
glog.Fatalf("[%s] Could not save known certificates file: %s", tuple.issuer.ID(), err)
select {
case <-ctx.Done():
return
default:
}

glog.Infof("[%s] %d total known serials for %s (times=%d, len=%d, cap=%d)", tuple.issuer.ID(),
serialCount, tuple.issuerDN, len(tuple.expDates), len(serials), cap(serials))
}
}

Expand All @@ -107,7 +128,21 @@ func checkPathArg(strObj string, confOptionName string, ctconfig *config.CTConfi

func main() {
ctconfig.Init()

ctx := context.Background()
ctx, cancelMain := context.WithCancel(ctx)

// Try to handle SIGINT and SIGTERM gracefully
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
defer close(sigChan)
go func() {
sig := <-sigChan
glog.Infof("Signal caught: %s..", sig)
cancelMain()
signal.Stop(sigChan) // Restore default behavior
}()

storageDB, remoteCache := engine.GetConfiguredStorage(ctx, ctconfig)
defer glog.Flush()

Expand All @@ -121,8 +156,6 @@ func main() {

engine.PrepareTelemetry("aggregate-known", ctconfig)

saveBackend := storage.NewLocalDiskBackend(permMode, *knownpath)

mozIssuers := rootprogram.NewMozillaIssuers()
if err := mozIssuers.LoadEnrolledIssuers(*enrolledpath); err != nil {
glog.Fatalf("Failed to load enrolled issuers from disk: %s", err)
Expand Down Expand Up @@ -197,44 +230,22 @@ func main() {
glog.Fatalf("Channel overflow. Aborting at %+v", wu)
}
}

// Signal that was the last work
close(workChan)

glog.Infof("Starting worker processes to handle %d work units", count)

// Handle signals from the OS
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, os.Interrupt)
defer signal.Stop(sigChan)

// Exit signal, used by signals from the OS
quitChan := make(chan struct{})

var wg sync.WaitGroup

// Start the workers
for t := 0; t < *ctconfig.NumThreads; t++ {
wg.Add(1)
worker := knownWorker{
saveStorage: saveBackend,
savePath: *knownpath,
remoteCache: remoteCache,
}
go worker.run(&wg, workChan, quitChan)
go worker.run(ctx, &wg, workChan)
}

// Set up a notifier for the workers closing
doneChan := make(chan bool)
go func(wait *sync.WaitGroup) {
wg.Wait()
doneChan <- true
}(&wg)

select {
case <-sigChan:
glog.Infof("Signal caught, stopping threads at next opportunity.")
quitChan <- struct{}{}
case <-doneChan:
glog.Infof("Completed.")
}
wg.Wait()
}

0 comments on commit 54157b2

Please sign in to comment.