This repository has been archived by the owner on Jun 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 92
/
rssbot.go
451 lines (406 loc) · 14 KB
/
rssbot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
// Package rssbot implements a Service capable of reading Atom/RSS feeds.
package rssbot
import (
"errors"
"fmt"
"html"
"net/http"
"net/url"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
"github.com/die-net/lrucache"
"github.com/gregjones/httpcache"
"github.com/matrix-org/go-neb/database"
"github.com/matrix-org/go-neb/polling"
"github.com/matrix-org/go-neb/types"
"github.com/matrix-org/gomatrix"
"github.com/mmcdole/gofeed"
"github.com/prometheus/client_golang/prometheus"
)
// ServiceType of the RSS Bot service
const ServiceType = "rssbot"
var cachingClient *http.Client
var (
pollCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "goneb_rss_polls_total",
Help: "The number of feed polls from RSS services",
}, []string{"url", "http_status"})
)
const minPollingIntervalSeconds = 60 * 5 // 5 min (News feeds can be genuinely spammy)
// Service contains the Config fields for this service.
//
// Example request:
// {
// feeds: {
// "http://rss.cnn.com/rss/edition.rss": {
// poll_interval_mins: 60,
// rooms: ["!cBrPbzWazCtlkMNQSF:localhost"]
// },
// "https://www.wired.com/feed/": {
// rooms: ["!qmElAGdFYCHoCJuaNt:localhost"]
// }
// }
// }
type Service struct {
types.DefaultService
// Feeds is a map of feed URL to configuration options for this feed.
Feeds map[string]struct {
// Optional. The time to wait between polls. If this is less than minPollingIntervalSeconds, it is ignored.
PollIntervalMins int `json:"poll_interval_mins"`
// The list of rooms to send feed updates into. This cannot be empty.
Rooms []string `json:"rooms"`
// True if rss bot is unable to poll this feed. This is populated by Go-NEB. Use /getService to
// retrieve this value.
IsFailing bool `json:"is_failing"`
// The time of the last successful poll. This is populated by Go-NEB. Use /getService to retrieve
// this value.
FeedUpdatedTimestampSecs int64 `json:"last_updated_ts_secs"`
// Internal field. When we should poll again.
NextPollTimestampSecs int64
// Internal field. The most recently seen GUIDs. Sized to the number of items in the feed.
RecentGUIDs []string
} `json:"feeds"`
}
// Register will check the liveness of each RSS feed given. If all feeds check out okay, no error is returned.
func (s *Service) Register(oldService types.Service, client *gomatrix.Client) error {
if len(s.Feeds) == 0 {
// this is an error UNLESS the old service had some feeds in which case they are deleting us :(
var numOldFeeds int
oldFeedService, ok := oldService.(*Service)
if !ok {
log.WithField("service", oldService).Error("Old service isn't an rssbot.Service")
} else {
numOldFeeds = len(oldFeedService.Feeds)
}
if numOldFeeds == 0 {
return errors.New("An RSS feed must be specified.")
}
return nil
}
// Make sure we can parse the feed
for feedURL, feedInfo := range s.Feeds {
if _, err := readFeed(feedURL); err != nil {
return fmt.Errorf("Failed to read URL %s: %s", feedURL, err.Error())
}
if len(feedInfo.Rooms) == 0 {
return fmt.Errorf("Feed %s has no rooms to send updates to", feedURL)
}
}
s.joinRooms(client)
return nil
}
func (s *Service) joinRooms(client *gomatrix.Client) {
roomSet := make(map[string]bool)
for _, feedInfo := range s.Feeds {
for _, roomID := range feedInfo.Rooms {
roomSet[roomID] = true
}
}
for roomID := range roomSet {
if _, err := client.JoinRoom(roomID, "", nil); err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"room_id": roomID,
"user_id": client.UserID,
}).Error("Failed to join room")
}
}
}
// PostRegister deletes this service if there are no feeds remaining.
func (s *Service) PostRegister(oldService types.Service) {
if len(s.Feeds) == 0 { // bye-bye :(
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
logger.Info("Deleting service: No feeds remaining.")
polling.StopPolling(s)
if err := database.GetServiceDB().DeleteService(s.ServiceID()); err != nil {
logger.WithError(err).Error("Failed to delete service")
}
}
}
// OnPoll rechecks RSS feeds which are due to be polled.
//
// In order for a feed to be polled, the current time must be greater than NextPollTimestampSecs.
// In order for an item on a feed to be sent to Matrix, the item's GUID must not exist in RecentGUIDs.
// The GUID for an item is created according to the following rules:
// - If there is a GUID field, use it.
// - Else if there is a Link field, use it as the GUID.
// - Else if there is a Title field, use it as the GUID.
//
// Returns a timestamp representing when this Service should have OnPoll called again.
func (s *Service) OnPoll(cli *gomatrix.Client) time.Time {
logger := log.WithFields(log.Fields{
"service_id": s.ServiceID(),
"service_type": s.ServiceType(),
})
now := time.Now().Unix() // Second resolution
// Work out which feeds should be polled
var pollFeeds []string
for u, feedInfo := range s.Feeds {
if feedInfo.NextPollTimestampSecs == 0 || now >= feedInfo.NextPollTimestampSecs {
// re-query this feed
pollFeeds = append(pollFeeds, u)
}
}
if len(pollFeeds) == 0 {
return s.nextTimestamp()
}
// Query each feed and send new items to subscribed rooms
for _, u := range pollFeeds {
feed, items, err := s.queryFeed(u)
if err != nil {
logger.WithField("feed_url", u).WithError(err).Error("Failed to query feed")
incrementMetrics(u, err)
continue
}
incrementMetrics(u, nil)
logger.WithFields(log.Fields{
"feed_url": u,
"feed_items": len(feed.Items),
"new_items": len(items),
}).Info("Sending new items")
// Loop backwards since [0] is the most recent and we want to send in chronological order
for i := len(items) - 1; i >= 0; i-- {
item := items[i]
if err := s.sendToRooms(cli, u, feed, item); err != nil {
logger.WithFields(log.Fields{
"feed_url": u,
log.ErrorKey: err,
"item": item,
}).Error("Failed to send item to room")
}
}
}
// Persist the service to save the next poll times
if _, err := database.GetServiceDB().StoreService(s); err != nil {
logger.WithError(err).Error("Failed to persist next poll times for service")
}
return s.nextTimestamp()
}
func incrementMetrics(urlStr string, err error) {
// extract domain part of RSS feed URL to get coarser (more useful) statistics
domain := urlStr
u, urlErr := url.Parse(urlStr)
if urlErr == nil {
domain = u.Host
}
if err != nil {
herr, ok := err.(gofeed.HTTPError)
statusCode := 0 // e.g. network timeout
if ok {
statusCode = herr.StatusCode
}
pollCounter.With(prometheus.Labels{"url": domain, "http_status": strconv.Itoa(statusCode)}).Inc()
} else {
pollCounter.With(prometheus.Labels{"url": domain, "http_status": "200"}).Inc() // technically 2xx but gofeed doesn't tell us which
}
}
func (s *Service) nextTimestamp() time.Time {
// return the earliest next poll ts
var earliestNextTs int64
for _, feedInfo := range s.Feeds {
if earliestNextTs == 0 || feedInfo.NextPollTimestampSecs < earliestNextTs {
earliestNextTs = feedInfo.NextPollTimestampSecs
}
}
// Don't allow times in the past. Set a min re-poll threshold of 60s to avoid
// tight-looping on feeds which 500.
now := time.Now().Unix()
if earliestNextTs <= now {
earliestNextTs = now + 60
}
return time.Unix(earliestNextTs, 0)
}
// Query the given feed, update relevant timestamps and return NEW items
func (s *Service) queryFeed(feedURL string) (*gofeed.Feed, []gofeed.Item, error) {
log.WithField("feed_url", feedURL).Info("Querying feed")
var items []gofeed.Item
feed, err := readFeed(feedURL)
// check for no items in addition to any returned errors as it appears some RSS feeds
// do not consistently return items.
if err == nil && len(feed.Items) == 0 {
err = errors.New("feed has 0 items")
}
if err != nil {
f := s.Feeds[feedURL]
f.IsFailing = true
s.Feeds[feedURL] = f
return nil, items, err
}
// Patch up the item list: make sure each item has a GUID.
ensureItemsHaveGUIDs(feed)
// Work out which items are new, if any (based on the last updated TS we have)
// If the TS is 0 then this is the first ever poll, so let's not send 10s of events
// into the room and just do new ones from this point onwards.
if s.Feeds[feedURL].NextPollTimestampSecs != 0 {
items = s.newItems(feedURL, feed.Items)
}
now := time.Now().Unix() // Second resolution
// Work out when to next poll this feed
nextPollTsSec := now + minPollingIntervalSeconds
if s.Feeds[feedURL].PollIntervalMins > int(minPollingIntervalSeconds/60) {
nextPollTsSec = now + int64(s.Feeds[feedURL].PollIntervalMins*60)
}
// TODO: Handle the 'sy' Syndication extension to control update interval.
// See http://www.feedforall.com/syndication.htm and http://web.resource.org/rss/1.0/modules/syndication/
// Work out which GUIDs to remember. We don't want to remember every GUID ever as that leads to completely
// unbounded growth of data.
f := s.Feeds[feedURL]
// Some RSS feeds can return a very small number of items then bounce
// back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
// forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
// seen from this feed, up to a max of 10,000.
maxGuids := 2 * len(feed.Items)
if len(f.RecentGUIDs) > maxGuids {
maxGuids = len(f.RecentGUIDs) // already 2x'd.
}
if maxGuids > 10000 {
maxGuids = 10000
}
lastSet := uniqueStrings(f.RecentGUIDs) // e.g. [4,5,6]
thisSet := uniqueGuids(feed.Items) // e.g. [1,2,3]
guids := append(thisSet, lastSet...) // e.g. [1,2,3,4,5,6]
guids = uniqueStrings(guids)
if len(guids) > maxGuids {
// Critically this favours the NEWEST elements, which are the ones we're most likely to see again.
guids = guids[0:maxGuids]
}
// Update the service config to persist the new times
f.NextPollTimestampSecs = nextPollTsSec
f.FeedUpdatedTimestampSecs = now
f.RecentGUIDs = guids
f.IsFailing = false
s.Feeds[feedURL] = f
return feed, items, nil
}
func (s *Service) newItems(feedURL string, allItems []*gofeed.Item) (items []gofeed.Item) {
for _, i := range allItems {
if i == nil {
continue
}
// if we've seen this guid before, we've sent it before
seenBefore := false
for _, guid := range s.Feeds[feedURL].RecentGUIDs {
if guid == i.GUID {
seenBefore = true
break
}
}
if seenBefore {
continue
}
// Decode HTML for <title> and <description>:
// The RSS 2.0 Spec http://cyber.harvard.edu/rss/rss.html#hrelementsOfLtitemgt supports a bunch
// of weird ways to put HTML into <title> and <description> tags. Not all RSS feed producers run
// these fields through entity encoders (some have ' unencoded, others have it as ’). We'll
// assume that all RSS fields are sending HTML for these fields and run them through a standard decoder.
// This will inevitably break for some people, but that group of people are probably smaller, so *shrug*.
i.Title = html.UnescapeString(i.Title)
i.Description = html.UnescapeString(i.Description)
items = append(items, *i)
}
return
}
func (s *Service) sendToRooms(cli *gomatrix.Client, feedURL string, feed *gofeed.Feed, item gofeed.Item) error {
logger := log.WithFields(log.Fields{
"feed_url": feedURL,
"title": item.Title,
"guid": item.GUID,
})
logger.Info("Sending new feed item")
for _, roomID := range s.Feeds[feedURL].Rooms {
if _, err := cli.SendMessageEvent(roomID, "m.room.message", itemToHTML(feed, item)); err != nil {
logger.WithError(err).WithField("room_id", roomID).Error("Failed to send to room")
}
}
return nil
}
// SomeOne posted a new article: Title Of The Entry ( https://someurl.com/blag )
func itemToHTML(feed *gofeed.Feed, item gofeed.Item) gomatrix.HTMLMessage {
return gomatrix.GetHTMLMessage("m.notice", fmt.Sprintf(
"<i>%s</i> posted a new article: %s ( %s )",
html.EscapeString(feed.Title), html.EscapeString(item.Title), html.EscapeString(item.Link),
))
}
func ensureItemsHaveGUIDs(feed *gofeed.Feed) {
for idx := 0; idx < len(feed.Items); idx++ {
itm := feed.Items[idx]
if itm.GUID == "" {
if itm.Link != "" {
itm.GUID = itm.Link
} else if itm.Title != "" {
itm.GUID = itm.Title
}
feed.Items[idx] = itm
}
}
}
// uniqueStrings returns a new slice of strings with duplicate elements removed.
// Order is otherwise preserved.
func uniqueStrings(a []string) []string {
ret := []string{}
seen := make(map[string]bool)
for _, str := range a {
if seen[str] {
continue
}
seen[str] = true
ret = append(ret, str)
}
return ret
}
// uniqueGuids returns a new slice of GUID strings with duplicate elements removed.
// Order is otherwise preserved.
func uniqueGuids(a []*gofeed.Item) []string {
ret := []string{}
seen := make(map[string]bool)
for _, item := range a {
if seen[item.GUID] {
continue
}
seen[item.GUID] = true
ret = append(ret, item.GUID)
}
return ret
}
type userAgentRoundTripper struct {
Transport http.RoundTripper
}
func (rt userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("User-Agent", "Go-NEB")
return rt.Transport.RoundTrip(req)
}
func readFeed(feedURL string) (*gofeed.Feed, error) {
// Don't use fp.ParseURL because it leaks on non-2xx responses as of 2016/11/29 (cac19c6c27)
fp := gofeed.NewParser()
resp, err := cachingClient.Get(feedURL)
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, gofeed.HTTPError{
StatusCode: resp.StatusCode,
Status: resp.Status,
}
}
return fp.Parse(resp.Body)
}
func init() {
lruCache := lrucache.New(1024*1024*20, 0) // 20 MB cache, no max-age
cachingClient = &http.Client{
Transport: userAgentRoundTripper{httpcache.NewTransport(lruCache)},
}
types.RegisterService(func(serviceID, serviceUserID, webhookEndpointURL string) types.Service {
r := &Service{
DefaultService: types.NewDefaultService(serviceID, serviceUserID, ServiceType),
}
return r
})
prometheus.MustRegister(pollCounter)
}