Skip to content

Commit

Permalink
Add pre-preloading for bulk queries that use the preloader.
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Mar 12, 2020
1 parent 02512b3 commit 20d06ab
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 32 deletions.
4 changes: 4 additions & 0 deletions docs/users/queries.rst
Expand Up @@ -6,6 +6,10 @@ IRRd accepts whois queries on port 43 (by default).
The encoding used is always UTF-8, though many objects fit 7-bit ASCII.
Line seperators are a single newline (``\n``) character.

.. note::
Due to caching in IRRd, it can take up to 60 seconds for some types
of changes to be visible in the result of some queries.

.. contents:: :backlinks: none

IRRd vs RIPE style
Expand Down
3 changes: 2 additions & 1 deletion irrd/server/whois/query_parser.py
Expand Up @@ -57,6 +57,7 @@ def __init__(self, client_ip: str, client_str: str) -> None:
self.key_fields_only = False
self.client_ip = client_ip
self.client_str = client_str
self.preloader = Preloader()

def handle_query(self, query: str) -> WhoisQueryResponse:
"""
Expand All @@ -67,7 +68,6 @@ def handle_query(self, query: str) -> WhoisQueryResponse:
self.database_handler = DatabaseHandler()
self.key_fields_only = False
self.object_classes = []
self.preloader = Preloader()

if query.startswith('!'):
try:
Expand Down Expand Up @@ -123,6 +123,7 @@ def handle_irrd_command(self, full_command: str) -> WhoisQueryResponse:
raise WhoisQueryParserException(f'Missing parameter for {command} query')

if command == '!':
self.preloader.load_routes_into_memory()
self.multiple_command_mode = True
result = None
response_type = WhoisQueryResponseType.NO_RESPONSE
Expand Down
111 changes: 87 additions & 24 deletions irrd/storage/preload.py
Expand Up @@ -12,10 +12,14 @@
from irrd.rpki.status import RPKIStatus
from .queries import RPSLDatabaseQuery


SENTINEL_HASH_CREATED = b'SENTINEL_HASH_CREATED'
REDIS_ORIGIN_ROUTE4_STORE_KEY = b'irrd-preload-origin-route4'
REDIS_ORIGIN_ROUTE6_STORE_KEY = b'irrd-preload-origin-route6'
REDIS_ORIGIN_LIST_SEPARATOR = ','
REDIS_PRELOAD_RELOAD_CHANNEL = b'irrd-preload-reload-channel'
REDIS_ORIGIN_LIST_SEPARATOR = ','
REDIS_KEY_ORIGIN_SOURCE_SEPARATOR = '_'
MAX_MEMORY_LIFETIME = 60

logger = logging.getLogger(__name__)

Expand All @@ -34,10 +38,29 @@ class Preloader:
needs to be updated. This interface can be used from any thread
or process.
"""
_loaded_in_memory_time = False

def __init__(self):
self._redis_conn = redis.Redis.from_url(get_setting('redis_url'))

def routes_for_origins(self, origins: Union[List[str], Set[str]], sources: Optional[List[str]], ip_version: Optional[int]=None) -> Set[str]:
def signal_reload(self, object_classes_changed: Optional[Set[str]]=None) -> None:
"""
Perform a (re)load.
Should be called after changes to the DB have been committed.
This will signal the process running PreloadStoreManager to reload
the store.
If object_classes_changed is provided, a reload is only performed
if those classes are relevant to the data in the preload store.
"""
relevant_object_classes = {'route', 'route6'}
if object_classes_changed is not None and not object_classes_changed.intersection(relevant_object_classes):
return
self._redis_conn.publish(REDIS_PRELOAD_RELOAD_CHANNEL, 'reload')

