# coding=utf-8
"""Twitter source class.
Uses the v1.1 REST API:
TODO: collections for twitter accounts; use as activity target?
The Audience Targeting 'to' field is set to @public or @private based on whether
the tweet author's 'protected' field is true or false.
from __future__ import absolute_import, unicode_literals
from future import standard_library
from future.moves.urllib import error as urllib_error
from builtins import range, str, zip
import collections
import datetime
import http.client
import itertools
import logging
import mimetypes
import re
import socket
import urllib.parse, urllib.request
from . import appengine_config
from bs4 import BeautifulSoup
import brevity
from . import source
from oauth_dropins import twitter_auth
from oauth_dropins.webutil import util
API_BLOCK_IDS = 'blocks/ids.json?count=5000&stringify_ids=true&cursor=%s'
API_BLOCKS = 'blocks/list.json?skip_status=true&count=5000&cursor=%s'
API_CURRENT_USER = 'account/verify_credentials.json'
API_DELETE_TWEET = 'statuses/destroy.json'
API_DELETE_FAVORITE = 'favorites/destroy.json'
API_FAVORITES = 'favorites/list.json?screen_name=%s&include_entities=true&tweet_mode=extended'
API_LIST_TIMELINE = 'lists/statuses.json?include_entities=true&tweet_mode=extended&count=%(count)d&slug=%(slug)s&owner_screen_name=%(owner_screen_name)s'
API_LOOKUP = 'statuses/lookup.json?id=%s&include_entities=true&tweet_mode=extended'
API_POST_FAVORITE = 'favorites/create.json'
API_POST_MEDIA = 'statuses/update_with_media.json'
API_POST_RETWEET = 'statuses/retweet/%s.json'
API_POST_TWEET = 'statuses/update.json'
API_RETWEETS = 'statuses/retweets.json?id=%s&tweet_mode=extended'
API_SEARCH = 'search/tweets.json?q=%(q)s&include_entities=true&tweet_mode=extended&result_type=recent&count=%(count)d'
API_STATUS = 'statuses/show.json?id=%s&include_entities=true&tweet_mode=extended'
API_TIMELINE = 'statuses/home_timeline.json?include_entities=true&tweet_mode=extended&count=%d'
API_USER = 'users/show.json?screen_name=%s'
API_USER_TIMELINE = 'statuses/user_timeline.json?include_entities=true&tweet_mode=extended&count=%(count)d&screen_name=%(screen_name)s'
TWEET_URL_RE = re.compile(r'https://twitter\.com/[^/?]+/status(es)?/[^/?]+$')
# Don't hit the RETWEETS endpoint more than this many times per
# get_activities() call.
# TODO: sigh. figure out a better way. dammit twitter, give me a batch API!!!
# Number of IDs to search for at a time
# For read requests only.
# Config constants, as of 2017-11-08:
# * Current max tweet length and expected length of a URL.
# * Max media per tweet.
# * Allowed image formats:
# * Allowed video formats, max video size, and upload chunk size:
# * Max alt text length.
# Update by running help/configuration.json manually in
# TODO: pull these from /help/configuration.json instead (except max tweet length)
IMAGE_MIME_TYPES = frozenset(('image/jpg', 'image/jpeg', 'image/png',
'image/gif', 'image/webp',))
VIDEO_MIME_TYPES = frozenset(('video/mp4',))
MB = 1024 * 1024
# username requirements and limits:
MENTION_RE = re.compile(r'(^|[^\w@/\!?=&])@(\w{1,15})\b', re.UNICODE)
# hashtag requirements and limits:
HASHTAG_RE = re.compile(r'(^|\s)[##](\w+)\b', re.UNICODE)
class OffsetTzinfo(datetime.tzinfo):
"""A simple, DST-unaware tzinfo from given utc offset in seconds.
def __init__(self, utc_offset=0):
utc_offset: Offset of time zone from UTC in seconds
self._offset = datetime.timedelta(seconds=utc_offset)
def utcoffset(self, dt):
return self._offset
def dst(self, dt):
return datetime.timedelta(0)
class Twitter(source.Source):
"""Twitter source class. See file docstring and Source class for details."""
NAME = 'Twitter'
FRONT_PAGE_TEMPLATE = 'templates/twitter_index.html'
POST_ID_RE = re.compile('^[0-9]+$')
# HTML snippet for embedding a tweet.
<script async defer src="//" charset="utf-8"></script>
<br />
<blockquote class="twitter-tweet" lang="en" data-dnt="true">
<a href="%(url)s">#</a></p>
URL_CANONICALIZER = util.UrlCanonicalizer(
def __init__(self, access_token_key, access_token_secret, username=None):
Twitter now requires authentication in v1.1 of their API. You can get an
OAuth access token by creating an app here:
access_token_key: string, OAuth access token key
access_token_secret: string, OAuth access token secret
username: string, optional, the current user. Used in e.g. preview/create.
self.access_token_key = access_token_key
self.access_token_secret = access_token_secret
self.username = username
def get_actor(self, screen_name=None):
"""Returns a user as a JSON ActivityStreams actor dict.
screen_name: string username. Defaults to the current user.
if screen_name is None:
url = API_USER % screen_name
return self.user_to_actor(self.urlopen(url))
def get_activities_response(self, user_id=None, group_id=None, app_id=None,
activity_id=None, start_index=0, count=0,
etag=None, min_id=None, cache=None,
fetch_replies=False, fetch_likes=False,
fetch_shares=False, fetch_events=False,
fetch_mentions=False, search_query=None, **kwargs):
"""Fetches posts and converts them to ActivityStreams activities.
XXX HACK: this is currently hacked for bridgy to NOT pass min_id to the
request for fetching activity tweets themselves, but to pass it to all of
the requests for filling in replies, retweets, etc. That's because we want
to find new replies and retweets of older initial tweets.
TODO: find a better way.
See :meth:`source.Source.get_activities_response()` for details. app_id is
ignored. min_id is translated to Twitter's since_id.
The code for handling ETags (and 304 Not Changed responses and setting
If-None-Match) is here, but unused right now since Twitter evidently doesn't
support ETags. From :
"I've confirmed with our team that we're not explicitly supporting this
family of features."
Likes (ie favorites) are scraped from HTML, since Twitter's REST
API doesn't offer a way to fetch them. You can also get them from the
Streaming API, though, and convert them with streaming_event_to_object().
Shares (ie retweets) are fetched with a separate API call per tweet:
However, retweets are only fetched for the first 15 tweets that have them,
since that's Twitter's rate limit per 15 minute window. :(
Quote tweets are fetched by searching for the possibly quoted tweet's ID,
using the OR operator to search up to 5 IDs at a time, and then checking
the quoted_status_id_str field
Use the group_id @self to retrieve a user_id’s timeline. If user_id is None
or @me, it will return tweets for the current API user.
group_id can be used to specify the slug of a list for which to return tweets.
By default the current API user’s lists will be used, but lists owned by other
users can be fetched by explicitly passing a username to user_id, e.g. to
fetch tweets from the list @exampleuser/example-list you would call
get_activities(user_id='exampleuser', group_id='example-list').
Twitter replies default to including a mention of the user they're replying
to, which overloads mentions a bit. When fetch_mentions is True, we determine
that a tweet mentions the current user if it @-mentions their username and:
* it's not a reply, OR
* it's a reply, but not to the current user, AND
* the tweet it's replying to doesn't @-mention the current user
if group_id is None:
group_id = source.FRIENDS
if user_id and user_id.startswith('@'):
user_id = user_id[1:]
# nested function for lazily fetching the user object if we need it
user = []
def _user():
if not user:
user.append(self.urlopen(API_USER % user_id if user_id else API_CURRENT_USER))
return user[0]
if count:
count += start_index
activities = []
if activity_id:
tweets = [self.urlopen(API_STATUS % activity_id)]
total_count = len(tweets)
if group_id == source.SELF:
if user_id in (None, source.ME):
user_id = ''
'count': count,
'screen_name': user_id,
if fetch_likes:
liked = self.urlopen(API_FAVORITES % user_id)
if liked:
activities += [self._make_like(tweet, _user()) for tweet in liked]
elif group_id == source.SEARCH:
if not search_query:
raise ValueError('search requires search_query parameter')
url = API_SEARCH % {
'q': urllib.parse.quote_plus(search_query.encode('utf-8')),
'count': count,
elif group_id in (source.FRIENDS, source.ALL):
url = API_TIMELINE % (count)
if not user_id:
user_id = _user().get('screen_name')
'count': count,
'slug': group_id,
'owner_screen_name': user_id,
headers = {'If-None-Match': etag} if etag else {}
total_count = None
resp = self.urlopen(url, headers=headers, parse_response=False)
etag ='ETag')
tweet_obj = source.load_json(, url)
if group_id == source.SEARCH:
tweet_obj = tweet_obj.get('statuses', [])
tweets = tweet_obj[start_index:]
except urllib_error.HTTPError as e:
if e.code == 304: # Not Modified, from a matching ETag
tweets = []
# batch get memcached counts of favorites and retweets for all tweets
cached = {}
if cache is not None:
keys = itertools.product(('ATR', 'ATF'), [t['id_str'] for t in tweets])
cached = cache.get_multi('%s %s' % (prefix, id) for prefix, id in keys)
# only update the cache at the end, in case we hit an error before then
cache_updates = {}
if fetch_shares:
retweet_calls = 0
for tweet in tweets:
# don't fetch retweets if the tweet is itself a retweet or if the
# author's account is protected. /statuses/retweets 403s with error
# code 200 (?!) for protected accounts.
if tweet.get('retweeted') or tweet.get('user', {}).get('protected'):
elif retweet_calls >= RETWEET_LIMIT:
logging.warning("Hit Twitter's retweet rate limit (%d) with more to "
"fetch! Results will be incomplete!" % RETWEET_LIMIT)
# store retweets in the 'retweets' field, which is handled by
# tweet_to_activity().
# TODO: make these HTTP requests asynchronous. not easy since we don't
# (yet) require threading support or use a non-blocking HTTP library.
# twitter limits this API endpoint to one call per minute per user,
# which is easy to hit, so we stop before we hit that.
# can't use the statuses/retweets_of_me endpoint because it only
# returns the original tweets, not the retweets or their authors.
id = tweet['id_str']
count = tweet.get('retweet_count')
if count and count != cached.get('ATR ' + id):
url = API_RETWEETS % id
if min_id is not None:
url = util.add_query_params(url, {'since_id': min_id})
tweet['retweets'] = self.urlopen(url)
except urllib_error.URLError as e:
code, _ = util.interpret_http_exception(e)
if code != '404': # 404 means the original tweet was deleted
retweet_calls += 1
cache_updates['ATR ' + id] = count
tweet_activities = [self.tweet_to_activity(t) for t in tweets]
if fetch_replies:
self.fetch_replies(tweet_activities, min_id=min_id)
if fetch_mentions:
# fetch mentions *after* replies so that we don't get replies to mentions
mentions = self.fetch_mentions(_user().get('screen_name'), tweets,
tweet_activities += [self.tweet_to_activity(m) for m in mentions]
if fetch_likes:
for tweet, activity in zip(tweets, tweet_activities):
id = tweet['id_str']
count = tweet.get('favorite_count')
if self.is_public(activity) and count and count != cached.get('ATF ' + id):
resp = util.urlopen(url).read()
html = source.load_json(resp, url).get('htmlUsers', '')
except urllib_error.URLError as e:
util.interpret_http_exception(e) # just log it
likes = self.favorites_html_to_likes(tweet, html)
activity['object'].setdefault('tags', []).extend(likes)
cache_updates['ATF ' + id] = count
activities += tweet_activities
response = self.make_activities_base_response(activities)
response.update({'total_count': total_count, 'etag': etag})
if cache_updates and cache is not None:
return response
def fetch_replies(self, activities, min_id=None):
"""Fetches and injects Twitter replies into a list of activities, in place.
Includes indirect replies ie reply chains, not just direct replies. Searches
for @-mentions, matches them to the original tweets with
in_reply_to_status_id_str, and recurses until it's walked the entire tree.
activities: list of activity dicts
same activities list
# cache searches for @-mentions for individual users. maps username to dict
# mapping tweet id to ActivityStreams reply object dict.
mentions = {}
# find replies
for activity in activities:
# list of ActivityStreams reply object dict and set of seen activity ids
# (tag URIs). seed with the original tweet; we'll filter it out later.
replies = [activity]
_, id = util.parse_tag_uri(activity['id'])
seen_ids = set([id])
for reply in replies:
# get mentions of this tweet's author so we can search them for replies to
# this tweet. can't use statuses/mentions_timeline because i'd need to
# auth as the user being mentioned.
# note that these HTTP requests are synchronous. you can make async
# requests by using urlfetch.fetch() directly, but not with urllib2.
author = reply['actor']['username']
if author not in mentions:
url = API_SEARCH % {
'q': urllib.parse.quote_plus('@' + author),
'count': 100,
if min_id is not None:
url = util.add_query_params(url, {'since_id': min_id})
mentions[author] = self.urlopen(url)['statuses']
# look for replies. add any we find to the end of replies. this makes us
# recursively follow reply chains to their end. (python supports
# appending to a sequence while you're iterating over it.)
for mention in mentions[author]:
id = mention['id_str']
if (mention.get('in_reply_to_status_id_str') in seen_ids and
id not in seen_ids):
items = [r['object'] for r in replies[1:]] # filter out seed activity
activity['object']['replies'] = {
'items': items,
'totalItems': len(items),
def fetch_mentions(self, username, tweets, min_id=None):
"""Fetches a user's @-mentions and returns them as ActivityStreams.
Tries to only include explicit mentions, not mentions automatically created
by @-replying. See the :meth:`get_activities()` docstring for details.
username: string
tweets: list of Twitter API objects. used to find quote tweets quoting them.
min_id: only return activities with ids greater than this
list of activity dicts
# get @-name mentions
url = API_SEARCH % {
'q': urllib.parse.quote_plus('@' + username),
'count': 100,
if min_id is not None:
url = util.add_query_params(url, {'since_id': min_id})
candidates = self.urlopen(url)['statuses']
# fetch in-reply-to tweets (if any)
in_reply_to_ids = util.trim_nulls(
[c.get('in_reply_to_status_id_str') for c in candidates])
origs = {
o.get('id_str'): o for o in
self.urlopen(API_LOOKUP % ','.join(in_reply_to_ids))
} if in_reply_to_ids else {}
# filter out tweets that we don't consider mentions
mentions = []
for c in candidates:
if (c.get('user', {}).get('screen_name') == username or
reply_to = origs.get(c.get('in_reply_to_status_id_str'))
if not reply_to:
reply_to_user = reply_to.get('user', {}).get('screen_name')
mentioned = [u.get('screen_name') for u in
reply_to.get('entities', {}).get('user_mentions', [])]
if username != reply_to_user and username not in mentioned:
# search for quote tweets
# Guideline ("Limit your searches to 10 keywords and operators.")
# implies fewer, but 20 IDs seems to work in practice.
for batch in [
for i in range(0, len(tweets), QUOTE_SEARCH_BATCH_SIZE)
batch_ids = [t['id_str'] for t in batch]
url = API_SEARCH % {
'q': urllib.parse.quote_plus(' OR '.join(batch_ids)),
'count': 100,
if min_id is not None:
url = util.add_query_params(url, {'since_id': min_id})
candidates = self.urlopen(url)['statuses']
for c in candidates:
quoted_status_id = c.get('quoted_status_id_str')
if (quoted_status_id and quoted_status_id in batch_ids and
not c.get('retweeted_status')):
return mentions
def get_comment(self, comment_id, activity_id=None, activity_author_id=None,
"""Returns an ActivityStreams comment object.
comment_id: string comment id
activity_id: string activity id, optional
activity_author_id: string activity author id. Ignored.
activity: activity object, optional
url = API_STATUS % comment_id
return self.tweet_to_object(self.urlopen(url))
def get_share(self, activity_user_id, activity_id, share_id, activity=None):
"""Returns an ActivityStreams 'share' activity object.
activity_user_id: string id of the user who posted the original activity
activity_id: string activity id
share_id: string id of the share object
activity: activity object, optional
url = API_STATUS % share_id
return self.retweet_to_object(self.urlopen(url))
def get_blocklist(self):
"""Returns the current user's block list.
May make multiple API calls, using cursors, to fully fetch large blocklists.
Block lists may have up to 10k users, but each API call only returns 100 at
most, and the API endpoint is rate limited to 15 calls per user per 15m. So
if a user has >1500 users on their block list, we can't get the whole thing
at once. :(
sequence of actor objects
:class:`source.RateLimited` if we hit the rate limit. The partial
attribute will have the list of user ids we fetched before hitting the
return self._get_blocklist_fn(API_BLOCKS,
lambda resp: (self.user_to_actor(user) for user in resp.get('users', [])))
def get_blocklist_ids(self):
"""Returns the current user's block list as a list of Twitter user ids.
May make multiple API calls, using cursors, to fully fetch large blocklists.
Subject to the same rate limiting as get_blocklist(), but each API call
returns ~4k ids, so realistically this can actually fetch blocklists of up
to 75k users at once. Beware though, many Twitter users have even more!
sequence of string Twitter user ids
:class:`source.RateLimited` if we hit the rate limit. The partial
attribute will have the list of user ids we fetched before hitting the
return self._get_blocklist_fn(API_BLOCK_IDS, lambda resp: resp.get('ids', []))
def _get_blocklist_fn(self, api_endpoint, response_fn):
values = []
cursor = '-1'
while cursor and cursor != '0':
resp = self.urlopen(api_endpoint % cursor)
except urllib_error.HTTPError as e:
raise source.RateLimited(str(e), partial=values)
cursor = resp.get('next_cursor_str')
return values
def create(self, obj, include_link=source.OMIT_LINK,
"""Creates a tweet, reply tweet, retweet, or favorite.
obj: ActivityStreams object
include_link: string
ignore_formatting: boolean
a CreationResult whose content will be a dict with 'id', 'url',
and 'type' keys (all optional) for the newly created Twitter
object (or None)
return self._create(obj, preview=False, include_link=include_link,
def preview_create(self, obj, include_link=source.OMIT_LINK,
"""Previews creating a tweet, reply tweet, retweet, or favorite.
obj: ActivityStreams object
include_link: string
ignore_formatting: boolean
a CreationResult whose content will be a unicode string HTML
snippet (or None)
return self._create(obj, preview=True, include_link=include_link,
def _create(self, obj, preview=None, include_link=source.OMIT_LINK,
"""Creates or previews creating a tweet, reply tweet, retweet, or favorite.
obj: ActivityStreams object
preview: boolean
include_link: string
ignore_formatting: boolean
a CreationResult
If preview is True, the content will be a unicode string HTML
snippet. If False, it will be a dict with 'id' and 'url' keys
for the newly created Twitter object.
assert preview in (False, True)
type = obj.get('objectType')
verb = obj.get('verb')
base_obj = self.base_object(obj)
base_id = base_obj.get('id')
base_url = base_obj.get('url')
is_reply = type == 'comment' or 'inReplyTo' in obj
is_rsvp = (verb and verb.startswith('rsvp-')) or verb == 'invite'
images = util.get_list(obj, 'image')
video_url = util.get_first(obj, 'stream', {}).get('url')
has_media = (images or video_url) and (type in ('note', 'article') or is_reply)
lat = obj.get('location', {}).get('latitude')
lng = obj.get('location', {}).get('longitude')
# prefer displayName over content for articles
type = obj.get('objectType')
prefer_content = type == 'note' or (base_url and (type == 'comment'
or obj.get('inReplyTo')))
preview_description = ''
quote_tweet_url = None
for att in obj.get('attachments', []):
url = self.URL_CANONICALIZER(att.get('url', ''))
if url and TWEET_URL_RE.match(url):
quote_tweet_url = url
preview_description += """\
<span class="verb">quote</span>
<a href="%s">this tweet</a>:<br>
<br>and """ % (url, self.embed_post(att))
content = self._content_for_create(
obj, ignore_formatting=ignore_formatting, prefer_name=not prefer_content,
strip_first_video_tag=bool(video_url), strip_quotations=bool(quote_tweet_url))
if not content:
if type == 'activity' and not is_rsvp:
content = verb
elif has_media:
content = ''
return source.creation_result(
abort=False, # keep looking for things to publish,
error_plain='No content text found.',
error_html='No content text found.')
if is_reply and base_url:
# Twitter *used* to require replies to include an @-mention of the
# original tweet's author
# ...but now we use the auto_populate_reply_metadata query param instead:
# the embed URL in the preview can't start with mobile. or www., so just
# hard-code it to index #1 is netloc.
parsed = urllib.parse.urlparse(base_url)
parts = parsed.path.split('/')
if len(parts) < 2 or not parts[1]:
raise ValueError('Could not determine author of in-reply-to URL %s' % base_url)
reply_to_prefix = '@%s ' % parts[1].lower()
if content.lower().startswith(reply_to_prefix):
content = content[len(reply_to_prefix):]
parsed = list(parsed)
parsed[1] = self.DOMAIN
base_url = urllib.parse.urlunparse(parsed)
# need a base_url with the tweet id for the embed HTML below. do this
# *after* checking the real base_url for in-reply-to author username.
if base_id and not base_url:
base_url = '' + base_id
if is_reply and not base_url:
return source.creation_result(
error_plain='Could not find a tweet to reply to.',
error_html='Could not find a tweet to <a href="">reply to</a>. '
'Check that your post has an <a href="">in-reply-to</a> '
'link a Twitter URL or to an original post that publishes a '
'<a href="">rel-syndication</a> link to Twitter.')
# truncate and ellipsize content if it's over the character
# count. URLs will be, so include that when counting.
content = self._truncate(content, obj.get('url'), include_link, type,
# linkify defaults to Twitter's link shortening behavior
preview_content = util.linkify(content, pretty=True, skip_bare_cc_tlds=True)
preview_content = MENTION_RE.sub(
r'\1<a href="\2">@\2</a>', preview_content)
preview_content = HASHTAG_RE.sub(
r'\1<a href="\2">#\2</a>', preview_content)
if type == 'activity' and verb == 'like':
if not base_url:
return source.creation_result(
error_plain='Could not find a tweet to like.',
error_html='Could not find a tweet to <a href="">favorite</a>. '
'Check that your post has a like-of link to a Twitter URL or to an original post that publishes a '
'<a href="">rel-syndication</a> link to Twitter.')
if preview:
preview_description += """\
<span class="verb">favorite</span>
<a href="%s">this tweet</a>:
%s""" % (base_url, self.embed_post(base_obj))
return source.creation_result(description=preview_description)
data = urllib.parse.urlencode({'id': base_id})
self.urlopen(API_POST_FAVORITE, data=data)
resp = {'type': 'like'}
elif type == 'activity' and verb == 'share':
if not base_url:
return source.creation_result(
error_plain='Could not find a tweet to retweet.',
error_html='Could not find a tweet to <a href="">retweet</a>. '
'Check that your post has a repost-of link to a Twitter URL or to an original post that publishes a '
'<a href="">rel-syndication</a> link to Twitter.')
if preview:
preview_description += """\
<span class="verb">retweet</span>
<a href="%s">this tweet</a>:
%s""" % (base_url, self.embed_post(base_obj))
return source.creation_result(description=preview_description)
data = urllib.parse.urlencode({'id': base_id})
resp = self.urlopen(API_POST_RETWEET % base_id, data=data)
resp['type'] = 'repost'
elif type in ('note', 'article') or is_reply or is_rsvp: # a tweet
content = str(content).encode('utf-8')
data = [('status', content)]
if is_reply:
preview_description += """\
<span class="verb">@-reply</span> to <a href="%s">this tweet</a>:
%s""" % (base_url, self.embed_post(base_obj))
('in_reply_to_status_id', base_id),
('auto_populate_reply_metadata', 'true'),
preview_description += '<span class="verb">tweet</span>:'
if video_url:
preview_content += ('<br /><br /><video controls src="%s"><a href="%s">'
'this video</a></video>' % (video_url, video_url))
if not preview:
ret = self.upload_video(video_url)
if isinstance(ret, source.CreationResult):
return ret
data.append(('media_ids', ret))
elif images:
num = len(images)
if num > MAX_MEDIA:
images = images[:MAX_MEDIA]
logging.warning('Found %d photos! Only using the first %d: %r',
num, MAX_MEDIA, images)
preview_content += '<br /><br />' + ' &nbsp; '.join(
'<img src="%s" alt="%s" />' % (img.get('url'), img.get('displayName', ''))
for img in images)
if not preview:
ret = self.upload_images(images)
if isinstance(ret, source.CreationResult):
return ret
data.append(('media_ids', ','.join(ret)))
if lat and lng:
preview_content += (
'<div>at <a href=",%s">'
'%s, %s</a></div>' % (lat, lng, lat, lng))
('lat', lat),
('long', lng),
if preview:
return source.creation_result(content=preview_content,
resp = self.urlopen(API_POST_TWEET, data=urllib.parse.urlencode(sorted(data)))
resp['type'] = 'comment' if is_reply else 'post'
return source.creation_result(
error_plain='Cannot publish type=%s, verb=%s to Twitter' % (type, verb),
error_html='Cannot publish type=%s, verb=%s to Twitter' % (type, verb))
id_str = resp.get('id_str')
if id_str:
resp.update({'id': id_str, 'url': self.tweet_url(resp)})
elif 'url' not in resp:
resp['url'] = base_url
return source.creation_result(resp)
def _truncate(self, content, url, include_link, type, quote_tweet=None):
"""Shorten tweet content to fit within the character limit.
content: string
url: string
include_link: string
type: string: 'article', 'note', etc.
quote_tweet: string URL, optional. If provided,
it will be appended to the content, *after* truncating.
Return: string, the possibly shortened and ellipsized tweet text
if type == 'article':
format = brevity.FORMAT_ARTICLE
format = brevity.FORMAT_NOTE
target_length = MAX_TWEET_LENGTH
if quote_tweet:
target_length -= (TCO_LENGTH + 1)
truncated = brevity.shorten(
# permalink is included only when the text is truncated
permalink=url if include_link != source.OMIT_LINK else None,
# permashortlink is always included
permashortlink=url if include_link == source.INCLUDE_LINK else None,
target_length=target_length, link_length=TCO_LENGTH, format=format)
if quote_tweet:
truncated += ' ' + quote_tweet
return truncated
def upload_images(self, images):
"""Uploads one or more images from web URLs.
Note that files and JSON bodies in media POST API requests are *not*
included in OAuth signatures.
images: sequence of AS image objects, eg:
[{'url': 'http://picture', 'displayName': 'a thing'}, ...]
list of string media ids or :class:`CreationResult` on error
ids = []
for image in images:
url = image.get('url')
if not url:
image_resp = util.urlopen(url)
bad_type = self._check_mime_type(url, image_resp, IMAGE_MIME_TYPES,
'JPG, PNG, GIF, and WEBP images')
if bad_type:
return bad_type
headers = twitter_auth.auth_header(
API_UPLOAD_MEDIA, self.access_token_key, self.access_token_secret, 'POST')
resp = util.requests_post(API_UPLOAD_MEDIA,
files={'media': image_resp},
resp.raise_for_status()'Got: %s', resp.text)
media_id = source.load_json(resp.text, API_UPLOAD_MEDIA)['media_id_string']
alt = image.get('displayName')
if alt:
alt = util.ellipsize(alt, chars=MAX_ALT_LENGTH)
headers = twitter_auth.auth_header(
API_MEDIA_METADATA, self.access_token_key, self.access_token_secret, 'POST')
resp = util.requests_post(API_MEDIA_METADATA,
json={'media_id': media_id,'alt_text': {'text': alt}},
headers=headers)'Got: %s', resp)
return ids
def upload_video(self, url):
"""Uploads a video from web URLs using the chunked upload process.
Chunked upload consists of multiple API calls:
* command=INIT, which allocates the media id
* command=APPEND for each 5MB block, up to 15MB total
* command=FINALIZE
url: string URL of images
string media id or :class:`CreationResult` on error
video_resp = util.urlopen(url)
bad_type = self._check_mime_type(url, video_resp, VIDEO_MIME_TYPES, 'MP4 videos')
if bad_type:
return bad_type
length = video_resp.headers.get('Content-Length')
if not util.is_int(length):
msg = "Couldn't determine your video's size."
return source.creation_result(abort=True, error_plain=msg, error_html=msg)
length = int(length)
if int(length) > MAX_VIDEO_SIZE:
msg = "Your %sMB video is larger than Twitter's %dMB limit." % (
length // MB, MAX_VIDEO_SIZE // MB)
return source.creation_result(abort=True, error_plain=msg, error_html=msg)
media_id = self.urlopen(API_UPLOAD_MEDIA, data=urllib.parse.urlencode({
'command': 'INIT',
'media_type': 'video/mp4',
'total_bytes': length,
headers = twitter_auth.auth_header(
API_UPLOAD_MEDIA, self.access_token_key, self.access_token_secret, 'POST')
i = 0
while True:
chunk = util.FileLimiter(video_resp, UPLOAD_CHUNK_SIZE)
data = {
'command': 'APPEND',
'media_id': media_id,
'segment_index': i,
resp = util.requests_post(API_UPLOAD_MEDIA, data=data,
files={'media': chunk}, headers=headers)
if chunk.ateof:
i += 1
self.urlopen(API_UPLOAD_MEDIA, data=urllib.parse.urlencode({
'command': 'FINALIZE',
'media_id': media_id,
return media_id
def _check_mime_type(url, resp, allowed, label):
"""Checks that a URL is in a set of allowed MIME type(s).
url: string
resp: urlopen result object
allowed: sequence of allowed string MIME types
label: human-readable description of the allowed MIME types, to be used in
an error message
None if the url's MIME type is in the set, :class:`CreationResult`
with abort=True if it isn't
type = resp.headers.get('Content-Type')
if not type:
type, _ = mimetypes.guess_type(url)
if type and type not in allowed:
msg = 'Twitter only supports %s; %s looks like %s' % (label, url, type)
return source.creation_result(abort=True, error_plain=msg, error_html=msg)
def delete(self, id):
"""Deletes a tweet. The authenticated user must have authored it.
id: int or string, tweet id to delete
Returns: CreationResult, content is Twitter API response dict
resp = self.urlopen(API_DELETE_TWEET, data=urllib.parse.urlencode({'id': id}))
return source.creation_result(resp)
def preview_delete(self, id):
"""Previews deleting a tweet.
id: int or string, tweet id to delete
Returns: CreationResult
url = self.status_url(self.username or '_', id)
return source.creation_result(description="""\
<span class="verb">delete</span>
<a href="%s">this tweet</a>:
%s""" % (url, self.embed_post({'url': url})))
def urlopen(self, url, parse_response=True, **kwargs):
"""Wraps :func:`urllib2.urlopen()` and adds an OAuth signature."""
if not url.startswith('http'):
url = API_BASE + url
def request():
resp = twitter_auth.signed_urlopen(
url, self.access_token_key, self.access_token_secret, **kwargs)
return source.load_json(, url) if parse_response else resp
if ('data' not in kwargs and not
(isinstance(url, urllib.request.Request) and url.get_method() == 'POST')):
# this is a GET. retry up to 3x if we deadline.
for _ in range(RETRIES):
return request()
except http.client.HTTPException as e:
if not str(e).startswith('Deadline exceeded'):
except socket.timeout as e:
except urllib_error.HTTPError as e:
code, body = util.interpret_http_exception(e)
if code is None or int(code) not in (500, 501, 502):
raise'Twitter API call failed! Retrying...')
# last try. if it deadlines, let the exception bubble up.
return request()
def base_object(self, obj):
"""Returns the 'base' silo object that an object operates on.
Includes special handling for Twitter photo and video URLs, e.g.
obj: ActivityStreams object
dict, minimal ActivityStreams object. Usually has at least id and
url fields; may also have author.
base_obj = super(Twitter, self).base_object(obj)
url = base_obj.get('url')
if url:
parsed = urllib.parse.urlparse(url)
parts = parsed.path.split('/')
if len(parts) >= 3 and parts[-2] in ('photo', 'video'):
base_obj['id'] = parts[-3]
parsed = list(parsed)
parsed[2] = '/'.join(parts[:-2])
base_obj['url'] = urllib.parse.urlunparse(parsed)
except BaseException as e:
"Couldn't parse object URL %s : %s. Falling back to default logic.",
url, e)
return base_obj
def tweet_to_activity(self, tweet):
"""Converts a tweet to an activity.
tweet: dict, a decoded JSON tweet
an ActivityStreams activity dict, ready to be JSON-encoded
obj = self.tweet_to_object(tweet)
activity = {
'verb': 'post',
'published': obj.get('published'),
'id': obj.get('id'),
'url': obj.get('url'),
'actor': obj.get('author'),
'object': obj,
retweeted = tweet.get('retweeted_status')
if retweeted:
activity['verb'] = 'share'
activity['object'] = self.tweet_to_object(retweeted)
in_reply_to = obj.get('inReplyTo')
if in_reply_to:
activity['context'] = {'inReplyTo': in_reply_to}
# yes, the source field has an embedded HTML link. bleh.
parsed ='<a href="([^"]+)".*>(.+)</a>', tweet.get('source', ''))
if parsed:
url, name = parsed.groups()
activity['generator'] = {'displayName': name, 'url': url}
return self.postprocess_activity(activity)
def tweet_to_object(self, tweet):
"""Converts a tweet to an object.
tweet: dict, a decoded JSON tweet
an ActivityStreams object dict, ready to be JSON-encoded
obj = {}
# always prefer id_str over id to avoid any chance of integer overflow.
# usually shouldn't matter in Python, but still.
id = tweet.get('id_str')
if not id:
return {}
created_at = tweet.get('created_at')
published = self.rfc2822_to_iso8601(created_at)
except ValueError:
# this is probably already ISO 8601, likely from the archive export.
published = created_at
obj = {
'id': self.tag_uri(id),
'objectType': 'note',
'published': published,
'to': [],
retweeted = tweet.get('retweeted_status')
base_tweet = retweeted if retweeted else tweet
entities = self._get_entities(base_tweet)
text = util.WideUnicode(base_tweet.get('text') or '')
user = tweet.get('user')
if user:
obj['author'] = self.user_to_actor(user)
username = obj['author'].get('username')
if username:
obj['url'] = self.status_url(username, id)
protected = user.get('protected')
if protected is not None:
'objectType': 'group',
'alias': '@public' if not protected else '@private',
# media! into attachments.
media = entities.get('media', [])
if media:
types = {
'photo': 'image',
'video': 'video',
'animated_gif': 'video',
obj['attachments'] = [{
'objectType': types.get(m.get('type')),
'image': {'url': m.get('media_url_https') or m.get('media_url')},
'stream': {'url': self._video_url(m)},
} for m in media]
first = obj['attachments'][0]
if first['objectType'] == 'video':
obj['stream'] = first['stream']
obj['image'] = first['image']
# if this tweet is quoting another tweet, include it as an attachment
quoted = tweet.get('quoted_status')
quoted_url = None
if quoted:
quoted_obj = self.tweet_to_object(quoted)
obj.setdefault('attachments', []).append(quoted_obj)
quoted_url = quoted_obj.get('url')
# remove quoted tweet URL from text
url_entities = entities.get('urls', [])
for i, entity in enumerate(url_entities):
indices = entity.get('indices')
if indices and entity.get('expanded_url') == quoted_url:
text = text[:indices[0]] + text[indices[1]:]
del url_entities[i]
# tags
obj['tags'] = [
{'objectType': 'person',
'id': self.tag_uri(t.get('screen_name')),
'url': self.user_url(t.get('screen_name')),
'displayName': t.get('name'),
'indices': t.get('indices')
} for t in entities.get('user_mentions', [])
] + [
{'objectType': 'hashtag',
'url': '' + t.get('text'),
'indices': t.get('indices'),
} for t in entities.get('hashtags', [])
] + [
{'objectType': 'article',
'url': t.get('expanded_url'),
'displayName': t.get('display_url'),
'indices': t.get('indices'),
} for t in entities.get('urls', [])
# media. these are only temporary, to get rid of the image links. the tag
# elements are removed farther down below.
# when there are multiple, twitter usually (always?) only adds a single
# media link to the end of the tweet text, and all of the media objects will
# have the same indices. so de-dupe based on indices.
indices_to_media = {
tuple(t['indices']): {
'objectType': 'image',
'displayName': '',
'indices': t['indices'],
} for t in media if t.get('indices')}
# sort tags by indices, since they need to be processed (below) in order.
obj['tags'].sort(key=lambda t: t.get('indices'))
# RT @username: prefix for retweets
rt_prefix = ''
if retweeted and retweeted.get('text'):
rt_prefix = 'RT <a href="">@%s</a>: ' % (
(retweeted.get('user', {}).get('screen_name'),) * 2)
# text content. linkify entities. convert start/end indices to start/length,
# and replace URLs with real "display" URLs.
text_start, text_end = (0, len(text))
full_text = base_tweet.get('full_text')
if full_text:
text = util.WideUnicode(full_text)
text_start, text_end = (tweet['display_text_range']
if tweet.get('display_text_range')
else (0, len(text)))
obj['to'].extend(tag for tag in obj['tags']
if tag.get('objectType') == 'person'
and tag.get('indices')[1] <= text_start)
# convert start/end indices to start/length, and replace URLs with
# real "display" URLs.
content = util.WideUnicode(rt_prefix + text[text_start:text_end])
offset = len(rt_prefix) - text_start
for t in obj['tags']:
start, end = t.pop('indices', None) or (0, 0)
if start >= text_start and end <= text_end:
start += offset
end += offset
length = end - start
if t['objectType'] in ('article', 'image'):
tag_text = t.get('displayName', t.get('url'))
if tag_text is not None:
content = util.WideUnicode(content[:start] + tag_text + content[end:])
offset += len(tag_text) - length
length = len(tag_text)
t.update({'startIndex': start, 'length': length})
'tags': [t for t in obj['tags'] if t['objectType'] != 'image'] +
[self.retweet_to_object(r) for r in tweet.get('retweets', [])],
'content': content,
# location
place = tweet.get('place')
if place:
obj['location'] = {
'displayName': place.get('full_name'),
'id': self.tag_uri(place.get('id')),
# place['url'] is a JSON API url, not useful for end users. get the
# lat/lon from geo instead.
geo = tweet.get('geo')
if geo:
coords = geo.get('coordinates')
if coords:
obj['location']['url'] = (',%s' %
# inReplyTo
reply_to_screenname = tweet.get('in_reply_to_screen_name')
reply_to_id = tweet.get('in_reply_to_status_id')
if reply_to_id and reply_to_screenname:
obj['inReplyTo'] = [{
'id': self.tag_uri(reply_to_id),
'url': self.status_url(reply_to_screenname, reply_to_id),
return self.postprocess_object(obj)
def _get_entities(tweet):
"""Merges and returns a tweet's entities and extended_entities.
Most entities are in the entities field - urls, hashtags, user_mentions,
symbols, etc. Media are special though: extended_entities is always
preferred. It has videos, animated gifs, and multiple photos. entities only
has one photo at most, either the first or a thumbnail from the video, and
its type is always 'photo' even for videos and animated gifs. (The id and
id_str will be the same.) So ignore it unless extended_entities is missing.
entities = collections.defaultdict(list)
# maps kind to set of id_str, url, and text values we've seen, for de-duping
seen_ids = collections.defaultdict(set)
for field in 'extended_entities', 'entities': # prefer extended_entities!
# kind is media, urls, hashtags, user_mentions, symbols, etc
for kind, values in tweet.get(field, {}).items():
for v in values:
id = v.get('id_str') or v.get('id') or v.get('url') or v.get('text')
if id:
if id in seen_ids[kind]:
return entities
def _video_url(self, media):
"""Returns the best video URL from a media object.
Prefers MIME types that start with video/, then falls back to others.
Twitter videos in extended entities currently often have both .m3u8 (HLS)
and .mp4 variants. Twitter threatened to drop the MP4s in Aug 2016, but
they're still there as of Dec 2017.
media: dict, Twitter media object
Returns: string URL
variants = media.get('video_info', {}).get('variants')
if not variants:
best_bitrate = 0
best_url = None
for variant in variants:
url = variant.get('url')
bitrate = variant.get('bitrate', 0)
type = variant.get('content_type', '')
if url and type.startswith('video/') and bitrate >= best_bitrate:
best_url = url
best_bitrate = bitrate
return best_url or variants[0].get('url')
def user_to_actor(self, user):
"""Converts a tweet to an activity.
user: dict, a decoded JSON Twitter user
an ActivityStreams actor dict, ready to be JSON-encoded
username = user.get('screen_name')
if not username:
return {}
urls = [self.user_url(username)] + util.trim_nulls(
[e.get('expanded_url') for e in itertools.chain(
*(user.get('entities', {}).get(field, {}).get('urls', [])
for field in ('url', 'description')))])
image = user.get('profile_image_url_https') or user.get('profile_image_url')
if image:
# remove _normal for a ~256x256 avatar rather than ~48x48
image = image.replace('_normal.', '.', 1)
return util.trim_nulls({
'objectType': 'person',
'displayName': user.get('name') or username,
'image': {'url': image},
'id': self.tag_uri(username),
# numeric_id is our own custom field that always has the source's numeric
# user id, if available.
'numeric_id': user.get('id_str'),
'published': self.rfc2822_to_iso8601(user.get('created_at')),
'url': urls[0],
'urls': [{'value': u} for u in urls] if len(urls) > 1 else None,
'location': {'displayName': user.get('location')},
'username': username,
'description': user.get('description'),
def retweet_to_object(self, retweet):
"""Converts a retweet to a share activity object.
retweet: dict, a decoded JSON tweet
an ActivityStreams object dict
orig = retweet.get('retweeted_status')
if not orig:
return None
share = self.tweet_to_object(retweet)
'objectType': 'activity',
'verb': 'share',
'object': {'url': self.tweet_url(orig)},
if 'tags' in share:
# the existing tags apply to the original tweet's text, which we replaced
del share['tags']
return self.postprocess_object(share)
def streaming_event_to_object(self, event):
"""Converts a Streaming API event to an object.
Right now, only converts favorite events to like objects.
event: dict, a decoded JSON Streaming API event
an ActivityStreams object dict
source = event.get('source')
tweet = event.get('target_object')
if event.get('event') == 'favorite' and source and tweet:
obj = self._make_like(tweet, source)
obj['published'] = self.rfc2822_to_iso8601(event.get('created_at'))
return obj
def favorites_html_to_likes(self, tweet, html):
"""Converts the HTML from a favorited_popup request to like objects.
html: string
list of ActivityStreams like object dicts
soup = BeautifulSoup(html)
likes = []
for user in soup.find_all(class_='js-user-profile-link'):
username = user.find(class_='username')
if not username:
username = str(username.get_text(''))
if username.startswith('@'):
username = username[1:]
img = user.find(class_='js-action-profile-avatar') or {}
author = {
'id_str': img.get('data-user-id'),
'screen_name': username,
'profile_image_url': img.get('src'),
fullname = user.find(class_='fullname')
if fullname:
author['name'] = fullname.get_text(' ', strip=True)
likes.append(self._make_like(tweet, author))
return likes
def _make_like(self, tweet, liker):
"""Generates and returns a ActivityStreams like object.
tweet: Twitter tweet dict
liker: Twitter user dict
ActivityStreams object dict
tweet_id = tweet.get('id_str')
liker_id = liker.get('id_str')
id = None
url = obj_url = self.tweet_url(tweet)
if liker_id:
id = self.tag_uri('%s_favorited_by_%s' % (tweet_id, liker_id))
url += '#favorited-by-%s' % liker_id
return self.postprocess_object({
'id': id,
'url': url,
'objectType': 'activity',
'verb': 'like',
'object': {'url': obj_url},
'author': self.user_to_actor(liker),
def rfc2822_to_iso8601(time_str):
"""Converts a timestamp string from RFC 2822 format to ISO 8601.
Example RFC 2822 timestamp string generated by Twitter:
'Wed May 23 06:01:13 +0000 2007'
Resulting ISO 8610 timestamp string:
if not time_str:
return None
without_timezone = re.sub(' [+-][0-9]{4} ', ' ', time_str)
timezone ='[+-][0-9]{4}', time_str).group(0)
## convert offset to seconds
offset = 3600 * int(timezone[1:3]) + 60 * int(timezone[3:])
## negative offset
if timezone[0] == '-':
offset = -offset
dt = datetime.datetime.strptime(without_timezone, '%a %b %d %H:%M:%S %Y').replace(tzinfo=OffsetTzinfo(offset))
return dt.isoformat()
def user_url(self, username):
"""Returns the Twitter URL for a given user."""
return 'https://%s/%s' % (self.DOMAIN, username)
def status_url(self, username, id):
"""Returns the Twitter URL for a tweet from a given user with a given id."""
return '%s/status/%s' % (self.user_url(username), id)
def tweet_url(self, tweet):
"""Returns the Twitter URL for a tweet given a tweet object."""
return self.status_url(tweet.get('user', {}).get('screen_name'),
def _validate_id(id):
if not util.is_int(id):
raise ValueError('Twitter ids must be integers; got %s' % id)