Skip to content

Commit

Permalink
Merge pull request #183 from vijos/aiomongo
Browse files Browse the repository at this point in the history
use aiomongo (custom branch for now)
  • Loading branch information
iceboy233 committed May 13, 2017
2 parents e675ae2 + d714baf commit d3a5201
Show file tree
Hide file tree
Showing 35 changed files with 179 additions and 183 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
aiohttp>=2.0.7
jinja2>=2.9.0
sockjs>=0.6.0
motor
hoedown
accept
aioamqp
Expand All @@ -12,3 +11,4 @@ httpagentparser
geoip2
GitPython
PyYAML
git+https://github.com/iceb0y/aiomongo
6 changes: 4 additions & 2 deletions vj4/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sockjs
from aiohttp import web

from vj4 import db
from vj4 import error
from vj4.service import bus
from vj4.service import smallcache
Expand Down Expand Up @@ -46,8 +47,9 @@ def __init__(self):
# Initialize components.
staticmanifest.init(static_path)
locale.load_translations(translation_path)
asyncio.get_event_loop().run_until_complete(
asyncio.gather(tools.ensure_all_indexes(), bus.init()))
loop = asyncio.get_event_loop()
loop.run_until_complete(db.init())
loop.run_until_complete(asyncio.gather(tools.ensure_all_indexes(), bus.init()))
smallcache.init()

# Load views.
Expand Down
35 changes: 12 additions & 23 deletions vj4/db.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
from motor import motor_asyncio
import aiomongo
import functools

from vj4.util import options

options.define('db_host', default='localhost', help='Database hostname or IP address.')
options.define('db_name', default='test', help='Database name.')


class Database(object):
_instance = None
async def init():
client = await aiomongo.create_client('mongodb://' + options.db_host)
global _db
_db = client.get_database(options.db_name)

def __new__(cls):
if not cls._instance:
client = motor_asyncio.AsyncIOMotorClient(options.db_host)
cls._instance = motor_asyncio.AsyncIOMotorDatabase(client, options.db_name)
return cls._instance

@functools.lru_cache()
def coll(name):
return aiomongo.Collection(_db, name)

class Collection(object):
_instances = {}

def __new__(cls, name):
if name not in cls._instances:
cls._instances[name] = motor_asyncio.AsyncIOMotorCollection(Database(), name)
return cls._instances[name]


class GridFS(object):
_instances = {}

