Skip to content

Commit

Permalink
potentionally fixed threading issue
Browse files Browse the repository at this point in the history
  • Loading branch information
wegrata committed May 16, 2013
1 parent 6d4366e commit 5fd220a
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from datetime import timedelta, datetime
import logging
from base_change_handler import BaseChangeHandler

log = logging.getLogger(__name__)

class BaseChangeThresholdHandler(BaseChangeHandler):
Expand All @@ -20,7 +19,7 @@ def __init__(self, countThreshold, timeThreshold=timedelta.max):
self._timeThreshold = timeThreshold
self._changeCount = 0
self._lastActionTime = datetime.now()

def _resetChangeToThreshold(self):
"""Resets each threshold independently"""
if self._changeCount >= self._countThreshold:
Expand All @@ -30,7 +29,7 @@ def _resetChangeToThreshold(self):

def _shouldTakeAction(self):
# log.debug("class: {0} count: {1} countThreshold: {2} timedelta: {3} timethreshold: {4}\n\n".format(
# self.__class__.__name__, self._changeCount, self._countThreshold,
# self.__class__.__name__, self._changeCount, self._countThreshold,
# (datetime.now() -self._lastActionTime) , self._timeThreshold))

if ((self._changeCount >= self._countThreshold) or
Expand All @@ -43,7 +42,7 @@ def handle(self, change, database):
#check to see if we can care about the change. if so increase the count.
if self._canHandle(change, database):
self._changeCount = self._changeCount + 1

if self._shouldTakeAction():
self._handle(change, database)
self._resetChangeToThreshold()
22 changes: 11 additions & 11 deletions LR/lr/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pylons import *
from lr.lib import ModelParser, SpecValidationException, helpers as h
from uuid import uuid4
import couchdb, os, logging, datetime, re, pprint
import couchdb, os, logging, datetime, re, pprint
log = logging.getLogger(__name__)

from resource_data import ResourceDataModel, appConfig
Expand All @@ -26,11 +26,11 @@


def _getNodeDocument(docModel, docType, numType=None, isRequired=True):

docs = docModel.getAll()
docModels = filter(lambda row :'doc_type' in row and row['doc_type']==docType, docs)
numModelDocs = len(docModels)

if numType is not None and isRequired and numModelDocs != numType:
raise(Exception("Error {0} of type '{1}' is required in database '{2}'".
format(numType, docType, docModel._defaultDB.name)))
Expand All @@ -42,7 +42,7 @@ def _getNodeDocument(docModel, docType, numType=None, isRequired=True):
return {}
else:
results = []
for d in docModels:
for d in docModels:
model = docModel(d)
model.validate()
results.append(model.specData)
Expand All @@ -54,29 +54,29 @@ def getNodeConfig():
#get community desciption.
docs = CommunityModel.getAll()
nodeConfig = {}

nodeConfig['community_description']=_getNodeDocument(CommunityModel,
'community_description', 1)
nodeConfig['network_description'] = _getNodeDocument(NetworkModel,
nodeConfig['network_description'] = _getNodeDocument(NetworkModel,
'network_description', 1)
nodeConfig['network_policy_description'] = _getNodeDocument(NetworkPolicyModel,
'policy_description', 1)
nodeConfig['node_description'] = _getNodeDocument(NodeModel,
nodeConfig['node_description'] = _getNodeDocument(NodeModel,
"node_description", 1)
nodeConfig['node_filter_description'] = _getNodeDocument(NodeFilterModel,
"filter_description", 1, False)
nodeConfig['node_services'] =_getNodeDocument(NodeServiceModel,
nodeConfig['node_services'] =_getNodeDocument(NodeServiceModel,
"service_description",None)
nodeConfig['node_connectivity'] = _getNodeDocument(NodeConnectivityModel,
"connection_description", None)
return nodeConfig

LRNode = LRNodeModel(getNodeConfig())

# Start process that listens the resource_data databasechange feed in order
# Start process that listens the resource_data databasechange feed in order
# to mirror distributable/resource_data type documents, udpate views and fire
# periodic distribution.
monitorResourceDataChanges()
# monitorResourceDataChanges()



7 changes: 4 additions & 3 deletions LR/lr/model/resource_data_monitor/compaction_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ def _compactView(self, compactUrl, credentials):
#curl -H "Content-Type: application/json" -X POST http://localhost:5984/dbname/_compact/designname

req = urllib2.Request(compactUrl, data="", headers={"Content-Type": "application/json"})
base64string = base64.encodestring('%s:%s' % credentials).replace('\n', '')
req.add_header("Authorization", "Basic %s" % base64string)
if credentials is not None and len(credentials) > 0:
base64string = base64.encodestring('%s:%s' % credentials).replace('\n', '')
req.add_header("Authorization", "Basic %s" % base64string)
log.debug(urllib2.urlopen(req).read())

def _handle(self, change, database):
log.debug("class: {0} Updating views ...".format(self.__class__.__name__))
try:
designDocs = database.view('_all_docs', include_docs=True,
startkey='_design%2F', endkey='_design0')
startkey='_design%2F', endkey='_design0')
for designDoc in designDocs:
viewInfo = "{0}/{1}/_info".format(database.resource.url, designDoc.id)
viewInfo = json.load(urllib2.urlopen(viewInfo))
Expand Down
8 changes: 5 additions & 3 deletions LR/lr/model/resource_data_monitor/incoming_copy_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import couchdb
from threading import Thread
from pylons import config
from os import getpid
from lr.lib import SpecValidationException, helpers as h
from lr.lib.couch_change_monitor import BaseChangeHandler
from lr.model import ResourceDataModel
Expand Down Expand Up @@ -47,13 +48,14 @@ def handleDocument(newDoc):
try:
# newDoc['node_timestamp'] = h.nowToISO8601Zformat()
ResourceDataModelValidator.set_timestamps(newDoc)
repl_helper.handle(newDoc)
del newDoc['_rev']
self.repl_helper.handle(newDoc)
# rd = ResourceDataModel(newDoc)
# rd.save(log_exceptions=False)
except SpecValidationException as e:
log.error(newDoc['_id'] + str(e))
except ResourceConflict:
log.error('conflict')
except ResourceConflict as ex:
log.exception(ex)
except Exception as ex:
should_delete = False # don't delete something unexpected happend
log.error(ex)
Expand Down
8 changes: 5 additions & 3 deletions LR/lr/websetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from lr.config.environment import load_environment
from lr.plugins import init_plugins

from lr.model.resource_data_monitor import monitorResourceDataChanges
log = logging.getLogger(__name__)


def setup_app(command, conf, vars):
"""Place any commands to setup lr here"""
# Don't reload the app if it was loaded under the testing environment
if not pylons.test.pylonsapp:
load_environment(conf.global_conf, conf.local_conf)

assert init_plugins() is not None, "Plugins not Loading!!!!"
monitorResourceDataChanges()
print("got here")
assert init_plugins() is not None, "Plugins not Loading!!!!"

0 comments on commit 5fd220a

Please sign in to comment.