def routes_for_origins(self, origins: Union[List[str], Set[str]], sources: List[str],
ip_version: Optional[int] = None) -> Set[str]:
"""
Retrieve all prefixes (in str format) originating from the provided origins,
from the given sources.
Expand All @@ -53,44 +76,84 @@ def routes_for_origins(self, origins: Union[List[str], Set[str]], sources: Optio
if not origins or not sources:
return set()

if self._loaded_in_memory_time:
return self._routes_for_origins_memory(origins, sources, ip_version)
else:
return self._routes_for_origins_redis(origins, sources, ip_version)

def _routes_for_origins_memory(self, origins: Union[List[str], Set[str]],
sources: List[str], ip_version: Optional[int]=None) -> Set[str]:
"""
Implementation of routes_for_origins() when retrieving from memory.
Updates the in-memory store if it's older than MAX_MEMORY_LIFETIME.
"""
# The in-memory store is a snapshot, and therefore has a limited lifetime,
# so that long-running connections don't end up serving very outdated data.
if time.time() - self._loaded_in_memory_time > MAX_MEMORY_LIFETIME:
self.load_routes_into_memory()

prefix_sets: Set[str] = set()
for origin in origins:
for source in sources:
if not ip_version or ip_version == 4:
prefix_sets.update(self._origin_route4_store[origin][source])
if not ip_version or ip_version == 6:
prefix_sets.update(self._origin_route6_store[origin][source])

return prefix_sets

def _routes_for_origins_redis(self, origins: Union[List[str], Set[str]],
sources: List[str], ip_version: Optional[int]=None) -> Set[str]:
"""
Implementation of routes_for_origins() when retrieving directly from redis.
"""
while not self._redis_conn.exists(REDIS_ORIGIN_ROUTE4_STORE_KEY):
time.sleep(1) # pragma: no cover

prefixes: Set[str] = set()
redis_keys = []
for source in sources:
for origin in origins:
redis_keys.append(f'{source}-{origin}'.encode('ascii'))
redis_keys.append(f'{source}{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}{origin}'.encode('ascii'))

if not ip_version or ip_version == 4:
prefixes_for_origins = self._redis_conn.hmget(REDIS_ORIGIN_ROUTE4_STORE_KEY, redis_keys)
def _load(hmap_key):
prefixes_for_origins = self._redis_conn.hmget(hmap_key, redis_keys)
for prefixes_for_origin in prefixes_for_origins:
if prefixes_for_origin:
prefixes.update(prefixes_for_origin.decode('ascii').split(REDIS_ORIGIN_LIST_SEPARATOR))

if not ip_version or ip_version == 4:
_load(REDIS_ORIGIN_ROUTE4_STORE_KEY)

if not ip_version or ip_version == 6:
prefixes_for_origins = self._redis_conn.hmget(REDIS_ORIGIN_ROUTE6_STORE_KEY, redis_keys)
for prefixes_for_origin in prefixes_for_origins:
if prefixes_for_origin:
prefixes.update(prefixes_for_origin.decode('ascii').split(REDIS_ORIGIN_LIST_SEPARATOR))
_load(REDIS_ORIGIN_ROUTE6_STORE_KEY)

return prefixes

def signal_reload(self, object_classes_changed: Optional[Set[str]]=None) -> None:
def load_routes_into_memory(self):
"""
Perform a (re)load.
Should be called after changes to the DB have been committed.
Pre-preload all routes into memory. This increases performance for
routes_for_origins() by up to 100x, but takes about 0.2-0.5s.
It also means origins are no longer updated as long as this object exists.
"""
while not self._redis_conn.exists(REDIS_ORIGIN_ROUTE4_STORE_KEY):
time.sleep(1) # pragma: no cover
logger.debug(f'Preloader pre-pre(re)loading routes into memory')

This will signal the process running PreloadStoreManager to reload
the store.
self._origin_route4_store = defaultdict(lambda: defaultdict(set))
self._origin_route6_store = defaultdict(lambda: defaultdict(set))

If object_classes_changed is provided, a reload is only performed
if those classes are relevant to the data in the preload store.
"""
relevant_object_classes = {'route', 'route6'}
if object_classes_changed is not None and not object_classes_changed.intersection(relevant_object_classes):
return
self._redis_conn.publish(REDIS_PRELOAD_RELOAD_CHANNEL, 'reload')
def _load(redis_key, target):
for key, routes in self._redis_conn.hgetall(redis_key).items():
if key == SENTINEL_HASH_CREATED:
continue
source, origin = key.decode('ascii').split(REDIS_KEY_ORIGIN_SOURCE_SEPARATOR)
target[origin][source].update(routes.decode('ascii').split(REDIS_ORIGIN_LIST_SEPARATOR))

_load(REDIS_ORIGIN_ROUTE4_STORE_KEY, self._origin_route4_store)
_load(REDIS_ORIGIN_ROUTE6_STORE_KEY, self._origin_route6_store)

self._loaded_in_memory_time = time.time()


class PreloadStoreManager(multiprocessing.Process):
Expand Down Expand Up @@ -178,8 +241,8 @@ def update_route_store(self, new_origin_route4_store, new_origin_route6_store) -
origin_route6_str_dict = {k: REDIS_ORIGIN_LIST_SEPARATOR.join(v) for k, v in new_origin_route6_store.items()}
# Redis can't handle empty dicts, but the dict needs to be present
# in order not to block queries.
origin_route4_str_dict['SENTINEL_HASH_CREATED'] = '1'
origin_route6_str_dict['SENTINEL_HASH_CREATED'] = '1'
origin_route4_str_dict[SENTINEL_HASH_CREATED] = '1'
origin_route6_str_dict[SENTINEL_HASH_CREATED] = '1'
pipeline.hmset(REDIS_ORIGIN_ROUTE4_STORE_KEY, origin_route4_str_dict)
pipeline.hmset(REDIS_ORIGIN_ROUTE6_STORE_KEY, origin_route6_str_dict)
pipeline.execute()
Expand Down Expand Up @@ -246,7 +309,7 @@ def run(self, mock_database_handler=None) -> None:

