forked from tucnak/telebot
/
poller.go
151 lines (134 loc) · 3.06 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package telebot
import (
"encoding/json"
"sync/atomic"
"time"
)
// Poller is a provider of Updates.
//
// All pollers must implement Poll(), which accepts bot
// pointer and subscription channel and start polling
// synchronously straight away.
type Poller interface {
// Poll is supposed to take the bot object
// subscription channel and start polling
// for Updates immediately.
//
// Poller must listen for stop constantly and close
// it as soon as it's done polling.
Poll(b *Bot, updates chan Update, stop chan struct{})
}
// LongPoller is a classic LongPoller with timeout.
type LongPoller struct {
Limit int
Timeout time.Duration
LastUpdateID int
// AllowedUpdates contains the update types
// you want your bot to receive.
//
// Default values:
//
// edited_message
// channel_post
// edited_channel_post
// inline_query
// chosen_inline_result
// callback_query
// shipping_query
// pre_checkout_query
// poll
// poll_answer
//
AllowedUpdates string
}
type AllowedUpdates []string
func DefaultAllowedUpdates() AllowedUpdates {
return AllowedUpdates(
[]string{
"edited_message",
"channel_post",
"edited_channel_post",
"inline_query",
"chosen_inline_result",
"callback_query",
"shipping_query",
"pre_checkout_query",
"poll",
"poll_answer",
"message_reaction",
},
)
}
func (u AllowedUpdates) Add(updates ...string) AllowedUpdates {
u = append(u, updates...)
return u
}
func (u AllowedUpdates) String() string {
b, _ := json.Marshal(u)
return string(b)
}
// Poll does long polling.
func (p *LongPoller) Poll(b *Bot, dest chan Update, stop chan struct{}) {
for {
select {
case <-stop:
return
default:
}
updates, err := b.getUpdates(p.LastUpdateID+1, p.Limit, p.Timeout, p.AllowedUpdates)
if err != nil {
b.debug(err)
continue
}
for _, update := range updates {
p.LastUpdateID = update.ID
if atomic.LoadInt64(&b.shouldWrireNextUpdate) == 1 {
b.nextUpdate <- update
} else {
dest <- update
}
}
}
}
// MiddlewarePoller is a special kind of poller that acts
// like a filter for updates. It could be used for spam
// handling, banning or whatever.
//
// For heavy middleware, use increased capacity.
type MiddlewarePoller struct {
Capacity int // Default: 1
Poller Poller
Filter func(*Update) bool
}
// NewMiddlewarePoller wait for it... constructs a new middleware poller.
func NewMiddlewarePoller(original Poller, filter func(*Update) bool) *MiddlewarePoller {
return &MiddlewarePoller{
Poller: original,
Filter: filter,
}
}
// Poll sieves updates through middleware filter.
func (p *MiddlewarePoller) Poll(b *Bot, dest chan Update, stop chan struct{}) {
if p.Capacity < 1 {
p.Capacity = 1
}
middle := make(chan Update, p.Capacity)
stopPoller := make(chan struct{})
stopConfirm := make(chan struct{})
go func() {
p.Poller.Poll(b, middle, stopPoller)
close(stopConfirm)
}()
for {
select {
case <-stop:
close(stopPoller)
<-stopConfirm
return
case upd := <-middle:
if p.Filter(&upd) {
dest <- upd
}
}
}
}