def __new__(cls, name):
if name not in cls._instances:
cls._instances[name] = motor_asyncio.AsyncIOMotorGridFS(Database(), name)
return cls._instances[name]
@functools.lru_cache()
def fs(name):
return aiomongo.GridFS(_db, name)
4 changes: 2 additions & 2 deletions vj4/handler/contest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async def get(self, *, tid: objectid.ObjectId, pid: document.convert_doc_id):
rdocs = await record.get_user_in_problem_multi(uid, self.domain_id, pdoc['doc_id']) \
.sort([('_id', -1)]) \
.limit(10) \
.to_list(None)
.to_list()
else:
rdocs = []
if not self.prefer_json:
Expand Down Expand Up @@ -326,7 +326,7 @@ async def post(self, *, title: str, content: str, rule: int,
pdocs = await problem.get_multi(domain_id=self.domain_id, doc_id={'$in': pids},
fields={'doc_id': 1}) \
.sort('doc_id', 1) \
.to_list(None)
.to_list()
exist_pids = [pdoc['doc_id'] for pdoc in pdocs]
if len(pids) != len(exist_pids):
for pid in pids:
Expand Down
6 changes: 3 additions & 3 deletions vj4/handler/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def prepare_contest(self):
if self.has_perm(builtin.PERM_VIEW_CONTEST):
tdocs = await contest.get_multi(self.domain_id) \
.limit(self.CONTESTS_ON_MAIN) \
.to_list(None)
.to_list()
tsdict = await contest.get_dict_status(self.domain_id, self.user['_id'],
(tdoc['doc_id'] for tdoc in tdocs))
else:
Expand All @@ -35,7 +35,7 @@ async def prepare_training(self):
tdocs = await training.get_multi(self.domain_id) \
.sort('doc_id', 1) \
.limit(self.TRAININGS_ON_MAIN) \
.to_list(None)
.to_list()
tsdict = await training.get_dict_status(self.domain_id, self.user['_id'],
(tdoc['doc_id'] for tdoc in tdocs))
else:
Expand All @@ -47,7 +47,7 @@ async def prepare_discussion(self):
if self.has_perm(builtin.PERM_VIEW_DISCUSSION):
ddocs = await discussion.get_multi(self.domain_id) \
.limit(self.DISCUSSIONS_ON_MAIN) \
.to_list(None)
.to_list()
vndict = await discussion.get_dict_vnodes(self.domain_id, map(discussion.node_id, ddocs))
else:
ddocs = []
Expand Down
6 changes: 3 additions & 3 deletions vj4/handler/home.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def modify_udoc(self, udict, key):
@base.require_priv(builtin.PRIV_USER_PROFILE)
async def get(self):
# TODO(iceboy): projection, pagination.
mdocs = await message.get_multi(self.user['_id']).sort([('_id', -1)]).limit(50).to_list(None)
mdocs = await message.get_multi(self.user['_id']).sort([('_id', -1)]).limit(50).to_list()
udict = await user.get_dict(
itertools.chain.from_iterable((mdoc['sender_uid'], mdoc['sendee_uid']) for mdoc in mdocs),
fields=user.PROJECTION_PUBLIC)
Expand Down Expand Up @@ -240,7 +240,7 @@ async def get(self):
dids = list(dudict.keys())
ddocs = await domain.get_multi(**{'$or': [{'_id': {'$in': dids}},
{'owner_uid': self.user['_id']}]}) \
.to_list(None)
.to_list()
can_manage = {}
for ddoc in builtin.DOMAINS + ddocs:
role = dudict.get(ddoc['_id'], {}).get('role', builtin.ROLE_DEFAULT)
Expand Down Expand Up @@ -276,7 +276,7 @@ def file_url(self, fdoc):

@base.require_priv(builtin.PRIV_USER_PROFILE)
async def get(self):
ufdocs = await userfile.get_multi(owner_uid=self.user['_id']).to_list(None)
ufdocs = await userfile.get_multi(owner_uid=self.user['_id']).to_list()
fdict = await fs.get_meta_dict(ufdoc.get('file_id') for ufdoc in ufdocs)
self.render('home_file.html', ufdocs=ufdocs, fdict=fdict)

Expand Down
6 changes: 3 additions & 3 deletions vj4/handler/problem.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ async def get(self, *, pid: document.convert_doc_id):
if pdoc.get('hidden', False):
self.check_perm(builtin.PERM_VIEW_PROBLEM_HIDDEN)
udoc = await user.get_by_uid(pdoc['owner_uid'])
tdocs = await training.get_multi(self.domain_id, **{'dag.pids': pid}).to_list(None) \
tdocs = await training.get_multi(self.domain_id, **{'dag.pids': pid}).to_list() \
if self.has_perm(builtin.PERM_VIEW_TRAINING) else None
ctdocs = await contest.get_multi(self.domain_id, pids=pid).to_list(None) \
ctdocs = await contest.get_multi(self.domain_id, pids=pid).to_list() \
if self.has_perm(builtin.PERM_VIEW_CONTEST) else None
path_components = self.build_path(
(self.translate('problem_main'), self.reverse_url('problem_main')),
Expand Down Expand Up @@ -227,7 +227,7 @@ async def get(self, *, pid: document.convert_doc_id):
.get_user_in_problem_multi(uid, self.domain_id, pdoc['doc_id']) \
.sort([('_id', -1)]) \
.limit(10) \
.to_list(None)
.to_list()
if not self.prefer_json:
path_components = self.build_path(
(self.translate('problem_main'), self.reverse_url('problem_main')),
Expand Down
2 changes: 1 addition & 1 deletion vj4/handler/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def get(self, *, uid_or_name: str='', pid: str='', tid: str=''):
query = await self.get_filter_query(uid_or_name, pid, tid)
# TODO(iceboy): projection, pagination.
rdocs = await record.get_all_multi(**query,
get_hidden=self.has_priv(builtin.PRIV_VIEW_HIDDEN_RECORD)).sort([('_id', -1)]).limit(50).to_list(None)
get_hidden=self.has_priv(builtin.PRIV_VIEW_HIDDEN_RECORD)).sort([('_id', -1)]).limit(50).to_list()
# TODO(iceboy): projection.
udict, pdict = await asyncio.gather(
user.get_dict(rdoc['uid'] for rdoc in rdocs),
Expand Down
4 changes: 2 additions & 2 deletions vj4/handler/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async def post(self, *, title: str, content: str, dag: str):
pdocs = await problem.get_multi(domain_id=self.domain_id, doc_id={'$in': pids},
fields={'doc_id': 1, 'hidden': 1}) \
.sort('doc_id', 1) \
.to_list(None)
.to_list()
exist_pids = [pdoc['doc_id'] for pdoc in pdocs]
if len(pids) != len(exist_pids):
for pid in pids:
Expand Down Expand Up @@ -229,7 +229,7 @@ async def post(self, *, tid: objectid.ObjectId, title: str, content: str, dag: s
pdocs = await problem.get_multi(domain_id=self.domain_id, doc_id={'$in': pids},
fields={'doc_id': 1, 'hidden': 1}) \
.sort('doc_id', 1) \
.to_list(None)
.to_list()
exist_pids = [pdoc['doc_id'] for pdoc in pdocs]
if len(pids) != len(exist_pids):
for pid in pids:
Expand Down
8 changes: 4 additions & 4 deletions vj4/handler/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,17 @@ async def get(self, *, uid: int):
bg = random.randint(1, 21)
rdocs = record.get_multi(get_hidden=self.has_priv(builtin.PRIV_VIEW_HIDDEN_RECORD),
uid=uid).sort([('_id', -1)])
rdocs = await rdocs.limit(10).to_list(None)
rdocs = await rdocs.limit(10).to_list()
# TODO(twd2): check status, eg. test, hidden problem, ...
pdocs = problem.get_multi(domain_id=self.domain_id, owner_uid=uid).sort([('_id', -1)])
pcount = await pdocs.count()
pdocs = await pdocs.limit(10).to_list(None)
pdocs = await pdocs.limit(10).to_list()
psdocs = problem.get_multi_solution_by_uid(self.domain_id, uid)
pscount = await psdocs.count()
psdocs = await psdocs.limit(10).to_list(None)
psdocs = await psdocs.limit(10).to_list()
ddocs = discussion.get_multi(self.domain_id, owner_uid=uid)
dcount = await ddocs.count()
ddocs = await ddocs.limit(10).to_list(None)
ddocs = await ddocs.limit(10).to_list()
self.render('user_detail.html', is_self_profile=is_self_profile,
udoc=udoc, dudoc=dudoc, sdoc=sdoc, email=email, bg=bg,
rdocs=rdocs, pdocs=pdocs, pcount=pcount, psdocs=psdocs, pscount=pscount,
Expand Down
2 changes: 1 addition & 1 deletion vj4/job/difficulty.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def update_problem(domain_id: str, pid: document.convert_doc_id):
@domainjob.wrap
async def recalc(domain_id: str):
pdocs = problem.get_multi(domain_id=domain_id)
coll = db.Collection('document')
coll = db.coll('document')
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Calculating')
Expand Down
6 changes: 3 additions & 3 deletions vj4/job/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@argmethod.wrap
async def sync_length():
_logger.info('Userfile length')
coll = db.Collection('document')
coll = db.coll('document')
ufdocs = userfile.get_multi()
bulk = coll.initialize_unordered_bulk_op()
execute = False
Expand Down Expand Up @@ -51,13 +51,13 @@ async def sync_usage():
}
}
]
coll = db.Collection('domain.user')
coll = db.coll('domain.user')
await coll.update_many({'domain_id': userfile.STORE_DOMAIN_ID},
{'$set': {'usage_userfile': 0}})
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document').aggregate(pipeline):
async for adoc in db.coll('document').aggregate(pipeline):
bulk.find({'domain_id': userfile.STORE_DOMAIN_ID,
'uid': adoc['_id']}) \
.update_one({'$set': {'usage_userfile': adoc['usage_userfile']}})
Expand Down
24 changes: 12 additions & 12 deletions vj4/job/num.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ async def discussion(domain_id: str):
}
}
]
coll = db.Collection('document')
coll = db.coll('document')
await coll.update_many({'domain_id': domain_id, 'doc_type': document.TYPE_DISCUSSION},
{'$set': {'num_replies': 0}})
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document').aggregate(pipeline):
async for adoc in db.coll('document').aggregate(pipeline):
bulk.find({'domain_id': domain_id,
'doc_type': document.TYPE_DISCUSSION,
'doc_id': adoc['_id']}) \
Expand All @@ -58,13 +58,13 @@ async def contest(domain_id: str):
}
}
]
coll = db.Collection('document')
coll = db.coll('document')
await coll.update_many({'domain_id': domain_id, 'doc_type': document.TYPE_CONTEST},
{'$set': {'attend': 0}})
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document.status').aggregate(pipeline):
async for adoc in db.coll('document.status').aggregate(pipeline):
bulk.find({'domain_id': domain_id,
'doc_type': document.TYPE_CONTEST,
'doc_id': adoc['_id']}) \
Expand All @@ -89,13 +89,13 @@ async def training(domain_id: str):
}
}
]
coll = db.Collection('document')
coll = db.coll('document')
await coll.update_many({'domain_id': domain_id, 'doc_type': document.TYPE_TRAINING},
{'$set': {'enroll': 0}})
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document.status').aggregate(pipeline):
async for adoc in db.coll('document.status').aggregate(pipeline):
bulk.find({'domain_id': domain_id,
'doc_type': document.TYPE_TRAINING,
'doc_id': adoc['_id']}) \
Expand All @@ -120,13 +120,13 @@ async def problem(domain_id: str):
}
}
]
user_coll = db.Collection('domain.user')
user_coll = db.coll('domain.user')
await user_coll.update_many({'domain_id': domain_id},
{'$set': {'num_problems': 0}})
user_coll = user_coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document').aggregate(pipeline):
async for adoc in db.coll('document').aggregate(pipeline):
user_coll.find({'domain_id': domain_id,
'uid': adoc['_id']}) \
.upsert().update_one({'$set': {'num_problems': adoc['num_problems']}})
Expand All @@ -150,13 +150,13 @@ async def problem_solution(domain_id: str):
}
}
]
coll = db.Collection('document')
coll = db.coll('document')
await coll.update_many({'domain_id': domain_id, 'doc_type': document.TYPE_PROBLEM_SOLUTION},
{'$set': {'vote': 0}})
bulk = coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document.status').aggregate(pipeline):
async for adoc in db.coll('document.status').aggregate(pipeline):
bulk.find({'domain_id': domain_id,
'doc_type': document.TYPE_PROBLEM_SOLUTION,
'doc_id': adoc['_id']}) \
Expand All @@ -178,13 +178,13 @@ async def problem_solution(domain_id: str):
}
}
]
user_coll = db.Collection('domain.user')
user_coll = db.coll('domain.user')
await user_coll.update_many({'domain_id': domain_id},
{'$set': {'num_liked': 0}})
user_bulk = user_coll.initialize_unordered_bulk_op()
execute = False
_logger.info('Counting')
async for adoc in db.Collection('document').aggregate(pipeline):
async for adoc in db.coll('document').aggregate(pipeline):
user_bulk.find({'domain_id': domain_id,
'uid': adoc['_id']}) \
.upsert().update_one({'$set': {'num_liked': adoc['num_liked']}})
Expand Down
2 changes: 1 addition & 1 deletion vj4/job/rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def run(domain_id: str, keyword: str='rp', rank_field: str='rank', level_f
last_dudoc = {keyword: None}
rank = 0
count = 0
user_coll = db.Collection('domain.user')
user_coll = db.coll('domain.user')
user_bulk = user_coll.initialize_unordered_bulk_op()
async for dudoc in dudocs:
count += 1
Expand Down
6 changes: 3 additions & 3 deletions vj4/job/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ async def user_in_problem(uid: int, domain_id: str, pid: document.convert_doc_id
@domainjob.wrap
async def run(domain_id: str):
_logger.info('Clearing previous statuses')
await db.Collection('document.status').update_many(
await db.coll('document.status').update_many(
{'domain_id': domain_id, 'doc_type': document.TYPE_PROBLEM},
{'$unset': {'journal': '', 'rev': '', 'status': '', 'rid': '',
'num_submit': '', 'num_accept': ''}})
pdocs = problem.get_multi(domain_id=domain_id, fields={'_id': 1, 'doc_id': 1}).sort('doc_id', 1)
dudoc_factory = functools.partial(dict, num_submit=0, num_accept=0)
dudoc_updates = collections.defaultdict(dudoc_factory)
status_coll = db.Collection('document.status')
status_coll = db.coll('document.status')
async for pdoc in pdocs:
_logger.info('Problem {0}'.format(pdoc['doc_id']))
# TODO(twd2): ignore no effect statuses like system error, ...
Expand Down Expand Up @@ -101,7 +101,7 @@ async def run(domain_id: str):
await document.set(domain_id, document.TYPE_PROBLEM, pdoc['doc_id'], **pdoc_update)
# users' num_submit, num_accept
execute = False
user_coll = db.Collection('domain.user')
user_coll = db.coll('domain.user')
user_bulk = user_coll.initialize_unordered_bulk_op()
_logger.info('Updating users')
for uid, dudoc_update in dudoc_updates.items():
Expand Down

0 comments on commit d3a5201

Please sign in to comment.