-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_answers.py
53 lines (43 loc) · 1.75 KB
/
stream_answers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import json
import time
import requests
import collect_followers
import sql_api
import twitter_api
def stream(tweet_id):
""" Monitor answers to a tweet. """
# Make connection to SQL databse.
db = sql_api.DB()
api = twitter_api.API()
rules = api.get_rules()
api.delete_all_rules(rules)
api.set_rules(tweet_id)
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream?expansions=referenced_tweets.id,author_id", auth=api.bearer_oauth, stream=True,
)
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
try:
twitter_username = json_response['includes']['users'][0]['username']
mastodon_username = collect_followers.extract_mastodon_handle(json_response['includes']['tweets'][0]['text'])
# Add Mastodon username to db.
columns = '"t_username", "m_username"'
values = [f'"{twitter_username}"', f'"{mastodon_username}"']
sql = f'INSERT OR REPLACE INTO usernames ({columns}) VALUES ({", ".join(values)})'
db.commit(sql)
# Add reply to queue
id = str(json_response['data']['id'])
columns = '"t_username", "tweet_id", "timestamp"'
values = [f'"{twitter_username}"', f'"{id}"', str(int(time.time()))]
sql = f'INSERT OR REPLACE INTO queue ({columns}) VALUES ({", ".join(values)})'
db.commit(sql)
except KeyError:
pass
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
tweet_id = sys.argv[1]
else:
tweet_id = input('Tweet ID: ')
stream(tweet_id)