forked from textileio/go-textile
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cafe_inbox.go
185 lines (162 loc) · 4.33 KB
/
cafe_inbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package core
import (
"sync"
"gx/ipfs/QmTRhk7cgjUf2gfQ3p2M9KPECNZEW9XUrmHcFCgog4cPgB/go-libp2p-peer"
"gx/ipfs/QmUJYo4etAQqFfSS2rarFAE97eNGB8ej64YkRT2SmsYD4r/go-ipfs/core"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/textileio/textile-go/crypto"
"github.com/textileio/textile-go/ipfs"
"github.com/textileio/textile-go/pb"
"github.com/textileio/textile-go/repo"
)
// cafeInFlushGroupSize is the size of concurrently processed messages
const cafeInFlushGroupSize = 16
// maxDownloadAttempts is the number of times a message can fail to download before being deleted
const maxDownloadAttempts = 5
// CafeInbox queues and processes outbound thread messages
type CafeInbox struct {
service func() *CafeService
threadsService func() *ThreadsService
node func() *core.IpfsNode
datastore repo.Datastore
checking bool
mux sync.Mutex
}
// NewCafeInbox creates a new inbox queue
func NewCafeInbox(
service func() *CafeService,
threadsService func() *ThreadsService,
node func() *core.IpfsNode,
datastore repo.Datastore,
) *CafeInbox {
return &CafeInbox{
service: service,
threadsService: threadsService,
node: node,
datastore: datastore,
}
}
// CheckMessages asks each active cafe session for new messages
func (q *CafeInbox) CheckMessages() error {
if q.checking {
return nil
}
q.checking = true
defer func() {
q.checking = false
}()
// get active cafe sessions
sessions := q.datastore.CafeSessions().List()
if len(sessions) == 0 {
return nil
}
// check each concurrently
wg := sync.WaitGroup{}
var cerr error
for _, session := range sessions {
cafe, err := peer.IDB58Decode(session.CafeId)
if err != nil {
cerr = err
continue
}
wg.Add(1)
go func(cafe peer.ID) {
if err := q.service().CheckMessages(cafe); err != nil {
cerr = err
}
wg.Done()
}(cafe)
}
wg.Wait()
return cerr
}
// Add adds an inbound message
func (q *CafeInbox) Add(msg *pb.CafeMessage) error {
log.Debugf("received cafe message from %s: %s", ipfs.ShortenID(msg.PeerId), msg.Id)
date, err := ptypes.Timestamp(msg.Date)
if err != nil {
return err
}
return q.datastore.CafeMessages().Add(&repo.CafeMessage{
Id: msg.Id,
PeerId: msg.PeerId,
Date: date,
})
}
// Flush processes pending messages
func (q *CafeInbox) Flush() {
q.mux.Lock()
defer q.mux.Unlock()
log.Debug("flushing cafe inbox")
if q.threadsService() == nil || q.service() == nil {
return
}
if err := q.batch(q.datastore.CafeMessages().List("", cafeInFlushGroupSize)); err != nil {
log.Errorf("cafe inbox batch error: %s", err)
return
}
}
// batch flushes a batch of messages
func (q *CafeInbox) batch(msgs []repo.CafeMessage) error {
log.Debugf("handling %d cafe messages", len(msgs))
if len(msgs) == 0 {
return nil
}
for _, msg := range msgs {
go func(msg repo.CafeMessage) {
if err := q.handle(msg); err != nil {
log.Warningf("unable to handle cafe message: %s", msg.Id)
return
}
if err := q.datastore.CafeMessages().Delete(msg.Id); err != nil {
log.Errorf("failed to delete cafe message %s: %s", msg.Id, err)
} else {
log.Debugf("handled cafe message %s", msg.Id)
}
}(msg)
}
// next batch
offset := msgs[len(msgs)-1].Id
next := q.datastore.CafeMessages().List(offset, cafeInFlushGroupSize)
// keep going
return q.batch(next)
}
// handle handles a single message
func (q *CafeInbox) handle(msg repo.CafeMessage) error {
pid, err := peer.IDB58Decode(msg.PeerId)
if err != nil {
return err
}
// download the actual message
ciphertext, err := ipfs.DataAtPath(q.node(), msg.Id)
if err != nil {
if msg.Attempts+1 >= maxDownloadAttempts {
if err := q.datastore.CafeMessages().Delete(msg.Id); err != nil {
return err
}
} else {
if err := q.datastore.CafeMessages().AddAttempt(msg.Id); err != nil {
return err
}
}
return err
}
envb, err := crypto.Decrypt(q.node().PrivateKey, ciphertext)
if err != nil {
return err
}
env := new(pb.Envelope)
if err := proto.Unmarshal(envb, env); err != nil {
return err
}
if err := q.threadsService().service.VerifyEnvelope(env, pid); err != nil {
log.Warningf("error verifying cafe message: %s", err)
return nil
}
// pass to thread service for normal handling
if _, err := q.threadsService().Handle(pid, env); err != nil {
return err
}
return nil
}