Skip to content

Commit

Permalink
Merge pull request #249 from vEpiphyte/epiphyte_remcycle_updates
Browse files Browse the repository at this point in the history
Epiphyte remcycle updates
  • Loading branch information
vEpiphyte committed May 31, 2017
2 parents d4b6616 + 353e5e5 commit 0cc8029
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 26 deletions.
10 changes: 8 additions & 2 deletions synapse/lib/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _checkCacheTimes(self):
finally:
if not self.isfini and self.maxtime != None:
ival = self.maxtime / 10.0
self.sched.insec(ival, self._checkCacheTimes )
self.schevt = self.sched.insec(ival, self._checkCacheTimes )

def clear(self):
'''
Expand Down Expand Up @@ -174,7 +174,10 @@ def __len__(self):
return len(self.cache)

def __iter__(self):
return list( self.cache.items() )
return iter(list(self.cache.items()))

def __contains__(self, item):
return item in self.cache

def _onCacheFini(self):
for key in self.keys():
Expand Down Expand Up @@ -242,6 +245,9 @@ def clear(self):
def __len__(self):
return len(self.fifo)

def __contains__(self, item):
return item in self.cache

class TufoCache(Cache):

def __init__(self, core, maxtime=None):
Expand Down
183 changes: 159 additions & 24 deletions synapse/lib/remcycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import synapse.async as s_async
import synapse.compat as s_compat
import synapse.cortex as s_cortex
import synapse.lib.cache as s_cache
import synapse.lib.ingest as s_ingest
import synapse.lib.config as s_config
import synapse.lib.threads as s_threads
Expand All @@ -40,13 +41,21 @@
MIN_WORKER_THREADS = 'web_min_worker_threads'
MAX_WORKER_THREADS = 'web_max_worker_threads'
MAX_SPOOL_FILESIZE = 'web_max_spool_file_size'
CACHE_ENABLED = 'web_result_cache_enable'
CACHE_TIMEOUT = 'web_result_cache_timeout'
HYPNOS_BASE_DEFS = (
(MIN_WORKER_THREADS, {'type': 'int', 'doc': 'Minimum number of worker threads to spawn', 'defval': 8}),
(MAX_WORKER_THREADS, {'type': 'int', 'doc': 'Maximum number of worker threads to spawn', 'defval': 64}),
(MAX_SPOOL_FILESIZE, {'type': 'int',
'doc': 'Maximum spoolfile size, in bytes, to use for storing responses associated with '
'APIs that have ingest definitions.',
'defval': s_axon.megabyte * 2})
'defval': s_axon.megabyte * 2}),
(CACHE_ENABLED, {'type': 'bool',
'doc': 'Enable caching of job results for a period of time, retrievable by jobid.',
'defval': False}),
(CACHE_TIMEOUT, {'type': 'int',
'doc': 'Timeout value, in seconds, that the results will persist in the cache.',
'defval': 300})
)

class Nyx(object):
Expand Down Expand Up @@ -267,6 +276,22 @@ class Hypnos(s_config.Config):
The Hypnos object inherits from the Config object, and as such has both
configable parameters and an EventBus available for message passing.
Notes:
The following items may be passed via kwargs to change the Hypnos
object behavior:
* ioloop: Tornado ioloop used by the IO thread. This would normally
be left unset, and an ioloop will be created for the io
thread. This is provided as a helper for testing.
* content_type_skip: A list of content-type values which will not have
any attempts to decode data done on them.
Args:
core (synapse.cores.common.Cortex): A cortex used to store ingest data.
By default a ram cortex is used.
opts (dict): Optional configuration data for the Config mixin.
defs (tuple): Default configuration data for the Config mixin.
'''

