Permalink
Browse files

fixed fsm cleanup for grep

  • Loading branch information...
1 parent 26c2bc0 commit 66f7a7886d216cb35cfaf0cdbd4690f0288a8af0 Navraj Chohan committed Nov 24, 2011
Showing with 62 additions and 25 deletions.
  1. +61 −25 grep/__init__.py
  2. +1 −0 main.py
View
@@ -1,9 +1,11 @@
import logging
+import datetime
from data.grep import GrepDataSet
from data.grep import get_result_query
from data.grep import GrepResults
from records import Record
from google.appengine.ext import db
+from google.appengine.api import memcache
from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
@@ -118,7 +120,41 @@ def post(self):
record.mr_id = mapreduce_id
record.put()
self.redirect('/grep')
-
+
+class FSMMapperCallBack(webapp.RequestHandler):
+ def post(self):
+ name = self.request.headers["mapreduce-id"]
+ if not name:
+ name = "NAME NOT FOUND"
+ logging.info("MR CALLBACK " + name)
+ q = Record.all()
+ q.filter('mr_id =', name)
+ q.fetch(1)
+ if q:
+ for ii in q:
+ t = memcache.get('fsm_mapper_cleanup')
+ if not t:
+ logging.error("Unable to get datetime from memcache")
+ return False
+ dt, msec = t.split(".")
+ dt = datetime.datetime.strptime(dt, '%Y-%m-%d %H:%M:%S')
+ msec = datetime.timedelta(microseconds = int(msec))
+ fullDatetime = dt + msec
+ ii.end = fullDatetime
+
+ delta = (ii.end - ii.start)
+ ii.total = float(delta.days * 86400 + delta.seconds) + float(delta.microseconds)/1000000
+ ii.state = "Done"
+ ii.put()
+
+ logging.info("updated: record for MR job id %s"%name)
+ else:
+ logging.info("Unable to find record for MR job id %s"%name)
+
+def fsm_mapper(entity):
+ max_time = memcache.get('fsm_mapper_cleanup')
+ if max_time is None or max_time < str(entity.modifiedDate):
+ memcache.set('fsm_mapper_cleanup', str(entity.modifiedDate))
def fsm_calculate_run_time():
""" Fantasm does not give call backs when its done. Must figure it out
@@ -134,29 +170,29 @@ def fsm_calculate_run_time():
logging.error("Unable to find a record for fsm/grep")
return False
- q = None
- record = None
for ii in results:
- if ii.state == "Done":
- logging.error("Last FSM end time has already been calculated")
- q = GrepResults.all()
- if not q:
- logging.error("No query returned for Grep results")
- return False
- record = ii
+ ii.state = "Calculating time"
+ ii.put()
+ shards = ii.num_entities/1000
+ if shards < 1:
+ shards = 1
+ if shards > 256:
+ shards = 256 # max amount of shards allowed
- max_date = None
- while True:
- results = q.fetch(1000)
- for ii in results:
- date = ii.modifiedDate
- if max_date == None or max_date < date:
- max_date = date
- if len(results) < 1000:
- break;
- record.state = "Done"
- record.end = max_date
- delta = (record.end - record.start)
- record.total = float(delta.days * 86400 + delta.seconds) + float(delta.microseconds)/1000000
- record.put()
- return True
+ kind = "GrepResults" #get_output_class(ii.num_entities)
+ mapreduce_id = control.start_map(
+ name="FSM Grep cleanup",
+ handler_spec="grep.fsm_mapper",
+ reader_spec="mapreduce.input_readers.DatastoreInputReader",
+ mapper_parameters={
+ "entity_kind": "data.grep."+kind,
+ "processing_rate": 500
+ },
+ mapreduce_parameters={model.MapreduceSpec.PARAM_DONE_CALLBACK:
+ '/grep/fsm/callback'},
+ shard_count=shards,
+ queue_name="default",
+ )
+ ii.mr_id = mapreduce_id
+ ii.put()
+ return True
View
@@ -76,6 +76,7 @@ def get(self):
('/join/mr/callback', join.mr.MRCallback),
('/join/fsm/callback', join.FSMMapperCallBack),
('/grep/mr/callback', grep.mr.MRCallback),
+ ('/grep/fsm/callback', grep.FSMMapperCallBack),
('/jobs', jobs.PastJobs),
],
debug=True)

0 comments on commit 66f7a78

Please sign in to comment.