This repository has been archived by the owner on Jul 7, 2020. It is now read-only.
forked from keybase/client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
storage_ephemeral_purge.go
151 lines (134 loc) · 5.3 KB
/
storage_ephemeral_purge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package storage
import (
"github.com/keybase/client/go/protocol/chat1"
"github.com/keybase/client/go/protocol/gregor1"
context "golang.org/x/net/context"
)
func (s *Storage) GetAllPurgeInfo(ctx context.Context, uid gregor1.UID) (allPurgeInfo map[string]chat1.EphemeralPurgeInfo, err error) {
defer s.Trace(ctx, func() error { return err }, "GetAllPurgeInfo")()
return s.ephemeralTracker.getAllPurgeInfo(ctx, uid)
}
// For a given conversation, purge all ephemeral messages from
// purgeInfo.MinUnexplodedID to the present, updating bookkeeping for the next
// time we need to purge this conv.
func (s *Storage) EphemeralPurge(ctx context.Context, convID chat1.ConversationID, uid gregor1.UID, purgeInfo *chat1.EphemeralPurgeInfo) (newPurgeInfo *chat1.EphemeralPurgeInfo, explodedMsgs []chat1.MessageUnboxed, err Error) {
defer s.Trace(ctx, func() error { return err }, "EphemeralPurge")()
locks.Storage.Lock()
defer locks.Storage.Unlock()
if purgeInfo == nil {
return nil, nil, nil
}
// Fetch secret key
key, ierr := getSecretBoxKey(ctx, s.G().ExternalG(), DefaultSecretUI)
if ierr != nil {
return nil, nil, MiscError{Msg: "unable to get secret key: " + ierr.Error()}
}
ctx, err = s.engine.Init(ctx, key, convID, uid)
if err != nil {
return nil, nil, err
}
maxMsgID, err := s.idtracker.getMaxMessageID(ctx, convID, uid)
if err != nil {
return nil, nil, err
}
// We don't care about holes.
maxHoles := int(maxMsgID-purgeInfo.MinUnexplodedID) + 1
var target int
if purgeInfo.MinUnexplodedID == 0 {
target = 0 // we need to traverse the whole conversation
} else {
target = maxHoles
}
rc := NewHoleyResultCollector(maxHoles, NewSimpleResultCollector(target))
err = s.engine.ReadMessages(ctx, rc, convID, uid, maxMsgID)
switch err.(type) {
case nil:
// ok
if len(rc.Result()) == 0 {
err := s.ephemeralTracker.inactivatePurgeInfo(ctx, convID, uid)
return nil, nil, err
}
case MissError:
s.Debug(ctx, "record-only ephemeralTracker: no local messages")
// We don't have these messages in cache, so don't retry this
// conversation until further notice.
err := s.ephemeralTracker.inactivatePurgeInfo(ctx, convID, uid)
return nil, nil, err
default:
return nil, nil, err
}
newPurgeInfo, explodedMsgs, err = s.ephemeralPurgeHelper(ctx, convID, uid, rc.Result())
if err != nil {
return nil, nil, err
}
err = s.ephemeralTracker.setPurgeInfo(ctx, convID, uid, newPurgeInfo)
return newPurgeInfo, explodedMsgs, err
}
func (s *Storage) explodeExpiredMessages(ctx context.Context, convID chat1.ConversationID,
uid gregor1.UID, msgs []chat1.MessageUnboxed) (err Error) {
defer s.Trace(ctx, func() error { return err }, "explodeExpiredMessages")()
purgeInfo, _, err := s.ephemeralPurgeHelper(ctx, convID, uid, msgs)
if err != nil {
return err
}
// We may only be merging in some subset of messages, we only update if the
// info we get is more restrictive that what we have already
return s.ephemeralTracker.maybeUpdatePurgeInfo(ctx, convID, uid, purgeInfo)
}
// Before adding or removing messages from storage, nuke any expired ones and
// give info for our bookkeeping for the next time we have to purge.
// requires msgs to be sorted by descending message ID
func (s *Storage) ephemeralPurgeHelper(ctx context.Context, convID chat1.ConversationID,
uid gregor1.UID, msgs []chat1.MessageUnboxed) (purgeInfo *chat1.EphemeralPurgeInfo, explodedMsgs []chat1.MessageUnboxed, err Error) {
defer s.Trace(ctx, func() error { return err }, "ephemeralPurgeHelper convID: %v, uid: %v, numMessages %v", convID, uid, len(msgs))()
if msgs == nil || len(msgs) == 0 {
return nil, nil, nil
}
nextPurgeTime := gregor1.Time(0)
minUnexplodedID := msgs[0].GetMessageID()
var allAssets []chat1.Asset
var hasExploding bool
for i, msg := range msgs {
if !msg.IsValid() {
s.Debug(ctx, "skipping invalid msg: %v", msg.GetMessageID())
continue
}
mvalid := msg.Valid()
if mvalid.IsEphemeral() {
if !mvalid.IsEphemeralExpired(s.clock.Now()) {
hasExploding = true
// Keep track of the minimum ephemeral message that is not yet
// exploded.
if msg.GetMessageID() < minUnexplodedID {
minUnexplodedID = msg.GetMessageID()
}
// Keep track of the next time we'll have purge this conv.
if nextPurgeTime == 0 || mvalid.Etime() < nextPurgeTime {
nextPurgeTime = mvalid.Etime()
}
s.Debug(ctx, "skipping unexpired ephemeral msg: %v, etime: %v, now: %v", msg.GetMessageID(), mvalid.Etime().Time(), s.clock.Now())
} else if mvalid.MessageBody.IsNil() {
s.Debug(ctx, "skipping already exploded message: %v", msg.GetMessageID())
} else {
msgPurged, assets := s.purgeMessage(mvalid)
allAssets = append(allAssets, assets...)
explodedMsgs = append(explodedMsgs, msgPurged)
msgs[i] = msgPurged
s.Debug(ctx, "purging ephemeral msg: %v", msgPurged.GetMessageID())
}
}
}
// queue asset deletions in the background
s.assetDeleter.DeleteAssets(ctx, uid, convID, allAssets)
s.Debug(ctx, "purging %v ephemeral messages", len(explodedMsgs))
if err = s.engine.WriteMessages(ctx, convID, uid, explodedMsgs); err != nil {
s.Debug(ctx, "write messages failed: %v", err)
return nil, nil, err
}
return &chat1.EphemeralPurgeInfo{
ConvID: convID,
MinUnexplodedID: minUnexplodedID,
NextPurgeTime: nextPurgeTime,
IsActive: hasExploding,
}, explodedMsgs, nil
}