Skip to content
This repository has been archived by the owner on Aug 4, 2020. It is now read-only.

Commit

Permalink
Switch to native protocol driver, cql3
Browse files Browse the repository at this point in the history
  • Loading branch information
thobbs committed Feb 27, 2014
1 parent 50c5b7c commit 678abbe
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 185 deletions.
250 changes: 143 additions & 107 deletions cass.py
@@ -1,23 +1,16 @@
import time from datetime import datetime
from uuid import uuid1, UUID


from pycassa.pool import ConnectionPool from cassandra.cluster import Cluster
from pycassa.columnfamily import ColumnFamily
from pycassa.cassandra.ttypes import NotFoundException


__all__ = ['get_user_by_username', 'get_friend_usernames', __all__ = ['get_user_by_username', 'get_friend_usernames',
'get_follower_usernames', 'get_users_for_usernames', 'get_friends', 'get_follower_usernames', 'get_users_for_usernames', 'get_friends',
'get_followers', 'get_timeline', 'get_userline', 'get_tweet', 'save_user', 'get_followers', 'get_timeline', 'get_userline', 'get_tweet', 'save_user',
'save_tweet', 'add_friends', 'remove_friends', 'DatabaseError', 'save_tweet', 'add_friends', 'remove_friends', 'DatabaseError',
'NotFound', 'InvalidDictionary', 'PUBLIC_USERLINE_KEY'] 'NotFound', 'InvalidDictionary', 'PUBLIC_USERLINE_KEY']


POOL = ConnectionPool('Twissandra') CLUSTER = Cluster(['127.0.0.1'])

SESSION = CLUSTER.connect('twissandra')
USER = ColumnFamily(POOL, 'User')
FRIENDS = ColumnFamily(POOL, 'Friends')
FOLLOWERS = ColumnFamily(POOL, 'Followers')
TWEET = ColumnFamily(POOL, 'Tweet')
TIMELINE = ColumnFamily(POOL, 'Timeline')
USERLINE = ColumnFamily(POOL, 'Userline')


# NOTE: Having a single userline key to store all of the public tweets is not # NOTE: Having a single userline key to store all of the public tweets is not
# scalable. Currently, Cassandra requires that an entire row (meaning # scalable. Currently, Cassandra requires that an entire row (meaning
Expand Down Expand Up @@ -48,64 +41,45 @@ class NotFound(DatabaseError):
class InvalidDictionary(DatabaseError): class InvalidDictionary(DatabaseError):
pass pass


def _get_friend_or_follower_usernames(cf, username, count):
"""
Gets the social graph (friends or followers) for a username.
"""
try:
friends = cf.get(str(username), column_count=count)
except NotFoundException:
return []
return friends.keys()


def _get_line(cf, username, start, limit): def _get_line(table, username, start, limit):
""" """
Gets a timeline or a userline given a username, a start, and a limit. Gets a timeline or a userline given a username, a start, and a limit.
""" """
# First we need to get the raw timeline (in the form of tweet ids) # First we need to get the raw timeline (in the form of tweet ids)
query = "SELECT time, tweet_id FROM {table} WHERE username=%s {time_clause} LIMIT %s"


# We get one more tweet than asked for, and if we exceed the limit by doing if not start:
# so, that tweet's key (timestamp) is returned as the 'next' key for time_clause = ''
# pagination. params = (username, limit)
start = long(start) if start else '' else:
next = None time_clause = 'AND time < %s'
try: params = (username, UUID(start), limit)
timeline = cf.get(str(username), column_start=start,
column_count=limit + 1, column_reversed=True)
except NotFoundException:
return [], next

if len(timeline) > limit:
# Find the minimum timestamp from our get (the oldest one), and convert
# it to a non-floating value.
oldest_timestamp = min(timeline.keys())


# Present the string version of the oldest_timestamp for the UI... query = query.format(table=table, time_clause=time_clause)
next = str(oldest_timestamp)


# And then convert the pylong back to a bitpacked key so we can delete results = SESSION.execute(query, params)
# if from timeline. if not results:
del timeline[oldest_timestamp] return [], None


# Now we do a multiget to get the tweets themselves if len(results) == limit:
tweet_ids = timeline.values() # Find the minimum timestamp from our get (the oldest one), and convert
tweets = TWEET.multiget(tweet_ids) # it to a non-floating value.
oldest_timeuuid = min(row.time for row in results)


# We want to get the information about the user who made the tweet # Present the string version of the oldest_timeuuid for the UI...
# First, pull out the list of unique users for our tweets next_timeuuid = oldest_timeuuid.urn[len('urn:uuid:'):]
usernames = list(set([tweet['username'] for tweet in tweets.values()])) else:
users = USER.multiget(usernames) next_timeuuid = None


# Then, create a list of tweets with the user record and id # Now we fetch the tweets themselves
# attached, and the body decoded properly. futures = []
result_tweets = list() for row in results:
for tweet_id, tweet in tweets.iteritems(): futures.append(SESSION.execute_async(
tweet['user'] = users.get(tweet['username']) "SELECT * FROM tweets WHERE tweet_id=%s", (row.tweet_id, )))
tweet['body'] = tweet['body'].decode('utf-8')
tweet['id'] = tweet_id
result_tweets.append(tweet)


