forked from snarfed/portablecontacts-unofficial
-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter.py
167 lines (132 loc) · 5.38 KB
/
twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/python
"""Twitter source class.
Uses the REST API: https://dev.twitter.com/docs/api
TODO: port to
http://code.google.com/p/oauth/source/browse/#svn%2Fcode%2Fpython . tweepy is
just a wrapper around that anyway.
snarfed_org user id: 139199211
Python code to pretty-print JSON responses from Twitter REST API:
pprint(json.loads(urllib.urlopen(
'https://api.twitter.com/1/users/lookup.json?screen_name=snarfed_org').read()))
pprint(json.loads(urllib.urlopen(
'https://api.twitter.com/1/followers/ids.json?screen_name=snarfed_org').read()))
"""
__author__ = ['Ryan Barrett <portablecontacts@ryanb.org>']
import cgi
import collections
import datetime
try:
import json
except ImportError:
import simplejson as json
import logging
import re
import urlparse
import appengine_config
import source
import tweepy
API_FRIENDS_URL = 'https://api.twitter.com/1/friends/ids.json?user_id=%d'
API_USERS_URL = 'https://api.twitter.com/1/users/lookup.json?user_id=%s'
API_ACCOUNT_URL = 'https://api.twitter.com/1/account/verify_credentials.json'
class Twitter(source.Source):
"""Implements the PortableContacts API for Twitter.
"""
DOMAIN = 'twitter.com'
ITEMS_PER_PAGE = 100
FRONT_PAGE_TEMPLATE = 'templates/twitter_index.html'
AUTH_URL = '/start_auth'
def get_contacts(self, user_id=None, startIndex=0, count=0):
"""Returns a (Python) list of PoCo contacts to be JSON-encoded.
OAuth credentials must be provided in access_token_key and
access_token_secret query parameters if the current user is protected, or to
receive any protected friends in the returned contacts.
Args:
user_id: integer or string. if provided, only this user will be returned.
startIndex: int >= 0
count: int >= 0
"""
if user_id is not None:
ids = [user_id]
else:
resp = self.urlfetch(API_FRIENDS_URL % self.get_current_user())
if count == 0:
end = self.ITEMS_PER_PAGE - startIndex
else:
end = startIndex + count
ids = json.loads(resp)['ids'][startIndex:end]
if not ids:
return []
ids_str = ','.join(str(id) for id in ids)
resp = self.urlfetch(API_USERS_URL % ids_str)
return [self.to_poco(user) for user in json.loads(resp)]
def get_current_user(self):
"""Returns the currently authenticated user's id.
"""
resp = self.urlfetch(API_ACCOUNT_URL)
return json.loads(resp)['id']
def urlfetch(self, url, **kwargs):
"""Wraps Source.urlfetch(), signing with OAuth if there's an access token.
TODO: unit test this
"""
request = self.handler.request
access_token_key = request.get('access_token_key')
access_token_secret = request.get('access_token_secret')
if access_token_key and access_token_secret:
logging.info('Found access token key %s and secret %s',
access_token_key, access_token_secret)
auth = tweepy.OAuthHandler(appengine_config.TWITTER_APP_KEY,
appengine_config.TWITTER_APP_SECRET)
auth.set_access_token(access_token_key, access_token_secret)
method = kwargs.get('method', 'GET')
headers = kwargs.setdefault('headers', {})
parsed = urlparse.urlparse(url)
url_without_query = urlparse.urlunparse(list(parsed[0:4]) + ['', ''])
auth.apply_auth(url_without_query, method, headers,
# TODO: switch to urlparse.parse_qsl after python27 runtime
dict(cgi.parse_qsl(parsed.query)))
logging.info('Populated Authorization header from access token: %s',
headers.get('Authorization'))
return super(Twitter, self).urlfetch(url, **kwargs)
def to_poco(self, tw):
"""Converts a Twitter user to a PoCo contact.
Args:
tw: dict, a decoded JSON Twitter user
"""
pc = collections.defaultdict(dict)
pc['accounts'] = [{'domain': self.DOMAIN}]
# tw should always have 'id' (it's an int)
if 'id' in tw:
pc['id'] = str(tw['id'])
pc['accounts'][0]['userid'] = str(tw['id'])
if 'screen_name' in tw:
pc['accounts'][0]['username'] = tw['screen_name']
# tw should always have 'name'
if 'name' in tw:
pc['displayName'] = tw['name']
pc['name']['formatted'] = tw['name']
if 'created_at' in tw:
# created_at is formatted like 'Sun, 01 Jan 11:44:57 +0000 2012'.
# remove the time zone, then parse the string, then reformat as ISO 8601.
created_at = re.sub(' [+-][0-9]{4} ', ' ', tw['created_at'])
created_at = datetime.datetime.strptime(created_at, '%a %b %d %H:%M:%S %Y')
pc['published'] = created_at.isoformat()
if 'profile_image_url' in tw:
pc['photos'] = [{'value': tw['profile_image_url'], 'primary': 'true'}]
if 'url' in tw:
pc['urls'] = [{'value': tw['url'], 'type': 'home'}]
if 'location' in tw:
pc['addresses'] = [{
'formatted': tw['location'],
'type': 'home',
}]
utc_offset = tw.get('utc_offset')
if utc_offset is not None:
# twitter's utc_offset field is seconds, oddly, not hours.
pc['utcOffset'] = '%+03d:00' % (tw['utc_offset'] / 60 / 60)
# also note that twitter's time_zone field provides the user's
# human-readable time zone, e.g. 'Pacific Time (US & Canada)'. i'd need to
# include tzdb to parse that, though, and i don't need to since utc_offset
# works fine.
if 'description' in tw:
pc['note'] = tw['description']
return dict(pc)