/
loop.go
161 lines (138 loc) · 4.38 KB
/
loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2017, 2022 Tamás Gulácsi. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
package imapclient
import (
"context"
"crypto/sha512"
"encoding/base64"
"errors"
"fmt"
"hash"
"io"
"strconv"
"time"
"github.com/go-logr/logr"
"github.com/tgulacsi/go/temp"
)
var (
// ShortSleep is the duration which ised for sleep after successful delivery.
ShortSleep = 1 * time.Second
// LongSleep is the duration which used for sleep between errors and if the inbox is empty.
LongSleep = 5 * time.Minute
// ErrSkip from DeliverFunc means leave the message as is.
ErrSkip = errors.New("skip move")
)
// DeliveryLoop periodically checks the inbox for mails with the specified pattern
// in the subject (or for any unseen mail if pattern == ""), tries to parse the
// message, and call the deliver function with the parsed message.
//
// If deliver did not returned error, the message is marked as Seen, and if outbox
// is not empty, then moved to outbox.
// Except when the error is ErrSkip - then the message is left there as is.
//
// deliver is called with the message, UID and hsh.
func DeliveryLoop(ctx context.Context, c Client, inbox, pattern string, deliver DeliverFunc, outbox, errbox string, logger logr.Logger) error {
if inbox == "" {
inbox = "INBOX"
}
for {
// nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable
n, err := one(ctx, c, inbox, pattern, deliver, outbox, errbox, logger)
if err != nil {
logger.Error(err, "DeliveryLoop one round", "count", n)
} else {
logger.Info("DeliveryLoop one round", "count", n)
}
select {
case <-ctx.Done():
return nil
default:
}
dur := ShortSleep
if n == 0 || err != nil {
dur = LongSleep
}
delay := time.NewTimer(dur)
select {
case <-delay.C:
case <-ctx.Done():
if !delay.Stop() {
<-delay.C
}
return nil
}
}
}
func NewHash() *Hash { return &Hash{Hash: sha512.New512_224()} }
type HashArray [sha512.Size224]byte
func (h HashArray) String() string { return base64.URLEncoding.EncodeToString(h[:]) }
type Hash struct{ hash.Hash }
func (h Hash) Array() HashArray { var a HashArray; h.Hash.Sum(a[:0]); return a }
func MkDeliverFunc(ctx context.Context, deliver DeliverFunc) DeliverFunc {
return func(ctx context.Context, r io.ReadSeeker, uid uint32, hsh HashArray) error {
return deliver(ctx, r, uid, hsh)
}
}
// DeliverOne does one round of message reading and delivery. Does not loop.
// Returns the number of messages delivered.
func DeliverOne(ctx context.Context, c Client, inbox, pattern string, deliver DeliverFunc, outbox, errbox string, logger logr.Logger) (int, error) {
if inbox == "" {
inbox = "INBOX"
}
return one(ctx, c, inbox, pattern, deliver, outbox, errbox, logger)
}
// DeliverFunc is the type for message delivery.
//
// r is the message data, uid is the IMAP server sent message UID, hsh is the message's hash.
type DeliverFunc func(ctx context.Context, r io.ReadSeeker, uid uint32, hsh HashArray) error
func one(ctx context.Context, c Client, inbox, pattern string, deliver DeliverFunc, outbox, errbox string, logger logr.Logger) (int, error) {
logger = logger.WithValues("inbox", inbox)
if err := c.Connect(ctx); err != nil {
logger.Error(err, "Connecting")
return 0, fmt.Errorf("connect: %w", err)
}
defer c.Close(ctx, true)
uids, err := c.List(ctx, inbox, pattern, outbox != "" && errbox != "")
logger.Info("List", "uids", uids, "error", err)
if err != nil {
return 0, fmt.Errorf("list %v/%v: %w", c, inbox, err)
}
var n int
hsh := NewHash()
for _, uid := range uids {
if err = ctx.Err(); err != nil {
return n, err
}
logger := logger.WithValues("uid", uid)
hsh.Reset()
body := temp.NewMemorySlurper(strconv.FormatUint(uint64(uid), 10))
if _, err = c.ReadTo(ctx, io.MultiWriter(body, hsh), uid); err != nil {
body.Close()
logger.Error(err, "Read")
continue
}
err = deliver(ctx, body, uid, hsh.Array())
body.Close()
if err != nil {
logger.Error(err, "deliver")
if errbox != "" && !errors.Is(err, ErrSkip) {
if err = c.Move(ctx, uid, errbox); err != nil {
logger.Error(err, "move to", "errbox", errbox)
}
}
continue
}
n++
if err = c.Mark(ctx, uid, true); err != nil {
logger.Error(err, "mark seen")
}
if outbox != "" {
if err = c.Move(ctx, uid, outbox); err != nil {
logger.Error(err, "move to", "outbox", outbox)
continue
}
}
}
return n, nil
}