forked from snarfed/portablecontacts-unofficial
-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_demo.py
99 lines (76 loc) · 3.07 KB
/
twitter_demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/python
"""Twitter front page and OAuth demo handlers.
Mostly copied from https://github.com/wasauce/tweepy-examples .
"""
__author__ = ['Ryan Barrett <portablecontacts@ryanb.org>']
import logging
import urllib
import webapp2
from webob import exc
import appengine_config
import tweepy
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext import db
OAUTH_CALLBACK = 'http://%s/oauth_callback' % appengine_config.HOST
class OAuthToken(db.Model):
"""Datastore model class for an OAuth token.
"""
token_key = db.StringProperty(required=True)
token_secret = db.StringProperty(required=True)
class StartAuthHandler(webapp2.RequestHandler):
"""Starts three-legged OAuth with Twitter.
Fetches an OAuth request token, then redirects to Twitter's auth page to
request an access token.
"""
def get(self):
try:
auth = tweepy.OAuthHandler(appengine_config.TWITTER_APP_KEY,
appengine_config.TWITTER_APP_SECRET,
OAUTH_CALLBACK)
auth_url = auth.get_authorization_url()
except tweepy.TweepError, e:
msg = 'Could not create Twitter OAuth request token: '
logging.exception(msg)
raise exc.HTTPInternalServerError(msg + `e`)
# store the request token for later use in the callback handler
OAuthToken(token_key = auth.request_token.key,
token_secret = auth.request_token.secret,
).put()
logging.info('Generated request token, redirecting to Twitter: %s', auth_url)
self.redirect(auth_url)
class CallbackHandler(webapp2.RequestHandler):
"""The OAuth callback. Fetches an access token and redirects to the front page.
"""
def get(self):
oauth_token = self.request.get('oauth_token', None)
oauth_verifier = self.request.get('oauth_verifier', None)
if oauth_token is None:
raise exc.HTTPBadRequest('Missing required query parameter oauth_token.')
# Lookup the request token
request_token = OAuthToken.gql('WHERE token_key=:key', key=oauth_token).get()
if request_token is None:
raise exc.HTTPBadRequest('Invalid oauth_token: %s' % oauth_token)
# Rebuild the auth handler
auth = tweepy.OAuthHandler(appengine_config.TWITTER_APP_KEY,
appengine_config.TWITTER_APP_SECRET)
auth.set_request_token(request_token.token_key, request_token.token_secret)
# Fetch the access token
try:
access_token = auth.get_access_token(oauth_verifier)
except tweepy.TweepError, e:
msg = 'Twitter OAuth error, could not get access token: '
logging.exception(msg)
raise exc.HTTPInternalServerError(msg + `e`)
params = {'access_token_key': access_token.key,
'access_token_secret': access_token.secret,
}
self.redirect('/?%s' % urllib.urlencode(params))
def main():
application = webapp2.WSGIApplication(
[('/start_auth', StartAuthHandler),
('/oauth_callback', CallbackHandler),
],
debug=appengine_config.DEBUG)
run_wsgi_app(application)
if __name__ == '__main__':
main()