/
sender.go
92 lines (80 loc) · 1.82 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package smtpSender
import (
"errors"
"sync"
)
// Config profile for sender pool
type Config struct {
Hostname string
Iface string
Port int
Stream int
MapIP map[string]string
SMTPserver *SMTPserver
}
// Pipe email pipe for send email
type Pipe struct {
wg sync.WaitGroup
email chan Email
config []Config
}
var ErrPipeStopped = errors.New("email streaming pipe stopped")
// NewPipe return new stream sender pipe
func NewPipe(conf ...Config) Pipe {
pipe := Pipe{}
pipe.config = append(pipe.config, conf...)
return pipe
}
// Start stream sender
func (pipe *Pipe) Start() {
pipe.wg = sync.WaitGroup{}
pipe.email = make(chan Email, len(pipe.config))
go func() {
for i := range pipe.config {
pipe.wg.Add(1)
go func(conf *Config) {
backet := make(chan struct{}, conf.Stream)
for email := range pipe.email {
backet <- struct{}{}
pipe.wg.Add(1)
go func(e Email) {
conn := new(Connect)
conn.SetHostName(conf.Hostname)
conn.SetSMTPport(conf.Port)
conn.SetIface(conf.Iface)
conn.mapIP = conf.MapIP
e.Send(conn, conf.SMTPserver)
<-backet
pipe.wg.Done()
}(email)
}
pipe.wg.Done()
}(&pipe.config[i])
}
}()
}
// Send add email to stream
func (pipe *Pipe) Send(email Email) (err error) {
defer func(eml *Email, err *error) {
if e := recover(); e != nil {
*err = ErrPipeStopped
//eml.ResultFunc(Result{ID: eml.ID, Err: errors.New("421 email streaming pipe stopped")})
} else {
*err = nil
}
}(&email, &err)
pipe.email <- email
return
}
// Stop stream sender
func (pipe *Pipe) Stop() {
close(pipe.email)
pipe.wg.Wait()
}
// NewEmailPipe return new chanel for stream send
// Deprecated: use NewPipe
func NewEmailPipe(conf ...Config) chan<- Email {
pipe := NewPipe(conf...)
pipe.Start()
return pipe.email
}