forked from flashmob/go-guerrilla
-
Notifications
You must be signed in to change notification settings - Fork 0
/
p_redis.go
127 lines (112 loc) · 3.88 KB
/
p_redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package backends
import (
"fmt"
"github.com/flashmob/go-guerrilla/mail"
"github.com/flashmob/go-guerrilla/response"
)
// ----------------------------------------------------------------------------------
// Processor Name: redis
// ----------------------------------------------------------------------------------
// Description : Saves the e.Data (email data) and e.DeliveryHeader together in redis
// : using the hash generated by the "hash" processor and stored in
// : e.Hashes
// ----------------------------------------------------------------------------------
// Config Options: redis_expire_seconds int - how many seconds to expiry
// : redis_interface string - <host>:<port> eg, 127.0.0.1:6379
// --------------:-------------------------------------------------------------------
// Input : e.Data
// : e.DeliveryHeader generated by Header() processor
// :
// ----------------------------------------------------------------------------------
// Output : Sets e.QueuedId with the first item fromHashes[0]
// ----------------------------------------------------------------------------------
func init() {
processors["redis"] = func() Decorator {
return Redis()
}
}
type RedisProcessorConfig struct {
RedisExpireSeconds int `json:"redis_expire_seconds"`
RedisInterface string `json:"redis_interface"`
}
type RedisProcessor struct {
isConnected bool
conn RedisConn
}
func (r *RedisProcessor) redisConnection(redisInterface string) (err error) {
if r.isConnected == false {
r.conn, err = RedisDialer("tcp", redisInterface)
if err != nil {
// handle error
return err
}
r.isConnected = true
}
return nil
}
// The redis decorator stores the email data in redis
func Redis() Decorator {
var config *RedisProcessorConfig
redisClient := &RedisProcessor{}
// read the config into RedisProcessorConfig
Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
configType := BaseConfig(&RedisProcessorConfig{})
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
if err != nil {
return err
}
config = bcfg.(*RedisProcessorConfig)
if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil {
err := fmt.Errorf("redis cannot connect, check your settings: %s", redisErr)
return err
}
return nil
}))
// When shutting down
Svc.AddShutdowner(ShutdownWith(func() error {
if redisClient.isConnected {
return redisClient.conn.Close()
}
return nil
}))
var redisErr error
return func(p Processor) Processor {
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
if task == TaskSaveMail {
hash := ""
if len(e.Hashes) > 0 {
e.QueuedId = e.Hashes[0]
hash = e.Hashes[0]
var stringer fmt.Stringer
// a compressor was set
if c, ok := e.Values["zlib-compressor"]; ok {
stringer = c.(*DataCompressor)
} else {
stringer = e
}
redisErr = redisClient.redisConnection(config.RedisInterface)
if redisErr != nil {
Log().WithError(redisErr).Warn("Error while connecting to redis")
result := NewResult(response.Canned.FailBackendTransaction)
return result, redisErr
}
_, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, stringer)
if doErr != nil {
Log().WithError(doErr).Warn("Error while SETEX to redis")
result := NewResult(response.Canned.FailBackendTransaction)
return result, doErr
}
e.Values["redis"] = "redis" // the next processor will know to look in redis for the message data
} else {
Log().Error("Redis needs a Hasher() process before it")
result := NewResult(response.Canned.FailBackendTransaction)
return result, StorageError
}
return p.Process(e, task)
} else {
// nothing to do for this task
return p.Process(e, task)
}
})
}
}