/
messages.go
134 lines (118 loc) · 3.65 KB
/
messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package mtpwrap
import (
"context"
"fmt"
"runtime/trace"
"github.com/gotd/td/telegram/message"
"github.com/gotd/td/telegram/query"
"github.com/gotd/td/telegram/query/messages"
"github.com/gotd/td/tg"
)
// SearchAllMyMessages returns the current authorized user messages from chat or
// channel `dlg`. For each API call, the callback function will be invoked, if
// not nil.
func (c *Client) SearchAllMyMessages(ctx context.Context, dlg Entity, cb func(n int)) ([]messages.Elem, error) {
return c.SearchAllMessages(ctx, dlg, &tg.InputPeerSelf{}, cb)
}
// SearchAllMessages search messages in the chat or channel `dlg`. It finds ALL
// messages from the person `who`. returns a slice of message.Elem. For each API
// call, the callback function will be invoked, if not nil.
func (c *Client) SearchAllMessages(ctx context.Context, dlg Entity, who tg.InputPeerClass, cb func(n int)) ([]messages.Elem, error) {
if cached, err := c.cache.Get(cacheKey(dlg.GetID())); err == nil {
msgs := cached.([]messages.Elem)
if cb != nil {
cb(len(msgs))
}
return msgs, nil
}
ip, err := asInputPeer(dlg)
if err != nil {
return nil, err
}
bld := query.Messages(c.cl.API()).
Search(ip).
BatchSize(defBatchSize).
FromID(who).
Filter(&tg.InputMessagesFilterEmpty{})
elems, err := collectMessages(ctx, bld, cb)
if err != nil {
return nil, err
}
if err := c.cache.Set(cacheKey(dlg.GetID()), elems); err != nil {
return nil, err
}
return elems, err
}
func (c *Client) DeleteMessages(ctx context.Context, dlg Entity, messages []messages.Elem) (int, error) {
ctx, task := trace.NewTask(ctx, "DeleteMessages")
defer task.End()
ip, err := asInputPeer(dlg)
if err != nil {
trace.Log(ctx, "logic", err.Error())
return 0, err
}
ids := splitBy(defBatchSize, messages, func(i int) int { return messages[i].Msg.GetID() })
trace.Logf(ctx, "logic", "split chunks: %d", len(ids))
// clearing cache.
if c.cache.Remove(cacheKey(dlg.GetID())) {
trace.Log(ctx, "logic", "cache cleared")
}
total := 0
for _, chunk := range ids {
resp, err := message.NewSender(c.cl.API()).To(ip).Revoke().Messages(ctx, chunk...)
if err != nil {
trace.Logf(ctx, "api", "revoke error: %s", err)
return 0, fmt.Errorf("failed to delete: %w", err)
}
total += resp.GetPtsCount()
}
trace.Log(ctx, "logic", "ok")
return total, nil
}
func asInputPeer(ent Entity) (tg.InputPeerClass, error) {
switch peer := ent.(type) {
case *tg.Chat:
return peer.AsInputPeer(), nil
case *tg.Channel:
return peer.AsInputPeer(), nil
default:
return nil, fmt.Errorf("unsupported input peer type: %T", peer)
}
// unreachable
}
// splitBy splits the chunk input of M items to X chunks of `n` items.
// For each element of input, the fn is called, that should return
// the value.
func splitBy[T, S any](n int, input []S, fn func(i int) T) [][]T {
var out [][]T = make([][]T, 0, len(input)/n)
var chunk []T
for i := range input {
if i > 0 && i%n == 0 {
out = append(out, chunk)
chunk = make([]T, 0, n)
}
chunk = append(chunk, fn(i))
}
if len(chunk) > 0 {
out = append(out, chunk)
}
return out
}
// collectMessages is the copy/pasta from the td/telegram/message package with added
// optional callback function. It creates iterator and collects all elements to
// slice, calling callback function for each iteration, if it's not nil.
func collectMessages(ctx context.Context, b *messages.SearchQueryBuilder, cb func(n int)) ([]messages.Elem, error) {
iter := b.Iter()
c, err := iter.Total(ctx)
if err != nil {
return nil, fmt.Errorf("get total: %w", err)
}
r := make([]messages.Elem, 0, c)
for iter.Next(ctx) {
r = append(r, iter.Value())
if cb != nil {
cb(1)
}
}
return r, iter.Err()
}