Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Add initial test integration with the new activity service
Browse files Browse the repository at this point in the history
This service is used to count active visitors within contexts. This
initial implementation allows us to shadow-write and shadow-read
depending on a set of feature flags.
  • Loading branch information
spladug committed Dec 17, 2015
1 parent 6e60419 commit c88036c
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 3 deletions.
5 changes: 5 additions & 0 deletions r2/example.ini
Expand Up @@ -722,6 +722,11 @@ page_cache_time = 90
commentpane_cache_time = 120


############################################ SERVICES
###### activity
activity_endpoint =


[server:main]
use = egg:Paste#http
host = 0.0.0.0
Expand Down
18 changes: 17 additions & 1 deletion r2/r2/lib/app_globals.py
Expand Up @@ -36,7 +36,9 @@
import sys

from sqlalchemy import engine, event
from baseplate import Baseplate
from baseplate import Baseplate, config as baseplate_config
from baseplate.thrift_pool import ThriftConnectionPool
from baseplate.context.thrift import ThriftContextFactory

import pkg_resources
import pytz
Expand All @@ -62,6 +64,7 @@
)
from r2.lib.configparse import ConfigValue, ConfigValueParser
from r2.lib.contrib import ipaddress
from r2.lib.contrib.activity_thrift import ActivityService
from r2.lib.eventcollector import EventQueue
from r2.lib.lock import make_lock_factory
from r2.lib.manager import db_manager
Expand Down Expand Up @@ -323,6 +326,10 @@ class Globals(object):
'cpm_selfserve_geotarget_country',
'cpm_selfserve_collection',
],

ConfigValue.baseplate(baseplate_config.Endpoint): [
"activity_endpoint",
],
}

