diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/community_description b/LR/data/specs/v_0_17/models/community_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/community_description rename to LR/data/specs/v_0_17/models/community_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/network_description b/LR/data/specs/v_0_17/models/network_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/network_description rename to LR/data/specs/v_0_17/models/network_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/network_distribution_policy_description b/LR/data/specs/v_0_17/models/network_distribution_policy_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/network_distribution_policy_description rename to LR/data/specs/v_0_17/models/network_distribution_policy_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/node_connectivity_description b/LR/data/specs/v_0_17/models/node_connectivity_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/node_connectivity_description rename to LR/data/specs/v_0_17/models/node_connectivity_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/node_description b/LR/data/specs/v_0_17/models/node_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/node_description rename to LR/data/specs/v_0_17/models/node_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/node_filter_description b/LR/data/specs/v_0_17/models/node_filter_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/node_filter_description rename to LR/data/specs/v_0_17/models/node_filter_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/node_service_description b/LR/data/specs/v_0_17/models/node_service_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/node_service_description rename to LR/data/specs/v_0_17/models/node_service_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/resource_data_description b/LR/data/specs/v_0_17/models/resource_data_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/resource_data_description rename to LR/data/specs/v_0_17/models/resource_data_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/models/status_description b/LR/data/specs/v_0_17/models/status_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/models/status_description rename to LR/data/specs/v_0_17/models/status_description diff --git a/LR/data/LR_Tech_Spec_V_0_17/spec.txt b/LR/data/specs/v_0_17/spec.txt similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_17/spec.txt rename to LR/data/specs/v_0_17/spec.txt diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/community_description b/LR/data/specs/v_0_21/models/community_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/community_description rename to LR/data/specs/v_0_21/models/community_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/connection_description b/LR/data/specs/v_0_21/models/connection_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/connection_description rename to LR/data/specs/v_0_21/models/connection_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/filter_description b/LR/data/specs/v_0_21/models/filter_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/filter_description rename to LR/data/specs/v_0_21/models/filter_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/network_description b/LR/data/specs/v_0_21/models/network_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/network_description rename to LR/data/specs/v_0_21/models/network_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/node_description b/LR/data/specs/v_0_21/models/node_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/node_description rename to LR/data/specs/v_0_21/models/node_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/policy_description b/LR/data/specs/v_0_21/models/policy_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/policy_description rename to LR/data/specs/v_0_21/models/policy_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/resource_data b/LR/data/specs/v_0_21/models/resource_data_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/resource_data rename to LR/data/specs/v_0_21/models/resource_data_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/service_description b/LR/data/specs/v_0_21/models/service_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/service_description rename to LR/data/specs/v_0_21/models/service_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/models/status_description b/LR/data/specs/v_0_21/models/status_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/models/status_description rename to LR/data/specs/v_0_21/models/status_description diff --git a/LR/data/LR_Tech_Spec_V_0_21/spec.txt b/LR/data/specs/v_0_21/spec.txt similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_21/spec.txt rename to LR/data/specs/v_0_21/spec.txt diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/community_description b/LR/data/specs/v_0_23/models/community_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/community_description rename to LR/data/specs/v_0_23/models/community_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/connection_description b/LR/data/specs/v_0_23/models/connection_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/connection_description rename to LR/data/specs/v_0_23/models/connection_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/filter_description b/LR/data/specs/v_0_23/models/filter_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/filter_description rename to LR/data/specs/v_0_23/models/filter_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/network_description b/LR/data/specs/v_0_23/models/network_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/network_description rename to LR/data/specs/v_0_23/models/network_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/node_description b/LR/data/specs/v_0_23/models/node_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/node_description rename to LR/data/specs/v_0_23/models/node_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/policy_description b/LR/data/specs/v_0_23/models/policy_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/policy_description rename to LR/data/specs/v_0_23/models/policy_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/resource_data b/LR/data/specs/v_0_23/models/resource_data_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/resource_data rename to LR/data/specs/v_0_23/models/resource_data_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/service_description b/LR/data/specs/v_0_23/models/service_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/service_description rename to LR/data/specs/v_0_23/models/service_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/models/status_description b/LR/data/specs/v_0_23/models/status_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/models/status_description rename to LR/data/specs/v_0_23/models/status_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/node_description b/LR/data/specs/v_0_23/node_description similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/node_description rename to LR/data/specs/v_0_23/node_description diff --git a/LR/data/LR_Tech_Spec_V_0_23/spec.txt b/LR/data/specs/v_0_23/spec.txt similarity index 100% rename from LR/data/LR_Tech_Spec_V_0_23/spec.txt rename to LR/data/specs/v_0_23/spec.txt diff --git a/LR/development.ini.orig b/LR/development.ini.orig index ed784b5e..26e7a602 100755 --- a/LR/development.ini.orig +++ b/LR/development.ini.orig @@ -69,6 +69,7 @@ couchdb.db.resourcesview = _design/learningregistry-resources/_view/docs couchdb.db.resourcecount = _design/learningregistry-resources/_view/count couchdb.threshold.distributes = 1000 couchdb.threshold.viewupdate = 100 +couchdb.stale.flag = update_after lr.distribute.url = http://localhost/distribute lr.obtain.docid = access:Basic Obtain service lr.harvest.docid = access:Basic Harvest service @@ -82,16 +83,20 @@ lr.services.docid = administrative:Network Node Services service lr.policy.docid = administrative:Resource Distribution Network Policy service lr.status.docid = administrative:Network Node Status service lr.distribute.docid = distribute:Resource Data Distribution service -models_spec_dir = %(here)s/data/models -spec.models.node_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/node_description -spec.models.resource_data = %(here)s/data/LR_Tech_Spec_V_0_23/models/resource_data -spec.models.network_policy_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/policy_description -spec.models.status_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/status_description -spec.models.filter_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/filter_description -spec.models.node_connectivity_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/connection_description -spec.models.community_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/community_description -spec.models.network_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/network_description -spec.models.node_service_description = %(here)s/data/LR_Tech_Spec_V_0_23/models/service_description +specs_dir = %(here)s/data/specs +spec.models.node_description =%(specs_dir)s/v_0_23/models/node_description + +spec.models.resource_data =%(specs_dir)s/v_0_23/models/resource_data_description, + %(specs_dir)s/v_0_21/models/resource_data_description, + %(specs_dir)s/v_0_17/models/resource_data_description + +spec.models.network_policy_description =%(specs_dir)s/v_0_23/models/policy_description +spec.models.status_description =%(specs_dir)s/v_0_23/models/status_description +spec.models.filter_description =%(specs_dir)s/v_0_23/models/filter_description +spec.models.node_connectivity_description =%(specs_dir)s/v_0_23/models/connection_description +spec.models.community_description =%(specs_dir)s/v_0_23/models/community_description +spec.models.network_description =%(specs_dir)s/v_0_23/models/network_description +spec.models.node_service_description =%(specs_dir)s/v_0_23/models/service_description diff --git a/LR/lr/config/routing.py b/LR/lr/config/routing.py index 3096826a..2b039a70 100755 --- a/LR/lr/config/routing.py +++ b/LR/lr/config/routing.py @@ -29,6 +29,10 @@ def mapResource(config_key, member_name, collection_name): map.connect("/"+collection_name,controller=collection_name,action='options',conditions=dict(method=['OPTIONS'])) if member_name == 'swordservice': map.connect("/swordpub",controller='swordservice',action='create') + + if member_name == 'distribute': + map.connect("/destination", controller='distribute', action='destination', + conditions=dict(method='GET')) log.info("Enabling service route for: {0} member: {1} collection: {2}".format(service_doc_id, member_name, collection_name)) else: log.info("Service route for {0} is disabled".format(service_doc_id)) diff --git a/LR/lr/controllers/distribute.py b/LR/lr/controllers/distribute.py index 3ee4e800..429e3b87 100755 --- a/LR/lr/controllers/distribute.py +++ b/LR/lr/controllers/distribute.py @@ -27,158 +27,171 @@ import base64 import pprint +import Queue + log = logging.getLogger(__name__) class DistributeController(BaseController): + __TARGET_NODE_INFO = 'taget_node_info' + __OK = 'ok' + __ERROR = 'error' + def __before__(self): self.resource_data = appConfig['couchdb.db.resourcedata'] """REST Controller styled on the Atom Publishing Protocol""" # To properly map this controller, ensure your config/routing.py # file has a resource setup: # map.resource('distribute', 'distribute') - def index(self, format='html'): - """GET /distribute: All items in the collection""" + + def destination(self): + """GET /destination: return node information""" # url('distribute') - distributeInfo = {'OK': True} - - #if sourceLRNode.isServiceAvailable(NodeServiceModel.DISTRIBUTE) == False: - #distributeInfo['OK'] = False - #else: - distributeInfo['node_config'] = sourceLRNode.config - distributeInfo['distribute_sink_url'] = urlparse.urljoin(request.url,self.resource_data) - # Check to see if the couch resource_data is defined in the config if so use it. - if appConfig.has_key("distribute_sink_url"): - distributeInfo['distribute_sink_url'] = appConfig["distribute_sink_url"] + response = {self.__OK: True} + + try: + response[self.__TARGET_NODE_INFO] = sourceLRNode.distributeInfo + except Exception as ex: + log.exception(ex) + response["error":"Internal error"] + + log.info("received distribute request...returning: \n"+pprint.pformat(response, 4)) + return json.dumps(response) + + def _getDistinationInfo(self, connection): + # Make sure we only have one slash in the url path. More than one + #confuses pylons routing libary. + destinationURL = urlparse.urljoin(connection.destination_node_url.strip(), + "destination") + + request = urllib2.Request(destinationURL) + credential = sourceLRNode.getDistributeCredentialFor(destinationURL) + + if credential is not None: + base64string = base64.encodestring('%s:%s' % (credential['username'],credential['password'])).replace("\n", "") + request.add_header("Authorization", "Basic %s" % base64string) + + log.info("\n\nAccess destination node at: "+pprint.pformat(request.__dict__)) + return json.load(urllib2.urlopen(request)) + + def _canDistributeTo(self, connection, sourceNodeInfo): + + if not connection.active: + return {self.__OK: False, + 'connection_id': connection.connection_id, + self.__ERROR: 'Inactive connection'} + + result={self.__OK:True, 'connection_id': connection.connection_id } + sourceNodeInfo = h.dictToObject(sourceNodeInfo) + try: + destinationNodeInfo = h.dictToObject(self._getDistinationInfo(connection)[self.__TARGET_NODE_INFO]) + result['destinationNodeInfo'] = destinationNodeInfo + + if ((sourceNodeInfo.gateway_node or destinationNodeInfo.gateway_node) != connection.gateway_connection): + result[self.__ERROR] = " 'gateway_connection' mismatch between nodes and connection data" + + elif ((sourceNodeInfo.community_id != destinationNodeInfo.community_id) and + ((not sourceNodeInfo.social_community) or (not destinationNodeInfo.social_community))): + result[self.__ERROR] = 'cannot distribute across non social communities' + + elif ((sourceNodeInfo.network_id != destinationNodeInfo.network_id) and + ((not sourceNodeInfo.gateway_node)or(not destinationNodeInfo.gateway_node))): + result[self.__ERROR] = 'cannot distribute across networks (or communities) unless gateway' + + elif ((sourceNodeInfo.gateway_node and destinationNodeInfo.gateway_node) + and (sourceNodeInfo.network_id == destinationNodeInfo.network_id)): + result[self.__ERROR] = 'gateway must only distribute across different networks' + + elif (sourceNodeInfo.gateway_node and not destinationNodeInfo.gateway_node): + result[self.__ERROR] = 'gateways can only distribute to gateways' + except urllib2.URLError as ex: + log.exception(ex) + result[self.__ERROR] = "Cannot reach destination node. "+str(ex.reason) + except Exception as ex: + log.exception(ex) + result[self.__ERROR] = "Internal error. Cannot process destination node info" + + if result.has_key(self.__ERROR): + result[self.__OK] = False + + return result + - log.info("received distribute request...returning: \n"+json.dumps(distributeInfo)) - return json.dumps(distributeInfo) - def _getDistributeDestinations(self): """"Method to test the connections and returns a list of destionation node if the connections are valid""" - nodeDestinationList =[] gatewayConnectionList = [] + connectionsStatusInfo = {self.__OK:True, 'connections':[]} + for connection in sourceLRNode.connections: - # Make sure that the connection is active - if connection.active == False: - continue - destinationLRNode = None - - if connection.gateway_connection == True: + # Make sure that the connection is active + connectionsStatusInfo['connections'].append(self._canDistributeTo(connection, sourceLRNode.distributeInfo)) + + if (connectionsStatusInfo['connections'][-1][self.__OK] and + sourceLRNode.distributeInfo['gateway_node'] and + connectionsStatusInfo['connections'][-1]['destinationNodeInfo'].gateway_node and + connection.gateway_connection): gatewayConnectionList.append(connection) - try: - # Make sure we only have one slash in the url path. More than one - #confuses pylons routing libary. - destinationURL = urlparse.urljoin(connection.destination_node_url.strip(), - "distribute") - - request = urllib2.Request(destinationURL) - credential = sourceLRNode.getDistributeCredentialFor(destinationURL) - - if credential is not None: - base64string = base64.encodestring('%s:%s' % (credential['username'],credential['password'])).replace("\n", "") - request.add_header("Authorization", "Basic %s" % base64string) - - log.info("\n\nAccess destination node at: "+pprint.pformat(request.__dict__)) - distributeInfo = json.load(urllib2.urlopen(request)) - destinationLRNode = LRNodeModel(distributeInfo['node_config']) - except Exception as ex: - log.exception(ex) - continue - # Use of local variable to store if the connection is gateway connection. It is - # done this way to deal with mismatch between node de and connection - # description. - isGatewayConnection = ( - (sourceLRNode.nodeDescription.gateway_node == True) and - (destinationLRNode.nodeDescription.gateway_node ==True)) - # Skip the connection if there is any mismatch between the connection and - # the node data. - if isGatewayConnection != connection.gateway_connection: - log.info("Skip connection. 'gateway_connection' mismatch between node and connection data") - continue - - # Only one gateway connection is allowed, faulty network description + # Only one gateway connection is allowed, faulty network description if len(gatewayConnectionList) > 1: log.info("***Abort distribution. More than one gateway node connection") - #Clear the node destination list no distribution is network description - # is faulty - nodeDestinationList = [] + connectionsStatusInfo[self.__ERROR] ="only one active gateway connection is allowed, faulty network description" break - #Calcuate if the connection is gateway one, if so - #cannot distribute across non social communities - if ((sourceLRNode.communityDescription.community_id != - destinationLRNode.communityDescription.community_id) and - ((sourceLRNode.communityDescription.social_community == False) or - (destinationLRNode.communityDescription.social_community == False))): - log.info("Cannot distribute across non social communities") - continue - # Cannot distribute across networks (or communities) unless gateway - if((isGatewayConnection == False) and - ((sourceLRNode.communityDescription.community_id != - destinationLRNode.communityDescription.community_id) or - (sourceLRNode.networkDescription.network_id != - destinationLRNode.networkDescription.network_id))): - log.info("Different Network. Cannot distribute across networks (or communities) unless gateway") - continue - # Gateway must only distribute across different networks. - if((isGatewayConnection ==True) and - (sourceLRNode.networkDescription.network_id == - destinationLRNode.networkDescription.network_id)): - log.info("Gateway must only distribute across different networks") - continue - # Only gateways can distribute on gateway connection. This is really for - # catching mismatch in the data where a connection says it is between - # gateways when the nodes are not both gateways. - if((connection.gateway_connection == True) and - ((sourceLRNode.nodeDescription.gateway_node == False) or - (destinationLRNode.nodeDescription.gateway_node == False))): - log.info("Only gateways can distribute on gateway connection") - continue - nodeInfo = { "distributeInfo": distributeInfo, - "distribute_sink_url": distributeInfo["distribute_sink_url"], - "destinationNode":destinationLRNode} - nodeDestinationList.append(nodeInfo) - - return nodeDestinationList + + if len (sourceLRNode.connections) == 0: + connectionsStatusInfo[self.__ERROR] ="No connection present for distribution" + + if connectionsStatusInfo.has_key(self.__ERROR) : + connectionsStatusInfo[self.__OK] = False + + return connectionsStatusInfo def create(self): """POST / distribute start distribution""" - def doDistribution(destinationNode, server, sourceUrl, destinationUrl, lock): + distributeResults = Queue.Queue() + + def doDistribution(connectionInfo, server, sourceUrl): # We want to always use the replication filter function to replicate # only distributable doc and filter out any other type of documents. # However we don't have any query arguments until we test if there is any filter. replicationOptions={'filter':ResourceDataModel.REPLICATION_FILTER, + 'source':sourceUrl, + 'connection_id': connectionInfo['connection_id'], 'query_params': None} # If the destination node is using an filter and is not custom use it # as the query params for the filter function - if ((destinationNode.filterDescription is not None) and - (destinationNode.filterDescription.custom_filter == False)): - replicationOptions['query_params'] = destinationNode.filterDescription.specData + if ((connectionInfo['destinationNodeInfo'].filter_description is not None ) and + (connectionInfo['destinationNodeInfo'].filter_description.get('custom_filter') == False)): + replicationOptions['query_params'] =connectionInfo['destinationNodeInfo'].filter_description #if distinationNode['distribute service'] .service_auth["service_authz"] is not None: #log.info("Destination node '{}' require authentication".format(destinationUrl)) - #Try to get the user name and password the url. + #Try to get the user name and password the url + destinationUrl = connectionInfo['destinationNodeInfo'].resource_data_url + credential = sourceLRNode.getDistributeCredentialFor(destinationUrl) if credential is not None: - parsedUrl = urlparse.urlparse(destinationUrl) + parsedUrl = urlparse.urlparse() destinationUrl = destinationUrl.replace(parsedUrl.netloc, "{0}:{1}@{2}".format( credential['username'], credential['password'], parsedUrl.netloc)) - log.info("\n\nReplication started\nSource:{0}\nDestionation:{1}\nArgs:{2}".format( - sourceUrl, destinationUrl, str(replicationOptions))) - if replicationOptions['query_params'] is None: del replicationOptions['query_params'] - results = server.replicate(sourceUrl, destinationUrl, **replicationOptions) - log.debug("Replication results: "+str(results)) - with lock: - server = couchdb.Server(appConfig['couchdb.url']) - db = server[appConfig['couchdb.db.node']] - doc = db[appConfig['lr.nodestatus.docid']] - doc['last_out_sync'] = h.nowToISO8601Zformat() - doc['out_sync_node'] = destinationNode.nodeDescription.node_name - db[appConfig['lr.nodestatus.docid']] = doc + + replicationOptions['target'] = destinationUrl + + request = urllib2.Request(urlparse.urljoin(appConfig['couchdb.url'], '_replicator'), + headers={'Content-Type':'application/json' }, + data = json.dumps(replicationOptions)) + + log.info("\n\nReplication started\nSource:{0}\nDestionation:{1}\nArgs:{2}".format( + sourceUrl, destinationUrl, pprint.pformat(replicationOptions))) + + results = json.load(urllib2.urlopen(request)) + connectionInfo['replication_results'] = results + distributeResults.put(connectionInfo) + log.debug("Replication results: " + pprint.pformat(results)) + log.info("Distribute.......\n") ##Check if the distribte service is available on the node. @@ -188,17 +201,31 @@ def doDistribution(destinationNode, server, sourceUrl, destinationUrl, lock): if((sourceLRNode.connections is None) or (len(sourceLRNode.connections) ==0)): log.info("No connection present for distribution") - return - log.info("Connections: "+str(sourceLRNode.connections)+"\n") - lock = threading.Lock() - for connectionInfo in self._getDistributeDestinations(): - replicationArgs = (connectionInfo['destinationNode'], - defaultCouchServer, - self.resource_data, - connectionInfo["distribute_sink_url"],lock) - - # Use a thread to do the actual replication. - replicationThread = threading.Thread(target=doDistribution, - args=replicationArgs) - replicationThread.start() + return json.dumps({self.__ERROR:''}) + log.info("Connections: \n{0}\n"+pprint.pformat([c.specData for c in sourceLRNode.connections])) + + connectionsStatusInfo = self._getDistributeDestinations() + log.debug("\nSource Node Info:\n{0}".format(pprint.pformat(sourceLRNode.distributeInfo))) + log.debug("\n\n Distribute connections:\n{0}\n\n".format(pprint.pformat(connectionsStatusInfo))) + + + for connectionStatus in connectionsStatusInfo['connections']: + if connectionsStatusInfo.has_key(self.__ERROR) or connectionStatus.has_key(self.__ERROR) == True: + distributeResults.put(connectionStatus) + else: + replicationArgs = (connectionStatus, defaultCouchServer, self.resource_data ) + # Use a thread to do the actual replication. + replicationThread = threading.Thread(target=doDistribution, args=replicationArgs) + replicationThread.start() + replicationThread.join() + log.debug("\n\n\n---------------------distribute threads end--------------------\n\n\n") + + + log.debug("\n\n\n----------Queue results Completed size: {0}--------------\n\n\n".format(distributeResults.qsize())) + connectionsStatusInfo['connections'] = [] + while distributeResults.empty() == False: + connectionsStatusInfo['connections'].append(distributeResults.get()) + log.debug("\n\n======== DISTRIBUTE RESULTS ============\n\n") + log.debug(pprint.pformat(connectionsStatusInfo)) + return json.dumps(connectionsStatusInfo, indent=4) diff --git a/LR/lr/controllers/harvest.py b/LR/lr/controllers/harvest.py index 5a53e5c1..b6af1ccc 100755 --- a/LR/lr/controllers/harvest.py +++ b/LR/lr/controllers/harvest.py @@ -13,7 +13,6 @@ log = logging.getLogger(__name__) import ast import string - from lr.model import LRNode as sourceLRNode, \ NodeServiceModel, ResourceDataModel, LRNodeModel, defaultCouchServer, appConfig BASIC_HARVEST_SERVICE_DOC = appConfig['lr.harvest.docid'] diff --git a/LR/lr/controllers/obtain.py b/LR/lr/controllers/obtain.py index 234cc170..b9609a3e 100755 --- a/LR/lr/controllers/obtain.py +++ b/LR/lr/controllers/obtain.py @@ -33,7 +33,7 @@ def get_view(self,view_name = '_design/learningregistry-resources/_view/docs',ke args = {} if len(keys) > 0: args['keys'] = keys - args['stale'] = 'ok' + args['stale'] = appConfig['couchdb.stale.flag'] if self.limit is not None: args['limit'] = self.limit args['include_docs'] = include_docs diff --git a/LR/lr/controllers/slice.py b/LR/lr/controllers/slice.py index cb0c224e..3a728e1a 100644 --- a/LR/lr/controllers/slice.py +++ b/LR/lr/controllers/slice.py @@ -128,7 +128,7 @@ def _set_boolean_param(paramKey, setifempty=True): def _get_view(self,view_name = '_design/learningregistry-slice/_view/docs',keys=[], include_docs = False, resumptionToken=None, limit=None): db_url = '/'.join([appConfig['couchdb.url'],appConfig['couchdb.db.resourcedata']]) - opts = {"stale": "ok", "reduce": False } + opts = {"stale": appConfig['couchdb.stale.flag'], "reduce": False } if include_docs: opts["include_docs"] = True @@ -150,9 +150,14 @@ def _get_view(self,view_name = '_design/learningregistry-slice/_view/docs',keys= return view def _get_view_total(self,view_name = '_design/learningregistry-slice/_view/docs',keys=[], resumptionToken=None): + + if resumptionToken and "maxResults" in resumptionToken and resumptionToken["maxResults"] != None : + return resumptionToken["maxResults"]; + + db_url = '/'.join([appConfig['couchdb.url'],appConfig['couchdb.db.resourcedata']]) - opts = {"stale": "ok", "reduce": True, "group": True } + opts = {"stale": appConfig['couchdb.stale.flag'], "reduce": True, "group": True } if self.enable_flow_control and resumptionToken != None: opts["keys"] = resumptionToken["keys"] @@ -168,6 +173,8 @@ def _get_view_total(self,view_name = '_design/learningregistry-slice/_view/docs' for row in view: if "value" in row: totalDocs += row["value"] + + #resumptionToken["maxResults"] = totalDocs; return totalDocs def _get_keys(self, params): @@ -267,6 +274,7 @@ def format_data(self,keys_only,docs, keys, forceUnique, current_rt=None): prefix = '{"documents":[\n' num_sent = 0 doc_count = 0 + update_resumption_max_results = current_rt and "maxResults" in current_rt and current_rt["maxResults"] != None if docs is not None: for row in docs: doc_count += 1 @@ -286,6 +294,8 @@ def format_data(self,keys_only,docs, keys, forceUnique, current_rt=None): prefix = ",\n" else: log.debug("{0} skipping: alreadySent {1} / forceUnique {2}".format(doc_count, repr(alreadySent), forceUnique)) + if update_resumption_max_results: + current_rt["maxResults"] = current_rt["maxResults"] - 1 if doc_count == 0: yield prefix @@ -300,11 +310,11 @@ def format_data(self,keys_only,docs, keys, forceUnique, current_rt=None): offset = 0 if offset+doc_count < maxResults: - rt = ''' "resumption_token":"{0}", '''.format(resumption_token.get_offset_token(self.service_id, offset=offset+doc_count, keys=keys)) + rt = ''' "resumption_token":"{0}", '''.format(resumption_token.get_offset_token(self.service_id, offset=offset+doc_count, keys=keys, maxResults=maxResults)) - yield '\n],'+rt+'"resultCount":'+str(num_sent) +'}' + yield '\n],'+rt+'"resultCount":'+str(maxResults) +'}' # if __name__ == '__main__': # param = {START_DATE: "2011-03-10", END_DATE: "2011-05-01", IDENTITY: "NSDL 2 LR Data Pump", 'search_key': 'Arithmetic'} diff --git a/LR/lr/lib/harvest.py b/LR/lr/lib/harvest.py index 877a9208..03b8d790 100755 --- a/LR/lr/lib/harvest.py +++ b/LR/lr/lib/harvest.py @@ -23,7 +23,7 @@ def get_record(self,id): return None def get_records_by_resource(self,resource_locator): - view_data = h.getView(database_url=self.db_url,view_name='_design/learningregistry-resource-location/_view/docs',method="POST",include_docs=True,keys=[resource_locator], stale='ok') + view_data = h.getView(database_url=self.db_url,view_name='_design/learningregistry-resource-location/_view/docs',method="POST",include_docs=True,keys=[resource_locator], stale=appConfig['couchdb.stale.flag']) for doc in view_data: yield doc["doc"] @@ -33,7 +33,7 @@ def list_records(self, from_date, until_date,resumption_token=None, limit=None): def list_metadata_formats(self): return [{'metadataFormat':{'metadataPrefix':'dc'}}] def earliestDate(self): - view = self.db.view('_design/learningregistry-by-date/_view/docs',limit=1,stale='ok') + view = self.db.view('_design/learningregistry-by-date/_view/docs',limit=1,stale=appConfig['couchdb.stale.flag']) if len(view.rows) > 0: return view.rows[0].key else: @@ -42,7 +42,7 @@ def list_identifiers(self, from_date, until_date,resumption_token=None,limit=Non return self.getViewRows(False,until_date,from_date,limit,resumption_token) def getViewRows(self,includeDocs, untilDate,fromDate,limit=None,resumption_token=None): params = { - 'stale':'ok', + 'stale':appConfig['couchdb.stale.flag'], 'include_docs':includeDocs, 'endkey':h.convertToISO8601Zformat(untilDate), 'startkey':h.convertToISO8601Zformat(fromDate), diff --git a/LR/lr/lib/model_parser.py b/LR/lr/lib/model_parser.py index cd36a017..7f9ef215 100755 --- a/LR/lr/lib/model_parser.py +++ b/LR/lr/lib/model_parser.py @@ -460,6 +460,14 @@ def _validateField(self, fieldName, value, modelProp): +fieldName+"' of type "+ modelProp[self._VALUE_TYPE]+"\n\n"+description) + #Check that required string are not empty. + if (self._IS_REQUIRED in modelProp.keys() and + isOfSpecType(value, 'string') and + len(value.strip()) == 0): + raise SpecValidationException( + "Required value for '"+fieldName+"' cannot be an empty string\n\n" + +description) + # Check for matching defined value if (self._VALUE_DEFINED in modelProp.keys() and modelProp[self._VALUE_DEFINED] != value): @@ -526,10 +534,14 @@ def _getModelInfo(self): self._extractData() return self._modelInfo - + def _getDocVersion(self): + if 'doc_version' in self.modelInfo: + return self.modelInfo.doc_version.value + return None modelInfo = property(_getModelInfo, None, None, None) modelName = property(lambda self: self._modelName, None, None, None) + docVersion = property(_getDocVersion, None, None, None) def asJSON(self): """Transforms the parsed model to a valid JSON representation.""" @@ -610,28 +622,21 @@ def extractModelsFromSpec(specFile, destDir='./models/'): parser = OptionParser() parser.add_option("-f", "--file", dest="filepath", - help="The full path of the data model spec definition.", - metavar="FILE") + help="The full path of the data model spec definition.", metavar="FILE") - parser.add_option("-s", "--string", dest="string", - help="String representation of the data model spec definition") + parser.add_option("-s", "--string", dest="string", help="String representation of the data model spec definition") parser.add_option("-j", "--json", dest="json", action = "store_true", - default=False, - help="Show a json representation of data model spec.") + default=False, help="Show a json representation of data model spec.") - parser.add_option("-v", "--validate", dest="source", - help="""Validates a JSON object against the spec data model + parser.add_option("-v", "--validate", dest="source", help="""Validates a JSON object against the spec data model The source JSON object can be a file or a string representation of the JSON object. """ ) - parser.add_option("-e", "--extract", dest="specFile", - help="extracts data models from the spec file.", - metavar="FILE") + parser.add_option("-e", "--extract", dest="specFile", help="extracts data models from the spec file.", metavar="FILE") - parser.add_option("-d", "--destination", dest="modelDestination", - help="""Destination path to put data model file specs + parser.add_option("-d", "--destination", dest="modelDestination", help="""Destination path to put data model file specs that are extracted from the main spec.""") diff --git a/LR/lr/lib/oaipmh.py b/LR/lr/lib/oaipmh.py index 9ed79ff1..0922a80a 100755 --- a/LR/lr/lib/oaipmh.py +++ b/LR/lr/lib/oaipmh.py @@ -151,13 +151,13 @@ def list_opts(self, metadataPrefix, from_date=None, until_date=None): # return map(lambda row: row["value"], view_data) def get_records_by_resource(self,resource_locator): - view_data = h.getView(database_url=self.db_url,view_name='_design/learningregistry-resource-location/_view/docs',method="POST", documentHandler=OAIPMHDocumentResolver(), include_docs=True,keys=[resource_locator], stale='ok') + view_data = h.getView(database_url=self.db_url,view_name='_design/learningregistry-resource-location/_view/docs',method="POST", documentHandler=OAIPMHDocumentResolver(), include_docs=True,keys=[resource_locator], stale=appConfig['couchdb.stale.flag']) for doc in view_data: yield doc def list_identifiers_or_records(self,metadataPrefix,from_date=None, until_date=None, rt=None, fc_limit=None, serviceid=None, include_docs=False): '''Returns the list_records as a generator based upon OAI-PMH query''' - opts = { "stale": "ok" }; + opts = { "stale": appConfig['couchdb.stale.flag'] }; if include_docs: opts["include_docs"] = True @@ -195,7 +195,7 @@ def format_docs(row): def list_metadata_formats(self, identity=None, by_doc_ID=False, verb="ListMetadataFormats"): try: - opts = { "stale": "ok" } + opts = { "stale": appConfig['couchdb.stale.flag'] } if identity != None: opts["include_docs"] = "true" @@ -243,7 +243,7 @@ def identify(self, database="node"): opts = { "group": True, "limit": 1, - "stale": "ok" + "stale": appConfig['couchdb.stale.flag'] } view_data = self.db.view('oai-pmh-identify-timestamp/docs', **opts) diff --git a/LR/lr/lib/resumption_token.py b/LR/lr/lib/resumption_token.py index c44c5787..cfc1d164 100644 --- a/LR/lr/lib/resumption_token.py +++ b/LR/lr/lib/resumption_token.py @@ -39,21 +39,23 @@ def get_payload(startkey=None, endkey={}, startkey_docid=None, from_date=None, u return payload -def get_offset_payload(offset=None, keys=None): +def get_offset_payload(offset=None, keys=None, maxResults=None): payload = {} if offset: payload["offset"] = offset if keys: payload["keys"] = keys + if maxResults: + payload["maxResults"] = maxResults return payload def get_token(serviceid, startkey=None, endkey={}, startkey_docid=None, from_date=None, until_date=None): return jwt.encode(get_payload(startkey, endkey, startkey_docid, from_date, until_date), serviceid, __JWT_ALG) -def get_offset_token(serviceid, offset=None, keys=None): - return jwt.encode(get_offset_payload(offset, keys), serviceid, __JWT_ALG) +def get_offset_token(serviceid, offset=None, keys=None, maxResults=None): + return jwt.encode(get_offset_payload(offset, keys, maxResults), serviceid, __JWT_ALG) if __name__ == "__main__": diff --git a/LR/lr/model/base_model.py b/LR/lr/model/base_model.py index 3544bced..e5d3db5f 100644 --- a/LR/lr/model/base_model.py +++ b/LR/lr/model/base_model.py @@ -33,6 +33,15 @@ def createDB(name, server=defaultCouchServer): #log.exception(ex) return server[name] + def getModelPasers(modelsPath): + modelParsers = {} + + for path in modelsPath.split(','): + parser = ModelParser(path.strip()) + modelParsers[parser.docVersion] = parser + + return modelParsers + class BaseModel(object): """Base model class for Learning Registry data models""" @@ -41,8 +50,8 @@ class BaseModel(object): _SPEC_DATA = '_specData' _defaultDB = createDB(defaultDBName, server) - _modelParser = ModelParser(modelSpec) - + _modelParsers = getModelPasers(modelSpec) + @classmethod def get(cls, doc_id, db=None): sourcDB = db @@ -84,7 +93,7 @@ def __init__(self, data=None): spec_data = json.loads(getFileString(data)) else: spec_data = json.loads(data) - spec_data = {} + spec_data = {} spec_data.update(data) # Remove _id an _rev in data if t there are present so that we can have # a clean specData for validation @@ -102,8 +111,8 @@ def __setattr__(self, name, value): # spec data attribute. if name == self._SPEC_DATA: self.__dict__[self._SPEC_DATA] = value - elif name in self._modelParser.modelInfo.keys(): - self._modelParser.validateField(name, value, self._specData) + elif self._isSpecDataFieldKey(name): + self._validateField(name, value) self.__dict__[self._SPEC_DATA][name] = value elif name in self.__dict__.keys(): self.__dict__[name] = value @@ -113,7 +122,7 @@ def __setattr__(self, name, value): def __getattr__(self, name): # Check if the attribute name is a spec attribute. - if name in self._modelParser.modelInfo.keys(): + if self._isSpecDataFieldKey(name): # If it is a spec attribute and it is set in the _specData # return it otherwise return None return self.__dict__[self._SPEC_DATA].get(name) @@ -123,18 +132,33 @@ def __getattr__(self, name): raise AttributeError("'"+self.__class__.__name__+ "' object has no attribute'"+name+"'") - + def _isSpecDataFieldKey(self, keyName): + #look through all the models spec for field name. + for specModel in self._modelParsers.values(): + if keyName in specModel.modelInfo: + return True + return False + + def _validateField(self, fieldName, value): + # Look for a parser to validate the field against. This done by using the + # version to look for the parser. If doc_version is not set already don't + # don't anything. The filed cannot be validate if we don't know what + # doc_version to use + if self.doc_version in self._modelParsers: + self._modelParsers[self.doc_version].validateField(fieldName, value, self._specData) + def _preValidation(self): pass def _validate(self): - self._modelParser.validate(self._specData) + self._modelParsers[self.doc_version].validate(self._specData) def _postValidation(self): pass def toJSON(self): """Returns JSON object of the spec data""" + return json.dumps(self._specData) def validate(self): diff --git a/LR/lr/model/node_config.py b/LR/lr/model/node_config.py index c2a892c3..92f0c84b 100755 --- a/LR/lr/model/node_config.py +++ b/LR/lr/model/node_config.py @@ -143,7 +143,7 @@ def _setNodeStatus(self): def _getStatusDescription(self): count = 0 - view = ResourceDataModel._defaultDB.view(appConfig['couchdb.db.resourcecount'],stale='ok') + view = ResourceDataModel._defaultDB.view(appConfig['couchdb.db.resourcecount'],stale=appConfig['couchdb.stale.flag']) if len(view.rows) > 0: count = view.rows[0].value statusData = {'doc_count': count, @@ -277,9 +277,20 @@ def getDistributeCredentialFor(self, targetUrl): passwords = h.dictToObject(NodeModel._defaultDB[_ACCESS_CREDENTIALS_ID]) credential = passwords.passwords.get(targetUrl) return credential + + def getDistributeInfo(self): + distributeInfo = {'active':self.nodeDescription.active, + 'node_id':self.nodeDescription.node_id, + 'network_id': self.nodeDescription.network_id, + 'community_id': self.nodeDescription.community_id, + 'gateway_node':self.nodeDescription.gateway_node, + 'social_community':self.communityDescription.social_community, + 'resource_data_url': appConfig['lr.distribute_resource_data_url'], + 'filter_description':self.filterDescription.specData + } + return distributeInfo - - + distributeInfo = property(lambda self: self.getDistributeInfo(), None, None, None) nodeDescription = property(lambda self: self._nodeDescription, None, None, None) networkDescription = property(lambda self: self._networkDescription, None, None, None) communityDescription = property(lambda self: self._communityDescription, None, None, None) @@ -293,3 +304,4 @@ def getDistributeCredentialFor(self, targetUrl): status = property(_getStatusDescription, None, None, None) + diff --git a/LR/lr/model/node_status.py b/LR/lr/model/node_status.py index ccea6bea..afd5b2bb 100644 --- a/LR/lr/model/node_status.py +++ b/LR/lr/model/node_status.py @@ -25,5 +25,8 @@ class NodeStatusModel(createBaseModel(SPEC_STATUS_DESCRIPTION, DB_NODE)): def __init__(self, data=None): + # Node status is model that create and does not have any doc_version. + # so set its doc_version to node. This model should probably be removed, + # alternate method should be implemented. + self.__dict__['doc_version'] = None super(NodeStatusModel, self).__init__(data) - diff --git a/LR/lr/model/resource_data_monitor/distribute_threshold_handler.py b/LR/lr/model/resource_data_monitor/distribute_threshold_handler.py index ea73aafd..f564864f 100644 --- a/LR/lr/model/resource_data_monitor/distribute_threshold_handler.py +++ b/LR/lr/model/resource_data_monitor/distribute_threshold_handler.py @@ -9,6 +9,7 @@ @author: jpoyau ''' import urllib2 +import pprint from lr.lib.couch_change_monitor import BaseChangeThresholdHandler from pylons import config import logging diff --git a/LR/lr/model/resource_data_monitor/resource_data_handler.py b/LR/lr/model/resource_data_monitor/resource_data_handler.py index 95be4979..b23a0d31 100644 --- a/LR/lr/model/resource_data_monitor/resource_data_handler.py +++ b/LR/lr/model/resource_data_monitor/resource_data_handler.py @@ -27,10 +27,10 @@ def _canHandle(self, change, database): return True return False - def _updateDistributableData(self, newDistributableData, database): + def _updateDistributableData(self, newDistributableData, distributableDocId, database): # Use the ResourceDataModel class to create an object that # contains only a the resource_data spec data. - currentDistributable = database[newDistributableData['_id']] + currentDistributable = database[distributableDocId] temp = ResourceDataModel(currentDistributable)._specData del temp['node_timestamp'] @@ -46,12 +46,12 @@ def _updateDistributableData(self, newDistributableData, database): log.exception(e) - def _addDistributableData(self, distributableData, database): + def _addDistributableData(self, distributableData, distributableDocId, database): try: - log.debug('Adding distributable doc %s...\n' % distributableData['_id']) - database[distributableData['_id']] = distributableData + log.debug('Adding distributable doc %s...\n' % distributableDocId) + database[distributableDocId] = distributableData except Exception as e: - log.error("Cannot save distributable document %s\n" % distributableData['_id'] ) + log.error("Cannot save distributable document %s\n" % distributableDocId) log.exception(e) def _handle(self, change, database): @@ -63,12 +63,12 @@ def _handle(self, change, database): del distributableDoc['node_timestamp'] #change thet doc_type distributableDoc['doc_type']='resource_data_distributable' - distributableDoc['_id'] = change['doc']['_id']+"-distributable" + distributableDocId= change['doc']['_id']+"-distributable" # Check to see if a corresponding distributatable document exist. # not create a new distribuation document without the # node_timestamp and _id+distributable. - if not distributableDoc['_id'] in database: - self._addDistributableData(distributableDoc, database) + if not distributableDocId in database: + self._addDistributableData(distributableDoc, distributableDocId, database) else: - self._updateDistributableData(distributableDoc, database) + self._updateDistributableData(distributableDoc, distributableDocId, database) diff --git a/LR/lr/model/resource_data_monitor/track_last_sequence.py b/LR/lr/model/resource_data_monitor/track_last_sequence.py index a693c016..cd14b608 100644 --- a/LR/lr/model/resource_data_monitor/track_last_sequence.py +++ b/LR/lr/model/resource_data_monitor/track_last_sequence.py @@ -26,7 +26,7 @@ def __init__(self, sequenceChangeDocId, countThreshold=25, timeThreshold=timedelta(seconds=60)): BaseChangeThresholdHandler.__init__(self, countThreshold, timeThreshold) self._sequenceChangeDocId =sequenceChangeDocId - + def _saveSequence(self, sequence, database): log.debug("Last process change sequence: {0}".format(sequence)) doc ={"_id":self._sequenceChangeDocId, diff --git a/LR/lr/tests/functional/distribute/largeTestNetworkLayout.png b/LR/lr/tests/functional/distribute/largeTestNetworkLayout.png new file mode 100755 index 00000000..98f6c424 Binary files /dev/null and b/LR/lr/tests/functional/distribute/largeTestNetworkLayout.png differ diff --git a/LR/lr/tests/functional/distribute/large_network_graph.dot b/LR/lr/tests/functional/distribute/large_network_graph.dot new file mode 100644 index 00000000..1c776e3e --- /dev/null +++ b/LR/lr/tests/functional/distribute/large_network_graph.dot @@ -0,0 +1,77 @@ +digraph largeNetworkTest{ + style = filled; + node [fontsize=8] + + subgraph gatways{ + node [shape=doublecircle, color=blue, fontsize=12]; + n0; n3; n4; n5; n7; n8; n9; n12; n13 + }; + + subgraph seed_nodes{ + node[fontcolor=cyan, fontsize=12, shape=Mcircle] + n6; n2 + }; + + subgraph cluster_1{ + label = "Open Community One"; + color = yellow; + + subgraph cluster_11{ + label = "OC1_N1"; + color = goldenrod; + n0->n1[color=red]; + }; + + subgraph cluster_12 { + label = "OC1_N2"; + color = gold; + + n2->n0[color=red]; + n2->n4->n3[color=green]; + n3->n0[color=green]; + n2->n5->n0[color=green]; + n2->n3[color=green]; + n5->n5[color=red]; + } + }; + subgraph cluster_2{ + label = "Open Community Two"; + color = "greenyellow"; + + subgraph cluster_21 { + label = "OC2_N1"; + color = "yellowgreen"; + + n3->n6[color=red]; + n6->n9[color=green]; + n6->n8[color=green]; + n4->n7->n4[color=green]; + n7->n6[color=red]; + n8->n6[color=red]; + }; + + subgraph cluster_22{ + label = "OC2_N2"; + color = darkseagreen; + + n12->n11[color=red] + n11->n10[color=green]; + n12->n10[color=red]; + n8->n12->n8[color=green]; + } + }; + subgraph cluster_3{ + label = "Closed Community"; + color = "tomato"; + + subgraph cluster_31{ + label = CC2_N1; + color = brown; + + n14->n13[color=green]; + n13->n14[color=red]; + n9->n13[color=red]; + n3->n13[color=red]; + } + } +} diff --git a/LR/lr/tests/functional/distribute/large_network_test.py b/LR/lr/tests/functional/distribute/large_network_test.py new file mode 100644 index 00000000..02140aa8 --- /dev/null +++ b/LR/lr/tests/functional/distribute/large_network_test.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python +# Copyright 2011 Lockheed Martin +# +''' +Created on Nov 11, 2011 + +@author: jpoyau +''' +from lr_node import Node + +import os +from os import path +import ConfigParser +import json +from time import sleep +import pprint + +_PWD = path.abspath(path.dirname(__file__)) +_TEST_DATA_PATH = path.abspath(path.join(_PWD, "../../data/nsdl_dc/data-000000000.json")) +_TEST_NODE_CONFIG_DIR = path.abspath(path.join(_PWD, "config")) + + + +_NODES = [] +_COMMUNITIES = {} +_GATEWAYS =[] +_CONNECTIONS = [] +_DISTRIBUTE_ORDER = [] +_DATA = None + +def generateNodeConfig(numberOfNodes): + + nodeConfigs = {} + for i in range(0, numberOfNodes): + nodeName = "test_node_"+str(i) + port = str(5001+i) + config = ConfigParser.ConfigParser() + #Create the sections + config.add_section('couch_info') + config.add_section('pylons_server') + config.add_section('node_config') + #Set the node url + config.set("node_config", "node_url", "http://localhost:"+port) + #Set the pylons server stuff + config.set("pylons_server", "port", port) + config.set("pylons_server", "use", "egg:Paste#http") + #Set the couchdb sutff + config.set("couch_info", "server", "http://localhost:5984") + config.set("couch_info", "resourcedata", nodeName+"_resource_data") + config.set("couch_info", "community", nodeName+"_community") + config.set("couch_info", "node", nodeName+"_node") + config.set("couch_info", "network", nodeName+"_network") + + nodeConfigs[nodeName] = config + return nodeConfigs + +def createNodes(numberOfNodes): + nodes = {} + configs = generateNodeConfig(numberOfNodes) + #Make sure the node name matches the index. Make it easier for debugging. + for c in sorted(configs, key=lambda k: int(k.split('_')[-1])): + node =Node(configs[c], c) + nodes[c] = node + _NODES.append( node) + return nodes + +def createCommunity(communityName, networkName, nodes): + for node in nodes: + node.setCommunityId(communityId) + node.setNetworkId(networkId) + +def setCommunity(community): + for network in community["networks"]: + for node in community["networks"][network]: + node.setCommunityInfo(community["communityId"], community["social_community"]) + node.setNetworkInfo(network) + + + +def createNetwork(): + nodes = createNodes(15) + global _NODES, _COMMUNITIES, _GATEWAYS, _CONNECTIONS, _DISTRIBUTE_ORDER, _DATA + n = _NODES + openCommunityOne = {"communityId": "Open Community One", + "networks": + { + "OC1_N1":n[0:2], + "OC1_N2":n[2:6] + }, + "social_community": True + } + + openCommunityTwo ={ "communityId": "Open Community Two", + "networks": + { + "OC2_N1":n[6:10], + "OC2_N2":n[10:13] + }, + "social_community": True + } + + closedCommunity = { "communityId":"Closed Community", + "networks": + { + "CC2_N1":n[13:], + }, + "social_community": False + } + communities =[openCommunityOne, openCommunityTwo, closedCommunity] + _COMMUNITIES = communities + + + gateways =[n[0], n[3], n[4],n[5], n[7], n[8], n[9], n[12],n[13]] + _GATEWAYS = gateways + F= assertNoDistribute + S=assertSucessDistribute + #Add the essert test function to each of the connection destination to test. + connections ={n[0]:{n[1]: F}, + + n[1]:{}, + + n[2]:{n[0]:F, + n[3]:S, + n[4]:S, + n[5]:S}, + + n[3]:{n[13]:F, + n[0]:S, + n[6]:F}, + + n[4]:{n[7]:S, + n[3]:F}, + + n[5]:{n[5]:F, + n[0]:S}, + + n[6]:{n[8]:S, + n[9]:S, + n[1]:F + }, + + n[7]:{n[4]:S, + n[6]:S}, + + n[8]:{n[6]:F, + n[12]:S}, + + n[9]:{n[13]:F}, + + n[10]:{}, + + n[11]:{n[10]:S,}, + + n[12]:{n[10]:F}, + + n[13]:{n[14]:F}, + + n[14]:{n[13]:S} + } + _CONNECTIONS = connections + #Use the distribute order to have the gateway nodes starts for so that + # the data can get propagated throughout the network + _DISTRIBUTE_ORDER =[2, 0, 3, 4, 5, 7, 6, 8, 12, 13, 9, 14, 11, 10, 1] + + for community in communities: + setCommunity(community) + + #set the gateway nodes. + for node in gateways: + node.setNodeInfo(isGateway=True) + + #create node connections. + for node in connections: + for destination in connections[node]: + gatewayConnection = ((destination in gateways) or (node in gateways)) + node.addConnectionTo(destination._getNodeUrl(), gatewayConnection) + + #Populate node[2, 6, 9] as the seed nodes. + _DATA = json.load(file(_TEST_DATA_PATH)) + n[2].publishResourceData(_DATA["documents"]) + n[6].publishResourceData(_DATA["documents"]) + + +def getNodeDistributeResults(sourceNode, destinationNode): + distributeResultsList = [] + for distributeResults in sourceNode._distributeResultsList: + if ('connections' in distributeResults) == False: + continue + for connectionResults in distributeResults['connections']: + if destinationNode._getNodeUrl() in connectionResults: + distributeResultList.append(connectionResults) + return distributeResultsList + + +def assertSucessDistribute(sourceNode, destinationNode): + for results in getNodeDistributeResults(sourceNode, destinationNode): + assert (results[sourceNode.__OK] and + results[sourceNode.__REPLICATION_RESULTS][self.__OK]), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + assert sourceNode.compareDistributedResources(destinationNode), \ + """Distribute failed all source node documents are at destination node.""" + +def assertNoDistribute(sourceNode, destinationNode): + for results in getNodeDistributeResults(sourceNode, destinationNode): + assert (not results[sourceNode.__OK] and + not (sourceNode.__REPLICATION_RESULTS in results)), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + +def assertNodeDistributeResults(sourceNode): + for destinationNode in _CONNECTIONS[sourceNode].keys(): + destinationNode.waitOnChangeMonitor() + _CONNECTIONS[sourceNode][destinationNode](sourceNode, destinationNode) + + +def startNodes(): + for node in _NODES: + node.start() + +def doNodesDistribute(): + for n in _DISTRIBUTE_ORDER: + print("\n\n==========Node {0} distribute results==========".format(_NODES[n]._nodeName)) + results = _NODES[n].distribute() + assert (len(_CONNECTIONS[_NODES[n]]) == 0 or + len(results['connections']) == len(_CONNECTIONS[_NODES[n]])), "Missing connection results" + assertNodeDistributeResults(_NODES[n]) + sleep(5) + +def testLargeNetwork(): + createNetwork() + startNodes() + doNodesDistribute() + + +def teardown(): + for node in _NODES: + node.tearDown() + + diff --git a/LR/lr/tests/functional/distribute/lr_node.py b/LR/lr/tests/functional/distribute/lr_node.py index 11b48622..1259529c 100644 --- a/LR/lr/tests/functional/distribute/lr_node.py +++ b/LR/lr/tests/functional/distribute/lr_node.py @@ -30,8 +30,8 @@ from services.Resource_Data_Distribution import __ResourceDataDistributionServiceTemplate as DistributeServiceTemplate import subprocess from lr.lib import helpers as h -from datetime import datetime from time import sleep +import pprint import signal import logging @@ -41,10 +41,11 @@ _PYLONS_CONFIG = path.abspath(path.join(_PWD, "../../../../development.ini.orig")) _RESOURCE_DATA_FILTER_APP = path.abspath(path.join(_PWD, "../../../../../couchdb/resource_data/apps/filtered-replication")) -_DISTRIBUTE_TEST_LOG = "test_distribute.log" - - +_TEST_DISTRIBUTE_DIR_LOG = path.abspath(path.join(path.dirname(_PYLONS_CONFIG), "test_distribute_logs")) + + class Node(object): + _REPLICATOR_DB = '_replicator' _CONFIG_DATABASE_NAMES=["community", "network", "node", "resourcedata"] _RESOURCE_DATA_FILTER = """ function(doc , req) @@ -56,27 +57,38 @@ class Node(object): return false; } """ + def __init__(self, nodeConfig, nodeName, communityId=None, networkId=None): self._nodeConfig = nodeConfig self._nodeName = nodeName - self._pylonsConfigPath = path.abspath(path.join(path.dirname(_PYLONS_CONFIG), - self._nodeName+"_config.ini")) + self._replicatorUrl = urlparse.urljoin(self._nodeConfig.get("couch_info", "server"), self._REPLICATOR_DB) + self._setupFilePaths() self._setupPylonsConfig() self._setupDescriptions() self._setupNode() self._setupDistributeService() self.setNodeInfo(nodeName) if communityId is not None: - self.setCommunityInfo(community) + self.setCommunityInfo(communityId) if networkId is not None: self.setNetworkInfo(networkId) self.removeTestLog() - + # Keep around the replication documents that are store in the replication + # database so that they can be deleted when the node is teared down. + self._distributeResultsList = [] + + def _setupFilePaths(self): + + self._pylonsConfigPath = path.abspath(path.join(path.dirname(_PYLONS_CONFIG), + self._nodeName+"_config.ini")) + self._logFilePath = path.abspath(path.join(_TEST_DISTRIBUTE_DIR_LOG, + self._nodeName+".log")) + def _getNodeDatabaseList(self): return [self._nodeConfig.get("couch_info", db) for db in self._CONFIG_DATABASE_NAMES] def _getNodeUrl(self): - return self._nodeConfig.get("node_config", "node_url") + return self._nodeConfig.get("node_config", "node_url").strip() def _setupDescriptions(self): # Set the node, network and community @@ -97,7 +109,7 @@ def _setupResourceData(self): def removeTestLog(self): try: - os.remove(path.abspath(path.join(self._pylonsConfig, _DISTRIBUTE_TEST_LOG))) + os.remove(self._logFilePath) except: pass @@ -110,8 +122,7 @@ def _setupNode(self): self._nodeConfig.get("couch_info", "network"), 'network_policy_description', policy) self._setupResourceData() - - + def _setupDistributeService(self): custom_opts = {} custom_opts["node_endpoint"] = self._getNodeUrl() @@ -139,8 +150,9 @@ def _setupPylonsConfig(self): for option in self._nodeConfig.options("pylons_server"): pylonsConfig.set("server:main", option, self._nodeConfig.get("pylons_server", option)) - #Add the distribute_sink_url - pylonsConfig.set("app:main", "distribute_sink_url", + #Set the ressource data url + + pylonsConfig.set("app:main", "lr.distribute_resource_data_url", urlparse.urljoin(self._nodeConfig.get("couch_info", "server"), self._nodeConfig.get("couch_info", "resourcedata"))) #change the logging level to the highest level to avoid spamming log. @@ -149,10 +161,7 @@ def _setupPylonsConfig(self): configFile = open(self._pylonsConfigPath, 'w') pylonsConfig.write(configFile) configFile.close() - - - - + def setCommunityInfo(self, community, isSocialCommunity=True): self._communityDescription["community_id"]= community self._communityDescription["community_name"] = community @@ -222,46 +231,131 @@ def publishResourceData(self, docs): del doc['_rev'] doc['doc_ID'] = uuid.uuid4().hex - now = datetime.utcnow().isoformat()+"Z" + now = h. nowToISO8601Zformat() doc['node_timestamp'] = now doc['create_timestamp'] = now doc['update_timestamp'] = now resourceDatabase[doc['doc_ID']] = doc + self.waitOnChangeMonitor() def addConnectionTo(self, destinationUrl, gateway_connection=False): connection = dict(nodeTemplate.connection_description) - connection['connection_id'] = uuid.uuid4().hex + connection['connection_id'] = "{0}_to_{1}_connection".format(self._getNodeUrl(), destinationUrl).strip() connection['source_node_url']=self._getNodeUrl() connection['gateway_connection'] = gateway_connection connection['destination_node_url'] = destinationUrl setup_utils.PublishDoc(self._server, self._nodeConfig.get("couch_info", "node"), - "{0}_to_{1}_connection".format(self._nodeName, destinationUrl), - connection) - + connection['connection_id'], + connection) + + def _getResourceDataChangeUrl(self): + return "{0}/{1}/{2}".format(self._server.resource.url, + self._nodeConfig.get("couch_info", "resourcedata"), + "_changes") + + def waitOnChangeMonitor(self): + """" Wait that for change monitor the recreate all the local copies of a document + after the distribution has been completed.""" + if hasattr(self, '_pylonsProcess') == False: + return + + try: + # Get both distributable and local copies of the document compare their + # numbers and wait until they are equal. + while (True) : + distributableDocs = self.getResourceDataDocs(include_docs= False, + doc_type='resource_data_distributable') + resourceDataDocs = self.getResourceDataDocs(include_docs=False) + remaining = len(distributableDocs) - len(resourceDataDocs) + + print("\n....{0} difference {1} for change monitor to catchup {2}\n".format( + self._nodeName, remaining, + "\nResourceData Count:\t{0}\nDistributatableData Count: {1}".format( + len(resourceDataDocs), len(distributableDocs)))) + + if remaining == 0: + return; + #Assume that it takes about .01 seconds to generate the new copy. + sleepTime = int(10+abs(remaining)*.01) + + print("\n....{0}, Waiting {1} seconds for change monitor to catchup {2}".format( + self._nodeName, sleepTime, + "\nResourceData Count:\t{0}\nDistributatableData Count: {1}".format( + len(resourceDataDocs), len(distributableDocs)))) - def distribute(self): + sleep(sleepTime) + pass + except Exception as e: + log.exception(e) + print(e) + + def waitOnReplication(self, distributeResults): + """Wait for the replication to complete on the all node connections specifed + by dsistributeResults """ + if (distributeResults is None or + ('connections' in distributeResults) == False or + len (self.getResourceDataDocs(doc_type='resource_data_distributable', include_docs=False) ) == 0): + print ("node {0} has no replication results or distributable docs ....".format(self._nodeName)) + return + + waiting = True + while(waiting): + # Set waiting to false and check to the replication document to see + # if replication not completed for any of the connections, if not then + # reset waiting to true. + waiting =False + for connectionResults in distributeResults['connections']: + if 'replication_results' not in connectionResults: + continue + #Get the replication document. + response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id']) + doc = json.load(response) + response.close() + + print('\n\n---------------Replication Status-----------') + print('<=From node:\t{0}\n=>To node:\t{1}\n<=>completion status: \t{2}\n'.format( + self._nodeName, + connectionResults['destinationNodeInfo']['resource_data_url'].split('/')[-1].split('_resource_data')[0], + doc.get('_replication_state'))) + + if doc.get('_replication_state') != 'completed': + waiting = True + sleep(30) + continue + + def distribute(self, waitOnReplication=True): + """ Distribute to all the node connections. When waited for completion is + true the this method will that distribution is completed before returning + the results. """ if hasattr(self, '_pylonsProcess'): data = json.dumps({"dist":"dist"}) request = urllib2.Request(urlparse.urljoin(self._getNodeUrl(), "distribute"), data, {'Content-Type':'application/json; charset=utf-8'}) - response = urllib2.urlopen(request) - - def getResourceDataDocs(self, filter_description=None): + + self._distributeResultsList.append(json.load(urllib2.urlopen(request))) + print("Distribute reponse: \n{0}".format(pprint.pformat(self._distributeResultsList[-1]))) + + if waitOnReplication: + self.waitOnReplication(self._distributeResultsList[-1]) - db = self._server[self._nodeConfig.get("couch_info", "resourcedata")] + return self._distributeResultsList[-1] + + def getResourceDataDocs(self, filter_description=None, doc_type='resource_data', include_docs=True): + db = self._server[self._nodeConfig.get("couch_info", "resourcedata")] #For source node get all the resource_data documents using the filter # that was using to distribute the document to destination node. options = { "filter": "filtered-replication/change_feed_filter", - "include_docs":True, - "doc_type":"resource_data"} + "include_docs":include_docs, + "doc_type":doc_type} if filter_description is not None: options["filter_description"] = json.dumps(filter_description) return db.changes(**options)["results"] - + + def compareDistributedResources(self, destination, filter_description=None): - """This method considered this node as source node. + """This method considers this node as source node. It compares its resource_data document with the destionation node to verify that data was distributed. This comparison assumes that distribute/ replication is done and that there is no other additions or deletions the @@ -300,29 +394,113 @@ def compareDistributedResources(self, destination, filter_description=None): return True def stop(self): - if hasattr(self, '_pylonsProcess'): - os.killpg(self._pylonsProcess.pid, signal.SIGTERM) - + if hasattr(self, '_pylonsProcess') == False: + return + + os.killpg(self._pylonsProcess.pid, signal.SIGTERM) + # Make sure that process is really dead and the port is releaseed. This done + # avoid bug when the node the stop and start methods are called quickly, + # the dead node is still holding port which sometimes tricks + # the start code thinking the node is up and running already. + while True: + try: + response = urllib2.urlopen(self._getNodeUrl()) + if response.code : + os.killpg(self._pylonsProcess.pid, signal.SIGTERM) + sleep(5) + except: + break; + del self._pylonsProcess + def start(self): - command = '(cd {0}; paster serve {1} --log-file {2}.log)'.format( + + # remove any existing log file to start from scratch to avoid ever + # growing log file. + self.removeTestLog() + + #Create the log file directory if it does not exists. + if not path.exists(_TEST_DISTRIBUTE_DIR_LOG): + print("create dir") + os.mkdir(_TEST_DISTRIBUTE_DIR_LOG) + + command = '(cd {0}; paster serve {1} --log-file {2})'.format( path.abspath(path.dirname(self._pylonsConfigPath)), - self._pylonsConfigPath, _DISTRIBUTE_TEST_LOG) + self._pylonsConfigPath, self._logFilePath) + #Create a process group name as so that the shell and all its process # are terminated when stop is called. self._pylonsProcess = subprocess.Popen(command, shell=True, preexec_fn=os.setsid) + + #wait for the node to start before returning. + while True: + sleep(5) + try: + response = urllib2.urlopen(self._getNodeUrl()) + if (response.code /100 ) < 4: + break + except: + continue + #Wait the change monitor to cat + self.waitOnChangeMonitor() + print("node '{0}' started ....\n".format(self._nodeName) ) def resetResourceData(self): del self._server[ self._nodeConfig.get("couch_info", "resourcedata")] self._setupResourceData() - + def restart(self): + self.stop() + self.start() + + def removeReplicationDocs(self): + '''Method to delete replication document results documents from the couchdb + _replicator database. Its seems like replication will not work if there is an existing + replication document stated as completed with the same source and target + database name eventhough those the document is about database thas been + deleted and recreated. ''' + for distributeResults in self._distributeResultsList: + if ('connections' in distributeResults) == False: + continue + for connectionResults in distributeResults['connections']: + if 'replication_results' in connectionResults: + try: + #Use urllib request to remove the replication documents, the python + #couchdb interface has a bug at time of this write that prevents access + # to the _replicator database. + + #first get the lastest version of the doc + response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id']) + doc = json.load(response) + response.close() + print ("\n\n--node {0} deleting replication doc: {1}".format( + self._nodeName, + self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']))) + + request = urllib2.Request(self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']), + headers={'Content-Type':'application/json' }) + + request.get_method = lambda: "DELETE" + + urllib2.urlopen(request) + except Exception as e: + log.exception(e) + def tearDown(self): self.stop() - #Delete the generated pylons configuration files - os.remove(self._pylonsConfigPath) + try: + #Delete the generated pylons configuration files + os.remove(self._pylonsConfigPath) + except Exception as e: + log.exception(e) + #Delete the generated database. for database in self._getNodeDatabaseList(): - del self._server[database] - + try: + del self._server[database] + except Exception as e: + log.exception(e) + + #Delete the replication documents + self.removeReplicationDocs() diff --git a/LR/lr/tests/functional/distribute/test_distribute.py b/LR/lr/tests/functional/distribute/test_distribute.py index 80aade25..1856a1d1 100644 --- a/LR/lr/tests/functional/distribute/test_distribute.py +++ b/LR/lr/tests/functional/distribute/test_distribute.py @@ -13,12 +13,17 @@ import ConfigParser import json from time import sleep +import pprint _PWD = path.abspath(path.dirname(__file__)) _TEST_DATA_PATH = path.abspath(path.join(_PWD, "../../data/nsdl_dc/data-000000000.json")) _TEST_NODE_CONFIG_DIR = path.abspath(path.join(_PWD, "config")) class TestDistribute(object): + __OK = 'ok' + __ERROR = 'error' + __REPLICATION_RESULTS = 'replication_results' + __CONNECTIONS = 'connections' @classmethod def setupClass(cls): @@ -77,21 +82,22 @@ def _setupNodePair(self, sourceNode, destinationNode, #add the destination node as connection to the source node. if isinstance(isGatewayConnection, bool): - sourceNode.addConnectionTo(destinationNode._getNodeUrl(). gatewayConnection) + sourceNode.addConnectionTo(destinationNode._getNodeUrl(), isGatewayConnection) else: - sourceNode.addConnectionTo(destinationNode._getNodeUrl(), (sourceIsGateway and destinationIsGateway)) + sourceNode.addConnectionTo(destinationNode._getNodeUrl(), (sourceIsGateway or destinationIsGateway)) + + def _doDistributeTest(self, sourceNode, destinationNode): #start the node nodes. sourceNode.start() destinationNode.start() - sleep(60) - #Do the distribute - sourceNode.distribute() - # Wait for two minutes or that that all the document and be transfer to test that - # there are indeed distributed correctly - sleep(60) + #Do the distribute + results = sourceNode.distribute(True) + destinationNode.waitOnChangeMonitor() + return results + def test_common_nodes_same_network_community_no_filter(self): """ This tests distribute/replication between to common nodes on the same network. There is no filter on the destination node. Distribution/replication @@ -104,7 +110,12 @@ def test_common_nodes_same_network_community_no_filter(self): #populate the node with test data. data = json.load(file(_TEST_DATA_PATH)) sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) + response =self._doDistributeTest(sourceNode, destinationNode) + + assert (response[self.__OK] and + response['connections'][0][self.__REPLICATION_RESULTS][self.__OK]), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + assert sourceNode.compareDistributedResources(destinationNode), \ """Distribute between two common nodes on the same network and community and no filter on the destination node.""" @@ -126,7 +137,12 @@ def test_gatewaynodes_on_different_open_communities(self): #populate the node with test data. data = json.load(file(_TEST_DATA_PATH)) sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) + response =self._doDistributeTest(sourceNode, destinationNode) + + assert (response[self.__OK] and + response[self.__CONNECTIONS][0][self.__REPLICATION_RESULTS][self.__OK]), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + assert sourceNode.compareDistributedResources(destinationNode), \ """Distribute between two gateway nodes on different community and network and no filter on the destination node.""" @@ -177,7 +193,12 @@ def _setup_common_nodes_same_network_and_community_filter(self, data["documents"][i]["active"] = False sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) + + response =self._doDistributeTest(sourceNode, destinationNode) + + assert (response[self.__OK] and + response[self.__CONNECTIONS][0][self.__REPLICATION_RESULTS][self.__OK]), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) assert sourceNode.compareDistributedResources(destinationNode, destinationNode._nodeFilterDescription), \ @@ -217,7 +238,13 @@ def test_gateway_to_common_node(self): #populate the node with test data. data = json.load(file(_TEST_DATA_PATH)) sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) + response =self._doDistributeTest(sourceNode, destinationNode) + + assert ( response[self.__OK] and + not response[self.__CONNECTIONS][0][self.__OK] and + response[self.__CONNECTIONS][0][self.__ERROR] =='gateways can only distribute to gateways'), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + # There should be no replication. Destination node should be # empty of resource_data docs assert len (destinationNode.getResourceDataDocs()) == 0, \ @@ -239,7 +266,14 @@ def test_gateway_to_gateway_closed_community(self): #populate the node with test data. data = json.load(file(_TEST_DATA_PATH)) sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) + + response =self._doDistributeTest(sourceNode, destinationNode) + + assert ( response[self.__OK] and + not response[self.__CONNECTIONS][0][self.__OK] and + response[self.__CONNECTIONS][0][self.__ERROR] =='cannot distribute across non social communities'), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + # There should be no replication. Destination node should be # empty of resource_data docs assert len (destinationNode.getResourceDataDocs()) == 0, \ @@ -247,7 +281,7 @@ def test_gateway_to_gateway_closed_community(self): is not allowed between gateway nodes on closed network.""" - def test_common_to_gateway_same_community_network(self): + def test_common_to_gateway_same_community_and_network(self): """ This tests distribute/replication between common node to a gateway common node. distribution/replication should work. """ @@ -255,14 +289,19 @@ def test_common_to_gateway_same_community_network(self): sourceNode =self._NODES[0] destinationNode = self._NODES[1] self._setupNodePair(sourceNode, destinationNode, - destinationIsGateway =True) + destinationIsGateway =True, + isGatewayConnection=True) #populate the node with test data. data = json.load(file(_TEST_DATA_PATH)) sourceNode.publishResourceData(data["documents"]) - self._doDistributeTest(sourceNode, destinationNode) - # There should be no replication. Destination node should be - # empty of resource_data docs + + response =self._doDistributeTest(sourceNode, destinationNode) + + assert (response[self.__OK] and + response[self.__CONNECTIONS][0][self.__REPLICATION_RESULTS][self.__OK]), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + assert sourceNode.compareDistributedResources(destinationNode), \ """Distribution from a common node to gateway node should work""" @@ -276,11 +315,21 @@ def test_source_node_with_more_than_two_gateway_connections(self): self._setupNodePair(sourceNode, destinationNode, destinationIsGateway =True) + sourceNode.addConnectionTo(destinationNode._getNodeUrl(), True) sourceNode.addConnectionTo("http://fake.node.org", True) + response =self._doDistributeTest(sourceNode, destinationNode) + + assert ( response[self.__OK] and + not response[self.__CONNECTIONS][0][self.__OK] and + response[self.__CONNECTIONS][0][self.__ERROR].find('Cannot reach destination node')!=-1), \ + "failed to processed replication/distribute:\n{0}".format(pprint.pformat(response)) + # There should be no replication. Destination node should be # empty of resource_data docs assert len (destinationNode.getResourceDataDocs()) == 0, \ """There should be NO distribution/replication. Source node connections are invalid""" + + diff --git a/LR/lr/tests/functional/test_publish.py b/LR/lr/tests/functional/test_publish.py index a2347a57..648d6abe 100644 --- a/LR/lr/tests/functional/test_publish.py +++ b/LR/lr/tests/functional/test_publish.py @@ -1,4 +1,9 @@ from lr.tests import * +from lr.model import ResourceDataModel +from time import sleep +import json + +headers={'Content-Type': 'application/json'} class TestPublisherController(TestController): def __init__(self, *args, **kwargs): @@ -6,3 +11,94 @@ def __init__(self, *args, **kwargs): self.controllerName = "publish" def test_index(self): pass + + def test_multiple_version(self): + data = { + "documents": + [ + {"doc_type": "resource_data", + "resource_locator": "http://example.com", + "resource_data": "\n\n http://www.myboe.org/go/resource/23466\n Posters to Go\n PDF version of a set of fifteen posters from the National Portrait Gallery and the Smithsonian American Art Museum. Includes an application for receiving the actual posters for the classroom. Arranged into themes: Westward Expansion Civil War Harlem Renaissance World War II and the Sixties.\n \n en-US\n Free access\n text/html\n 2010-07-26\n 2010-07-26\n", + "update_timestamp": "2011-11-07T14:51:07.137671Z", + "TOS": {"submission_attribution": "Smithsonian Education", + "submission_TOS": "http://si.edu/Termsofuse"}, + "resource_data_type": "metadata", + "payload_schema_locator": "http://ns.nsdl.org/schemas/nsdl_dc/nsdl_dc_v1.02.xsd", + "payload_placement": "inline", + "payload_schema": ["NSDL DC 1.02.020"], + "node_timestamp": "2011-12-21T04:57:18.343124Z", + "digital_signature": {"key_location": ["http://pgp.mit.edu:11371/pks/lookup?op=get&search=0xE006035FD5EFEA67"], + "signing_method": "LR-PGP.1.0", + "signature": "-----BEGIN PGP SIGNED MESSAGE-----\nHash: SHA256\n\n316c7e1e9436cc45f5c2f0a1ba8d560adefb22e2ca4abf72fa5279267c88ac6e-----BEGIN PGP SIGNATURE-----\nVersion: BCPG v1.46\n\niQFdBAEBCABHBQJOt+/6QBxTbWl0aHNvbmlhbiBFZHVjYXRpb24gKFNtaXRoc29u\naWFuIEVkdWNhdGlvbikgPGxlYXJuaW5nQHNpLmVkdT4ACgkQ4AYDX9Xv6merCAf+\nPJQ6TX7jTo79a9XKhaSmFbYTgRz+D/uN9ksWJjsmvvoprqjMnsZBivD+3YDE/nTK\nttexx5Gy173Sj0wsojY4UPVezPmwbBjA2+2CG9btTKIsg3WwQpqzPeA/6LT46Ski\n2v3UbbGAMAU00ereuOjmdsqRZkXD/ABtZ/LYVMQCIqVMdR3aeQorHzuLlxTuzt/A\nMFxJb4A+a2jVw5nUM2Ry/x31Cb0pQ9uNO+jIWr8Xl3fjqiD5dUtySmVYvOEjEYcN\nh1twKySLmRWx0OfLN/Fnr1+N+sXT/s7lPCopKV3leEC2FOKDTjHhFM3mKUJvV4E+\ncXWLz6hBgEtIJlRuIj/o"}, + "doc_version": "0.23.0", + "active": True, + "identity": {"signer": "Smithsonian Education ", + "submitter": "Brokers of Expertise on behalf of Smithsonian Education", + "submitter_type": "agent", + "curator": "Smithsonian Education", + "owner": "Smithsonian American Art Museum"} + }, + + {"doc_type": "resource_data", + "resource_locator": "http://example.com/1", + "resource_data": "\n\n http://www.myboe.org/go/resource/23466\n Posters to Go\n PDF version of a set of fifteen posters from the National Portrait Gallery and the Smithsonian American Art Museum. Includes an application for receiving the actual posters for the classroom. Arranged into themes: Westward Expansion Civil War Harlem Renaissance World War II and the Sixties.\n \n en-US\n Free access\n text/html\n 2010-07-26\n 2010-07-26\n", + "TOS": {"submission_attribution": "Smithsonian Education", + "submission_TOS": "http://si.edu/Termsofuse"}, + "resource_data_type": "metadata", + "payload_schema_locator": "http://ns.nsdl.org/schemas/nsdl_dc/nsdl_dc_v1.02.xsd", + "payload_placement": "inline", + "payload_schema": ["NSDL DC 1.02.020"], + "node_timestamp": "2011-12-21T04:57:18.343124Z", + "digital_signature": {"key_location": ["http://pgp.mit.edu:11371/pks/lookup?op=get&search=0xE006035FD5EFEA67"], + "signing_method": "LR-PGP.1.0", + "signature": "-----BEGIN PGP SIGNED MESSAGE-----\nHash: SHA256\n\n316c7e1e9436cc45f5c2f0a1ba8d560adefb22e2ca4abf72fa5279267c88ac6e-----BEGIN PGP SIGNATURE-----\nVersion: BCPG v1.46\n\niQFdBAEBCABHBQJOt+/6QBxTbWl0aHNvbmlhbiBFZHVjYXRpb24gKFNtaXRoc29u\naWFuIEVkdWNhdGlvbikgPGxlYXJuaW5nQHNpLmVkdT4ACgkQ4AYDX9Xv6merCAf+\nPJQ6TX7jTo79a9XKhaSmFbYTgRz+D/uN9ksWJjsmvvoprqjMnsZBivD+3YDE/nTK\nttexx5Gy173Sj0wsojY4UPVezPmwbBjA2+2CG9btTKIsg3WwQpqzPeA/6LT46Ski\n2v3UbbGAMAU00ereuOjmdsqRZkXD/ABtZ/LYVMQCIqVMdR3aeQorHzuLlxTuzt/A\nMFxJb4A+a2jVw5nUM2Ry/x31Cb0pQ9uNO+jIWr8Xl3fjqiD5dUtySmVYvOEjEYcN\nh1twKySLmRWx0OfLN/Fnr1+N+sXT/s7lPCopKV3leEC2FOKDTjHhFM3mKUJvV4E+\ncXWLz6hBgEtIJlRuIj/o"}, + "create_timestamp": "2011-11-07T14:51:07.137671Z", + "doc_version": "0.21.0", + "active": True, + "identity": {"signer": "Smithsonian Education ", + "submitter": "Brokers of Expertise on behalf of Smithsonian Education", + "submitter_type": "agent", + "curator": "Smithsonian Education", + "owner": "Smithsonian American Art Museum"} + }, + + { + "doc_type": "resource_data", + "resource_locator": "http://example.com/2", + "resource_data": "\n\n http://www.myboe.org/go/resource/23466\n Posters to Go\n PDF version of a set of fifteen posters from the National Portrait Gallery and the Smithsonian American Art Museum. Includes an application for receiving the actual posters for the classroom. Arranged into themes: Westward Expansion Civil War Harlem Renaissance World War II and the Sixties.\n \n en-US\n Free access\n text/html\n 2010-07-26\n 2010-07-26\n", + "TOS": {"submission_attribution": "Smithsonian Education", + "submission_TOS": "http://si.edu/Termsofuse"}, + "resource_data_type": "metadata", + "payload_schema_locator": "http://ns.nsdl.org/schemas/nsdl_dc/nsdl_dc_v1.02.xsd", + "payload_placement": "inline", + "payload_schema": ["NSDL DC 1.02.020"], + "create_timestamp": "2011-11-07T14:51:07.137671Z", + "doc_version": "0.11.0", + "active": True, + "submitter": "Brokers of Expertise on behalf of Smithsonian Education", + "submitter_type": "agent", + } + ] + } + + result = json.loads(self.app.post('/publish', params=json.dumps(data), headers=headers).body) + assert(result['OK']), " Publish was not successfully" + + index = 0 + for docResults in result['document_results']: + assert(docResults['OK'] == True), "Publish should work for doc version {0}".format(data['documents'][index]['doc_version']) + assert('doc_ID' in docResults), "Publish should return doc_ID for doc version {0}".format(data['documents'][index]['doc_version']) + index = index +1 + + ##delete the published testdocuments. + #Wait for documents to be processed + sleep(5) + for doc in result['document_results']: + try: + del ResourceDataModel._defaultDB[doc['doc_ID']] + except: + pass + try: + del ResourceDataModel._defaultDB[doc['doc_ID']+'-distributable'] + except: + pass diff --git a/LR/lr/tests/functional/test_slices.py b/LR/lr/tests/functional/test_slices.py index 8b498a9d..b659c9a0 100644 --- a/LR/lr/tests/functional/test_slices.py +++ b/LR/lr/tests/functional/test_slices.py @@ -37,7 +37,7 @@ def writeTestData(obj): for testIdentity in obj.identities : obj.setupCount = obj.setupCount + 1 setupCountFlag = testName + "setupCount" + str(obj.setupCount) - testDoc = buildTestDoc(testIdentity+testName, [setupCountFlag, obj.testDataKey, testKey+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", ["nsdl_dc"]) + testDoc = buildTestDoc(testIdentity+testName, [setupCountFlag, obj.testDataKey, testKey+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", [obj.testSchema]) test_data["documents"].append(testDoc) docs_json = json.dumps(test_data) @@ -73,7 +73,7 @@ def writeMultiKeyTestData(obj): for testIdentity in obj.identities : obj.setupCount = obj.setupCount + 1 setupCountFlag = testName + "setupCount" + str(obj.setupCount) - testDoc = buildTestDoc(testIdentity+testName, [setupCountFlag, obj.testDataKey, obj.testKeys[0]+testName, obj.testKeys[1]+testName, obj.testKeys[2]+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", ["nsdl_dc"]) + testDoc = buildTestDoc(testIdentity+testName, [setupCountFlag, obj.testDataKey, obj.testKeys[0]+testName, obj.testKeys[1]+testName, obj.testKeys[2]+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", [obj.testSchema]) test_data["documents"].append(testDoc) docs_json = json.dumps(test_data) @@ -90,7 +90,7 @@ def writeResumptionTestData(obj): for x in xrange(0,num_docs): obj.setupCount = obj.setupCount + 1 setupCountFlag = testName + "setupCount" + str(obj.setupCount) - testDoc = buildTestDoc(obj.identities[1]+testName, [setupCountFlag, obj.testDataKey, obj.testKeys[0]+testName, obj.testKeys[1]+testName, obj.testKeys[2]+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", ["nsdl_dc"]) + testDoc = buildTestDoc(obj.identities[1]+testName, [setupCountFlag, obj.testDataKey, obj.testKeys[0]+testName, obj.testKeys[1]+testName, obj.testKeys[2]+testName, obj.otherKeys[0], obj.otherKeys[1]], "metadata", [obj.testSchema]) test_data["documents"].append(testDoc) #i = i+1 @@ -138,6 +138,7 @@ def removeTestData(obj): deleteDistributableFail = 0 print "delete attempt: " + str(deleteAttempts) del_key = quote("{\"tag\": \""+obj.testDataKey+"\"}") + #del_key = quote("{\"tag\": \"metadata\"}") print("del_key: " + del_key) print("del response call: " + obj.couch_url+"/resource_data/_design/learningregistry-slice/_view/docs?reduce=false&key="+del_key) response = urlopen(obj.couch_url+"/resource_data/_design/learningregistry-slice/_view/docs?reduce=false&key="+del_key) @@ -195,6 +196,7 @@ class TestSlicesSmallController(TestController): testKeys = ['alphaTestKey', 'betaTestKey', 'gammaTestKey'] otherKeys = ['deltaTestKey', 'epsilonTestKey', 'zetaTestKey'] + testSchema = 'NSDL' testDataKey = 'lr-test-data-slice-jbrecht' identities = ['FederalMuseumTestIdentity', 'FederalArchivesTestIdentity', 'UniversityXTestIdentity'] test_time_string = "T00:00:00.000000Z" @@ -226,6 +228,7 @@ def __init__(self, *args, **kwargs): self.controllerName = "slice" testKeys = ['alphaTestKey', 'betaTestKey', 'gammaTestKey'] otherKeys = ['deltaTestKey', 'epsilonTestKey', 'zetaTestKey'] + testSchema = 'NSDL' testDataKey = 'lr-test-data-slice-jbrecht' identities = ['FederalMuseumTestIdentity', 'FederalArchivesTestIdentity', 'UniversityXTestIdentity'] test_time_string = "T00:00:00.000000Z" @@ -314,6 +317,13 @@ def _checkTag(self, doc, tag) : return False + def _checkSchema(self, doc, testSchema) : + for schema in doc['payload_schema'] : + if schema.lower() == testSchema.lower() : return True + + return False + + #paramKeys = ['start_date', 'identity', 'any_tags', 'full_docs'] #call slice with the supplied parameters @@ -386,41 +396,35 @@ def test_by_identity(self): assert self._checkIdentity(docs[1]['resource_data_description'], self.identities[1]+"test_by_identity") assert self._checkIdentity(docs[2]['resource_data_description'], self.identities[1]+"test_by_identity") - #test that there are 100 docs in the first result and 50 in the result after 1 resumption - #grab the service document for slice: http://127.0.0.1:5984/node/access%3Aslice - @DataCleaner("test_resumption", "Resumption") - def test_resumption(self): - - -# response = urlopen(obj.couch_url+"/resource_data/_design/learningregistry-slice/_view/docs?reduce=false&key=\""+obj.testDataKey+"\"") -# body = response.read() -# data = json.loads(body) -# page_size = data["rows"] - - slice_doc = helpers.getServiceDocument("access:slice") - page_size = slice_doc["service_data"]["doc_limit"] - - ##add test to assert that flow control is enabled, check that flow_control in service_data is true - - parameters = {} - parameters[IDENTITY] = self.identities[1]+"test_resumption" - parameters[IDS_ONLY] = False - response = self._slice(parameters) - data = json.loads(response.body) - docs = data["documents"] - if len(docs)!=100 : - print "resumption assert will fail. doc count is: " + str(len(docs)) - assert len(docs)==100 - for doc in docs: - assert self._checkIdentity(doc['resource_data_description'], self.identities[1]+"test_resumption") - resumption_token = data["resumption_token"] - parameters[RESUMPTION] = resumption_token - response = self._slice(parameters) - data = json.loads(response.body) - docs = data["documents"] - assert len(docs)==50 - for doc in docs: - assert self._checkIdentity(doc['resource_data_description'], self.identities[1]+"test_resumption") +# #test that there are 100 docs in the first result and 50 in the result after 1 resumption +# #grab the service document for slice: http://127.0.0.1:5984/node/access%3Aslice +# @DataCleaner("test_resumption", "Resumption") +# def test_resumption(self): +# +# slice_doc = helpers.getServiceDocument("access:slice") +# page_size = slice_doc["service_data"]["doc_limit"] +# +# ##add test to assert that flow control is enabled, check that flow_control in service_data is true +# +# parameters = {} +# parameters[IDENTITY] = self.identities[1]+"test_resumption" +# parameters[IDS_ONLY] = False +# response = self._slice(parameters) +# data = json.loads(response.body) +# docs = data["documents"] +# if len(docs)!=100 : +# print "resumption assert will fail. doc count is: " + str(len(docs)) +# assert len(docs)==100 +# for doc in docs: +# assert self._checkIdentity(doc['resource_data_description'], self.identities[1]+"test_resumption") +# resumption_token = data["resumption_token"] +# parameters[RESUMPTION] = resumption_token +# response = self._slice(parameters) +# data = json.loads(response.body) +# docs = data["documents"] +# assert len(docs)==50 +# for doc in docs: +# assert self._checkIdentity(doc['resource_data_description'], self.identities[1]+"test_resumption") #test that there are 3 documents with key = testKeys[0] @@ -443,6 +447,26 @@ def test_by_single_key(self): assert self._checkTag(docs[1]['resource_data_description'], self.testKeys[0]+"test_by_single_key") assert self._checkTag(docs[2]['resource_data_description'], self.testKeys[0]+"test_by_single_key") + #test that there are 9*DATA_MULTIPLIER documents with key = "NSDL DC 1.02.020" + @DataCleaner("test_by_paradata_schema") + def test_by_paradata_schema(self): + parameters = {} + parameters[ANY_TAGS] = [self.testSchema] + parameters[IDS_ONLY] = False + response = self._slice(parameters) + docs = self._loadAllDocs(parameters, response) + print "docs length in schema test is: " + str(len(docs)) + if len(docs)!=9*DATA_MULTIPLIER : + print "assert will fail in test_by_paradata_schema. docs are:" + for doc in docs : + doc_id = doc["doc_ID"] + couchdoc = self.db[doc_id] + print json.dumps(couchdoc) + assert len(docs)==9*DATA_MULTIPLIER + if len(docs)==9*DATA_MULTIPLIER : + assert self._checkSchema(docs[0]['resource_data_description'], self.testSchema) + assert self._checkSchema(docs[1]['resource_data_description'], self.testSchema) + assert self._checkSchema(docs[2]['resource_data_description'], self.testSchema) @DataCleaner("test_by_multiple_keys", "Multi") def test_by_multiple_keys(self): diff --git a/config/services/Basic_Publish.py b/config/services/Basic_Publish.py index 04a9bd5c..6e6195a4 100644 --- a/config/services/Basic_Publish.py +++ b/config/services/Basic_Publish.py @@ -20,7 +20,7 @@ def install(server, dbname, setupInfo): custom_opts["doc_limit"] = int(active) active = getInput("Enter message size limit in octet. \n"+ - "This should the maximum data size the the node will accept ", None, isInt) + "This should the maximum data size the the node will accept", None, isInt) custom_opts["msg_size_limit"] = int(active) @@ -36,8 +36,6 @@ def install(server, dbname, setupInfo): print("Configured Basic Publish service:\n{0}\n".format(json.dumps(doc, indent=4, sort_keys=True))) - - class __BasicPublishServiceTemplate(ServiceTemplate): def __init__(self): ServiceTemplate.__init__(self) diff --git a/config/setup_node.py b/config/setup_node.py index 4b0d14d3..a5ea8df1 100755 --- a/config/setup_node.py +++ b/config/setup_node.py @@ -129,6 +129,26 @@ def setNetworkId(): t.network_policy_description['network_id'] = network t.network_policy_description['policy_id'] =network+" policy" +def setConfigFile(nodeSetup): + + #create a copy of the existing config file as to not overide it. + if os.path.exists(_PYLONS_CONFIG_DEST): + backup = _PYLONS_CONFIG_DEST+".backup" + print("\nMove existing {0} to {1}".format(_PYLONS_CONFIG_DEST, backup)) + shutil.copyfile(_PYLONS_CONFIG_DEST, backup) + + #Update pylons config file to use the couchdb url + _config.set("app:main", "couchdb.url", nodeSetup['couchDBUrl']) + # set the url to for destribute/replication (that is the url that a source couchdb node + # will use for replication. + _config.set("app:main", "lr.distribute_resource_data_url", nodeSetup['distributeResourceDataUrl']) + + destConfigfile = open(_PYLONS_CONFIG_DEST, 'w') + _config.write(destConfigfile) + destConfigfile.close() + + + if __name__ == "__main__": from optparse import OptionParser @@ -152,19 +172,16 @@ def setNetworkId(): for k in nodeSetup.keys(): print("{0}: {1}".format(k, nodeSetup[k])) - #create a copy of the existing config file as to not overide it. - if os.path.exists(_PYLONS_CONFIG_DEST): - backup = _PYLONS_CONFIG_DEST+".backup" - print("\nMove existing {0} to {1}".format(_PYLONS_CONFIG_DEST, backup)) - shutil.copyfile(_PYLONS_CONFIG_DEST, backup) - - #Update pylons config file to use the couchdb url - _config.set("app:main", "couchdb.url", nodeSetup['couchDBUrl']) + setConfigFile(nodeSetup) + + server = couchdb.Server(url= nodeSetup['couchDBUrl']) + if server.version() < "1.1.0": + _config.set("app:main", "couchdb.stale.flag", "OK") destConfigfile = open(_PYLONS_CONFIG_DEST, 'w') _config.write(destConfigfile) destConfigfile.close() - server = couchdb.Server(url= nodeSetup['couchDBUrl']) + #Create the databases. CreateDB(server, dblist=[_RESOURCE_DATA]) diff --git a/config/setup_utils.py b/config/setup_utils.py index c03c7846..91345990 100644 --- a/config/setup_utils.py +++ b/config/setup_utils.py @@ -152,6 +152,10 @@ def getSetupInfo(): isNodeOpen = getInput('Is the node "open" (T/F)', 'T') nodeSetup['open_connect_source'] = (isNodeOpen=='T') + nodeSetup['distributeResourceDataUrl'] = getInput("\nEnter distribute/replication "+ + "resource_data destination URL \n(this is the resource_data URL that another node couchdb "+ + "will use to replicate/distribute to this node)", "{0}/resource_data".format(nodeUrl)) + isDistributeDest = getInput("Does the node want to be the destination for replication (T/F)", 'T') nodeSetup['open_connect_dest'] =(isDistributeDest =='T') return nodeSetup diff --git a/couchdb/resource_data/apps/learningregistry-slice/views/docs/map.js b/couchdb/resource_data/apps/learningregistry-slice/views/docs/map.js index acc9ebf4..faa44dc0 100755 --- a/couchdb/resource_data/apps/learningregistry-slice/views/docs/map.js +++ b/couchdb/resource_data/apps/learningregistry-slice/views/docs/map.js @@ -13,14 +13,38 @@ function(doc) { return false; } + //grab all the identities in identity or submitter/curator/owner/signer (depending on version) + //if any identities are identical, ignore redundant ones. if(doc.identity) { - if(doc.identity.submitter) identities.push(doc.identity.submitter.toLowerCase()); - if(doc.identity.curator) identities.push(doc.identity.curator.toLowerCase()); - if(doc.identity.owner) identities.push(doc.identity.owner.toLowerCase()); - if(doc.identity.signer) identities.push(doc.identity.signer.toLowerCase()); + if(doc.identity.submitter) { + identities.push(doc.identity.submitter.toLowerCase()); + } + if(doc.identity.curator) { + var curator = doc.identity.curator.toLowerCase(); + if(!arrayContains(identities,curator)) { + identities.push(curator); + } + } + if(doc.identity.owner) { + var owner = doc.identity.owner.toLowerCase(); + if(!arrayContains(identities,owner)) { + identities.push(owner); + } + } + if(doc.identity.signer) { + var signer = doc.identity.signer.toLowerCase(); + if(!arrayContains(identities,signer)) { + identities.push(signer); + } + } } - if(doc.submitter) identities.push(doc.submitter.toLowerCase()); + if(doc.submitter) { + var submitter = doc.submitter.toLowerCase(); + if(!arrayContains(identities,submitter)) { + identities.push(submitter); + } + } //build identities indices