Skip to content

Commit

Permalink
queue: implement adding a message to the queue that gets sent to mult…
Browse files Browse the repository at this point in the history
…iple recipients

and in a way that allows us to send that message to multiple recipients in a
single smtp transaction.
  • Loading branch information
mjl- committed Mar 5, 2024
1 parent 15e450d commit 47ebfa8
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 276 deletions.
8 changes: 4 additions & 4 deletions dmarcdb/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,13 +842,13 @@ Period: %s - %s UTC
continue
}

qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
qm := queue.MakeMsg(from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
// Don't try as long as regular deliveries, and stop before we would send the
// delayed DSN. Though we also won't send that due to IsDMARCReport.
qm.MaxAttempts = 5
qm.IsDMARCReport = true

err = queueAdd(ctx, log, &qm, msgf)
err = queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm)
if err != nil {
tempError = true
log.Errorx("queueing message with dmarc aggregate report", err)
Expand Down Expand Up @@ -997,13 +997,13 @@ Submitting-URI: %s
continue
}

qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
qm := queue.MakeMsg(fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
// Don't try as long as regular deliveries, and stop before we would send the
// delayed DSN. Though we also won't send that due to IsDMARCReport.
qm.MaxAttempts = 5
qm.IsDMARCReport = true

if err := queueAdd(ctx, log, &qm, msgf); err != nil {
if err := queueAdd(ctx, log, mox.Conf.Static.Postmaster.Account, msgf, qm); err != nil {
log.Errorx("queueing message with dmarc error report", err)
metricReportError.Inc()
} else {
Expand Down
7 changes: 6 additions & 1 deletion dmarcdb/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ func TestSendReports(t *testing.T) {
aggrAddrs := map[string]struct{}{}
errorAddrs := map[string]struct{}{}

queueAdd = func(ctx context.Context, log mlog.Log, qm *queue.Msg, msgFile *os.File) error {
queueAdd = func(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...queue.Msg) error {
if len(qml) != 1 {
return fmt.Errorf("queued %d messages, expected 1", len(qml))
}
qm := qml[0]

// Read message file. Also write copy to disk for inspection.
buf, err := io.ReadAll(&moxio.AtReader{R: msgFile})
tcheckf(t, err, "read report message")
Expand Down
4 changes: 2 additions & 2 deletions gentestdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ Accounts:
const qmsg = "From: <test0@mox.example>\r\nTo: <other@remote.example>\r\nSubject: test\r\n\r\nthe message...\r\n"
_, err = fmt.Fprint(mf, qmsg)
xcheckf(err, "writing message")
qm := queue.MakeMsg("test0", mailfrom, rcptto, false, false, int64(len(qmsg)), "<test@localhost>", prefix, nil)
err = queue.Add(ctxbg, c.log, &qm, mf)
qm := queue.MakeMsg(mailfrom, rcptto, false, false, int64(len(qmsg)), "<test@localhost>", prefix, nil)
err = queue.Add(ctxbg, c.log, "test0", mf, qm)
xcheckf(err, "enqueue message")

// Create three accounts.
Expand Down
102 changes: 70 additions & 32 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@ var Localserve bool
// Use MakeMsg to make a message with fields that Add needs. Add will further set
// queueing related fields.
type Msg struct {
ID int64
ID int64

// A message for multiple recipients will get a BaseID that is identical to the
// first Msg.ID queued. They may be delivered in a single SMTP transaction if they
// are going to the same mail server. For messages with a single recipient, this
// field will be 0.
BaseID int64 `bstore:"index"`

Queued time.Time `bstore:"default now"`
SenderAccount string // Failures are delivered back to this local account. Also used for routing.
SenderLocalpart smtp.Localpart // Should be a local user and domain.
Expand Down Expand Up @@ -208,10 +215,9 @@ func Count(ctx context.Context) (int, error) {
}

// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
now := time.Now()
return Msg{
SenderAccount: senderAccount,
SenderLocalpart: sender.Localpart,
SenderDomain: sender.IPDomain,
RecipientLocalpart: recipient.Localpart,
Expand All @@ -228,43 +234,56 @@ func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf
}
}

// Add a new message to the queue. The queue is kicked immediately to start a
// first delivery attempt.
// Add one or more new messages to the queue. They'll get the same BaseID, so they
// can be delivered in a single SMTP transaction, with a single DATA command, but
// may be split into multiple transactions if errors/limits are encountered. The
// queue is kicked immediately to start a first delivery attempt.
//
// ID must be 0 and will be set after inserting in the queue.
// ID of the messagse must be 0 and will be set after inserting in the queue.
//
// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
// such as Queued, NextAttempt, LastAttempt, LastError.
func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
// todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
if len(qml) == 0 {
return fmt.Errorf("must queue at least one message")
}

if qm.ID != 0 {
return fmt.Errorf("id of queued message must be 0")
for _, qm := range qml {
if qm.ID != 0 {
return fmt.Errorf("id of queued messages must be 0")
}
}

if Localserve {
if qm.SenderAccount == "" {
if senderAccount == "" {
return fmt.Errorf("cannot queue with localserve without local account")
}
acc, err := store.OpenAccount(log, qm.SenderAccount)
acc, err := store.OpenAccount(log, senderAccount)
if err != nil {
return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
}
defer func() {
err := acc.Close()
log.Check(err, "closing account")
}()
m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
conf, _ := acc.Conf()
dest := conf.Destinations[qm.Sender().String()]
err = nil
acc.WithWLock(func() {
err = acc.DeliverDestination(log, dest, &m, msgFile)
for i, qm := range qml {
qml[i].SenderAccount = senderAccount
m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
dest := conf.Destinations[qm.Sender().String()]
err = acc.DeliverDestination(log, dest, &m, msgFile)
if err != nil {
err = fmt.Errorf("delivering message: %v", err)
return // Returned again outside WithWLock.
}
}
})
if err != nil {
return fmt.Errorf("delivering message: %v", err)
if err == nil {
log.Debug("immediately delivered from queue to sender")
}
log.Debug("immediately delivered from queue to sender")
return nil
return err
}

tx, err := DB.Begin(ctx, true)
Expand All @@ -279,30 +298,49 @@ func Add(ctx context.Context, log mlog.Log, qm *Msg, msgFile *os.File) error {
}
}()

if err := tx.Insert(qm); err != nil {
return err
// Insert messages into queue. If there are multiple messages, they all get a
// non-zero BaseID that is the Msg.ID of the first message inserted.
var baseID int64
for i := range qml {
qml[i].SenderAccount = senderAccount
qml[i].BaseID = baseID
if err := tx.Insert(&qml[i]); err != nil {
return err
}
if i == 0 && len(qml) > 1 {
baseID = qml[i].ID
qml[i].BaseID = baseID
if err := tx.Update(&qml[i]); err != nil {
return err
}
}
}

dst := qm.MessagePath()
var paths []string
defer func() {
if dst != "" {
err := os.Remove(dst)
log.Check(err, "removing destination message file for queue", slog.String("path", dst))
for _, p := range paths {
err := os.Remove(p)
log.Check(err, "removing destination message file for queue", slog.String("path", p))
}
}()
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(log, dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)

for _, qm := range qml {
dst := qm.MessagePath()
paths = append(paths, dst)
dstDir := filepath.Dir(dst)
os.MkdirAll(dstDir, 0770)
if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
return fmt.Errorf("linking/copying message to new file: %s", err)
} else if err := moxio.SyncDir(log, dstDir); err != nil {
return fmt.Errorf("sync directory: %v", err)
}
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %s", err)
}
tx = nil
dst = ""
paths = nil

queuekick()
return nil
Expand Down

0 comments on commit 47ebfa8

Please sign in to comment.