Skip to content

Commit

Permalink
Refactor backend
Browse files Browse the repository at this point in the history
Split backend into different modules, created a python package.
Created a basic MediaWiki API wrapper.

Bug: T97900
Change-Id: I07768df24e9e1d13c95f85b9f96ea808cf01ece4
  • Loading branch information
sitic committed May 13, 2015
1 parent ba1b26a commit cfd158d
Show file tree
Hide file tree
Showing 23 changed files with 469 additions and 306 deletions.
Empty file added backend/__init__.py
Empty file.
16 changes: 16 additions & 0 deletions backend/__main__.py
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# ISC License
# Copyright (C) 2015 Jan Lebert
from __future__ import print_function
from __future__ import absolute_import
import sys

from .server import run

if len(sys.argv) < 2:
print("ERROR: no port number as first argument given. Quitting",
file=sys.stderr)
sys.exit(1)
else:
port = int(sys.argv[1])
run(port)
40 changes: 40 additions & 0 deletions backend/celery/__init__.py
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# ISC License
# Copyright (C) 2015 Jan Lebert
from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
from kombu import Exchange, Queue

from .. import config

logger = get_task_logger(__name__)

BROKER_URL = 'redis://{server}:{port}/{db}'.format(
server=config.redis_server,
port=config.redis_port,
db=config.redis_db
)

app = Celery(broker=BROKER_URL,
include=['backend.celery.tasks'])

app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_IGNORE_RESULT=True,
CELERY_DISABLE_RATE_LIMITS=True,
CELERY_DEFAULT_QUEUE=config.toolname,
CELERY_QUEUES=(
Queue(config.redis_prefix + 'q', Exchange(config.toolname),
routing_key=config.toolname),
),
BROKER_TRANSPORT_OPTIONS={
'fanout_prefix': True,
'fanout_patterns': True,
'keyprefix_queue': config.redis_prefix + '.binding.%s',
'unacked_key': config.redis_prefix + '_unacked',
'unacked_index_key': config.redis_prefix + '_unacked_index',
'unacked_mutex_key': config.redis_prefix + '_unacked_mutex'
},
)
152 changes: 152 additions & 0 deletions backend/celery/api.py
@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# ISC License
# Copyright (C) 2015 Jan Lebert
from __future__ import absolute_import
import requests
from requests_oauthlib import OAuth1
from redis import StrictRedis
import json
from datetime import datetime, timedelta

from .. import config
from . import logger


class MediaWiki(object):
def __init__(self, host="https://en.wikipedia.org", path="/w/api.php",
access_token=None, redis_channel=None):
self.api_url = host + path
self.wikis = {}

user_agent = "crosswatch (https://tools.wmflabs.org/crosswatch;" +\
"crosswatch@tools.wmflabs.org) python-requests/" +\
requests.__version__
self.headers = {'User-Agent': user_agent}

if access_token:
# Construct an auth object with the consumer and access tokens
access_token = json.loads(access_token)
self.auth = OAuth1(config.consumer_token.key,
client_secret=config.consumer_token.secret,
resource_owner_key=access_token['key'],
resource_owner_secret=access_token['secret'])
else:
self.auth = None

self.redis_channel = redis_channel
self.redis = StrictRedis(
host=config.redis_server,
port=config.redis_port,
db=config.redis_db
)

def publish(self, message):
if not self.redis_channel:
raise Exception("No redis channel set to publish to")
self.redis.publish(self.redis_channel, json.dumps(message))

def timestamp(self, daysdelta=0):
"""
:param daysdelta: calculate timestamp in ´daysdelta´ days
:return: MediaWIki timestamp format
"""
now = datetime.utcnow()
delta = timedelta(days=daysdelta)
time = now + delta
return time.strftime("%Y%m%d%H%M%S")

def query(self, params):
params['format'] = "json"
response = requests.get(self.api_url, params=params, auth=self.auth,
headers=self.headers).json()

if 'error' in response:
logger.error(response['error']['code'])
if response['error']['code'] == "mwoauth-invalid-authorization":
raise Exception("OAuth authentication failed")

raise Exception(str(response['error']['code']))
if 'warnings' in response:
logger.warn("API-request warning: " + str(response['warnings']))
return response

def query_gen(self, params):
params['format'] = "json"
params['action'] = "query"
last_continue = {'continue': ""}
while True:
p = params.copy()
p.update(last_continue)
response = requests.get(self.api_url, params=p, auth=self.auth,
headers=self.headers).json()

if 'error' in response:
raise Exception(str(response['error']))
if 'warnings' in response:
warning = response['warnings']['query']['*']
logger.warn("API-request warning: " + warning)
if 'query' in response:
yield response['query']
if 'continue' not in response:
break
last_continue = response['continue']

def get_username(self):
try:
params = {
'action': "query",
'meta': "userinfo",
}
response = self.query(params)
username = response['query']['userinfo']['name']
return username
except KeyError as e:
if response['error']['code'] == "mwoauth-invalid-authorization":
logger.error('mwoauth-invalid-authorization')
raise Exception("OAuth authentication failed")
raise e

def get_wikis(self, use_cache=True):
key = config.redis_prefix + 'cached_wikis'
wikis = self.redis.get(key)
if use_cache and wikis:
wikis = json.loads(wikis)
else:
# Cache miss, do api request and fill cache
wikis = self._get_wikis()
self.redis.setex(key, 86400, json.dumps(wikis)) # 1 day exp.

