Skip to content
Permalink
Browse files

Added tweet importer

  • Loading branch information
ericflo committed Jun 16, 2009
1 parent e550632 commit df526835eef42aceb09c2bca2c56e04c76186f71
Showing with 155 additions and 97 deletions.
  1. +1 −0 __init__.py
  2. +3 −50 get_twitter_data.py
  3. +98 −47 insert_twitter_data.py
  4. +53 −0 utils.py
@@ -0,0 +1 @@

@@ -1,59 +1,11 @@
import sys

from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol

from cassandra import Cassandra
from cassandra.ttypes import NotFoundException

def with_thrift(host='localhost', port=9160):
# TODO: Connection pooling, reuse, etc.
def _inner(func):
def __inner(*args, **kwargs):
socket = TSocket.TSocket(host, port)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Cassandra.Client(protocol)
val = None
try:
transport.open()
val = func(client, *args, **kwargs)
finally:
transport.close()
return val
return __inner
return _inner

@with_thrift()
def get_id(client, username):
try:
col = client.get_column('TwitterClone', username, 'usernames:id')
except NotFoundException:
return None
return col.value

@with_thrift()
def get_friends(client, user_id):
try:
cols = client.get_slice_super('TwitterClone', str(user_id),
'user_edges:friends', -1, -1)[0].columns
except (NotFoundException, IndexError):
return None
return [int(c.value) for c in cols]

@with_thrift()
def get_followers(client, user_id):
try:
cols = client.get_slice_super('TwitterClone', str(user_id),
'user_edges:followers', -1, -1)[0].columns
except (NotFoundException, IndexError):
return None
return [int(c.value) for c in cols]
from utils import with_thrift, get_user_id, get_friends, get_followers

def main():
username = sys.argv[1]

user_id = get_id(username)
user_id = get_user_id(username)
if user_id is None:
print 'Unable to find data for user %s' % (username,)
return None
@@ -67,6 +19,7 @@ def main():
print "FOLLOWERS:"
print followers
print '--------------------'
print '%s friends and %s followers' % (len(friends), len(followers))

if __name__ == '__main__':
main()
@@ -5,33 +5,13 @@
import json
import time

from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from utils import with_thrift, get_friends, get_followers

from cassandra import Cassandra
from cassandra.ttypes import batch_mutation_t, column_t, superColumn_t

USERS = ['ericflo', 'eston', 'kneath', 'mihasya', 'mmalone', 'rcrowley',
USERS = ['ericflo', 'eston', 'kneath', 'mihasya', 'mjmalone', 'rcrowley',
'thauber']

def with_thrift(host='localhost', port=9160):
# TODO: Connection pooling, reuse, etc.
def _inner(func):
def __inner(*args, **kwargs):
socket = TSocket.TSocket(host, port)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Cassandra.Client(protocol)
val = None
try:
transport.open()
val = func(client, *args, **kwargs)
finally:
transport.close()
return val
return __inner
return _inner

@with_thrift()
def import_users(client, usernames):
UNWANTED_KEYS = ('status', 'favourites_count', 'followers_count',
@@ -56,25 +36,24 @@ def import_users(client, usernames):
users.append(data)

users.sort(key=lambda u: u['created_at'])

tt = time.time()


for user in users:
columns = []
user_id = str(user['id'])
for key, value in user.items():
columns.append(column_t(
columnName=key,
value=str(value),
timestamp=tt
value=unicode(value).encode('utf-8'),
timestamp=user['created_at_in_seconds']
))
client.batch_insert(batch_mutation_t(
table='TwitterClone',
key=user_id,
cfmap={'users': columns},
), True)
if columns:
client.batch_insert(batch_mutation_t(
table='TwitterClone',
key=user_id,
cfmap={'users': columns},
), True)
client.insert('TwitterClone', user['screen_name'], 'usernames:id',
user_id, tt, True)
user_id, user['created_at_in_seconds'], True)

@with_thrift()
def import_social_graph(client, usernames):
@@ -97,10 +76,11 @@ def import_social_graph(client, usernames):
value=str(follower),
timestamp=tt
))
supercolumns.append(superColumn_t(
name='followers',
columns=follower_columns
))
if follower_columns:
supercolumns.append(superColumn_t(
name='followers',
columns=follower_columns
))

