Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/bad-key-revoker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ var maxSerials = 100
// sendMessage sends a single email to the provided address with the revoked
// serials
func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
err := bkr.mailer.Connect()
conn, err := bkr.mailer.Connect()
if err != nil {
return err
}
defer func() {
_ = bkr.mailer.Close()
_ = conn.Close()
}()
mutSerials := make([]string, len(serials))
copy(mutSerials, serials)
Expand All @@ -213,7 +213,7 @@ func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
if err != nil {
return err
}
err = bkr.mailer.SendMail([]string{addr}, bkr.emailSubject, message.String())
err = conn.SendMail([]string{addr}, bkr.emailSubject, message.String())
if err != nil {
return err
}
Expand Down
146 changes: 93 additions & 53 deletions cmd/expiration-mailer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"sort"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -53,6 +54,7 @@ type mailer struct {
emailTemplate *template.Template
subjectTemplate *template.Template
nagTimes []time.Duration
parallelSends uint
limit int
clk clock.Clock
stats mailerStats
Expand All @@ -66,7 +68,7 @@ type mailerStats struct {
processingLatency prometheus.Histogram
}

func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error {
func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error {
if len(contacts) == 0 {
return nil
}
Expand Down Expand Up @@ -161,7 +163,7 @@ func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error {
m.log.Infof("attempting send JSON=%s", string(logStr))

startSending := m.clk.Now()
err = m.mailer.SendMail(emails, subjBuf.String(), msgBuf.String())
err = conn.SendMail(emails, subjBuf.String(), msgBuf.String())
if err != nil {
m.log.Errf("failed send JSON=%s", string(logStr))
return err
Expand Down Expand Up @@ -196,6 +198,11 @@ func (m *mailer) certIsRenewed(names []string, issued time.Time) (bool, error) {
return present, err
}

type work struct {
regID int64
certs []core.Certificate
}

func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) {
regIDToCerts := make(map[int64][]core.Certificate)

Expand All @@ -205,76 +212,104 @@ func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate)
regIDToCerts[cert.RegistrationID] = cs
}

err := m.mailer.Connect()
if err != nil {
m.log.AuditErrf("Error connecting to send nag emails: %s", err)
return
var wg sync.WaitGroup
workChan := make(chan work)

parallelSends := m.parallelSends
if parallelSends == 0 {
parallelSends = 1
}
defer func() {
_ = m.mailer.Close()
}()

for regID, certs := range regIDToCerts {
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
for senderNum := uint(0); senderNum < parallelSends; senderNum++ {
conn, err := m.mailer.Connect()
if err != nil {
m.log.AuditErrf("Error fetching registration %d: %s", regID, err)
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
continue
m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err)
close(workChan)
return
}

parsedCerts := []*x509.Certificate{}
for _, cert := range certs {
parsedCert, err := x509.ParseCertificate(cert.DER)
if err != nil {
// TODO(#1420): tell registration about this error
m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
continue
}

renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore)
if err != nil {
m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err)
// assume not renewed
} else if renewed {
m.log.Debugf("Cert %s is already renewed", cert.Serial)
m.stats.renewalCount.With(prometheus.Labels{}).Inc()
err := m.updateCertStatus(cert.Serial)
wg.Add(1)
go func(conn bmail.Conn, ch <-chan work) {
defer wg.Done()
for w := range ch {
err := m.sendToOneRegID(ctx, conn, w.regID, w.certs)
if err != nil {
m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
m.log.AuditErr(err.Error())
}
continue
}
conn.Close()
}(conn, workChan)

parsedCerts = append(parsedCerts, parsedCert)
}
// For politeness' sake, don't open more than 1 new connection per
// second.
time.Sleep(time.Second)
}
for regID, certs := range regIDToCerts {
workChan <- work{regID, certs}
}
close(workChan)
wg.Wait()
}

if len(parsedCerts) == 0 {
// all certificates are renewed
continue
}
func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certs []core.Certificate) error {
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
if err != nil {
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
return fmt.Errorf("fetching registration %d: %w", regID, err)
}

if reg.Contact == nil {
continue
}
if reg.Contact == nil {
return nil
}

err = m.sendNags(reg.Contact, parsedCerts)
parsedCerts := []*x509.Certificate{}
for _, cert := range certs {
parsedCert, err := x509.ParseCertificate(cert.DER)
if err != nil {
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
m.log.AuditErrf("Error sending nag emails: %s", err)
continue
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
// TODO(#1420): tell registration about this error
return fmt.Errorf("parsing certificate %s: %w", cert.Serial, err)
}
for _, cert := range parsedCerts {
serial := core.SerialToString(cert.SerialNumber)
err = m.updateCertStatus(serial)

renewed, err := m.certIsRenewed(parsedCert.DNSNames, parsedCert.NotBefore)
if err != nil {
return fmt.Errorf("expiration-mailer: error fetching renewal state: %w", err)
} else if renewed {
m.stats.renewalCount.With(prometheus.Labels{}).Inc()
err := m.updateCertStatus(cert.Serial)
if err != nil {
m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
continue
return fmt.Errorf("updating certificate status for %s: %w", cert.Serial, err)
}
continue
}

parsedCerts = append(parsedCerts, parsedCert)
}

if len(parsedCerts) == 0 {
// all certificates are renewed
return nil
}

err = m.sendNags(conn, reg.Contact, parsedCerts)
if err != nil {
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
return fmt.Errorf("sending nag emails: %w", err)
}
for _, cert := range parsedCerts {
serial := core.SerialToString(cert.SerialNumber)
err = m.updateCertStatus(serial)
if err != nil {
// Don't return immediately; we'd like to at least try and update the status for
// all certificates, even if one of them experienced an error (which might have
// been intermittent)
m.log.AuditErrf("updating certificate status for %s: %s", serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
continue
}
}
return nil
}

func (m *mailer) findExpiringCertificates(ctx context.Context) error {
Expand Down Expand Up @@ -402,8 +437,12 @@ type Config struct {
// Path to a text/template email template
EmailTemplate string

// How often to process a batch of certificates
Frequency cmd.ConfigDuration

// How many parallel goroutines should process each batch of emails
ParallelSends uint

TLS cmd.TLSConfig
SAService *cmd.GRPCClientConfig

Expand Down Expand Up @@ -589,6 +628,7 @@ func main() {
emailTemplate: tmpl,
nagTimes: nags,
limit: c.Mailer.CertLimit,
parallelSends: c.Mailer.ParallelSends,
clk: clk,
stats: initStats(scope),
}
Expand Down
45 changes: 42 additions & 3 deletions cmd/expiration-mailer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"errors"
"fmt"
"math/big"
"net"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/letsencrypt/boulder/db"
berrors "github.com/letsencrypt/boulder/errors"
blog "github.com/letsencrypt/boulder/log"
bmail "github.com/letsencrypt/boulder/mail"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/mocks"
"github.com/letsencrypt/boulder/sa"
Expand Down Expand Up @@ -128,7 +130,9 @@ func TestSendNags(t *testing.T) {
DNSNames: []string{"example.com"},
}

err := m.sendNags([]string{emailA}, []*x509.Certificate{cert})
conn, err := m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{emailA}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Failed to send warning messages")
test.AssertEquals(t, len(mc.Messages), 1)
test.AssertEquals(t, mocks.MailerMessage{
Expand All @@ -138,7 +142,9 @@ func TestSendNags(t *testing.T) {
}, mc.Messages[0])

mc.Clear()
err = m.sendNags([]string{emailA, emailB}, []*x509.Certificate{cert})
conn, err = m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{emailA, emailB}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Failed to send warning messages")
test.AssertEquals(t, len(mc.Messages), 2)
test.AssertEquals(t, mocks.MailerMessage{
Expand All @@ -153,7 +159,9 @@ func TestSendNags(t *testing.T) {
}, mc.Messages[1])

mc.Clear()
err = m.sendNags([]string{}, []*x509.Certificate{cert})
conn, err = m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Not an error to pass no email contacts")
test.AssertEquals(t, len(mc.Messages), 0)

Expand Down Expand Up @@ -217,6 +225,37 @@ func TestProcessCerts(t *testing.T) {
}
}

func TestProcessCertsParallel(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})

testCtx.m.parallelSends = 2
certs := addExpiringCerts(t, testCtx)
log.Clear()
testCtx.m.processCerts(context.Background(), certs)
// Test that the lastExpirationNagSent was updated for the certificate
// corresponding to serial4, which is set up as "already renewed" by
// addExpiringCerts.
if len(log.GetAllMatching("DEBUG: SQL: UPDATE certificateStatus .*2006-01-02 15:04:05.999999999.*\"000000000000000000000000000000001339\"")) != 1 {
t.Errorf("Expected an update to certificateStatus, got these log lines:\n%s",
strings.Join(log.GetAllMatching(".*"), "\n"))
}
}

type erroringMailClient struct{}

func (e erroringMailClient) Connect() (bmail.Conn, error) {
return nil, errors.New("whoopsie-doo")
}

func TestProcessCertsConnectError(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})

testCtx.m.mailer = erroringMailClient{}
certs := addExpiringCerts(t, testCtx)
// Checking that this terminates rather than deadlocks
testCtx.m.processCerts(context.Background(), certs)
}

func TestFindExpiringCertificates(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24, time.Hour * 24 * 4, time.Hour * 24 * 7})

Expand Down
4 changes: 3 additions & 1 deletion cmd/expiration-mailer/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func TestSendEarliestCertInfo(t *testing.T) {
serial2,
)

err := ctx.m.sendNags([]string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
conn, err := ctx.m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = ctx.m.sendNags(conn, []string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/notify-mailer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ func (m *mailer) run() error {
m.log.Infof("Address %q was associated with the most recipients (%d)",
mostRecipients, mostRecipientsLen)

err = m.mailer.Connect()
conn, err := m.mailer.Connect()
if err != nil {
return err
}

defer func() { _ = m.mailer.Close() }()
defer func() { _ = conn.Close() }()

startTime := m.clk.Now()
sortedAddresses := sortAddresses(addressToRecipient)
Expand All @@ -186,7 +186,7 @@ func (m *mailer) run() error {
continue
}

err = m.mailer.SendMail([]string{address}, m.subject, messageBody)
err = conn.SendMail([]string{address}, m.subject, messageBody)
if err != nil {
var badAddrErr bmail.BadAddressSMTPError
if errors.As(err, &badAddrErr) {
Expand Down
Loading