forked from henriklied/django-twitter-oauth
/
utils.py
74 lines (60 loc) · 3.1 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import oauth
from django.conf import settings
signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
SERVER = getattr(settings, 'OAUTH_SERVER', 'twitter.com')
REQUEST_TOKEN_URL = getattr(settings, 'OAUTH_REQUEST_TOKEN_URL', 'https://%s/oauth/request_token' % SERVER)
ACCESS_TOKEN_URL = getattr(settings, 'OAUTH_ACCESS_TOKEN_URL', 'https://%s/oauth/access_token' % SERVER)
AUTHORIZATION_URL = getattr(settings, 'OAUTH_AUTHORIZATION_URL', 'http://%s/oauth/authorize' % SERVER)
CONSUMER_KEY = getattr(settings, 'CONSUMER_KEY', 'YOUR_KEY')
CONSUMER_SECRET = getattr(settings, 'CONSUMER_SECRET', 'YOUR_SECRET')
# We use this URL to check if Twitters oAuth worked
TWITTER_CHECK_AUTH = 'https://twitter.com/account/verify_credentials.json'
TWITTER_FRIENDS = 'https://twitter.com/statuses/friends.json'
def request_oauth_resource(consumer, url, access_token, parameters=None, signature_method=signature_method):
"""
usage: request_oauth_resource( consumer, '/url/', your_access_token, parameters=dict() )
Returns a OAuthRequest object
"""
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
consumer, token=access_token, http_url=url, parameters=parameters,
)
oauth_request.sign_request(signature_method, consumer, access_token)
return oauth_request
def fetch_response(oauth_request, connection):
url = oauth_request.to_url()
connection.request(oauth_request.http_method, url)
response = connection.getresponse()
s = response.read()
return s
def get_unauthorised_request_token(consumer, connection, signature_method=signature_method):
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
consumer, http_url=REQUEST_TOKEN_URL
)
oauth_request.sign_request(signature_method, consumer, None)
resp = fetch_response(oauth_request, connection)
token = oauth.OAuthToken.from_string(resp)
return token
def get_authorisation_url(consumer, token, signature_method=signature_method):
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
consumer, token=token, http_url=AUTHORIZATION_URL
)
oauth_request.sign_request(signature_method, consumer, token)
return oauth_request.to_url()
def exchange_request_token_for_access_token(consumer, connection, request_token, signature_method=signature_method):
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
consumer, token=request_token, http_url=ACCESS_TOKEN_URL
)
oauth_request.sign_request(signature_method, consumer, request_token)
resp = fetch_response(oauth_request, connection)
return oauth.OAuthToken.from_string(resp)
def is_authenticated(consumer, connection, access_token):
oauth_request = request_oauth_resource(consumer, TWITTER_CHECK_AUTH, access_token)
json = fetch_response(oauth_request, connection)
if 'screen_name' in json:
return json
return False
def get_friends(consumer, connection, access_token, page=0):
"""Get friends on Twitter"""
oauth_request = request_oauth_resource(consumer, TWITTER_FRIENDS, access_token, {'page': page})
json = fetch_response(oauth_request, connection)
return json