Skip to content

Commit

Permalink
Merge pull request #846 from krinsman/step6
Browse files Browse the repository at this point in the history
Step 6
  • Loading branch information
parente committed Dec 9, 2019
2 parents fffd462 + cb768ff commit e8bf3bf
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 281 deletions.
26 changes: 9 additions & 17 deletions nbviewer/app.py
Expand Up @@ -18,7 +18,6 @@
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

from tornado import web, httpserver, ioloop, log
from tornado.httpclient import AsyncHTTPClient

import tornado.options
from tornado.options import define, options
Expand All @@ -30,15 +29,10 @@

from .handlers import init_handlers
from .cache import DummyAsyncCache, AsyncMultipartMemcache, MockCache, pylibmc
from .index import NoSearch, ElasticSearch
from .index import NoSearch
from .formats import configure_formats

from .providers import default_providers, default_rewrites

try:
from .providers.url.client import NBViewerCurlAsyncHTTPClient as HTTPClientClass
except ImportError:
from .providers.url.client import NBViewerSimpleAsyncHTTPClient as HTTPClientClass
from .providers.url.client import NBViewerAsyncHTTPClient as HTTPClientClass
from .ratelimit import RateLimiter

from .log import log_request
Expand Down Expand Up @@ -93,6 +87,13 @@ class NBViewer(Application):
gist_handler = Unicode(default_value="nbviewer.providers.gist.handlers.GistHandler", help="The Tornado handler to use for viewing notebooks stored as GitHub Gists").tag(config=True)
user_gists_handler = Unicode(default_value="nbviewer.providers.gist.handlers.UserGistsHandler", help="The Tornado handler to use for viewing directory containing all of a user's Gists").tag(config=True)

client = Any().tag(config=True)
@default('client')
def _default_client(self):
client = HTTPClientClass()
client.cache = self.cache
return client

index = Any().tag(config=True)
@default('index')
def _load_index(self):
Expand Down Expand Up @@ -160,15 +161,6 @@ def cache(self):

return cache

# for some reason this needs to be a computed property,
# and not a traitlets Any(), otherwise nbviewer won't run
@cached_property
def client(self):
AsyncHTTPClient.configure(HTTPClientClass)
client = AsyncHTTPClient()
client.cache = self.cache
return client

@cached_property
def env(self):
env = Environment(loader=FileSystemLoader(self.template_paths), autoescape=True)
Expand Down
75 changes: 38 additions & 37 deletions nbviewer/cache.py
Expand Up @@ -9,9 +9,8 @@
from time import monotonic

from concurrent.futures import ThreadPoolExecutor
from tornado.concurrent import Future
from asyncio import Future

from tornado import gen
from tornado.log import app_log

try:
Expand All @@ -28,25 +27,25 @@ class MockCache(object):
def __init__(self, *args, **kwargs):
pass

def get(self, key):
async def get(self, key):
f = Future()
f.set_result(None)
return f
return await f

def set(self, key, value, *args, **kwargs):
async def set(self, key, value, *args, **kwargs):
f = Future()
f.set_result(None)
return f
return await f

def add(self, key, value, *args, **kwargs):
async def add(self, key, value, *args, **kwargs):
f = Future()
f.set_result(True)
return f
return await f

def incr(self, key):
async def incr(self, key):
f = Future()
f.set_result(None)
return f
return await f

class DummyAsyncCache(object):
"""Dummy Async Cache. Just stores things in a dict of fixed size."""
Expand All @@ -55,10 +54,10 @@ def __init__(self, limit=10):
self._cache_order = []
self.limit = limit

def get(self, key):
async def get(self, key):
f = Future()
f.set_result(self._get(key))
return f
return await f

def _get(self, key):
value, deadline = self._cache.get(key, (None, None))
Expand All @@ -68,7 +67,7 @@ def _get(self, key):
else:
return value

def set(self, key, value, expires=0):
async def set(self, key, value, expires=0):
if key in self._cache and self._cache_order[-1] != key:
idx = self._cache_order.index(key)
del self._cache_order[idx]
Expand All @@ -87,18 +86,18 @@ def set(self, key, value, expires=0):
self._cache[key] = (value, deadline)
f = Future()
f.set_result(True)
return f
return await f

