Permalink
Browse files

Merge pull request #1 from markrwilliams/faster

Pool.imap for chunked downloads; single transaction; simple partitioning for difficulty. Good night for some old-fashioned XP.
  • Loading branch information...
mahmoud committed May 18, 2012
2 parents 02da294 + 055111b commit 440cfa0b53fe23e368837ee84bb9b115141c3030
Showing with 39 additions and 34 deletions.
  1. +10 −0 dabase.py
  2. +29 −34 dabnabbit.py
View
@@ -16,6 +16,9 @@ class Meta:
class Dabblet(DabModel):
+ CONTEXT_THRESHOLD = 100
+ CHOICES_THRESHOLD = 10
+
title = pw.CharField()
context = pw.TextField()
@@ -46,6 +49,13 @@ def from_page(cls, title, context, source_page, source_order,
ret.source_page = source_page
return ret
+ def rank(self):
+ if len(self.context) > self.CONTEXT_THRESHOLD:
+ if self.choices.count() > self.CHOICES_THRESHOLD:
+ return 1
+ return 2
+ return 3
+
def _asdict(self):
return {'title': self.title,
'source_title': self.source_title,
View
@@ -3,6 +3,7 @@
from gevent import monkey
monkey.patch_all()
+import itertools
import requests
import json
import time
@@ -39,6 +40,7 @@ def api_req(action, params=None, raise_exc=False, **kwargs):
raise
else:
resp.error = e
+ resp.results = None
return resp
try:
@@ -50,6 +52,8 @@ def api_req(action, params=None, raise_exc=False, **kwargs):
raise
else:
resp.error = e
+ resp.results = None
+ resp.servedby = None
return resp
mw_error = resp.headers.get('MediaWiki-API-Error')
@@ -134,12 +138,9 @@ def get_category_recursive(cat_name, count=None):
ret = list(ret)[:count]
print 'Done, returning', len(ret),'items'
return list(ret)
-
+
def get_dab_page_ids(date=None, count=500):
- #cat_res = get_category_recursive("Articles_with_links_needing_disambiguation_from_June_2011", count)
cat_res = get_category_recursive("Articles_with_links_needing_disambiguation", count)
- # TODO: Continue query?
- # TODO: Get subcategory of Category:Articles_with_links_needing_disambiguation
return [ a.pageid for a in cat_res ]
@@ -245,7 +246,7 @@ def get_context(dab_a):
d(chosen_context).addClass('dab-context')
# add upperbound/wrapping div
return d(chosen_context)
-
+
def get_dabblets(parsed_page):
"Call with a Page object, the type you'd get from get_articles()"
ret = []
@@ -320,32 +321,15 @@ def submit_solution(title, solution):
P_PER_CALL = 4
DEFAULT_TIMEOUT = 30
-def green_call_list(func, arglist, per_call=P_PER_CALL, timeout=DEFAULT_TIMEOUT):
- import time
- fname = func.__name__
- print 'calling', fname, 'on', len(arglist), 'items...'
- start = time.time()
- jobs = [gevent.spawn(func, arglist[i:i+per_call])
- for i in
- range(0, len(arglist), per_call)]
- print 'using', len(jobs), 'green threads.'
- gevent.joinall(jobs, timeout)
- done_jobs = [j for j in jobs if j.value]
- try:
- ret = sum([j.value for j in done_jobs], [])
- except TypeError as te:
- print '(', fname, "'s results appear to not be iterable)"
- ret = [ j.value for j in done_jobs ]
- print '(', len(done_jobs), 'out of', len(jobs), 'jobs returned, with',
- print len(ret), 'results.)'
- dur = time.time() - start
- print 'done', fname, 'on', len(arglist), 'items in',
- print dur, 'seconds.'
+def chunked_pimap(func, els, concurrency=150, chunk_size=P_PER_CALL):
+ chunked = (els[i:i + chunk_size]
+ for i in xrange(0, len(els), chunk_size))
+ pool = Pool(concurrency)
+ return pool.imap_unordered(func, chunked)
- return ret
-
-def save_a_bunch(count=1000):
+@dabase.dab_db.commit_on_success
+def save_a_bunch(count=1000, concurrency=150, per_call=P_PER_CALL):
import time
db_name = 'abunch'
@@ -355,20 +339,31 @@ def save_a_bunch(count=1000):
page_ids = get_dab_page_ids(count=count)
- pages = green_call_list(get_articles, page_ids)
- dabblets = sum([ get_dabblets(p) for p in pages ], [])
+ dabblets = []
+ for pages in chunked_pimap(get_articles, page_ids,
+ concurrency=concurrency,
+ chunk_size=per_call):
+ for p in pages:
+ dabblets.extend(get_dabblets(p))
- # TODO start transaction
for d in dabblets:
d.save()
- all_choices = green_call_list(get_dab_choices, dabblets)
+ all_choices = []
+ for choice in chunked_pimap(get_dab_choices, dabblets,
+ concurrency=concurrency,
+ chunk_size=per_call):
+ all_choices.extend(choice)
for c in all_choices:
c.save()
# TODO end transaction
end = time.time()
+ for d in dabblets:
+ d.difficulty = d.rank()
+ d.save()
+
print len(dabblets), 'Dabblets saved to', db_name, 'in', end-start, 'seconds'
print len(set([d.title for d in dabblets])), 'unique titles'
print len(set([d.source_title for d in dabblets])), 'unique source pages'
@@ -391,5 +386,5 @@ def test():
title_articles = get_articles(titles=["Dog"], raise_exc=True)
if __name__ == '__main__':
- dabblets = save_a_bunch(600)
+ dabblets = save_a_bunch(20)
import pdb;pdb.set_trace()

0 comments on commit 440cfa0

Please sign in to comment.