Skip to content

Commit

Permalink
Merge branch 'master' of github.com:sandialabs/slycat
Browse files Browse the repository at this point in the history
  • Loading branch information
srbdev committed Apr 11, 2016
2 parents e0e6e2a + ec72421 commit 0872a17
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 35 deletions.
155 changes: 127 additions & 28 deletions packages/slycat/web/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,70 @@
import slycat.web.server.remote
import stat
import uuid
import sys
import cPickle
import Queue
import threading

config = {}


class ServeCache(object):
"""
class used to cache HQL and metadata queries
usage example:
server_cache = ServeCache()
with server_cache.lock:
apply: crud operation to
server_cache.cache["artifact:aid:mid"]
\
server_cache.cache["artifact:aid:mid"]["artifact:data"]
eg: server_cache.cache["artifact:aid:mid"]["metadata"], server_cache.cache["artifact:aid:mid"]["hql-result"]
NOTE: a parse tree is also generated in order to speed up future unseen calls
"""
__cache = {}
__queue = Queue.Queue()
__lock = threading.Lock()

def __init__(self):
pass
@property
def cache(self):
"""
:return: dict() cache tree see class details
"""
return self.__cache
@cache.deleter
def cache(self):
"""
resets the cash to an empty dict {}
:return:
"""
self.__cache = {}
@property
def queue(self):
"""
blocking queue that is read by the slycat.web.server.cleanup.py to force a cache cleanup
by the cache cleanup thread.
:return:
"""
return self.__queue
@property
def lock(self):
"""
threading.Lock() used to control crud operations to the cache.
:return:
"""
return self.__lock
def clean(self):
"""
Request a cleanup pass for the cache.
"""
cherrypy.log.error("updating server cache force cleanup queue")
self.__queue.put("cleanup")
server_cache = ServeCache()# instantiate our server cache for use here and in slycat.web.server.cleanup.py