def add(self, key, value, expires=0):
async def add(self, key, value, expires=0):
f = Future()
if self._get(key) is not None:
f.set_result(False)
else:
self.set(key, value, expires)
await self.set(key, value, expires)
f.set_result(True)
return f
return await f

def incr(self, key):
async def incr(self, key):
f = Future()
if self._get(key) is not None:
value, deadline = self._cache[key]
Expand All @@ -107,7 +106,7 @@ def incr(self, key):
else:
value = None
f.set_result(value)
return f
return await f

class AsyncMemcache(object):
"""Wrap pylibmc.Client to run in a background thread
Expand All @@ -119,8 +118,12 @@ def __init__(self, *args, **kwargs):

self.mc = pylibmc.Client(*args, **kwargs)
self.mc_pool = pylibmc.ThreadMappedPool(self.mc)

def _call_in_thread(self, method_name, *args, **kwargs):

self.loop = asyncio.get_event_loop()

async def _call_in_thread(self, method_name, *args, **kwargs):
# https://stackoverflow.com/questions/34376814/await-future-from-executor-future-cant-be-used-in-await-expression

key = args[0]
if 'multi' in method_name:
key = sorted(key)[0].decode('ascii') + '[%i]' % len(key)
Expand All @@ -130,19 +133,19 @@ def f():
with self.mc_pool.reserve() as mc:
meth = getattr(mc, method_name)
return meth(*args, **kwargs)
return self.pool.submit(f)
return await self.loop.run_in_executor(self.pool, f)

def get(self, *args, **kwargs):
return self._call_in_thread('get', *args, **kwargs)
async def get(self, *args, **kwargs):
return await self._call_in_thread('get', *args, **kwargs)

def set(self, *args, **kwargs):
return self._call_in_thread('set', *args, **kwargs)
async def set(self, *args, **kwargs):
return await self._call_in_thread('set', *args, **kwargs)

def add(self, *args, **kwargs):
return self._call_in_thread('add', *args, **kwargs)
async def add(self, *args, **kwargs):
return await self._call_in_thread('add', *args, **kwargs)

def incr(self, *args, **kwargs):
return self._call_in_thread('incr', *args, **kwargs)
async def incr(self, *args, **kwargs):
return await self._call_in_thread('incr', *args, **kwargs)

class AsyncMultipartMemcache(AsyncMemcache):
"""subclass of AsyncMemcache that splits large files into multiple chunks
Expand All @@ -154,11 +157,10 @@ def __init__(self, *args, **kwargs):
self.max_chunks = kwargs.pop('max_chunks', 16)
super(AsyncMultipartMemcache, self).__init__(*args, **kwargs)

@gen.coroutine
def get(self, key, *args, **kwargs):
async def get(self, key, *args, **kwargs):
keys = [('%s.%i' % (key, idx)).encode()
for idx in range(self.max_chunks)]
values = yield self._call_in_thread('get_multi', keys, *args, **kwargs)
values = await self._call_in_thread('get_multi', keys, *args, **kwargs)
parts = []
for key in keys:
if key not in values:
Expand All @@ -171,10 +173,9 @@ def get(self, key, *args, **kwargs):
except zlib.error as e:
app_log.error("zlib decompression of %s failed: %s", key, e)
else:
raise gen.Return(result)
return result

@gen.coroutine
def set(self, key, value, *args, **kwargs):
async def set(self, key, value, *args, **kwargs):
chunk_size = self.chunk_size
compressed = zlib.compress(value)
offsets = range(0, len(compressed), chunk_size)
Expand All @@ -186,5 +187,5 @@ def set(self, key, value, *args, **kwargs):
values[('%s.%i' % (key, idx)).encode()] = compressed[
offset:offset + chunk_size
]
return self._call_in_thread('set_multi', values, *args, **kwargs)
return await self._call_in_thread('set_multi', values, *args, **kwargs)

1 change: 0 additions & 1 deletion nbviewer/formats.py
Expand Up @@ -5,7 +5,6 @@
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------

import re
import os

from nbconvert.exporters.export import exporter_map
Expand Down

0 comments on commit e8bf3bf

Please sign in to comment.