friends_url = 'http://twitter.com/friends/ids/%s.json' % (username,)
friends = json.loads(urllib2.urlopen(friends_url).read())
@@ -112,21 +92,92 @@ def import_social_graph(client, usernames):
value=str(friend),
timestamp=tt
))
supercolumns.append(superColumn_t(
name='friends',
columns=friend_columns
))

client.batch_insert_superColumn(batch_mutation_t(
table='TwitterClone',
key=str(data['id']),
cfmap={'user_edges': supercolumns}
), True)
if friend_columns:
supercolumns.append(superColumn_t(
name='friends',
columns=friend_columns
))

if supercolumns:
client.batch_insert_superColumn(batch_mutation_t(
table='TwitterClone',
key=str(data['id']),
cfmap={'user_edges': supercolumns}
), True)

@with_thrift()
def import_tweets(client, usernames):
for username in usernames:
print 'Importing tweets for %s' % (username,)
url = 'http://twitter.com/statuses/user_timeline/%s.json' % (username,)

tweets = json.loads(urllib2.urlopen(url).read())
supercolumns = []
user_tweet_columns = []
for tweet in tweets:
user = tweet.pop('user', None)
user_id = str(user['id'])

tweet['user_id'] = user['id']

parsed_date = rfc822.parsedate(tweet['created_at'])
created_at_in_seconds = calendar.timegm(parsed_date)
tweet['created_at'] = datetime.datetime.fromtimestamp(
created_at_in_seconds)
tweet['created_at_in_seconds'] = created_at_in_seconds

tweet_id = str(tweet['id'])
columns = []
for key, value in tweet.items():
columns.append(column_t(
columnName=key,
value=unicode(value).encode('utf-8'),
timestamp=created_at_in_seconds
))

if columns:
client.batch_insert(batch_mutation_t(
table='TwitterClone',
key=tweet_id,
cfmap={'tweets': columns},
), True)

user_tweet_columns.append(column_t(
columnName=tweet_id,
value=tweet_id,
timestamp=created_at_in_seconds
))

friend_tweet_columns = []
for follower in get_followers(user['id']):
friend_tweet_columns.append(column_t(
columnName=tweet_id,
value=tweet_id,
timestamp=created_at_in_seconds
))
if friend_tweet_columns:
supercolumns.append(superColumn_t(
name='friend_tweets',
columns=friend_tweet_columns
))

if user_tweet_columns:
supercolumns.append(superColumn_t(
name='user_tweets',
columns=user_tweet_columns
))

if supercolumns:
client.batch_insert_superColumn(batch_mutation_t(
table='TwitterClone',
key=user_id,
cfmap={'tweet_edges': supercolumns}
), True)

def main():
import_users(USERS)
import_social_graph(USERS)
import_tweets(USERS)

if __name__ == '__main__':
main()
@@ -0,0 +1,53 @@
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol

from cassandra import Cassandra
from cassandra.ttypes import NotFoundException

def with_thrift(host='localhost', port=9160):
# TODO: Connection pooling, reuse, etc.
def _inner(func):
def __inner(*args, **kwargs):
socket = TSocket.TSocket(host, port)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Cassandra.Client(protocol)
val = None
try:
transport.open()
val = func(client, *args, **kwargs)
finally:
transport.close()
return val
return __inner
return _inner

@with_thrift()
def get_user_id(client, username):
try:
col = client.get_column('TwitterClone', username, 'usernames:id')
except NotFoundException:
return None
return col.value

@with_thrift()
def get_friends(client, user_id):
try:
cols = client.get_slice_super('TwitterClone', str(user_id),
'user_edges:friends', -1, -1)[0].columns
except (NotFoundException, IndexError):
return None
return [int(c.value) for c in cols]

@with_thrift()
def get_followers(client, user_id):
try:
cols = client.get_slice_super('TwitterClone', str(user_id),
'user_edges:followers', -1, -1)[0].columns
except (NotFoundException, IndexError):
return None
return [int(c.value) for c in cols]

@with_thrift()
def get_tweets(user_id):
pass

0 comments on commit df52683

Please sign in to comment.
You can’t perform that action at this time.