def mix(a, b, amount):
"""Linear interpolation between two numbers. Useful for computing model progress."""
return ((1.0 - amount) * a) + (amount * b)
Expand Down Expand Up @@ -122,22 +183,35 @@ def get_model_arrayset_metadata(database, model, aid, arrays=None, statistics=No
if isinstance(unique, basestring):
unique = slycat.hyperchunks.parse(unique)

# Handle legacy behavior.
# Handle legacy behavior
if arrays is None and statistics is None and unique is None:
with slycat.web.server.hdf5.lock:
with slycat.web.server.hdf5.open(model["artifact:%s" % aid], "r+") as file:
hdf5_arrayset = slycat.hdf5.ArraySet(file)
results = []
for array in sorted(hdf5_arrayset.keys()):
hdf5_array = hdf5_arrayset[array]
results.append({
"array": int(array),
"index" : int(array),
"dimensions" : hdf5_array.dimensions,
"attributes" : hdf5_array.attributes,
"shape": tuple([dimension["end"] - dimension["begin"] for dimension in hdf5_array.dimensions]),
})
return results
with server_cache.lock:
mydict_as_string = cPickle.dumps(server_cache.cache)
cherrypy.log.error("\n\n in metadata call server cache size %s %s\n" % (sys.getsizeof(mydict_as_string),model["_id"]))
if "artifact:%s%s" % (aid,model["_id"]) in server_cache.cache:
cherrypy.log.error("\n\n found artifact\n")
if "metadata" in server_cache.cache["artifact:%s%s" % (aid,model["_id"])]:
cherrypy.log.error("metadata janga %s\n" % server_cache.cache.keys())
return server_cache.cache["artifact:%s%s" % (aid,model["_id"])]["metadata"]
else:
server_cache.cache["artifact:%s%s" % (aid,model["_id"])] = {}
cherrypy.log.error("metadata server cache: %s" % server_cache.cache.keys())

with slycat.web.server.hdf5.lock:
with slycat.web.server.hdf5.open(model["artifact:%s" % aid], "r+") as file:
hdf5_arrayset = slycat.hdf5.ArraySet(file)
results = []
for array in sorted(hdf5_arrayset.keys()):
hdf5_array = hdf5_arrayset[array]
results.append({
"array": int(array),
"index" : int(array),
"dimensions" : hdf5_array.dimensions,
"attributes" : hdf5_array.attributes,
"shape": tuple([dimension["end"] - dimension["begin"] for dimension in hdf5_array.dimensions]),
})
server_cache.cache["artifact:%s%s" % (aid,model["_id"])]["metadata"] = results
return results

with slycat.web.server.hdf5.lock:
with slycat.web.server.hdf5.open(model["artifact:%s" % aid], "r+") as file: # We have to open the file with writing enabled in case the statistics cache needs to be updated.
Expand Down Expand Up @@ -212,23 +286,48 @@ def get_model_arrayset_data(database, model, aid, hyperchunks):
"""
if isinstance(hyperchunks, basestring):
hyperchunks = slycat.hyperchunks.parse(hyperchunks)
#slycat.hyperchunks.tostring(expression)

with server_cache.lock:
update_cache = False
if "artifact:%s%s" % (aid,model["_id"]) in server_cache.cache:
if slycat.hyperchunks.tostring(hyperchunks) in server_cache.cache["artifact:%s%s" % (aid,model["_id"])]:
for value in server_cache.cache["artifact:%s%s" % (aid,model["_id"])][slycat.hyperchunks.tostring(hyperchunks)]:
yield value
else:
update_cache = True
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][slycat.hyperchunks.tostring(hyperchunks)] = []
else:
update_cache = True
server_cache.cache["artifact:%s%s" % (aid,model["_id"])] = {}
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][slycat.hyperchunks.tostring(hyperchunks)] = []

with slycat.web.server.hdf5.lock:
with slycat.web.server.hdf5.open(model["artifact:%s" % aid], "r+") as file:
hdf5_arrayset = slycat.hdf5.ArraySet(file)
for array in slycat.hyperchunks.arrays(hyperchunks, hdf5_arrayset.array_count()):
hdf5_array = hdf5_arrayset[array.index]
if update_cache:
with slycat.web.server.hdf5.lock:
with slycat.web.server.hdf5.open(model["artifact:%s" % aid], "r+") as file:
hdf5_arrayset = slycat.hdf5.ArraySet(file)
for array in slycat.hyperchunks.arrays(hyperchunks, hdf5_arrayset.array_count()):
hdf5_array = hdf5_arrayset[array.index]

if array.order is not None:
order = evaluate(hdf5_array, array.order, "order")
if array.index not in server_cache.cache["artifact:%s%s" % (aid,model["_id"])]:
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][array.index]={}

for attribute in array.attributes(len(hdf5_array.attributes)):
values = evaluate(hdf5_array, attribute.expression, "attribute")
for hyperslice in attribute.hyperslices():
if array.order is not None:
yield values[order][hyperslice]
else:
yield values[hyperslice]
order = evaluate(hdf5_array, array.order, "order")

for attribute in array.attributes(len(hdf5_array.attributes)):
if slycat.hyperchunks.tostring(attribute.expression) not in server_cache.cache["artifact:%s%s" % (aid,model["_id"])][array.index]:
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][array.index][slycat.hyperchunks.tostring(attribute.expression)] = evaluate(hdf5_array, attribute.expression, "attribute")

for hyperslice in attribute.hyperslices():
if array.order is not None:
value = server_cache.cache["artifact:%s%s" % (aid,model["_id"])][array.index][slycat.hyperchunks.tostring(attribute.expression)][order][hyperslice]
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][slycat.hyperchunks.tostring(hyperchunks)].append(value)
yield value
else:
value = server_cache.cache["artifact:%s%s" % (aid,model["_id"])][array.index][slycat.hyperchunks.tostring(attribute.expression)][hyperslice]
server_cache.cache["artifact:%s%s" % (aid,model["_id"])][slycat.hyperchunks.tostring(hyperchunks)].append(value)
yield value

def get_model_parameter(database, model, aid):
key = "artifact:%s" % aid
Expand Down
33 changes: 33 additions & 0 deletions packages/slycat/web/server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import Queue
import slycat.web.server.database.couchdb
import slycat.web.server.hdf5
import slycat.web.server
import threading
import time
import sys
import cPickle

def _array_cleanup_worker():
cherrypy.log.error("Started array cleanup worker.")
Expand Down Expand Up @@ -49,10 +52,40 @@ def _login_session_cleanup_worker():
_login_session_cleanup_worker.thread = threading.Thread(name="session-cleanup", target=_login_session_cleanup_worker)
_login_session_cleanup_worker.thread.daemon = True

def _cache_cleanup_worker():
import cherrypy
cherrypy.log.error("Started server cache cleanup worker.")
while True:
time.sleep(datetime.timedelta(minutes=30).total_seconds())
with slycat.web.server.server_cache.lock:
cherrypy.log.error("running server cache-cleanup thread cache size = %s mbs" % (sys.getsizeof(cPickle.dumps(slycat.web.server.server_cache.cache))/1024/1024))
_cache_cleanup()

_cache_cleanup_worker.thread = threading.Thread(name="cache-cleanup", target=_cache_cleanup_worker)
_cache_cleanup_worker.thread.daemon = True

def _forced_cache_cleanup_worker():
import cherrypy
cherrypy.log.error("Started server forced cache cleanup worker.")
while True:
msg = slycat.web.server.server_cache.queue.get()
with slycat.web.server.server_cache.lock:
cherrypy.log.error("running server forced-cache-cleanup thread with: %s" % msg)
_cache_cleanup()

_forced_cache_cleanup_worker.thread = threading.Thread(name="forced-cache-cleanup", target=_forced_cache_cleanup_worker)
_forced_cache_cleanup_worker.thread.daemon = True

def _cache_cleanup():
del slycat.web.server.server_cache.cache
cherrypy.log.error("cache size now = %s mbs" % (sys.getsizeof(cPickle.dumps(slycat.web.server.server_cache.cache))/1024/1024))

def start():
"""Called to start all of the cleanup worker threads."""
_array_cleanup_worker.thread.start()
_login_session_cleanup_worker.thread.start()
_cache_cleanup_worker.thread.start()
_forced_cache_cleanup_worker.thread.start()

def arrays():
"""Request a cleanup pass for unused arrays."""
Expand Down
2 changes: 2 additions & 0 deletions packages/slycat/web/server/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def log_configuration(tree, indent=""):
# Setup root server parameters.
configuration["/"] = {}
configuration["/"]["request.dispatch"] = dispatcher
configuration["/"]["tools.caching.on"] = True
configuration["/"]["tools.caching.delay"] = 3600

authentication = configuration["slycat-web-server"]["authentication"]["plugin"]
configuration["/"]["tools.%s.on" % authentication] = True
Expand Down
5 changes: 4 additions & 1 deletion packages/slycat/web/server/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import time
import Queue
import cPickle
import re
import slycat.email
import slycat.hdf5
Expand Down Expand Up @@ -1091,6 +1092,7 @@ def get_model_array_attribute_chunk(mid, aid, array, attribute, **arguments):

@cherrypy.tools.json_out(on = True)
def get_model_arrayset_metadata(mid, aid, **kwargs):
cherrypy.log.error("GET arrayset metadata mid:%s aid:%s kwargs:%s" %(mid, aid, kwargs.keys()))
database = slycat.web.server.database.couchdb.connect()
model = database.get("model", mid)
project = database.get("project", model["project"])
Expand Down Expand Up @@ -1122,7 +1124,7 @@ def get_model_arrayset_metadata(mid, aid, **kwargs):
except:
slycat.email.send_error("slycat.web.server.handlers.py get_model_arrayset_metadata", "cherrypy.HTTPError 400 not a valid hyperchunks specification.")
raise cherrypy.HTTPError("400 Not a valid hyperchunks specification.")

cherrypy.log.error("GET arrayset metadata arrays:%s stats:%s unique:%s" %(arrays, statistics, unique))
results = slycat.web.server.get_model_arrayset_metadata(database, model, aid, arrays, statistics, unique)
if "unique" in results:
for unique in results["unique"]:
Expand Down Expand Up @@ -1649,6 +1651,7 @@ def get_model_statistics(mid):
total_hdf5_server_size += os.path.getsize(fp)

return {
"server_cache_size": sys.getsizeof(cPickle.dumps(slycat.web.server.server_cache.cache)),
"mid":mid,
"hdf5_file_size":hdf5_file_size,
"total_server_data_size": total_server_data_size,
Expand Down
32 changes: 26 additions & 6 deletions web-server/plugins/slycat-timeseries-model/js/chunker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains certain
rights in this software.
*/
var arrayset_metadata_cache = {};
var arrayset_metadata_retrieval_inprogress = {};
var arrayset_metadata_callbacks = {};

function is_little_endian()
{
Expand Down Expand Up @@ -41,25 +43,43 @@ function get_model_array_attribute_metadata(parameters, dfd)
// Retrieve an arrayset's metadata asynchronously, calling a callback when it's ready ...
function get_model_arrayset_metadata(parameters)
{
// It's cached, so just execute callback with cached metadata
if(arrayset_metadata_cache[parameters.server_root + parameters.mid + parameters.aid] !== undefined) {
parameters.metadata = arrayset_metadata_cache[parameters.server_root + parameters.mid + parameters.aid];
if(parameters.metadataSuccess !== undefined) {
parameters.metadataSuccess(parameters);
} else {
parameters.success(parameters);
}
} else {
}
// It's being cached now, so add callback to queue
else if(arrayset_metadata_retrieval_inprogress[parameters.server_root + parameters.mid + parameters.aid]) {
var callback = parameters.metadataSuccess !== undefined ? parameters.metadataSuccess : parameters.success;
arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid].push({callback:callback, parameters: parameters});
}
// It's not in the cache and it's not being cached, so retrieve it and execute callback queue
else {
arrayset_metadata_retrieval_inprogress[parameters.server_root + parameters.mid + parameters.aid] = true;
var callback = parameters.metadataSuccess !== undefined ? parameters.metadataSuccess : parameters.success;
if(arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid] === undefined)
{
arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid] = [];
}
arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid].push({callback:callback, parameters: parameters});

$.ajax({
url : parameters.server_root + "models/" + parameters.mid + "/arraysets/" + parameters.aid + "/metadata",
contentType : "application/json",
success: function(metadata)
{
arrayset_metadata_cache[parameters.server_root + parameters.mid + parameters.aid] = metadata;
parameters.metadata = metadata;
if(parameters.metadataSuccess !== undefined) {
parameters.metadataSuccess(parameters);
} else {
parameters.success(parameters);
arrayset_metadata_retrieval_inprogress[parameters.server_root + parameters.mid + parameters.aid] = false;
// Execute callback queue
for(var i=0; i < arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid].length; i++)
{
var callback_parameters = arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid][i].parameters;
callback_parameters.metadata = metadata;
arrayset_metadata_callbacks[parameters.server_root + parameters.mid + parameters.aid][i].callback(callback_parameters);
}
},
error: function(request, status, reason_phrase)
Expand Down

0 comments on commit 0872a17

Please sign in to comment.