def __init__(self,
Expand All @@ -275,22 +300,6 @@ def __init__(self,
defs=HYPNOS_BASE_DEFS,
*args,
**kwargs):
'''
Notes:
The following items may be passed via kwargs to change the Hypnos object
behavior:
* ioloop: Tornado ioloop used by the IO thread. This would normally
be left unset, and an ioloop will be created for the io
thread. This is provided as a helper for testing.
Args:
core (synapse.cores.common.Cortex): A cortex used to store ingest data. By default,
a ram cortex is used.
opts (dict): Optional configuration data for the Config mixin.
defs (tuple): Default configuration data for the Config mixin.
'''

s_config.Config.__init__(self,
opts,
defs)
Expand Down Expand Up @@ -328,9 +337,19 @@ def __init__(self,
self._web_api_ingests = collections.defaultdict(list)
self._web_api_gest_opens = {}

self.web_cache = s_cache.Cache()
self.web_cache_enabled = self.getConfOpt(CACHE_ENABLED)
if self.web_cache_enabled:
self.webCacheEnable()
# Setup Fini handlers
self.onfini(self._onHypoFini)

# List of content-type headers to skip processing on
self._web_content_type_skip = set([])
self.webContentTypeSkipAdd('application/octet-stream')
for ct in kwargs.get('content_type_skip', []):
self.webContentTypeSkipAdd(ct)

def __repr__(self):
d = {'name': self.__class__.__name__,
'loc': hex(id(self)),
Expand All @@ -352,6 +371,8 @@ def _onHypoFini(self):
self.web_boss.fini()
# Stop the consuming pool
self.web_pool.fini()
# Stop the web cache
self.web_cache.fini()

def getWebDescription(self):
'''
Expand All @@ -361,8 +382,6 @@ def getWebDescription(self):
Returns:
dict: Dictionary describing the regsistered namespace API data.
'''
# Make copies of object so the returned multable dictionary does not
# affect the
d = {}
for ns in self._web_namespaces:
nsd = {'doc': self._web_docs[ns]}
Expand Down Expand Up @@ -666,8 +685,7 @@ def _webFlattenHttpResponse(resp):
'effective_url': resp.effective_url, })
return resp_dict

@staticmethod
def _webProcessResponseFlatten(resp_dict):
def _webProcessResponseFlatten(self, resp_dict):
'''
Process a flattened HTTP response to extract as much meaningful data
out of it as possible.
Expand All @@ -691,9 +709,9 @@ def _webProcessResponseFlatten(resp_dict):
return
# Try to do a clean decoding of the provided data if possible.
ct = resp_dict.get('headers', {}).get('Content-Type', 'text/plain')
if ct.lower() == 'application/octet-stream':
return
ct_type, ct_params = cgi.parse_header(ct)
if ct_type.lower() in self._web_content_type_skip:
return
charset = ct_params.get('charset', 'utf-8').lower()
try:
resp_dict['data'] = resp_dict.get('data').decode(charset)
Expand Down Expand Up @@ -943,4 +961,121 @@ def webJobWait(self, jid, timeout=None):
bool: async.Boss.wait() result.
'''
return self.web_boss.wait(jid, timeout=timeout)
return self.web_boss.wait(jid, timeout=timeout)

def webContentTypeSkipAdd(self, content_type):
'''
Add a content-type value to be skipped from any sort of decoding
attempts.
Args:
content_type (str): Content-type value to skip.
'''
self._web_content_type_skip.add(content_type)

def webContentTypeSkipDel(self, content_type):
'''
Removes a content-type value from the set of values to be skipped
from any sort of decoding attempts.
Args:
content_type (str): Content-type value to remove.
'''
if content_type in self._web_content_type_skip:
self._web_content_type_skip.remove(content_type)

def _cacheRequestResults(self, event):
'''
Cache the request response/errors.
Ideally, the cached results will be msgpack serializable. If the
api_args provided were not msgpack serializable, attempts to get
cached results over Telepath may fail.
Args:
event ((str, dict)): job:fini event.
Returns:
None: Returns None.
'''
jid, job = event[1].get('job')
task_kwargs = job['task'][2] # type: dict
# Cache items which are likely going to
resp = task_kwargs.get('resp', {}).copy()
if 'ingdata' in resp:
resp.pop('ingdata', None)
# Convert the spooled temporary file back into a bytes object
data = resp.pop('data')
data.seek(0)
data = data.read()
resp['data'] = data
d = {'web_api_name': task_kwargs.get('web_api_name'),
'resp': resp,
'api_args': task_kwargs.get('api_args', {})}
if 'err' in job:
d['err'] = job['err']
d['errmsg'] = job['errmsg']
d['errfile'] = job['errfile']
d['errline'] = job['errline']
self.web_cache.put(jid, d)

def webCacheGet(self, jid):
'''
Retrieve the cached web response for a given job id.
Args:
jid (str): Job ID to retrieve.
Returns:
dict: A dictionary containing the job response data. It will have
the following keys:
* web_api_name: Name of the API
* resp: Dictionary containing response data
* api_args: Args used when crafting the HTTPRequest with Nyx
* err (optional): Error type if a error is encountered.
* errmsg (optional): Error message if a error is encountered.
* errfile (optional): Empty string if a error is encountered.
* errline (optional): Empty string if a error is encountered.
'''
if not self.web_cache_enabled:
logger.warning('Cached response requested but cache not enabled.')
return self.web_cache.get(jid)

def webCachePop(self, jid):
'''
Retrieve the cached web response for a given job id and remove it from the cache.
Args:
jid (str): Job ID to retrieve.
Returns:
dict: A dictionary containing the job response data. See the docs
for webCacheGet for the dictionary details.
'''
if not self.web_cache_enabled:
logger.warning('Cache deletion requested but cache not enabled.')
return self.web_cache.pop(jid)

def webCacheEnable(self):
'''
Enable caching of results from fireWebApi.
'''
self.web_cache_enabled = True
self.setConfOpt(CACHE_ENABLED, True)
self.web_cache.setMaxTime(self.getConfOpt(CACHE_TIMEOUT))
self.web_boss.on('job:fini', self._cacheRequestResults)

def webCacheDisable(self):
'''
Disable caching of results from fireWebApi and clear all data from the cache.
'''
self.web_boss.off('job:fini', self._cacheRequestResults)
self.webCacheClear()
self.setConfOpt(CACHE_ENABLED, False)
self.web_cache_enabled = False

def webCacheClear(self):
'''
Clear all the contents of the web cache.
'''
self.web_cache.clear()
77 changes: 77 additions & 0 deletions synapse/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def onmiss(key):
return 10

c.setOnMiss( onmiss )
self.false('woot' in c)
self.assertEqual( c.get('woot'), 10 )
self.true('woot' in c)

def test_cache_tufo(self):
core = s_cortex.openurl('ram:///')
Expand Down Expand Up @@ -112,6 +114,10 @@ def getfoo(x):
cache = s_cache.KeyCache(getfoo)

self.assertEqual( cache[10], 'asdf' )
# Ensure put/pop methods work.
cache.put(20, 'wasd')
self.eq(cache[20], 'wasd')
self.eq(cache.pop(20), 'wasd')

def test_cache_fixed(self):

Expand All @@ -121,7 +127,10 @@ def getfoo(x):
return x + 20

cache = s_cache.FixedCache(maxsize=3, onmiss=getfoo)
self.false(30 in cache)
self.eq( cache.get(30), 50 )
self.eq(len(cache), 1)
self.true(30 in cache)
self.eq( cache.get(30), 50 )
self.eq( cache.get(30), 50 )
self.eq( cache.get(30), 50 )
Expand All @@ -143,3 +152,71 @@ def getfoo(x):
self.eq( cache.get(30), 50 )

self.eq( data[30], 3 )

def test_cache_magic(self):
c = s_cache.Cache()
c.put(1, 'a')
c.put(2, 'b')
keys = set([])
values = set([])

self.eq(len(c), 2)

cvs = c.values()
cvs.sort()
self.eq(cvs, ['a', 'b'])

cks = c.keys()
cks.sort()
self.eq(cks, [1, 2])

for k, v in c:
keys.add(k)
values.add(v)

self.eq(keys, {1, 2})
self.eq(values, {'a', 'b'})

def test_cache_clearing(self):
c = s_cache.Cache()

d = {}
def flush(event):
key = event[1].get('key')
d[key] = c.get(key)

c.on('cache:flush', flush)
c.put(1, 'a')
c.put(2, 'b')
self.eq(len(c), 2)

c.flush(1)
self.true(1 in d)
self.eq(d, {1: 'a'})
self.eq(len(c), 2) # A straight flush doesn't remove the key.

c.clear()
self.eq(len(c), 0)

def test_cache_fini(self):
c = s_cache.Cache(maxtime=0.1)
c.put(1, 'a')
self.nn(c.schevt)
self.nn(c.schevt[1])
c.fini()
self.none(c.schevt[1])
self.eq(len(c), 0)

def test_cache_defval(self):
# Ensure default behaviors are covered.
c = s_cache.Cache()
r = c.get('foo')
self.none(r)

fc = s_cache.FixedCache(maxsize=10)
fr = fc.get('foo')
self.none(fr)

od = s_cache.OnDem()
with self.raises(KeyError) as cm:
od.get('foo')
Loading

0 comments on commit 0cc8029

Please sign in to comment.