-
Notifications
You must be signed in to change notification settings - Fork 1
/
twitter.go
132 lines (109 loc) · 2.83 KB
/
twitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"encoding/json"
"net/url"
"regexp"
"time"
"github.com/chimeracoder/anaconda"
)
type tweetStream struct {
Hashtags []string
Users []string
}
func newTwitterApi(conf *Config) *anaconda.TwitterApi {
anaconda.SetConsumerKey(conf.GetConfigString("twitter.login.consumer_key"))
anaconda.SetConsumerSecret(conf.GetConfigString("twitter.login.consumer_secret"))
api := anaconda.NewTwitterApi(conf.GetConfigString("twitter.login.access_token_key"),
conf.GetConfigString("twitter.login.access_token_secret"))
return api
}
func newTwitterStream(conf *Config, api *anaconda.TwitterApi) *anaconda.Stream {
track, follow := conf.GetTweetFilter()
params := url.Values{}
params.Set("track", track)
params.Set("follow", follow)
stream := api.PublicStreamFilter(params)
return stream
}
func convertTweet(t anaconda.Tweet) (Tweet, error) {
var tweet Tweet
jsonTweet, err := json.Marshal(t)
if err != nil {
return Tweet{}, err
}
err = json.Unmarshal(jsonTweet, &tweet)
if err != nil {
return Tweet{}, err
}
return tweet, nil
}
func processStream(conf *Config, api *anaconda.TwitterApi, stream *anaconda.Stream, sseEventStream chan<- interface{}) {
regex, rerr := regexp.Compile("https?://")
for message := range stream.C {
if t, ok := message.(anaconda.Tweet); ok {
if rerr == nil && regex.MatchString(t.Text) {
// Tweet might contain an image
// we need to hydrate the tweet first
go hydrateTweet(conf, api, sseEventStream, t.Id)
continue
}
tweet, err := convertTweet(t)
if err != nil {
// log here
continue
}
conf.StoreTweet(tweet)
sseEventStream <- tweet
}
}
}
func hydrateTweet(conf *Config, api *anaconda.TwitterApi, sseEventStream chan<- interface{}, tweetId int64) {
t, err := api.GetTweet(tweetId, nil)
if err != nil {
// log here
return
}
tweet, err := convertTweet(t)
if err != nil {
// log here
return
}
conf.StoreTweet(tweet)
sseEventStream <- tweet
}
func twitterConnect(conf *Config) *anaconda.TwitterApi {
api := newTwitterApi(conf)
if conf.IsDebugging() {
api.SetLogger(anaconda.BasicLogger)
}
return api
}
func twitterListen(api *anaconda.TwitterApi, conf *Config, sseEventStream chan<- interface{}) {
for {
stream := newTwitterStream(conf, api)
processStream(conf, api, stream, sseEventStream)
//stream closed, need to wait & restart it
<-time.After(60 * time.Second)
}
}
func loadRecentTweets(api *anaconda.TwitterApi, conf *Config) error {
val := url.Values{
"count": []string{"100"},
"include_entities": []string{"true"},
}
for _, tsi := range conf.StreamInfo {
tweets, err := api.GetSearch(tsi.GetTweetFilter(), val)
if err != nil {
return err
}
for _, t := range tweets.Statuses {
tweet, err := convertTweet(t)
if err != nil {
// log here
continue
}
tsi.Add(&tweet)
}
}
return nil
}