-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweetclient.py
178 lines (136 loc) · 6.76 KB
/
tweetclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from __future__ import annotations
import os
import random
import InstaTweet
from tweepy import OAuth1UserHandler, API, Media, TweepyException
from typing import Union, Optional
from . import InstaPost
class TweetClient:
MAX_HASHTAGS = 5
DEFAULT_KEYS = {
'Consumer Key': 'string',
'Consumer Secret': 'string',
'Access Token': 'string',
'Token Secret': 'string'
}
def __init__(self, profile: InstaTweet.Profile, proxies: dict = None):
"""Initialize TweetClient using a :class:`~.Profile`
Basically just a wrapper for tweepy.
It uses the settings of a profile to initialize the API and send tweets
:param profile: the profile to use when initializing a :class:`tweepy.API` object
:param proxies: optional proxies to use when making API requests
"""
self.profile = profile
self.proxies = proxies
self.api = self.get_api()
def get_api(self) -> API:
"""Initializes a :class:`~.tweepy.API` object using the API keys of the loaded :class:`~.Profile`"""
return API(
auth=self.get_oauth(self.profile.twitter_keys),
user_agent=self.profile.user_agent,
proxy=self.proxies
)
@staticmethod
def get_oauth(api_keys: dict) -> OAuth1UserHandler:
"""Initializes and returns an :class:`~.OAuth1UserHandler` object from tweepy using the specified API keys
:param api_keys: Twitter developer API keys with v1.1 endpoint access
"""
if missing_keys := [key for key in TweetClient.DEFAULT_KEYS if key not in api_keys]:
raise KeyError(
f"Missing the following Twitter Keys: {missing_keys}"
)
if bad_keys := [key for key in TweetClient.DEFAULT_KEYS if not api_keys[key] or api_keys[key] == 'string']:
raise ValueError(
f"Invalid values for the following Twitter keys: {bad_keys}"
)
return OAuth1UserHandler(
consumer_key=api_keys['Consumer Key'],
consumer_secret=api_keys['Consumer Secret'],
access_token=api_keys['Access Token'],
access_token_secret=api_keys['Token Secret']
)
def send_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> bool:
"""Composes and sends a Tweet using an already-downloaded Instagram post
:param post: the post to tweet
:param hashtags: a list of hashtags to randomly chose from and include in the tweet
.. admonition:: How Tweets are Composed and Sent
:class: instatweet
The :attr:`.InstaPost.filepath` -- set by :meth:`~.download_post` -- is used as the media source
The body of the tweet is then generated by :meth:`~build_tweet` as folows:
- The :attr:`InstaPost.caption` is used as a starting point
- If you've :meth:`~.add_hashtags` for the user, will randomly :meth:`~pick_hashtags` to include
- Lastly, the :attr:`InstaPost.permalink` is added to the end
"""
if not post.is_downloaded:
raise FileNotFoundError('Post must be downloaded first')
media_ids = self.upload_media(post)
if not isinstance(media_ids, list):
return False
try:
tweet = self.api.update_status(
status=self.build_tweet(post, hashtags),
media_ids=media_ids,
)
except TweepyException as e:
print('Failed to send tweet for {}:\nResponse: {}'.format(post, e))
return False
print(f'Sent tweet for {post}')
return post.add_tweet_data(tweet)
def upload_media(self, post: InstaPost) -> Union[list, bool]:
"""Uploads the media from an already-downloaded Instagram post to Twitter
.. note:: If the post is a carousel, only the first 4 photos/videos will be uploaded
:param post: the Instagram post to use as the media source
:return: the list of uploaded media ids (if API upload was successful) or ``False``
"""
if not post.is_downloaded:
raise FileNotFoundError('Post must be downloaded first')
content = post.children[:4] if post.is_carousel else [post]
media_ids = []
for media in content:
media_upload = self.api.media_upload(
filename=media.filepath,
media_category='TWEET_VIDEO' if media.is_video else 'TWEET_IMAGE',
wait_for_async_finalize=True,
chunked=True)
if hasattr(media_upload, 'processing_info'):
if media_upload.processing_info['state'] != 'succeeded':
print(f'Failed to upload media to Twitter for {media}')
return False
media_ids.append(str(media_upload.media_id))
print(f'Successfully uploaded media to Twitter for {post}')
return media_ids
def build_tweet(self, post: InstaPost, hashtags: Optional[list[str]] = None) -> str:
"""Uses an :class:`~.InstaPost` to build the body text of a tweet
:param post: the post that's being tweeted; the caption and link are used
:param hashtags: optional list of hashtags to randomly pick from and include
:return: the text to use for the tweet
"""
tags = self.pick_hashtags(hashtags)
caption = post.caption.strip().replace('@', '@/') # Avoid tagging randos on Twitter
characters = 280 - len(tags) - len(post.permalink) - 2
tweet = "{text}\n{hashtags}\n{link}".format(
text=caption[:characters],
hashtags=tags,
link=post.permalink
)
return tweet
@staticmethod
def pick_hashtags(hashtags: list[str]) -> str:
"""Randomly picks hashtags from the provided list and returns them as a single string
The number of hashtags chosen will either be 1 less than the length of the list (to avoid using the same tags
in every tweet), or the value of :attr:`~.MAX_HASHTAGS`, whichever is smaller
:param hashtags: a list of hashtags to randomly choose from
:Example:
::
from InstaTweet import TweetClient
>> TweetClient.pick_hashtags(['cat','dog','woof'])
"#woof #cat\\n"
.. note:: A newline is added to help with formatting & character counting in :meth:`~.build_tweet`
"""
if not hashtags:
return ''
if not isinstance(hashtags, list):
raise TypeError('Provide a list of hashtags')
num_hashtags = min(len(hashtags) - 1, TweetClient.MAX_HASHTAGS) # Pick at most MAX_HASHTAGS
random_hashtags = random.sample(hashtags, max(1, num_hashtags)) # Pick at least 1
return ' '.join(f'#{hashtag}' for hashtag in random_hashtags) + '\n'