From a0fec2f004b0260dd85cd012d471e441403c4887 Mon Sep 17 00:00:00 2001 From: Joe Cabrera Date: Sat, 26 Jul 2014 11:48:36 -0400 Subject: [PATCH] pep8 finished --- examples/follow_user.py | 3 +- examples/search_results.py | 4 +- examples/stream.py | 6 +- setup.py | 5 +- tests/config.py | 3 +- tests/test_core.py | 12 +-- tests/test_endpoints.py | 22 ++++-- twython/api.py | 145 +++++++++++++++++++++++++------------ twython/streaming/api.py | 33 ++++++--- 9 files changed, 156 insertions(+), 77 deletions(-) diff --git a/examples/follow_user.py b/examples/follow_user.py index b9e78a88..21db370d 100644 --- a/examples/follow_user.py +++ b/examples/follow_user.py @@ -9,7 +9,8 @@ if len(sys.argv) >= 2: target = sys.argv[1] else: - target = raw_input("User to follow: ") # For Python 3.x use: target = input("User to follow: ") + target = raw_input("User to follow: ") + # For Python 3.x use: target = input("User to follow: ") # Requires Authentication as of Twitter API v1.1 twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET) diff --git a/examples/search_results.py b/examples/search_results.py index 99841396..3eff41d3 100644 --- a/examples/search_results.py +++ b/examples/search_results.py @@ -8,5 +8,7 @@ print e for tweet in search_results['statuses']: - print 'Tweet from @%s Date: %s' % (tweet['user']['screen_name'].encode('utf-8'), tweet['created_at']) + print 'Tweet from @%s Date: %s' % (tweet['user']['screen_nam\ + e'].encode('utf-8'), + tweet['created_at']) print tweet['text'].encode('utf-8'), '\n' diff --git a/examples/stream.py b/examples/stream.py index 0ff5c048..1d1ee1ff 100644 --- a/examples/stream.py +++ b/examples/stream.py @@ -16,5 +16,7 @@ def on_error(self, status_code, data): OAUTH_TOKEN, OAUTH_TOKEN_SECRET) stream.statuses.filter(track='twitter') -#stream.user() # Read the authenticated users home timeline (what they see on Twitter) in real-time -#stream.site(follow='twitter') +# stream.user() +# Read the authenticated users home timeline +# (what they see on Twitter) in real-time +# stream.site(follow='twitter') diff --git a/setup.py b/setup.py index d2d3570d..e6c8e94f 100755 --- a/setup.py +++ b/setup.py @@ -29,9 +29,10 @@ license=open('LICENSE').read(), url='https://github.com/ryanmcgrath/twython/tree/master', keywords='twitter search api tweet twython stream', - description='Actively maintained, pure Python wrapper for the Twitter API. Supports both normal and streaming Twitter APIs', + description='Actively maintained, pure Python wrapper for the \ + Twitter API. Supports both normal and streaming Twitter APIs', long_description=open('README.rst').read() + '\n\n' + - open('HISTORY.rst').read(), + open('HISTORY.rst').read(), include_package_data=True, packages=packages, classifiers=[ diff --git a/tests/config.py b/tests/config.py index d26b6e1a..65cc2f70 100644 --- a/tests/config.py +++ b/tests/config.py @@ -23,7 +23,8 @@ # Test Ids test_tweet_id = os.environ.get('TEST_TWEET_ID', '318577428610031617') test_list_slug = os.environ.get('TEST_LIST_SLUG', 'team') -test_list_owner_screen_name = os.environ.get('TEST_LIST_OWNER_SCREEN_NAME', 'twitterapi') +test_list_owner_screen_name = os.environ.get('TEST_LIST_OWNER_SCREEN_NAME', + 'twitterapi') test_tweet_object = {u'contributors': None, u'truncated': False, u'text': u'http://t.co/FCmXyI6VHd is a #cool site, lol! @mikehelmick should #checkitout. If you can! #thanks Love, @__twython__ https://t.co/67pwRvY6z9', u'in_reply_to_status_id': None, u'id': 349683012054683648, u'favorite_count': 0, u'source': u'web', u'retweeted': False, u'coordinates': None, u'entities': {u'symbols': [], u'user_mentions': [{u'id': 29251354, u'indices': [45, 57], u'id_str': u'29251354', u'screen_name': u'mikehelmick', u'name': u'Mike Helmick'}, {u'id': 1431865928, u'indices': [104, 116], u'id_str': u'1431865928', u'screen_name': u'__twython__', u'name': u'Twython'}], u'hashtags': [{u'indices': [28, 33], u'text': u'cool'}, {u'indices': [65, 76], u'text': u'checkitout'}, {u'indices': [90, 97], u'text': u'thanks'}], u'urls': [{u'url': u'http://t.co/FCmXyI6VHd', u'indices': [0, 22], u'expanded_url': u'http://google.com', u'display_url': u'google.com'}, {u'url': u'https://t.co/67pwRvY6z9', u'indices': [117, 140], u'expanded_url': u'https://github.com', u'display_url': u'github.com'}]}, u'in_reply_to_screen_name': None, u'id_str': u'349683012054683648', u'retweet_count': 0, u'in_reply_to_user_id': None, u'favorited': False, u'user': {u'follow_request_sent': False, u'profile_use_background_image': True, u'default_profile_image': True, u'id': 1431865928, u'verified': False, u'profile_text_color': u'333333', u'profile_image_url_https': u'https://si0.twimg.com/sticky/default_profile_images/default_profile_3_normal.png', u'profile_sidebar_fill_color': u'DDEEF6', u'entities': {u'description': {u'urls': []}}, u'followers_count': 1, u'profile_sidebar_border_color': u'C0DEED', u'id_str': u'1431865928', u'profile_background_color': u'3D3D3D', u'listed_count': 0, u'profile_background_image_url_https': u'https://si0.twimg.com/images/themes/theme1/bg.png', u'utc_offset': None, u'statuses_count': 2, u'description': u'', u'friends_count': 1, u'location': u'', u'profile_link_color': u'0084B4', u'profile_image_url': u'http://a0.twimg.com/sticky/default_profile_images/default_profile_3_normal.png', u'following': False, u'geo_enabled': False, u'profile_background_image_url': u'http://a0.twimg.com/images/themes/theme1/bg.png', u'screen_name': u'__twython__', u'lang': u'en', u'profile_background_tile': False, u'favourites_count': 0, u'name': u'Twython', u'notifications': False, u'url': None, u'created_at': u'Thu May 16 01:11:09 +0000 2013', u'contributors_enabled': False, u'time_zone': None, u'protected': False, u'default_profile': False, u'is_translator': False}, u'geo': None, u'in_reply_to_user_id_str': None, u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Wed Jun 26 00:18:21 +0000 2013', u'in_reply_to_status_id_str': None, u'place': None} test_tweet_html = 'google.com is a #cool site, lol! @mikehelmick should #checkitout. If you can! #thanks Love, @__twython__ github.com' diff --git a/tests/test_core.py b/tests/test_core.py index 5b8a3749..98d3a476 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -28,8 +28,8 @@ def get_url(self, endpoint): return '%s/%s.json' % (self.api.api_url % self.api.api_version, endpoint) def register_response(self, method, url, body='{}', match_querystring=False, - status=200, adding_headers=None, stream=False, - content_type='application/json; charset=utf-8'): + status=200, adding_headers=None, stream=False, + content_type='application/json; charset=utf-8'): """Wrapper function for responses for simpler unit tests""" # responses uses BytesIO to hold the body so it needs to be in bytes @@ -37,7 +37,7 @@ def register_response(self, method, url, body='{}', match_querystring=False, body = bytes(body, 'UTF-8') responses.add(method, url, body, match_querystring, - status, adding_headers, stream, content_type) + status, adding_headers, stream, content_type) @responses.activate def test_request_should_handle_full_endpoint(self): @@ -77,7 +77,7 @@ def test_request_should_post_request_regardless_of_case(self): @responses.activate def test_request_should_throw_exception_with_invalid_http_method(self): """Test that request() throws an exception when an invalid HTTP method is passed""" - #TODO(cash): should Twython catch the AttributeError and throw a TwythonError + # TODO(cash): should Twython catch the AttributeError and throw a TwythonError self.assertRaises(AttributeError, self.api.request, endpoint='search/tweets', method='INVALID') @responses.activate @@ -315,5 +315,5 @@ def test_html_for_tweet_short_url(self): """Test using expanded url in HTML for Tweet displays full urls""" tweet_text = self.api.html_for_tweet(test_tweet_object, False) # Make sure HTML doesn't contain the display OR expanded url - self.assertTrue(not 'http://google.com' in tweet_text) - self.assertTrue(not 'google.com' in tweet_text) + self.assertTrue('http://google.com' not in tweet_text) + self.assertTrue('google.com' not in tweet_text) diff --git a/tests/test_endpoints.py b/tests/test_endpoints.py index 6204a577..b7478d03 100644 --- a/tests/test_endpoints.py +++ b/tests/test_endpoints.py @@ -20,8 +20,10 @@ def setUp(self): 'allow_redirects': False } + # This is so we can hit coverage that Twython sets + # User-Agent for us if none is supplied oauth2_client_args = { - 'headers': {} # This is so we can hit coverage that Twython sets User-Agent for us if none is supplied + 'headers': {} } self.api = Twython(app_key, app_secret, @@ -42,7 +44,8 @@ def test_get_user_timeline(self): """Test returning timeline for authenticated user and random user succeeds""" self.api.get_user_timeline() # Authenticated User Timeline - self.api.get_user_timeline(screen_name='twitter') # Random User Timeline + self.api.get_user_timeline(screen_name='twitter') + # Random User Timeline @unittest.skip('skipping non-updated test') def test_get_protected_user_timeline_following(self): @@ -82,7 +85,8 @@ def test_show_status(self): @unittest.skip('skipping non-updated test') def test_update_and_destroy_status(self): """Test updating and deleting a status succeeds""" - status = self.api.update_status(status='Test post just to get deleted :( %s' % int(time.time())) + status = self.api.update_status(status='Test post just to get \ + deleted :( %s' % int(time.time())) self.api.destroy_status(id=status['id_str']) @unittest.skip('skipping non-updated test') @@ -117,7 +121,8 @@ def test_get_sent_messages(self): def test_send_get_and_destroy_direct_message(self): """Test sending, getting, then destory a direct message succeeds""" message = self.api.send_direct_message(screen_name=protected_twitter_1, - text='Hey d00d! %s' % int(time.time())) + text='Hey d00d! %s\ + ' % int(time.time())) self.api.get_direct_message(id=message['id_str']) self.api.destroy_direct_message(id=message['id_str']) @@ -127,7 +132,8 @@ def test_send_direct_message_to_non_follower(self): """Test sending a direct message to someone who doesn't follow you fails""" self.assertRaises(TwythonError, self.api.send_direct_message, - screen_name=protected_twitter_2, text='Yo, man! %s' % int(time.time())) + screen_name=protected_twitter_2, text='Yo, man! \ + %s' % int(time.time())) # Friends & Followers @unittest.skip('skipping non-updated test') @@ -348,7 +354,8 @@ def test_create_update_destroy_list_add_remove_list_members(self): the_list = self.api.create_list(name='Stuff %s' % int(time.time())) list_id = the_list['id_str'] - self.api.update_list(list_id=list_id, name='Stuff Renamed %s' % int(time.time())) + self.api.update_list(list_id=list_id, name='Stuff Renamed \ + %s' % int(time.time())) screen_names = ['johncena', 'xbox'] # Multi add/delete members @@ -359,7 +366,8 @@ def test_create_update_destroy_list_add_remove_list_members(self): # Single add/delete member self.api.add_list_member(list_id=list_id, screen_name='justinbieber') - self.api.delete_list_member(list_id=list_id, screen_name='justinbieber') + self.api.delete_list_member(list_id=list_id, + screen_name='justinbieber') self.api.delete_list(list_id=list_id) diff --git a/twython/api.py b/twython/api.py index 9ae69a15..3a37909a 100644 --- a/twython/api.py +++ b/twython/api.py @@ -194,12 +194,14 @@ def _request(self, url, method='GET', params=None, api_call=None): raise ExceptionType(error_message, error_code=response.status_code, - retry_after=response.headers.get('retry-after')) + retry_after=response.headers.get('retry-\ + after')) try: content = response.json() except ValueError: - raise TwythonError('Response was not valid JSON. Unable to decode.') + raise TwythonError('Response was not valid JSON. \ + Unable to decode.') return content @@ -209,7 +211,8 @@ def _get_error_message(self, response): error_message = 'An error occurred processing your request.' try: content = response.json() - # {"errors":[{"code":34,"message":"Sorry, that page does not exist"}]} + # {"errors":[{"code":34,"message":"Sorry, + # that page does not exist"}]} error_message = content['errors'][0]['message'] except ValueError: # bad json data from Twitter for an error @@ -223,13 +226,18 @@ def _get_error_message(self, response): def request(self, endpoint, method='GET', params=None, version='1.1'): """Return dict of response received from Twitter's API - :param endpoint: (required) Full url or Twitter API endpoint (e.g. search/tweets) + :param endpoint: (required) Full url or Twitter API endpoint + (e.g. search/tweets) :type endpoint: string - :param method: (optional) Method of accessing data, either GET or POST. (default GET) + :param method: (optional) Method of accessing data, either + GET or POST. (default GET) :type method: string - :param params: (optional) Dict of parameters (if any) accepted the by Twitter API endpoint you are trying to access (default None) + :param params: (optional) Dict of parameters (if any) accepted + the by Twitter API endpoint you are trying to + access (default None) :type params: dict or None - :param version: (optional) Twitter API version to access (default 1.1) + :param version: (optional) Twitter API version to access + (default 1.1) :type version: string :rtype: dict @@ -242,7 +250,8 @@ def request(self, endpoint, method='GET', params=None, version='1.1'): else: url = '%s/%s.json' % (self.api_url % version, endpoint) - content = self._request(url, method=method, params=params, api_call=url) + content = self._request(url, method=method, params=params, + api_call=url) return content @@ -258,7 +267,8 @@ def get_lastfunction_header(self, header, default_return_value=None): """Returns a specific header from the last API call This will return None if the header is not present - :param header: (required) The name of the header you want to get the value of + :param header: (required) The name of the header you want to get + the value of Most useful for the following header information: x-rate-limit-limit, @@ -268,21 +278,31 @@ def get_lastfunction_header(self, header, default_return_value=None): """ if self._last_call is None: - raise TwythonError('This function must be called after an API call. It delivers header information.') + raise TwythonError('This function must be called after an API call. \ + It delivers header information.') return self._last_call['headers'].get(header, default_return_value) - def get_authentication_tokens(self, callback_url=None, force_login=False, screen_name=''): - """Returns a dict including an authorization URL, ``auth_url``, to direct a user to - - :param callback_url: (optional) Url the user is returned to after they authorize your app (web clients only) - :param force_login: (optional) Forces the user to enter their credentials to ensure the correct users account is authorized. - :param screen_name: (optional) If forced_login is set OR user is not currently logged in, Prefills the username input box of the OAuth login screen with the given value + def get_authentication_tokens(self, callback_url=None, force_login=False, + screen_name=''): + """Returns a dict including an authorization URL, ``auth_url``, to + direct a user to + + :param callback_url: (optional) Url the user is returned to after + they authorize your app (web clients only) + :param force_login: (optional) Forces the user to enter their + credentials to ensure the correct users + account is authorized. + :param screen_name: (optional) If forced_login is set OR user is + not currently logged in, Prefills the username + input box of the OAuth login screen with the + given value :rtype: dict """ if self.oauth_version != 1: - raise TwythonError('This method can only be called when your OAuth version is 1.0.') + raise TwythonError('This method can only be called when your \ + OAuth version is 1.0.') request_args = {} if callback_url: @@ -290,15 +310,18 @@ def get_authentication_tokens(self, callback_url=None, force_login=False, screen response = self.client.get(self.request_token_url, params=request_args) if response.status_code == 401: - raise TwythonAuthError(response.content, error_code=response.status_code) + raise TwythonAuthError(response.content, + error_code=response.status_code) elif response.status_code != 200: - raise TwythonError(response.content, error_code=response.status_code) + raise TwythonError(response.content, + error_code=response.status_code) request_tokens = dict(parse_qsl(response.content.decode('utf-8'))) if not request_tokens: raise TwythonError('Unable to decode request tokens.') - oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') == 'true' + oauth_callback_confirmed = request_tokens.get('oauth_callback_confirmed') \ + == 'true' auth_url_params = { 'oauth_token': request_tokens['oauth_token'], @@ -314,21 +337,28 @@ def get_authentication_tokens(self, callback_url=None, force_login=False, screen if callback_url and not oauth_callback_confirmed: auth_url_params['oauth_callback'] = self.callback_url - request_tokens['auth_url'] = self.authenticate_url + '?' + urlencode(auth_url_params) + request_tokens['auth_url'] = self.authenticate_url + \ + '?' + urlencode(auth_url_params) return request_tokens def get_authorized_tokens(self, oauth_verifier): - """Returns a dict of authorized tokens after they go through the :class:`get_authentication_tokens` phase. + """Returns a dict of authorized tokens after they go through the + :class:`get_authentication_tokens` phase. - :param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN for non web apps) retrieved from the callback url querystring + :param oauth_verifier: (required) The oauth_verifier (or a.k.a PIN + for non web apps) retrieved from the callback url querystring :rtype: dict """ if self.oauth_version != 1: - raise TwythonError('This method can only be called when your OAuth version is 1.0.') + raise TwythonError('This method can only be called when your \ + OAuth version is 1.0.') - response = self.client.get(self.access_token_url, params={'oauth_verifier': oauth_verifier}, headers={'Content-Type': 'application/json'}) + response = self.client.get(self.access_token_url, + params={'oauth_verifier': oauth_verifier}, + headers={'Content-Type': 'application/\ + json'}) if response.status_code == 401: try: @@ -341,7 +371,8 @@ def get_authorized_tokens(self, oauth_verifier): except ValueError: content = {} - raise TwythonError(content.get('error', 'Invalid / expired Token'), error_code=response.status_code) + raise TwythonError(content.get('error', 'Invalid / expired To \ + ken'), error_code=response.status_code) authorized_tokens = dict(parse_qsl(response.content.decode('utf-8'))) if not authorized_tokens: @@ -350,12 +381,14 @@ def get_authorized_tokens(self, oauth_verifier): return authorized_tokens # pragma: no cover def obtain_access_token(self): - """Returns an OAuth 2 access token to make OAuth 2 authenticated read-only calls. + """Returns an OAuth 2 access token to make OAuth 2 authenticated + read-only calls. :rtype: string """ if self.oauth_version != 2: - raise TwythonError('This method can only be called when your OAuth version is 2.0.') + raise TwythonError('This method can only be called when your \ + OAuth version is 2.0.') data = {'grant_type': 'client_credentials'} basic_auth = HTTPBasicAuth(self.app_key, self.app_secret) @@ -377,8 +410,10 @@ def obtain_access_token(self): def construct_api_url(api_url, **params): """Construct a Twitter API url, encoded, with parameters - :param api_url: URL of the Twitter API endpoint you are attempting to construct - :param \*\*params: Parameters that are accepted by Twitter for the endpoint you're requesting + :param api_url: URL of the Twitter API endpoint you are attempting + to construct + :param \*\*params: Parameters that are accepted by Twitter for the + endpoint you're requesting :rtype: string Usage:: @@ -387,7 +422,8 @@ def construct_api_url(api_url, **params): >>> twitter = Twython() >>> api_url = 'https://api.twitter.com/1.1/search/tweets.json' - >>> constructed_url = twitter.construct_api_url(api_url, q='python', result_type='popular') + >>> constructed_url = twitter.construct_api_url(api_url, q='python', + result_type='popular') >>> print constructed_url https://api.twitter.com/1.1/search/tweets.json?q=python&result_type=popular @@ -403,7 +439,8 @@ def construct_api_url(api_url, **params): def search_gen(self, search_query, **params): # pragma: no cover warnings.warn( - 'This method is deprecated. You should use Twython.cursor instead. [eg. Twython.cursor(Twython.search, q=\'your_query\')]', + 'This method is deprecated. You should use Twython.cursor instead. \ + [eg. Twython.cursor(Twython.search, q=\'your_query\')]', TwythonDeprecationWarning, stacklevel=2 ) @@ -412,14 +449,17 @@ def search_gen(self, search_query, **params): # pragma: no cover def cursor(self, function, return_pages=False, **params): """Returns a generator for results that match a specified query. - :param function: Instance of a Twython function (Twython.get_home_timeline, Twython.search) - :param \*\*params: Extra parameters to send with your request (usually parameters accepted by the Twitter API endpoint) + :param function: Instance of a Twython function + (Twython.get_home_timeline, Twython.search) + :param \*\*params: Extra parameters to send with your request + (usually parameters accepted by the Twitter API endpoint) :rtype: generator Usage:: >>> from twython import Twython - >>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET) + >>> twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, + OAUTH_TOKEN_SECRET) >>> results = twitter.cursor(twitter.search, q='python') >>> for result in results: @@ -427,7 +467,8 @@ def cursor(self, function, return_pages=False, **params): """ if not hasattr(function, 'iter_mode'): - raise TwythonError('Unable to create generator for Twython method "%s"' % function.__name__) + raise TwythonError('Unable to create generator for Twython \ + method "%s"' % function.__name__) while True: content = function(**params) @@ -446,22 +487,26 @@ def cursor(self, function, return_pages=False, **params): for result in results: yield result - if function.iter_mode == 'cursor' and content['next_cursor_str'] == '0': + if function.iter_mode == 'cursor' and \ + content['next_cursor_str'] == '0': raise StopIteration try: if function.iter_mode == 'id': - if not 'max_id' in params: - # Add 1 to the id because since_id and max_id are inclusive + if 'max_id' not in params: + # Add 1 to the id because since_id and + # max_id are inclusive if hasattr(function, 'iter_metadata'): - since_id = content[function.iter_metadata].get('since_id_str') + since_id = content[function.iter_metadata]\ + .get('since_id_str') else: since_id = content[0]['id_str'] params['since_id'] = (int(since_id) - 1) elif function.iter_mode == 'cursor': params['cursor'] = content['next_cursor_str'] except (TypeError, ValueError): # pragma: no cover - raise TwythonError('Unable to generate next page of search results, `page` is not a number.') + raise TwythonError('Unable to generate next page of search \ + results, `page` is not a number.') @staticmethod def unicode2utf8(text): @@ -483,11 +528,14 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False): """Return HTML for a tweet (urls, mentions, hashtags replaced with links) :param tweet: Tweet object from received from Twitter API - :param use_display_url: Use display URL to represent link (ex. google.com, github.com). Default: True - :param use_expanded_url: Use expanded URL to represent link (e.g. http://google.com). Default False + :param use_display_url: Use display URL to represent link + (ex. google.com, github.com). Default: True + :param use_expanded_url: Use expanded URL to represent link + (e.g. http://google.com). Default False If use_expanded_url is True, it overrides use_display_url. - If use_display_url and use_expanded_url is False, short url will be used (t.co/xxxxx) + If use_display_url and use_expanded_url is False, short url will + be used (t.co/xxxxx) """ if 'retweeted_status' in tweet: @@ -502,7 +550,8 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False): start, end = entity['indices'][0], entity['indices'][1] mention_html = '@%(screen_name)s' - text = text.replace(tweet['text'][start:end], mention_html % {'screen_name': entity['screen_name']}) + text = text.replace(tweet['text'][start:end], + mention_html % {'screen_name': entity['screen_name']}) # Hashtags for entity in entities['hashtags']: @@ -514,7 +563,8 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False): # Urls for entity in entities['urls']: start, end = entity['indices'][0], entity['indices'][1] - if use_display_url and entity.get('display_url') and not use_expanded_url: + if use_display_url and entity.get('display_url') \ + and not use_expanded_url: shown_url = entity['display_url'] elif use_expanded_url and entity.get('expanded_url'): shown_url = entity['expanded_url'] @@ -522,6 +572,7 @@ def html_for_tweet(tweet, use_display_url=True, use_expanded_url=False): shown_url = entity['url'] url_html = '%s' - text = text.replace(tweet['text'][start:end], url_html % (entity['url'], shown_url)) + text = text.replace(tweet['text'][start:end], + url_html % (entity['url'], shown_url)) return text diff --git a/twython/streaming/api.py b/twython/streaming/api.py index c814acbf..47678e47 100644 --- a/twython/streaming/api.py +++ b/twython/streaming/api.py @@ -38,8 +38,11 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, retired :param retry_in: (optional) Amount of time (in secs) the previous API call should be tried again - :param client_args: (optional) Accepts some requests Session parameters and some requests Request parameters. - See http://docs.python-requests.org/en/latest/api/#sessionapi and requests section below it for details. + :param client_args: (optional) Accepts some requests Session + parameters and some requests Request parameters. + See + http://docs.python-requests.org/en/latest/api/#sessionapi + and requests section below it for details. [ex. headers, proxies, verify(SSL verification)] :param handlers: (optional) Array of message types for which corresponding handlers will be called @@ -53,11 +56,12 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, self.client_args = client_args or {} default_headers = {'User-Agent': 'Twython Streaming v' + __version__} - if not 'headers' in self.client_args: + if 'headers' not in self.client_args: # If they didn't set any headers, set our defaults for them self.client_args['headers'] = default_headers elif 'User-Agent' not in self.client_args['headers']: - # If they set headers, but didn't include User-Agent.. set it for them + # If they set headers, but didn't include User-Agent.. + # set it for them self.client_args['headers'].update(default_headers) self.client_args['timeout'] = timeout @@ -87,7 +91,8 @@ def __init__(self, app_key, app_secret, oauth_token, oauth_token_secret, self.connected = False - self.handlers = handlers if handlers else ['delete', 'limit', 'disconnect'] + self.handlers = handlers if handlers else \ + ['delete', 'limit', 'disconnect'] self.chunk_size = chunk_size @@ -103,7 +108,8 @@ def _request(self, url, method='GET', params=None): def _send(retry_counter): requests_args = {} for k, v in self.client_args.items(): - # Maybe this should be set as a class variable and only done once? + # Maybe this should be set as a class + # variable and only done once? if k in ('timeout', 'allow_redirects', 'verify'): requests_args[k] = v @@ -121,7 +127,8 @@ def _send(retry_counter): if response.status_code != 200: self.on_error(response.status_code, response.content) - if self.retry_count and (self.retry_count - retry_counter) > 0: + if self.retry_count and \ + (self.retry_count - retry_counter) > 0: time.sleep(self.retry_in) retry_counter += 1 _send(retry_counter) @@ -140,13 +147,19 @@ def _send(retry_counter): line = line.decode('utf-8') data = json.loads(line) except ValueError: # pragma: no cover - self.on_error(response.status_code, 'Unable to decode response, not valid JSON.') + self.on_error(response.status_code, + 'Unable to decode response, \ + not valid JSON.') else: if self.on_success(data): # pragma: no cover for message_type in self.handlers: if message_type in data: - handler = getattr(self, 'on_' + message_type, None) - if handler and callable(handler) and not handler(data.get(message_type)): + handler = getattr(self, + 'on_' + message_type, + None) + if handler \ + and callable(handler) \ + and not handler(data.get(message_type)): break response.close()