return wikis

def _get_wikis(self):
params = {'action': "sitematrix"}
data = self.query(params)
for key, val in data['sitematrix'].items():
if key == 'count':
continue

if 'code' in val:
for site in val['site']:
self._parse_sitematrix(site, val['code'], val['name'])
else:
for site in val:
self._parse_sitematrix(site, '', '')

return self.wikis

def _parse_sitematrix(self, site, lang, langname):
wiki = {
'lang': lang,
'langname': langname,
'url': site['url'],
'dbname': site['dbname'],
'group': site['code']
}
if wiki['group'] == 'wiki':
wiki['group'] = 'wikipedia'

inactive_wikis = ['closed', 'private', 'fishbowl']
if any([key in site for key in inactive_wikis]):
wiki['closed'] = True

self.wikis[site['dbname']] = wiki
83 changes: 83 additions & 0 deletions backend/celery/tasks.py
@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""Celery tasks"""
# ISC License
# Copyright (C) 2015 Jan Lebert
from __future__ import absolute_import

from . import app, logger
from .api import MediaWiki


@app.task
def initial_task(obj):
mw = MediaWiki(access_token=obj['access_token'])
wikis = mw.get_wikis()
projects = ['enwiki', 'dewiki', 'itwiki', 'frwiki', 'itwiki',
'commonswiki', 'wikidatawiki', 'enwiktionary', 'dewiktionary',
'metawiki', 'mediawikiwiki']
for project in projects:
obj['wiki'] = wikis[project]
watchlistgetter.delay(obj)


def fix_urls(html, url):
a = u'<a target="_blank" href="' + url + u'/'
html = html.replace(u'<a href="/', a)
return html


@app.task
def watchlistgetter(obj):
"""
Get the watchlist for a wiki
:param obj: dict with wiki and connection information
"""
logger.info("Reading watchlist items for wiki " +
obj['wiki']['dbname'])
mw = MediaWiki(host=obj['wiki']['url'],
access_token=obj['access_token'],
redis_channel=obj['redis_channel'])
if 'watchlistperiod' in obj:
days = obj['watchlistperiod']
else:
days = 1
params = {
'list': "watchlist",
'wlallrev': "",
'wltype': "edit|new",
'wllimit': 500,
'wlend': mw.timestamp(daysdelta=-days),
'wlprop': "ids|flags|title|parsedcomment|user|timestamp|sizes|" +
"notificationtimestamp|loginfo"
}

for response in mw.query_gen(params):
items = []
for item in response['watchlist']:
item['project'] = obj['wiki']['dbname']
item['projecturl'] = obj['wiki']['url']

if 'commenthidden' in item:
item['parsedcomment'] = "<s>edit summary removed</s>"
item['parsedcomment'] = fix_urls(item['parsedcomment'],
obj['wiki']['url'])
item['projectgroup'] = obj['wiki']['group']
item['projectlang'] = obj['wiki']['lang']
item['projectlangname'] = obj['wiki']['langname']
if 'bot' in item:
item['bot'] = "b"
if 'minor' in item:
item['minor'] = "m"
if 'new' in item:
item['new'] = "n"
items.append(item)
message = {
'msgtype': 'watchlist',
'entires': items
}
if items:
mw.publish(message)


if __name__ == '__main__':
app.start()
36 changes: 10 additions & 26 deletions backend/config.py.sample
@@ -1,37 +1,21 @@
# -*- coding: utf-8 -*-
# ISC License
# Copyright (C) 2015 Jan Lebert
from __future__ import absolute_import
import socket
from mwoauth import ConsumerToken

# https://pypi.python.org/pypi/celerybeat-redis/0.0.7
toolname = 'crosswatch'
redis_server = 'tools-redis'

if not socket.gethostname().startswith('tools'):
redis_server = 'localhost'
redis_port = 6379
redis_db = 3
redis_prefix = 'CHANGE THIS STRING' # random string
redis_prefix = 'REPLACE THIS STRING' # random string
sql_user = 'REPLACE THIS STRING'
sql_passwd = 'REPLACE THIS STRING'

oauth_consumer_key = 'CHANGE THIS STRING'
oauth_consumer_secret = 'CHANGE THIS STRING'
oauth_consumer_key = 'REPLACE THIS STRING'
oauth_consumer_secret = 'REPLACE THIS STRING'
consumer_token = ConsumerToken(oauth_consumer_key, oauth_consumer_secret)


class celeryconfig():
BROKER_URL = 'redis://{server}:{port}/{db}'.format(
server=redis_server, port=redis_port, db=redis_db)
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_IGNORE_RESULT=True
CELERY_DISABLE_RATE_LIMITS=True
CELERY_DEFAULT_QUEUE=config.toolname
CELERY_QUEUES=(
Queue(config.redis_prefix + 'q', Exchange(config.toolname),
routing_key=config.toolname),
)
BROKER_TRANSPORT_OPTIONS = {
'fanout_prefix': True,
'fanout_patterns': True,
'keyprefix_queue': redis_prefix + '.binding.%s',
'unacked_key': redis_prefix + '_unacked',
'unacked_index_key': redis_prefix + '_unacked_index_key',
'unacked_mutex_key': redis_prefix + '_unacked_mutex_key'
}
30 changes: 0 additions & 30 deletions backend/oauth_handler.py

This file was deleted.

1 change: 0 additions & 1 deletion backend/public

This file was deleted.

0 comments on commit cfd158d

Please sign in to comment.