Permalink
Browse files

link votes: Use a queue to update links by domain queries

The queue can be sharded by domain to minimize lock contention
and the consumer will batch updates to the same links (e.g. several
votes for the same link) and to the same domain (e.g. votes for different
links to a single domain).
  • Loading branch information...
1 parent 5ebee09 commit 7f3113f476ce86fbe7bfdaeccd97a8c5ade24afc @bsimpson63 bsimpson63 committed Oct 17, 2016
Showing with 88 additions and 24 deletions.
  1. +1 −0 install/reddit.sh
  2. +2 −0 r2/example.ini
  3. +8 −0 r2/r2/config/queues.py
  4. +1 −0 r2/r2/lib/app_globals.py
  5. +62 −24 r2/r2/lib/voting.py
  6. +14 −0 upstart/reddit-consumer-domain_query_q.conf
View
@@ -592,6 +592,7 @@ set_consumer_count automoderator_q 0
set_consumer_count butler_q 1
set_consumer_count author_query_q 1
set_consumer_count subreddit_query_q 1
+set_consumer_count domain_query_q 1
chown -R $REDDIT_USER:$REDDIT_GROUP $CONSUMER_CONFIG_ROOT/
View
@@ -758,6 +758,8 @@ shard_commentstree_queues = false
shard_author_query_queues = false
# should we split links by subreddit query processing into shards by sr id?
shard_subreddit_query_queues = false
+# should we split links by domain query processing into shards by domain?
+shard_domain_query_queues = false
# chance of a write to the query cache triggering pruning. increasing this will
# potentially slow down writes, but will keep the size of cached queries in check better
querycache_prune_chance = 0.05
@@ -90,6 +90,7 @@ def declare_queues(g):
"modmail_email_q": MessageQueue(bind_to_self=True),
"author_query_q": MessageQueue(bind_to_self=True),
"subreddit_query_q": MessageQueue(bind_to_self=True),
+ "domain_query_q": MessageQueue(bind_to_self=True),
})
if g.shard_commentstree_queues:
@@ -112,6 +113,13 @@ def declare_queues(g):
}
queues.declare(sharded_subreddit_query_queues)
+ if g.shard_domain_query_queues:
+ sharded_domain_query_queues = {
+ "domain_query_%d_q" % i: MessageQueue(bind_to_self=True)
+ for i in xrange(10)
+ }
+ queues.declare(sharded_domain_query_queues)
+
queues.cloudsearch_changes << "search_changes"
queues.scraper_q << ("new_link", "link_text_edited")
queues.newcomments_q << "new_comment"
@@ -259,6 +259,7 @@ class Globals(object):
'shard_commentstree_queues',
'shard_author_query_queues',
'shard_subreddit_query_queues',
+ 'shard_domain_query_queues',
'authnet_validate',
'ENFORCE_RATELIMIT',
'RL_SITEWIDE_ENABLED',
View
@@ -22,7 +22,6 @@
from collections import defaultdict
from datetime import datetime
-from itertools import product
import json
from pylons import tmpl_context as c, app_globals as g, request
@@ -108,26 +107,6 @@ def update_user_liked(vote):
m.insert(get_disliked(vote.user), [vote])
-# these sorts can be changed by voting - we don't need to do "new" since that's
-# taken care of by new_link and doesn't change afterwards
-SORTS = ["hot", "top", "controversial"]
-
-
-def update_domain_queries(link):
- from r2.lib.db.queries import add_queries, get_domain_links
-
- parsed = UrlParser(link.url)
- if not is_subdomain(parsed.hostname, 'imgur.com'):
- domains = parsed.domain_permutations()
- add_queries(
- queries=[
- get_domain_links(domain, sort, "all")
- for domain, sort in product(domains, SORTS)
- ],
- insert_items=link,
- )
-
-
def consume_link_vote_queue(qname="vote_link_q"):
@g.stats.amqp_processor(qname)
def process_message(msg):
@@ -178,16 +157,19 @@ def process_message(msg):
if vote_valid and link_valid:
add_to_author_query_q(link)
add_to_subreddit_query_q(link)
-
- update_domain_queries(link)
- timer.intermediate("domain_queries")
+ add_to_domain_query_q(link)
timer.stop()
timer.flush()
amqp.consume_items(qname, process_message, verbose=False)
+# these sorts can be changed by voting - we don't need to do "new" since that's
+# taken care of by new_link and doesn't change afterwards
+SORTS = ["hot", "top", "controversial"]
+
+
def add_to_author_query_q(link):
if g.shard_author_query_queues:
author_shard = link.author_id % 10
@@ -279,6 +261,62 @@ def process_message(msgs, chan):
amqp.handle_items(qname, process_message, limit=limit)
+def add_to_domain_query_q(link):
+ parsed = UrlParser(link.url)
+ if is_subdomain(parsed.hostname, 'imgur.com'):
+ # don't build domain listings for imgur
+ return
+
+ if not parsed.domain_permutations():
+ # no valid domains found
+ return
+
+ if g.shard_domain_query_queues:
+ domain_shard = hash(parsed.hostname) % 10
+ queue_name = "domain_query_%s_q" % domain_shard
+ else:
+ queue_name = "domain_query_q"
+ amqp.add_item(queue_name, link._fullname)
+
+
+def consume_domain_query_queue(qname="domain_query_q", limit=1000):
+ @g.stats.amqp_processor(qname)
+ def process_message(msgs, chan):
+ """Update get_domain_links(), the Links by domain precomputed query.
+
+ get_domain_links() is a CachedResult which is stored in permacache. To
+ update these objects we need to do a read-modify-write which requires
+ obtaining a lock. Sharding these updates by domain allows us to run
+ multiple consumers (but ideally just one per shard) to avoid lock
+ contention.
+
+ """
+
+ from r2.lib.db.queries import add_queries, get_domain_links
+
+ link_names = {msg.body for msg in msgs}
+ links = Link._by_fullname(link_names, return_dict=False)
+ print 'Processing %r' % (links,)
+
+ links_by_domain = defaultdict(list)
+ for link in links:
+ parsed = UrlParser(link.url)
+
+ # update the listings for all permutations of the link's domain
+ for domain in parsed.domain_permutations():
+ links_by_domain[domain].append(link)
+
+ for d, links in links_by_domain.iteritems():
+ with g.stats.get_timer("link_vote_processor.domain_queries"):
+ add_queries(
+ queries=[
+ get_domain_links(d, sort, "all") for sort in SORTS],
+ insert_items=links,
+ )
+
+ amqp.handle_items(qname, process_message, limit=limit)
+
+
def consume_comment_vote_queue(qname="vote_comment_q"):
@g.stats.amqp_processor(qname)
def process_message(msg):
@@ -0,0 +1,14 @@
+description "update links by domain precomputed queries"
+
+instance $x
+
+stop on reddit-stop or runlevel [016]
+
+respawn
+respawn limit 10 5
+
+nice 10
+script
+ . /etc/default/reddit
+ wrap-job paster run --proctitle domain_query_q$x $REDDIT_INI $REDDIT_ROOT/r2/lib/voting.py -c "consume_domain_query_queue()"
+end script

0 comments on commit 7f3113f

Please sign in to comment.