live_config_spec = {
Expand Down Expand Up @@ -775,6 +782,15 @@ def setup(self):

self.startup_timer.intermediate("memcache")

################# THRIFT-BASED SERVICES
activity_endpoint = self.config.get("activity_endpoint")
if activity_endpoint:
activity_pool = ThriftConnectionPool(activity_endpoint, timeout=0.1)
self.baseplate.add_to_context("activity_service",
ThriftContextFactory(activity_pool, ActivityService.Client))

self.startup_timer.intermediate("thrift")

################# CASSANDRA
keyspace = "reddit"
self.cassandra_pools = {
Expand Down
4 changes: 2 additions & 2 deletions r2/r2/lib/baseplate_integration.py
Expand Up @@ -34,12 +34,12 @@


class R2BaseplateObserver(BaseplateObserver):
def make_root_observer(self, context, root_span):
def on_root_span_created(self, context, root_span):
return R2RootSpanObserver()


class R2RootSpanObserver(RootSpanObserver):
def make_child_observer(self, span):
def on_child_span_created(self, span):
return R2SpanObserver(span.name)


Expand Down
6 changes: 6 additions & 0 deletions r2/r2/lib/configparse.py
Expand Up @@ -92,6 +92,12 @@ def timeinterval(v, key=None):
def messages(v, key=None):
return ConfigValue.messages_re.findall(v.decode("string_escape"))

@staticmethod
def baseplate(baseplate_parser):
def adapter(v, key=None):
return baseplate_parser(v)
return adapter


class ConfigValueParser(dict):
def __init__(self, raw_data):
Expand Down
56 changes: 56 additions & 0 deletions r2/r2/lib/contrib/activity.thrift
@@ -0,0 +1,56 @@
include "baseplate.thrift"

/** A unique identifier for a given "context".
A context is an area of the service which a user may be active within, such as
a subreddit or live thread.
*/
typedef string ContextID

/** A unique identifier for a given visitor.
A visitor may be a logged-in user's ID or a logged-out user's LOID value. The
value is not actually stored, but only used to update the internal counters.
*/
typedef string VisitorID


/** A count of visitors active within a context.
If the count is low enough, some fuzzing is applied to the number. If this
kicks in, the `is_fuzzed` attribute will be True.
*/
struct ActivityInfo {
1: optional i32 count;
2: optional bool is_fuzzed;
}

/** A specified context ID was invalid */
exception InvalidContextIDException {
}

service ActivityService extends baseplate.BaseplateService {
/** Register a visitor's activity within a given context.
The visitor's activity will be recorded but will expire over time. If the
user continues to be active within the context, this endpoint should be
called occasionally to ensure they continue to be counted.
This method is `oneway`; no indication of success or failure is returned.
*/
oneway void record_activity(1: ContextID context_id, 2: VisitorID visitor_id),

/** Count how many visitors are currently active in a given context.
The results of this call are cached for a period of time to ensure that if
the count is fuzzed, the fuzzing is stable. This prevents repeated requests
from revealing the range of fuzzing and therefore the true value.
*/
ActivityInfo count_activity(1: ContextID context_id)
throws (1: InvalidContextIDException invalid_context_id),
}
12 changes: 12 additions & 0 deletions r2/r2/lib/pages/pages.py
Expand Up @@ -23,8 +23,11 @@

from collections import Counter, OrderedDict

from thrift import Thrift

from r2.config import feature
from r2.lib.contrib.ipaddress import ip_address
from r2.lib.contrib import activity_thrift
from r2.lib.db.operators import asc
from r2.lib.wrapped import Wrapped, Templated, CachedTemplate
from r2.models import (
Expand Down Expand Up @@ -1122,6 +1125,7 @@ class LoginFormWide(CachedTemplate):
pass



class SubredditInfoBar(CachedTemplate):
"""When not on Default, renders a sidebox which gives info about
the current reddit, including links to the moderator and
Expand All @@ -1143,6 +1147,14 @@ def __init__(self, site = None):

self.accounts_active, self.accounts_active_fuzzed = self.sr.get_accounts_active()

if c.activity_service and feature.is_enabled("activity_service_read"):
try:
info = c.activity_service.count_activity(self.sr._fullname)
self.visitor_count = info.count
self.visitor_count_is_fuzzed = info.is_fuzzed
except Thrift.TException as exc:
g.log.warning("failed to fetch activity: %s", exc)

if c.user_is_loggedin and c.user.pref_show_flair:
self.flair_prefs = FlairPrefs()
else:
Expand Down
9 changes: 9 additions & 0 deletions r2/r2/models/account.py
Expand Up @@ -52,6 +52,8 @@
import hashlib
from pycassa.system_manager import ASCII_TYPE

from thrift import Thrift


trylater_hooks = hooks.HookRegistrar()
COOKIE_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S'
Expand Down Expand Up @@ -615,6 +617,13 @@ def update_sr_activity(self, sr):
if not self._spam:
AccountsActiveBySR.touch(self, sr)

if c.activity_service and feature.is_enabled("activity_service_write"):
try:
c.activity_service.record_activity(
sr._fullname, self._fullname)
except Thrift.TException as exc:
g.log.warning("failed to update activity: %s", exc)

def get_trophy_id(self, uid):
'''Return the ID of the Trophy associated with the given "uid"
Expand Down
6 changes: 6 additions & 0 deletions r2/r2/templates/subredditinfobar.html
Expand Up @@ -61,6 +61,12 @@ <h1 class="hover redditname">
</p>
%endif

%if feature.is_enabled("activity_service_view") and getattr(thing, "visitor_count", None):
<p class="users-online ${'fuzzed' if thing.visitor_count_is_fuzzed else ''}" title="${_('logged-in users viewing this subreddit in the past 15 minutes')}">
${unsafe(Score.users_here_now(thing.visitor_count, prepend='~' if thing.visitor_count_is_fuzzed else ''))}
</p>
%endif

%if thing.sr.moderator:
<div class="leavemoderator">
${text_with_links(ModListing.remove_self_title % dict(action='(%(action)s)'),
Expand Down

1 comment on commit c88036c

@13steinj
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: Are the automatically generated r2/lib/contrib/activity_thrift folder supposed to be in the .gitignore? I just noticed it because I wanted to git add -A and I saw a whole bunch of stuff I didn't even write.

Please sign in to comment.