return (result_tweets, next) tweets = [f.result()[0] for f in futures]
return (tweets, next_timeuuid)




# QUERYING APIs # QUERYING APIs
Expand All @@ -114,35 +88,53 @@ def get_user_by_username(username):
""" """
Given a username, this gets the user record. Given a username, this gets the user record.
""" """
try: rows = SESSION.execute("SELECT * FROM users WHERE username=%s", (username, ))
user = USER.get(str(username)) if not rows:
except NotFoundException:
raise NotFound('User %s not found' % (username,)) raise NotFound('User %s not found' % (username,))
return user else:
return rows[0]



def get_friend_usernames(username, count=5000): def get_friend_usernames(username, count=5000):
""" """
Given a username, gets the usernames of the people that the user is Given a username, gets the usernames of the people that the user is
following. following.
""" """
return _get_friend_or_follower_usernames(FRIENDS, username, count) rows = SESSION.execute(
"SELECT friend FROM friends WHERE username=%s LIMIT %s",
(username, count))
return [row.friend for row in rows]



def get_follower_usernames(username, count=5000): def get_follower_usernames(username, count=5000):
""" """
Given a username, gets the usernames of the people following that user. Given a username, gets the usernames of the people following that user.
""" """
return _get_friend_or_follower_usernames(FOLLOWERS, username, count) rows = SESSION.execute(
"SELECT follower FROM followers WHERE username=%s LIMIT %s",
(username, count))
return [row['follower'] for row in rows]



def get_users_for_usernames(usernames): def get_users_for_usernames(usernames):
""" """
Given a list of usernames, this gets the associated user object for each Given a list of usernames, this gets the associated user object for each
one. one.
""" """
try: futures = []
users = USER.multiget(map(str, usernames)) for user in usernames:
except NotFoundException: future = SESSION.execute_async("SELECT * FROM users WHERE username=%s", (user, ))
raise NotFound('Users %s not found' % (usernames,)) futures.append(future)
return users.values()
users = []
for user, future in zip(usernames, futures):
results = future.result()
if not results:
raise NotFound('User %s not found' % (user,))
users.append(results[0])

return users



def get_friends(username, count=5000): def get_friends(username, count=5000):
""" """
Expand All @@ -151,92 +143,136 @@ def get_friends(username, count=5000):
friend_usernames = get_friend_usernames(username, count=count) friend_usernames = get_friend_usernames(username, count=count)
return get_users_for_usernames(friend_usernames) return get_users_for_usernames(friend_usernames)



def get_followers(username, count=5000): def get_followers(username, count=5000):
""" """
Given a username, gets the people following that user. Given a username, gets the people following that user.
""" """
follower_usernames = get_follower_usernames(username, count=count) follower_usernames = get_follower_usernames(username, count=count)
return get_users_for_usernames(follower_usernames) return get_users_for_usernames(follower_usernames)



def get_timeline(username, start=None, limit=40): def get_timeline(username, start=None, limit=40):
""" """
Given a username, get their tweet timeline (tweets from people they follow). Given a username, get their tweet timeline (tweets from people they follow).
""" """
return _get_line(TIMELINE, username, start, limit) return _get_line("timeline", username, start, limit)



def get_userline(username, start=None, limit=40): def get_userline(username, start=None, limit=40):
""" """
Given a username, get their userline (their tweets). Given a username, get their userline (their tweets).
""" """
return _get_line(USERLINE, username, start, limit) return _get_line("userline", username, start, limit)



def get_tweet(tweet_id): def get_tweet(tweet_id):
""" """
Given a tweet id, this gets the entire tweet record. Given a tweet id, this gets the entire tweet record.
""" """
try: results = SESSION.execute("SELECT * FROM tweets WHERE tweet_id=%s", (tweet_id, ))
tweet = TWEET.get(str(tweet_id)) if not results:
except NotFoundException:
raise NotFound('Tweet %s not found' % (tweet_id,)) raise NotFound('Tweet %s not found' % (tweet_id,))
tweet['body'] = tweet['body'].decode('utf-8') else:
return tweet return results[0]



def get_tweets_for_tweet_ids(tweet_ids): def get_tweets_for_tweet_ids(tweet_ids):
""" """
Given a list of tweet ids, this gets the associated tweet object for each Given a list of tweet ids, this gets the associated tweet object for each
one. one.
""" """
try: futures = []
tweets = TWEET.multiget(map(str, tweet_ids)) for tweet_id in tweet_ids:
except NotFoundException: futures.append(SESSION.execute_async(
raise NotFound('Tweets %s not found' % (tweet_ids,)) "SELECT * FROM tweets WHERE tweet_id=%s", (tweet_id, )))
return tweets.values()
tweets = []
for tweet_id, future in zip(tweet_id, futures):
result = future.result()
if not result:
raise NotFound('Tweet %s not found' % (tweet_id,))
else:
tweets.append(result[0])

return tweets




# INSERTING APIs # INSERTING APIs