for result in dh.execute_query(q):
prefix = result['ip_first']
key = result['source'] + '-AS' + str(result['asn_first'])
key = result['source'] + REDIS_KEY_ORIGIN_SOURCE_SEPARATOR + 'AS' + str(result['asn_first'])
length = result['prefix_length']

if result['ip_version'] == 4:
Expand Down
54 changes: 47 additions & 7 deletions irrd/storage/tests/test_preload.py
Expand Up @@ -7,7 +7,7 @@
from irrd.rpki.status import RPKIStatus
from irrd.utils.test_utils import flatten_mock_calls
from ..database_handler import DatabaseHandler
from ..preload import Preloader, PreloadStoreManager, PreloadUpdater
from ..preload import Preloader, PreloadStoreManager, PreloadUpdater, REDIS_KEY_ORIGIN_SOURCE_SEPARATOR
from ..queries import RPSLDatabaseQuery

# Use different
Expand Down Expand Up @@ -88,11 +88,11 @@ def test_routes_for_origins(self, mock_redis_keys):

preload_manager.update_route_store(
{
'TEST2-AS65546': {'192.0.2.0/25'},
'TEST1-AS65547': {'192.0.2.128/25', '198.51.100.0/25'},
f'TEST2{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65546': {'192.0.2.0/25'},
f'TEST1{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'192.0.2.128/25', '198.51.100.0/25'},
},
{
'TEST2-AS65547': {'2001:db8::/32'},
f'TEST2{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'2001:db8::/32'},
},
)
sources = ['TEST1', 'TEST2']
Expand All @@ -114,6 +114,46 @@ def test_routes_for_origins(self, mock_redis_keys):
preloader.routes_for_origins(['AS65547'], [], 2)
assert 'Invalid IP version: 2' in str(ve.value)

def test_routes_for_origins_memory(self, mock_redis_keys, monkeypatch):
preloader = Preloader()
preload_manager = PreloadStoreManager()

preload_manager.update_route_store(
{
f'TEST2{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65546': {'192.0.2.0/25'},
f'TEST1{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'192.0.2.128/25', '198.51.100.0/25'},
},
{
f'TEST2{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'2001:db8::/32'},
},
)
preloader._redis_conn.hmget = None # Enforce that loading from redis per item can't work
preloader.load_routes_into_memory()

sources = ['TEST1', 'TEST2']
assert preloader.routes_for_origins([], sources) == set()
assert preloader.routes_for_origins(['AS65545'], sources) == set()
assert preloader.routes_for_origins(['AS65546'], []) == set()
assert preloader.routes_for_origins(['AS65546'], sources, 4) == {'192.0.2.0/25'}
assert preloader.routes_for_origins(['AS65547'], sources, 4) == {'192.0.2.128/25', '198.51.100.0/25'}
assert preloader.routes_for_origins(['AS65546'], sources, 6) == set()
assert preloader.routes_for_origins(['AS65547'], sources, 6) == {'2001:db8::/32'}
assert preloader.routes_for_origins(['AS65546'], sources) == {'192.0.2.0/25'}
assert preloader.routes_for_origins(['AS65547'], sources) == {'192.0.2.128/25', '198.51.100.0/25', '2001:db8::/32'}
assert preloader.routes_for_origins(['AS65547', 'AS65546'], sources, 4) == {'192.0.2.0/25', '192.0.2.128/25', '198.51.100.0/25'}

assert preloader.routes_for_origins(['AS65547', 'AS65546'], ['TEST1']) == {'192.0.2.128/25', '198.51.100.0/25'}
assert preloader.routes_for_origins(['AS65547', 'AS65546'], ['TEST2']) == {'192.0.2.0/25', '2001:db8::/32'}

# Load an empty route store. This should not affect the in-memory store,
# because that is only periodically updated, i.e. same response for now.
preload_manager.update_route_store({}, {})
assert preloader.routes_for_origins(['AS65546'], sources, 4) == {'192.0.2.0/25'}

# Make the preloader think the in-memory store is expired, forcing a refresh
preloader._loaded_in_memory_time = time.time() - 3600
assert preloader.routes_for_origins(['AS65546'], sources, 4) == set()


class TestPreloadUpdater:
def test_preload_updater(self, monkeypatch):
Expand Down Expand Up @@ -168,11 +208,11 @@ def test_preload_updater(self, monkeypatch):
'update_route_store',
(
{
'TEST1-AS65546': {'192.0.2.0/25'},
'TEST1-AS65547': {'192.0.2.128/25', '198.51.100.0/25'},
f'TEST1{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65546': {'192.0.2.0/25'},
f'TEST1{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'192.0.2.128/25', '198.51.100.0/25'},
},
{
'TEST2-AS65547': {'2001:db8::/32'}
f'TEST2{REDIS_KEY_ORIGIN_SOURCE_SEPARATOR}AS65547': {'2001:db8::/32'}
},
),
{}
Expand Down

0 comments on commit 20d06ab

Please sign in to comment.