Permalink
Browse files

code review: Store live promotions as C* data

  • Loading branch information...
1 parent 6bfa552 commit 1c3374300232f49b657f09066ef64cd10b73be56 @kemitche kemitche committed Oct 1, 2012
Showing with 80 additions and 57 deletions.
  1. +1 −0 r2/r2/config/queues.py
  2. +53 −27 r2/r2/lib/promote.py
  3. +10 −28 r2/r2/models/bidding.py
  4. +2 −2 r2/r2/models/token.py
  5. +14 −0 upstart/reddit-consumer-update_promos_q.conf
View
@@ -83,6 +83,7 @@ def declare_queues():
"log_q": MessageQueue(bind_to_self=True),
"usage_q": MessageQueue(bind_to_self=True, durable=False),
"cloudsearch_changes": MessageQueue(bind_to_self=True),
+ "update_promos_q": MessageQueue(bind_to_self=True),
})
queues.cloudsearch_changes << "search_changes"
View
@@ -22,6 +22,8 @@
from __future__ import with_statement
+import json
+
from r2.models import *
from r2.models.bidding import SponsorBoxWeightings, WeightingRef
from r2.lib.wrapped import Wrapped
@@ -30,9 +32,10 @@
from r2.lib.memoize import memoize
from r2.lib.template_helpers import get_domain
from r2.lib.utils import Enum, UniqueIterator
-from organic import keep_fresh_links
+from r2.lib.organic import keep_fresh_links
from pylons import g, c
from datetime import datetime, timedelta
+from r2.lib import amqp
from r2.lib.db.queries import make_results, db_sort, add_queries, merge_results
import itertools
@@ -48,6 +51,10 @@
CAMPAIGN = Enum("start", "end", "bid", "sr", "trans_id")
+UPDATE_QUEUE = 'update_promos_q'
+QUEUE_ALL = 'all'
+
+
@memoize("get_promote_srid")
def get_promote_srid(name = 'promos'):
try:
@@ -548,9 +555,9 @@ def accept_promotion(link):
now = promo_datetime_now(0)
if link._fullname in set(l.thing_name for l in
PromotionWeights.get_campaigns(now)):
- PromotionLog.add(link, 'requeued')
+ PromotionLog.add(link, 'Marked promotion for acceptance')
charge_pending(0) # campaign must be charged before it will go live
- make_daily_promotions()
+ queue_changed_promo(link, "accepted")
if link._spam:
link._spam = False
link._commit()
@@ -564,24 +571,10 @@ def reject_promotion(link, reason = None):
# while we're doing work here, it will correctly exclude it
set_status(link, STATUS.rejected)
- # Updates just the permacache list
- # permacache doesn't check the srids list; send an empty list
- links, weighted = get_live_promotions([], _use_cass=False)
+ links, = get_live_promotions([SponsorBoxWeightings.ALL_ADS_ID])[0]
if link._fullname in links:
- links.remove(link._fullname)
- for k in list(weighted.keys()):
- weighted[k] = [(lid, w, cid) for lid, w, cid in weighted[k]
- if lid != link._fullname]
- if not weighted[k]:
- del weighted[k]
- set_live_promotions(links, weighted, which=("permacache",))
- PromotionLog.add(link, 'dequeued')
-
- # Updates just the Cassandra version
- campaigns = PromoCampaign._by_link(link._id)
- subreddits = Subreddit._by_name([c.sr_name for c in campaigns],
- return_dict=False)
- SponsorBoxWeightings.remove_link(link._fullname, subreddits)
+ PromotionLog.add(link, 'Marked promotion for rejection')
+ queue_changed_promo(link, "rejected")
# Send a rejection email (unless the advertiser requested the reject)
if not c.user or c.user._id != link.author_id:
@@ -735,26 +728,25 @@ def weight_schedule(by_sr):
def promotion_key():
return "current_promotions:1"
-def get_live_promotions(srids, _use_cass=False):
- if _use_cass:
+def get_live_promotions(srids, from_permacache=True):
+ if not from_permacache:
timer = g.stats.get_timer("promote.get_live.cass")
timer.start()
links = set()
weights = {}
find_srids = set(srids)
if '' in find_srids:
find_srids.remove('')
- find_srids.add(SponsorBoxWeightings.DEFAULT_SR_ID)
+ find_srids.add(SponsorBoxWeightings.FRONT_PAGE)
ads = SponsorBoxWeightings.load_multi(find_srids)
for srid, refs in ads.iteritems():
- links.update([ref.data['link'] for ref in refs])
+ links.update(ref.data['link'] for ref in refs)
promos = [ref.to_promo() for ref in refs]
- if srid == SponsorBoxWeightings.DEFAULT_SR_ID:
+ if srid == SponsorBoxWeightings.FRONT_PAGE:
srid = ''
elif srid == SponsorBoxWeightings.ALL_ADS_ID:
srid = 'all'
weights[srid] = promos
- links.update([ad.data['link'] for ad in ads])
timer.stop()
else:
timer = g.stats.get_timer("promote.get_live.permacache")
@@ -1038,8 +1030,42 @@ def get(cls, link):
def Run(offset = 0):
+ '''reddit-job-update_promos: Intended to be run hourly to pull in
+ scheduled changes to ads
+
+ '''
charge_pending(offset = offset + 1)
charge_pending(offset = offset)
- make_daily_promotions(offset = offset)
+ amqp.add_item(UPDATE_QUEUE, json.dumps(QUEUE_ALL),
+ delivery_mode=amqp.DELIVERY_TRANSIENT)
+def run_changed(drain=False, limit=100, sleep_time=10, verbose=False):
+ '''reddit-consumer-update_promos: amqp consumer of update_promos_q
+
+ Handles asynch accepting/rejecting of ads that are scheduled to be live
+ right now
+
+ '''
+ @g.stats.amqp_processor(UPDATE_QUEUE)
+ def _run(msgs, chan):
+ items = [json.loads(msg.body) for msg in msgs]
+ if QUEUE_ALL in items:
+ # QUEUE_ALL is just an indicator to run make_daily_promotions.
+ # There's no promotion log to update in this case.
+ items.remove(QUEUE_ALL)
+ make_daily_promotions()
+ links = Link._by_fullname([i["link"] for i in items])
+ for item in items:
+ PromotionLog.add(links[c.link_id],
+ "Finished remaking current promotions (this link "
+ " was: %(message)s" % item,
+ commit=True)
+ amqp.handle_items(UPDATE_QUEUE, _run, limit=limit, drain=drain,
+ sleep_time=sleep_time, verbose=verbose)
+
+
+def queue_changed_promo(link, message):
+ msg = {"link": link._fullname, "message": message}
+ amqp.add_item(UPDATE_QUEUE, json.dumps(msg),
+ delivery_mode=amqp.DELIVERY_TRANSIENT)
View
@@ -641,13 +641,13 @@ class SponsorBoxWeightings(object):
# the data if something goes wrong.
_ttl = datetime.timedelta(days=1)
- DEFAULT_SR_ID = 0
+ FRONT_PAGE = 0
# TODO The concept of an "All ads row" should not be needed
# after the permacache implementation is removed.
ALL_ADS_ID = -1
- _IDX_ROWKEY_FMT = '%s.latest'
+ _IDX_ROWKEY_FMT = '%s/index'
_IDX_COLUMN_KEY = 0
class ID(int):
@@ -661,35 +661,21 @@ def __init__(self, subreddit, timestamp, items, is_srid=True):
self.timeslot = tdb_cassandra.date_serializer.pack(timestamp)
@classmethod
- def index_column(cls, subreddit):
- return subreddit._id
-
- @classmethod
def index_rowkey(cls, subreddit):
return cls._IDX_ROWKEY_FMT % subreddit._id
def rowkey(self):
- return '%s.%s' % (self.subreddit._id, self.timeslot)
+ return '%s/%s' % (self.subreddit._id, self.timeslot)
@classmethod
def get_latest_rowkey(cls, subreddit):
- # This is a 2 layered function so the memoize key can be an ID instead
- # of a Subreddit object
- return cls._get_latest_rowkey(cls.index_rowkey(subreddit))
-
- @classmethod
- @memoize('sponsor_box_weightings_rowkey', time=60 * 60, stale=False)
- def _get_latest_rowkey(cls, idx_rowkey, _update=False):
- if _update:
- # Don't spoil the memoize cache with outdated data
- rcl = tdb_cassandra.CL.QUORUM
- else:
- rcl = cls._read_consistency_level
+ idx_rowkey = cls.index_rowkey(subreddit)
try:
- return cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY],
- read_consistency_level=rcl)[cls._IDX_COLUMN_KEY]
+ row = cls._cf.get(idx_rowkey, columns=[cls._IDX_COLUMN_KEY])
except tdb_cassandra.NotFoundException:
return None
+ else:
+ return row[cls._IDX_COLUMN_KEY]
@classmethod
def load_by_sr(cls, sr_id):
@@ -701,8 +687,7 @@ def load_by_sr(cls, sr_id):
return data
@classmethod
- @memoize('sponsor_box_weightings__load', time=60 * 60, stale=True)
- def _load(cls, rowkey, _update=False):
+ def _load(cls, rowkey):
return [WeightingRef.from_cass(val)
for dummy, val in cls._cf.xget(rowkey)]
@@ -721,9 +706,6 @@ def _set_as_latest(self):
self._cf.insert(self.index_rowkey(self.subreddit),
{self._IDX_COLUMN_KEY: rowkey},
ttl=self._ttl)
-
- self._get_latest_rowkey(self.index_rowkey(self.subreddit),
- _update=True)
@tdb_cassandra.will_write
def set_timeslots(self):
@@ -756,7 +738,7 @@ def set_from_weights(cls, all_weights):
all_ads = itertools.chain.from_iterable(all_weights.itervalues())
weights[cls.ALL_ADS_ID] = all_ads
if '' in weights:
- weights[cls.DEFAULT_SR_ID] = weights.pop('')
+ weights[cls.FRONT_PAGE] = weights.pop('')
timeslot = datetime.datetime.now(g.tz)
@@ -783,7 +765,7 @@ def remove_link(cls, link_fn, from_subreddits, include_all_sr=True):
if include_all_sr:
srs = itertools.chain(from_subreddits, [cls.ID(cls.ALL_ADS_ID)])
else:
- srs = iter(from_subreddits)
+ srs = from_subreddits
for subreddit in srs:
current = cls.load_by_sr(subreddit._id)
updated = [r for r in current if r.data['link'] != link_fn]
View
@@ -505,7 +505,7 @@ class OAuth2RefreshTokensByUser(tdb_cassandra.View):
class EmailVerificationToken(ConsumableToken):
_use_db = True
_connection_pool = "main"
- _ttl = timedelta(hours=12)
+ _ttl = datetime.timedelta(hours=12)
token_size = 20
@classmethod
@@ -520,7 +520,7 @@ def valid_for_user(self, user):
class PasswordResetToken(ConsumableToken):
_use_db = True
_connection_pool = "main"
- _ttl = timedelta(hours=12)
+ _ttl = datetime.timedelta(hours=12)
token_size = 20
@classmethod
@@ -0,0 +1,14 @@
+description "reject or accept promotions in between the hourly update_promos cron"
+
+instance $x
+
+stop on reddit-stop or runlevel [016]
+
+respawn
+respawn limit 10 5
+
+nice 10
+script
+ . /etc/default/reddit
+ wrap-job paster run --proctitle update_promos_q$x $REDDIT_INI $REDDIT_ROOT/r2/lib/promote.py -c 'run_changed()'
+end script

0 comments on commit 1c33743

Please sign in to comment.