diff --git a/examples/follow_user.py b/examples/follow_user.py
index b9e78a88..21db370d 100644
--- a/examples/follow_user.py
+++ b/examples/follow_user.py
@@ -9,7 +9,8 @@
if len(sys.argv) >= 2:
target = sys.argv[1]
else:
- target = raw_input("User to follow: ") # For Python 3.x use: target = input("User to follow: ")
+ target = raw_input("User to follow: ")
+ # For Python 3.x use: target = input("User to follow: ")
# Requires Authentication as of Twitter API v1.1
twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
diff --git a/examples/search_results.py b/examples/search_results.py
index 99841396..3eff41d3 100644
--- a/examples/search_results.py
+++ b/examples/search_results.py
@@ -8,5 +8,7 @@
print e
for tweet in search_results['statuses']:
- print 'Tweet from @%s Date: %s' % (tweet['user']['screen_name'].encode('utf-8'), tweet['created_at'])
+ print 'Tweet from @%s Date: %s' % (tweet['user']['screen_nam\
+ e'].encode('utf-8'),
+ tweet['created_at'])
print tweet['text'].encode('utf-8'), '\n'
diff --git a/examples/stream.py b/examples/stream.py
index 0ff5c048..1d1ee1ff 100644
--- a/examples/stream.py
+++ b/examples/stream.py
@@ -16,5 +16,7 @@ def on_error(self, status_code, data):
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
stream.statuses.filter(track='twitter')
-#stream.user() # Read the authenticated users home timeline (what they see on Twitter) in real-time
-#stream.site(follow='twitter')
+# stream.user()
+# Read the authenticated users home timeline
+# (what they see on Twitter) in real-time
+# stream.site(follow='twitter')
diff --git a/setup.py b/setup.py
index d2d3570d..e6c8e94f 100755
--- a/setup.py
+++ b/setup.py
@@ -29,9 +29,10 @@
license=open('LICENSE').read(),
url='https://github.com/ryanmcgrath/twython/tree/master',
keywords='twitter search api tweet twython stream',
- description='Actively maintained, pure Python wrapper for the Twitter API. Supports both normal and streaming Twitter APIs',
+ description='Actively maintained, pure Python wrapper for the \
+ Twitter API. Supports both normal and streaming Twitter APIs',
long_description=open('README.rst').read() + '\n\n' +
- open('HISTORY.rst').read(),
+ open('HISTORY.rst').read(),
include_package_data=True,
packages=packages,
classifiers=[
diff --git a/tests/config.py b/tests/config.py
index d26b6e1a..65cc2f70 100644
--- a/tests/config.py
+++ b/tests/config.py
@@ -23,7 +23,8 @@
# Test Ids
test_tweet_id = os.environ.get('TEST_TWEET_ID', '318577428610031617')
test_list_slug = os.environ.get('TEST_LIST_SLUG', 'team')
-test_list_owner_screen_name = os.environ.get('TEST_LIST_OWNER_SCREEN_NAME', 'twitterapi')
+test_list_owner_screen_name = os.environ.get('TEST_LIST_OWNER_SCREEN_NAME',
+ 'twitterapi')
test_tweet_object = {u'contributors': None, u'truncated': False, u'text': u'http://t.co/FCmXyI6VHd is a #cool site, lol! @mikehelmick should #checkitout. If you can! #thanks Love, @__twython__ https://t.co/67pwRvY6z9', u'in_reply_to_status_id': None, u'id': 349683012054683648, u'favorite_count': 0, u'source': u'web', u'retweeted': False, u'coordinates': None, u'entities': {u'symbols': [], u'user_mentions': [{u'id': 29251354, u'indices': [45, 57], u'id_str': u'29251354', u'screen_name': u'mikehelmick', u'name': u'Mike Helmick'}, {u'id': 1431865928, u'indices': [104, 116], u'id_str': u'1431865928', u'screen_name': u'__twython__', u'name': u'Twython'}], u'hashtags': [{u'indices': [28, 33], u'text': u'cool'}, {u'indices': [65, 76], u'text': u'checkitout'}, {u'indices': [90, 97], u'text': u'thanks'}], u'urls': [{u'url': u'http://t.co/FCmXyI6VHd', u'indices': [0, 22], u'expanded_url': u'http://google.com', u'display_url': u'google.com'}, {u'url': u'https://t.co/67pwRvY6z9', u'indices': [117, 140], u'expanded_url': u'https://github.com', u'display_url': u'github.com'}]}, u'in_reply_to_screen_name': None, u'id_str': u'349683012054683648', u'retweet_count': 0, u'in_reply_to_user_id': None, u'favorited': False, u'user': {u'follow_request_sent': False, u'profile_use_background_image': True, u'default_profile_image': True, u'id': 1431865928, u'verified': False, u'profile_text_color': u'333333', u'profile_image_url_https': u'https://si0.twimg.com/sticky/default_profile_images/default_profile_3_normal.png', u'profile_sidebar_fill_color': u'DDEEF6', u'entities': {u'description': {u'urls': []}}, u'followers_count': 1, u'profile_sidebar_border_color': u'C0DEED', u'id_str': u'1431865928', u'profile_background_color': u'3D3D3D', u'listed_count': 0, u'profile_background_image_url_https': u'https://si0.twimg.com/images/themes/theme1/bg.png', u'utc_offset': None, u'statuses_count': 2, u'description': u'', u'friends_count': 1, u'location': u'', u'profile_link_color': u'0084B4', u'profile_image_url': u'http://a0.twimg.com/sticky/default_profile_images/default_profile_3_normal.png', u'following': False, u'geo_enabled': False, u'profile_background_image_url': u'http://a0.twimg.com/images/themes/theme1/bg.png', u'screen_name': u'__twython__', u'lang': u'en', u'profile_background_tile': False, u'favourites_count': 0, u'name': u'Twython', u'notifications': False, u'url': None, u'created_at': u'Thu May 16 01:11:09 +0000 2013', u'contributors_enabled': False, u'time_zone': None, u'protected': False, u'default_profile': False, u'is_translator': False}, u'geo': None, u'in_reply_to_user_id_str': None, u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Wed Jun 26 00:18:21 +0000 2013', u'in_reply_to_status_id_str': None, u'place': None}
test_tweet_html = 'google.com is a #cool site, lol! @mikehelmick should #checkitout. If you can! #thanks Love, @__twython__ github.com'
diff --git a/tests/test_core.py b/tests/test_core.py
index 5b8a3749..98d3a476 100644
--- a/tests/test_core.py
+++ b/tests/test_core.py
@@ -28,8 +28,8 @@ def get_url(self, endpoint):
return '%s/%s.json' % (self.api.api_url % self.api.api_version, endpoint)
def register_response(self, method, url, body='{}', match_querystring=False,
- status=200, adding_headers=None, stream=False,
- content_type='application/json; charset=utf-8'):
+ status=200, adding_headers=None, stream=False,
+ content_type='application/json; charset=utf-8'):
"""Wrapper function for responses for simpler unit tests"""
# responses uses BytesIO to hold the body so it needs to be in bytes
@@ -37,7 +37,7 @@ def register_response(self, method, url, body='{}', match_querystring=False,
body = bytes(body, 'UTF-8')
responses.add(method, url, body, match_querystring,
- status, adding_headers, stream, content_type)
+ status, adding_headers, stream, content_type)
@responses.activate
def test_request_should_handle_full_endpoint(self):
@@ -77,7 +77,7 @@ def test_request_should_post_request_regardless_of_case(self):
@responses.activate
def test_request_should_throw_exception_with_invalid_http_method(self):
"""Test that request() throws an exception when an invalid HTTP method is passed"""
- #TODO(cash): should Twython catch the AttributeError and throw a TwythonError
+ # TODO(cash): should Twython catch the AttributeError and throw a TwythonError
self.assertRaises(AttributeError, self.api.request, endpoint='search/tweets', method='INVALID')
@responses.activate
@@ -315,5 +315,5 @@ def test_html_for_tweet_short_url(self):
"""Test using expanded url in HTML for Tweet displays full urls"""
tweet_text = self.api.html_for_tweet(test_tweet_object, False)
# Make sure HTML doesn't contain the display OR expanded url
- self.assertTrue(not 'http://google.com' in tweet_text)
- self.assertTrue(not 'google.com' in tweet_text)
+ self.assertTrue('http://google.com' not in tweet_text)
+ self.assertTrue('google.com' not in tweet_text)
diff --git a/tests/test_endpoints.py b/tests/test_endpoints.py
index 6204a577..b7478d03 100644
--- a/tests/test_endpoints.py
+++ b/tests/test_endpoints.py
@@ -20,8 +20,10 @@ def setUp(self):
'allow_redirects': False
}
+ # This is so we can hit coverage that Twython sets
+ # User-Agent for us if none is supplied
oauth2_client_args = {
- 'headers': {} # This is so we can hit coverage that Twython sets User-Agent for us if none is supplied
+ 'headers': {}
}
self.api = Twython(app_key, app_secret,
@@ -42,7 +44,8 @@ def test_get_user_timeline(self):
"""Test returning timeline for authenticated user and random user
succeeds"""
self.api.get_user_timeline() # Authenticated User Timeline
- self.api.get_user_timeline(screen_name='twitter') # Random User Timeline
+ self.api.get_user_timeline(screen_name='twitter')
+ # Random User Timeline
@unittest.skip('skipping non-updated test')
def test_get_protected_user_timeline_following(self):
@@ -82,7 +85,8 @@ def test_show_status(self):
@unittest.skip('skipping non-updated test')
def test_update_and_destroy_status(self):
"""Test updating and deleting a status succeeds"""
- status = self.api.update_status(status='Test post just to get deleted :( %s' % int(time.time()))
+ status = self.api.update_status(status='Test post just to get \
+ deleted :( %s' % int(time.time()))
self.api.destroy_status(id=status['id_str'])
@unittest.skip('skipping non-updated test')
@@ -117,7 +121,8 @@ def test_get_sent_messages(self):
def test_send_get_and_destroy_direct_message(self):
"""Test sending, getting, then destory a direct message succeeds"""
message = self.api.send_direct_message(screen_name=protected_twitter_1,
- text='Hey d00d! %s' % int(time.time()))
+ text='Hey d00d! %s\
+ ' % int(time.time()))
self.api.get_direct_message(id=message['id_str'])
self.api.destroy_direct_message(id=message['id_str'])
@@ -127,7 +132,8 @@ def test_send_direct_message_to_non_follower(self):
"""Test sending a direct message to someone who doesn't follow you
fails"""
self.assertRaises(TwythonError, self.api.send_direct_message,
- screen_name=protected_twitter_2, text='Yo, man! %s' % int(time.time()))
+ screen_name=protected_twitter_2, text='Yo, man! \
+ %s' % int(time.time()))
# Friends & Followers
@unittest.skip('skipping non-updated test')
@@ -348,7 +354,8 @@ def test_create_update_destroy_list_add_remove_list_members(self):
the_list = self.api.create_list(name='Stuff %s' % int(time.time()))
list_id = the_list['id_str']
- self.api.update_list(list_id=list_id, name='Stuff Renamed %s' % int(time.time()))
+ self.api.update_list(list_id=list_id, name='Stuff Renamed \
+ %s' % int(time.time()))
screen_names = ['johncena', 'xbox']
# Multi add/delete members
@@ -359,7 +366,8 @@ def test_create_update_destroy_list_add_remove_list_members(self):
# Single add/delete member
self.api.add_list_member(list_id=list_id, screen_name='justinbieber')
- self.api.delete_list_member(list_id=list_id, screen_name='justinbieber')
+ self.api.delete_list_member(list_id=list_id,
+ screen_name='justinbieber')
self.api.delete_list(list_id=list_id)
diff --git a/twython/api.py b/twython/api.py
index 9ae69a15..3a37909a 100644
--- a/twython/api.py
+++ b/twython/api.py
@@ -194,12 +194,14 @@ def _request(self, url, method='GET', params=None, api_call=None):
raise ExceptionType(error_message,
error_code=response.status_code,
- retry_after=response.headers.get('retry-after'))
+ retry_after=response.headers.get('retry-\
+ after'))
try:
content = response.json()
except ValueError:
- raise TwythonError('Response was not valid JSON. Unable to decode.')
+ raise TwythonError('Response was not valid JSON. \
+ Unable to decode.')
return content
@@ -209,7 +211,8 @@ def _get_error_message(self, response):
error_message = 'An error occurred processing your request.'
try:
content = response.json()
- # {"errors":[{"code":34,"message":"Sorry, that page does not exist"}]}
+ # {"errors":[{"code":34,"message":"Sorry,
+ # that page does not exist"}]}
error_message = content['errors'][0]['message']
except ValueError:
# bad json data from Twitter for an error
@@ -223,13 +226,18 @@ def _get_error_message(self, response):
def request(self, endpoint, method='GET', params=None, version='1.1'):
"""Return dict of response received from Twitter's API
- :param endpoint: (required) Full url or Twitter API endpoint (e.g. search/tweets)
+ :param endpoint: (required) Full url or Twitter API endpoint
+ (e.g. search/tweets)
:type endpoint: string
- :param method: (optional) Method of accessing data, either GET or POST. (default GET)
+ :param method: (optional) Method of accessing data, either
+ GET or POST. (default GET)
:type method: string
- :param params: (optional) Dict of parameters (if any) accepted the by Twitter API endpoint you are trying to access (default None)
+ :param params: (optional) Dict of parameters (if any) accepted
+ the by Twitter API endpoint you are trying to
+ access (default None)
:type params: dict or None
- :param version: (optional) Twitter API version to access (default 1.1)
+ :param version: (optional) Twitter API version to access
+ (default 1.1)
:type version: string
:rtype: dict
@@ -242,7 +250,8 @@ def request(self, endpoint, method='GET', params=None, version='1.1'):
else:
url = '%s/%s.json' % (self.api_url % version, endpoint)
- content = self._request(url, method=method, params=params, api_call=url)
+ content = self._request(url, method=method, params=params,
+ api_call=url)
return content
@@ -258,7 +267,8 @@ def get_lastfunction_header(self, header, default_return_value=None):
"""Returns a specific header from the last API call
This will return None if the header is not present
- :param header: (required) The name of the header you want to get the value of
+ :param header: (required) The name of the header you want to get
+ the value of
Most useful for the following header information:
x-rate-limit-limit,
@@ -268,21 +278,31 @@ def get_lastfunction_header(self, header, default_return_value=None):
"""
if self._last_call is None:
- raise TwythonError('This function must be called after an API call. It delivers header information.')
+ raise TwythonError('This function must be called after an API call. \
+ It delivers header information.')
return self._last_call['headers'].get(header, default_return_value)
- def get_authentication_tokens(self, callback_url=None, force_login=False, screen_name=''):
- """Returns a dict including an authorization URL, ``auth_url``, to direct a user to
-
- :param callback_url: (optional) Url the user is returned to after they authorize your app (web clients only)
- :param force_login: (optional) Forces the user to enter their credentials to ensure the correct users account is authorized.
- :param screen_name: (optional) If forced_login is set OR user is not currently logged in, Prefills the username input box of the OAuth login screen with the given value
+ def get_authentication_tokens(self, callback_url=None, force_login=False,
+ screen_name=''):
+ """Returns a dict including an authorization URL, ``auth_url``, to
+ direct a user to
+
+ :param callback_url: (optional) Url the user is returned to after
+ they authorize your app (web clients only)
+ :param force_login: (optional) Forces the user to enter their
+ credentials to ensure the correct users
+ account is authorized.
+ :param screen_name: (optional) If forced_login is set OR user is
+ not currently logged in, Prefills the username
+ input box of the OAuth login screen with the
+ given value
:rtype: dict
"""
if self.oauth_version != 1:
- raise TwythonError('This method can only be called when your OAuth version is 1.0.')
+ raise TwythonError('This method can only be called when your \
+ OAuth version is 1.0.')
request_args = {}
if callback_url:
@@ -290,15 +310,18 @@ def get_authentication_tokens(self, callback_url=None, force_login=False, screen
response = self.client.get(self.request_token_url, params=request_args)
if response.status_code == 401:
- raise TwythonAuthError(response.content, error_code=response.status_code)
+ raise TwythonAuthError(response.content,
+ error_code=response.status_code)
elif response.status_code != 200:
- raise TwythonError(response.content, error_code=response.status_code)
+ raise TwythonError(response.content,
+ error_code=response.status_code)
request_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not request_tokens:
raise TwythonError('Unable to decode request tokens.')
- oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') == 'true'
+ oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') \
+ == 'true'
auth_url_params = {
'oauth_token': request_tokens['oauth_token'],
@@ -314,21 +337,28 @@ def get_authentication_tokens(self, callback_url=None, force_login=False, screen
if callback_url and not oauth_callback_confirmed:
auth_url_params['oauth_callback'] = self.callback_url
- request_tokens['auth_url'] = self.authenticate_url + '?' + urlencode(auth_url_params)
+ request_tokens['auth_url'] = self.authenticate_url + \
+ '?' + urlencode(auth_url_params)
return request_tokens
def get_authorized_tokens(self, oauth_verifier):
- """Returns a dict of authorized tokens after they go through the :class:`get_authentication_tokens` phase.
+ """Returns a dict of authorized tokens after they go through the
+ :class:`get_authentication_tokens` phase.
- :param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN for non web apps) retrieved from the callback url querystring
+ :param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN
+ for non web apps) retrieved from the callback url querystring
:rtype: dict
"""
if self.oauth_version != 1:
- raise TwythonError('This method can only be called when your OAuth version is 1.0.')
+ raise TwythonError('This method can only be called when your \
+ OAuth version is 1.0.')
- response = self.client.get(self.access_token_url, params={'oauth_verifier': oauth_verifier}, headers={'Content-Type': 'application/json'})
+ response = self.client.get(self.access_token_url,
+ params={'oauth_verifier': oauth_verifier},
+ headers={'Content-Type': 'application/\
+ json'})
if response.status_code == 401:
try:
@@ -341,7 +371,8 @@ def get_authorized_tokens(self, oauth_verifier):
except ValueError:
content = {}
- raise TwythonError(content.get('error', 'Invalid / expired Token'), error_code=response.status_code)
+ raise TwythonError(content.get('error', 'Invalid / expired To \
+ ken'), error_code=response.status_code)
authorized_tokens = dict(parse_qsl(response.content.decode('utf-8')))
if not authorized_tokens:
@@ -350,12 +381,14 @@ def get_authorized_tokens(self, oauth_verifier):
return authorized_tokens # pragma: no cover
def obtain_access_token(self):
- """Returns an OAuth 2 access token to make OAuth 2 authenticated read-only calls.
+ """Returns an OAuth 2 access token to make OAuth 2 authenticated
+ read-only calls.
:rtype: string
"""
if self.oauth_version != 2:
- raise TwythonError('This method can only be called when your OAuth version is 2.0.')
+ raise TwythonError('This method can only be called when your \
+ OAuth version is 2.0.')
data = {'grant_type': 'client_credentials'}
basic_auth = HTTPBasicAuth(self.app_key, self.app_secret)
@@ -377,8 +410,10 @@ def obtain_access_token(self):
def construct_api_url(api_url, **params):
"""Construct a Twitter API url, encoded, with parameters
- :param api_url: URL of the Twitter API endpoint you are attempting to construct
- :param \*\*params: Parameters that are accepted by Twitter for the endpoint you're requesting
+ :param api_url: URL of the Twitter API endpoint you are attempting
+ to construct
+ :param \*\*params: Parameters that are accepted by Twitter for the
+ endpoint you're requesting
:rtype: string
Usage::
@@ -387,7 +422,8 @@ def construct_api_url(api_url, **params):
>>> twitter = Twython()
>>> api_url = 'https://api.twitter.com/1.1/search/tweets.json'
- >>> constructed_url = twitter.construct_api_url(api_url, q='python', result_type='popular')
+ >>> constructed_url = twitter.construct_api_url(api_url, q='python',
+ result_type='popular')
>>> print constructed_url
https://api.twitter.com/1.1/search/tweets.json?q=python&result_type=popular
@@ -403,7 +439,8 @@ def construct_api_url(api_url, **params):
def search_gen(self, search_query, **params): # pragma: no cover
warnings.warn(
- 'This method is deprecated. You should use Twython.cursor instead. [eg. Twython.cursor(Twython.search, q=\'your_query\')]',
+ 'This method is deprecated. You should use Twython.cursor instead. \
+ [eg. Twython.cursor(Twython.search, q=\'your_query\')]',
TwythonDeprecationWarning,
stacklevel=2
)
@@ -412,14 +449,17 @@ def search_gen(self, search_query, **params): # pragma: no cover
def cursor(self, function, return_pages=False, **params):
"""Returns a generator for results that match a specified query.
- :param function: Instance of a Twython function (Twython.get_home_timeline, Twython.search)
- :param \*\*params: Extra parameters to send with your request (usually parameters accepted by the Twitter API endpoint)
+ :param function: Instance of a Twython function
+ (Twython.get_home_timeline, Twython.search)
+ :param \*\*params: Extra parameters to send with your request
+ (usually parameters accepted by the Twitter API endpoint)
:rtype: generator
Usage::
>>> from twython import Twython
- >>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
+ >>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN,
+ OAUTH_TOKEN_SECRET)
>>> results = twitter.cursor(twitter.search, q='python')
>>> for result in results:
@@ -427,7 +467,8 @@ def cursor(self, function, return_pages=False, **params):
"""
if not hasattr(function, 'iter_mode'):
- raise TwythonError('Unable to create generator for Twython method "%s"' % function.__name__)
+ raise TwythonError('Unable to create generator for Twython \
+ method "%s"' % function.__name__)
while True:
content = function(**params)
@@ -446,22 +487,26 @@ def cursor(self, function, return_pages=False, **params):
for result in results:
yield result
- if function.iter_mode == 'cursor' and content['next_cursor_str'] == '0':
+ if function.iter_mode == 'cursor' and \
+ content['next_cursor_str'] == '0':
raise StopIteration
try:
if function.iter_mode == 'id':
- if not 'max_id' in params:
- # Add 1 to the id because since_id and max_id are inclusive
+ if 'max_id' not in params:
+ # Add 1 to the id because since_id and
+ # max_id are inclusive
if hasattr(function, 'iter_metadata'):
- since_id = content[function.iter_metadata].get('since_id_str')
+ since_id = content[function.iter_metadata]\
+ .get('since_id_str')
else:
since_id = content[0]['id_str']
params['since_id'] = (int(since_id) - 1)
elif function.iter_mode == 'cursor':
params['cursor'] = content['next_cursor_str']
except (TypeError, ValueError): # pragma: no cover
- raise TwythonError('Unable to generate next page of search results, `page` is not a number.')
+ raise TwythonError('Unable to generate next page of search \
+ results, `page` is not a number.')
@staticmethod
def unicode2utf8(text):
@@ -483,11 +528,14 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False):
"""Return HTML for a tweet (urls, mentions, hashtags replaced with links)
:param tweet: Tweet object from received from Twitter API
- :param use_display_url: Use display URL to represent link (ex. google.com, github.com). Default: True
- :param use_expanded_url: Use expanded URL to represent link (e.g. http://google.com). Default False
+ :param use_display_url: Use display URL to represent link
+ (ex. google.com, github.com). Default: True
+ :param use_expanded_url: Use expanded URL to represent link
+ (e.g. http://google.com). Default False
If use_expanded_url is True, it overrides use_display_url.
- If use_display_url and use_expanded_url is False, short url will be used (t.co/xxxxx)
+ If use_display_url and use_expanded_url is False, short url will
+ be used (t.co/xxxxx)
"""
if 'retweeted_status' in tweet:
@@ -502,7 +550,8 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False):
start, end = entity['indices'][0], entity['indices'][1]
mention_html = '@%(screen_name)s'
- text = text.replace(tweet['text'][start:end], mention_html % {'screen_name': entity['screen_name']})
+ text = text.replace(tweet['text'][start:end],
+ mention_html % {'screen_name': entity['screen_name']})
# Hashtags
for entity in entities['hashtags']:
@@ -514,7 +563,8 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False):
# Urls
for entity in entities['urls']:
start, end = entity['indices'][0], entity['indices'][1]
- if use_display_url and entity.get('display_url') and not use_expanded_url:
+ if use_display_url and entity.get('display_url') \
+ and not use_expanded_url:
shown_url = entity['display_url']
elif use_expanded_url and entity.get('expanded_url'):
shown_url = entity['expanded_url']
@@ -522,6 +572,7 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False):
shown_url = entity['url']
url_html = '%s'
- text = text.replace(tweet['text'][start:end], url_html % (entity['url'], shown_url))
+ text = text.replace(tweet['text'][start:end],
+ url_html % (entity['url'], shown_url))
return text
diff --git a/twython/streaming/api.py b/twython/streaming/api.py
index c814acbf..47678e47 100644
--- a/twython/streaming/api.py
+++ b/twython/streaming/api.py
@@ -38,8 +38,11 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
retired
:param retry_in: (optional) Amount of time (in secs) the previous
API call should be tried again
- :param client_args: (optional) Accepts some requests Session parameters and some requests Request parameters.
- See http://docs.python-requests.org/en/latest/api/#sessionapi and requests section below it for details.
+ :param client_args: (optional) Accepts some requests Session
+ parameters and some requests Request parameters.
+ See
+ http://docs.python-requests.org/en/latest/api/#sessionapi
+ and requests section below it for details.
[ex. headers, proxies, verify(SSL verification)]
:param handlers: (optional) Array of message types for which
corresponding handlers will be called
@@ -53,11 +56,12 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
self.client_args = client_args or {}
default_headers = {'User-Agent': 'Twython Streaming v' + __version__}
- if not 'headers' in self.client_args:
+ if 'headers' not in self.client_args:
# If they didn't set any headers, set our defaults for them
self.client_args['headers'] = default_headers
elif 'User-Agent' not in self.client_args['headers']:
- # If they set headers, but didn't include User-Agent.. set it for them
+ # If they set headers, but didn't include User-Agent..
+ # set it for them
self.client_args['headers'].update(default_headers)
self.client_args['timeout'] = timeout
@@ -87,7 +91,8 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret,
self.connected = False
- self.handlers = handlers if handlers else ['delete', 'limit', 'disconnect']
+ self.handlers = handlers if handlers else \
+ ['delete', 'limit', 'disconnect']
self.chunk_size = chunk_size
@@ -103,7 +108,8 @@ def _request(self, url, method='GET', params=None):
def _send(retry_counter):
requests_args = {}
for k, v in self.client_args.items():
- # Maybe this should be set as a class variable and only done once?
+ # Maybe this should be set as a class
+ # variable and only done once?
if k in ('timeout', 'allow_redirects', 'verify'):
requests_args[k] = v
@@ -121,7 +127,8 @@ def _send(retry_counter):
if response.status_code != 200:
self.on_error(response.status_code, response.content)
- if self.retry_count and (self.retry_count - retry_counter) > 0:
+ if self.retry_count and \
+ (self.retry_count - retry_counter) > 0:
time.sleep(self.retry_in)
retry_counter += 1
_send(retry_counter)
@@ -140,13 +147,19 @@ def _send(retry_counter):
line = line.decode('utf-8')
data = json.loads(line)
except ValueError: # pragma: no cover
- self.on_error(response.status_code, 'Unable to decode response, not valid JSON.')
+ self.on_error(response.status_code,
+ 'Unable to decode response, \
+ not valid JSON.')
else:
if self.on_success(data): # pragma: no cover
for message_type in self.handlers:
if message_type in data:
- handler = getattr(self, 'on_' + message_type, None)
- if handler and callable(handler) and not handler(data.get(message_type)):
+ handler = getattr(self,
+ 'on_' + message_type,
+ None)
+ if handler \
+ and callable(handler) \
+ and not handler(data.get(message_type)):
break
response.close()