def save_user(username, user): def save_user(username, password):
""" """
Saves the user record. Saves the user record.
""" """
USER.insert(str(username), user) SESSION.execute(
"INSERT INTO users (username, password) VALUES (%s, %s)",
(username, password))



def save_tweet(tweet_id, username, tweet, timestamp=None): def save_tweet(tweet_id, username, tweet, timestamp=None):
""" """
Saves the tweet record. Saves the tweet record.
""" """
# Generate a timestamp for the USER/TIMELINE # TODO don't ignore timestamp
if not timestamp: now = uuid1()
ts = long(time.time() * 1e6)
else:
ts = timestamp

# Make sure the tweet body is utf-8 encoded
tweet['body'] = tweet['body'].encode('utf-8')


# Insert the tweet, then into the user's timeline, then into the public one # Insert the tweet, then into the user's timeline, then into the public one
TWEET.insert(str(tweet_id), tweet) SESSION.execute(
USERLINE.insert(str(username), {ts: str(tweet_id)}) "INSERT INTO tweets (tweet_id, username, body) VALUES (%s, %s, %s)",
USERLINE.insert(PUBLIC_USERLINE_KEY, {ts: str(tweet_id)}) (tweet_id, username, tweet))

SESSION.execute(
"INSERT INTO userline (username, time, tweet_id) VALUES (%s, %s, %s)",
(username, now, tweet_id))

SESSION.execute(
"INSERT INTO userline (username, time, tweet_id) VALUES (%s, %s, %s)",
(PUBLIC_USERLINE_KEY, now, tweet_id))

# Get the user's followers, and insert the tweet into all of their streams # Get the user's followers, and insert the tweet into all of their streams
futures = []
follower_usernames = [username] + get_follower_usernames(username) follower_usernames = [username] + get_follower_usernames(username)
for follower_username in follower_usernames: for follower_username in follower_usernames:
TIMELINE.insert(str(follower_username), {ts: str(tweet_id)}) futures.append(SESSION.execute_async(
"INSERT INTO timeline (username, time, tweet_id) VALUES (%s, %s, %s)",
(follower_username, now, tweet_id)))

for future in futures:
future.result()



def add_friends(from_username, to_usernames): def add_friends(from_username, to_usernames):
""" """
Adds a friendship relationship from one user to some others. Adds a friendship relationship from one user to some others.
""" """
ts = str(int(time.time() * 1e6)) now = datetime.utcnow()
dct = dict((str(username), ts) for username in to_usernames) futures = []
FRIENDS.insert(str(from_username), dct) for to_user in to_usernames:
for to_username in to_usernames: futures.append(SESSION.execute_async(
FOLLOWERS.insert(str(to_username), {str(from_username): ts}) "INSERT INTO friends (username, friend, since) VALUES (%s, %s, %s)",
(from_username, to_user, now)))

futures.append(SESSION.execute_async(
"INSERT INTO followers (username, follower, since) VALUES (%s, %s, %s)",
(to_user, from_username, now)))

for future in futures:
future.result()



def remove_friends(from_username, to_usernames): def remove_friends(from_username, to_usernames):
""" """
Removes a friendship relationship from one user to some others. Removes a friendship relationship from one user to some others.
""" """
FRIENDS.remove(str(from_username), columns=map(str, to_usernames)) futures = []
for to_username in to_usernames: for to_user in to_usernames:
FOLLOWERS.remove(str(to_username), columns=[str(from_username)]) futures.append(SESSION.execute_async(
"DELETE FROM friends WHERE username=%s AND friend=%s",
(from_username, to_user)))

futures.append(SESSION.execute_async(
"DELETE FROM followers WHERE username=%s AND follower=%s",
(to_user, from_username)))

for future in futures:
future.result()
5 changes: 2 additions & 3 deletions requirements.txt
@@ -1,5 +1,4 @@
Django Django==1.5.5
simplejson simplejson
thrift cassandra-driver
pycassa
loremipsum loremipsum
4 changes: 2 additions & 2 deletions templates/tweets/userline.html
Expand Up @@ -23,8 +23,8 @@ <h2 class="grid_4 suffix_5">{{ username }}&rsquo;s Timeline</h2>
{% if request.user.is_authenticated %} {% if request.user.is_authenticated %}
{% ifnotequal request.user.id user.id %} {% ifnotequal request.user.id user.id %}
<form method="POST" action="{% url 'modify_friend' %}?next={{ request.path }}"> <form method="POST" action="{% url 'modify_friend' %}?next={{ request.path }}">
<input type="hidden" name="{% if 'user.friend' %}remove{% else %}add{% endif %}-friend" value="{{ user.id }}" /> <input type="hidden" name="{% if is_friend %}remove{% else %}add{% endif %}-friend" value="{{ user.id }}" />
<input type="submit" value="{% if 'user.friend' %}Remove{% else %}Add{% endif %} Friend" /> <input type="submit" value="{% if is_friend %}Remove{% else %}Add{% endif %} Friend" />
</form> </form>
{% endifnotequal %} {% endifnotequal %}
{% else %} {% else %}
Expand Down

1 comment on commit 678abbe

@matthewrudy
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, this is much clearer,
thanks.

Please sign in to comment.