diff --git a/requirements.txt b/requirements.txt index f6a5ebec..5a013202 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,6 @@ certifi==0.0.8 chardet==1.0.1 distribute==0.6.10 iso8601==0.1.4 -librato==0.2 lxml==2.3.4 newrelic==1.4.0.137 nose==1.1.2 diff --git a/totalimpact/backend.py b/totalimpact/backend.py index 1bffe42c..96c88b89 100755 --- a/totalimpact/backend.py +++ b/totalimpact/backend.py @@ -11,27 +11,6 @@ logger.setLevel(logging.DEBUG) thread_count = defaultdict(dict) -#import librato -# mylibrato = librato.LibratoConnection(os.environ["LIBRATO_METRICS_USER"], os.environ["LIBRATO_METRICS_TOKEN"]) -# def get_or_create(metric_type, name, description): -# if metric_type=="counter": -# try: -# metric = mylibrato.get_counter(name) -# except librato.exceptions.ClientError: -# metric = mylibrato.create_counter(name, description) -# else: -# try: -# metric = mylibrato.get_gauge(name) -# except librato.exceptions.ClientError: -# metric = mylibrato.create_gauge(name, description) -# return metric - -# librato_provider_thread_start = get_or_create("guage", "provider_thread_start", "+1 when a provider thread is started") -# librato_provider_thread_end = get_or_create("guage", "provider_thread_end", "+1 when a provider thread is ended") -# librato_provider_thread_run_duration = get_or_create("gauge", "provider_thread_run_duration", "elapsed time for a provider thread to run") -# librato_provider_thread_launch_duration = get_or_create("gauge", "provider_thread_launch_duration", "elapsed time for a provider thread to launch") -# librato_provider_thread_count = get_or_create("gauge", "provider_thread_count", "number of threads running") - class RedisQueue(object): def __init__(self, queue_name, myredis): @@ -103,7 +82,7 @@ def __init__(self, provider, polling_interval, alias_queue, provider_queue, couc self.couch_queues = couch_queues self.wrapper = wrapper self.myredis = myredis - self.name = "worker_"+self.provider_name + self.name = self.provider_name+"_worker" # last variable is an artifact so it has same call signature as other callbacks def add_to_couch_queue_if_nonzero(self, tiid, new_content, method_name, dummy=None): @@ -132,7 +111,7 @@ def wrapper(cls, tiid, input_aliases_dict, provider, method_name, aliases_provid # "wrapper", tiid=tiid, provider_name=provider.provider_name, method_name=method_name, aliases=aliases)) provider_name = provider.provider_name - worker_name = "worker_"+provider_name + worker_name = provider_name+"_worker" input_alias_tuples = ItemFactory.alias_tuples_from_dict(input_aliases_dict) method = getattr(provider, method_name) @@ -158,12 +137,9 @@ def wrapper(cls, tiid, input_aliases_dict, provider, method_name, aliases_provid logger.info("{:20}: RETURNED {tiid} {method_name} {provider_name} : {response}".format( worker_name, tiid=tiid, method_name=method_name.upper(), provider_name=provider_name.upper(), response=response)) - + callback(tiid, response, method_name, aliases_providers_run) - # librato_provider_thread_run_duration.add(time.time()-start_time, source=provider_name) - # librato_provider_thread_end.add(1, source=provider_name) - # librato_provider_thread_count.add(len(thread_count[provider_name].keys()), source=provider_name) del thread_count[provider_name][tiid+method_name] return response @@ -171,34 +147,32 @@ def wrapper(cls, tiid, input_aliases_dict, provider, method_name, aliases_provid def run(self): provider_message = self.provider_queue.pop() if provider_message: - logger.info("POPPED from queue for {provider}".format( - provider=self.provider_name)) + #logger.info("POPPED from queue for {provider}".format( + # provider=self.provider_name)) (tiid, alias_dict, method_name, aliases_providers_run) = provider_message if method_name == "aliases": callback = self.add_to_alias_and_couch_queues else: callback = self.add_to_couch_queue_if_nonzero - thread_count[self.provider.provider_name][tiid+method_name] = 1 + #logger.info("BEFORE STARTING thread for {tiid} {method_name} {provider}".format( + # method_name=method_name.upper(), tiid=tiid, num=len(thread_count[self.provider.provider_name].keys()), + # provider=self.provider.provider_name.upper())) - logger.info("BEFORE STARTING thread for {tiid} {method_name} {provider}, now at {num} {provider} threads".format( - method_name=method_name.upper(), tiid=tiid, num=len(thread_count[self.provider.provider_name].keys()), - provider=self.provider.provider_name.upper())) + thread_count[self.provider.provider_name][tiid+method_name] = 1 + number_of_threads_for_this_provider = len(thread_count[self.provider.provider_name].keys()) + number_of_total_provider_threads = sum([len(thread_count[p].keys()) for p in thread_count]) - logger.info("NUMBER of {provider} threads = {num}".format( - num=len(thread_count[self.provider.provider_name].keys()), + logger.info("NUMBER of {provider} threads = {num_provider}, all provider threads = {num_total}".format( + num_provider=number_of_threads_for_this_provider, + num_total=number_of_total_provider_threads, provider=self.provider.provider_name.upper())) - # librato_provider_thread_start.add(1, source=self.provider.provider_name) - # librato_provider_thread_count.add(len(thread_count[self.provider.provider_name].keys()), source=self.provider.provider_name) - t = threading.Thread(target=ProviderWorker.wrapper, args=(tiid, alias_dict, self.provider, method_name, aliases_providers_run, callback), name=self.provider_name+"-"+method_name.upper()+"-"+tiid[0:4]) t.start() - # librato_provider_thread_launch_duration.add(time.time()-start_time, source=self.provider.provider_name) - # sleep to give the provider a rest :) time.sleep(self.polling_interval) @@ -209,7 +183,7 @@ def __init__(self, couch_queue, myredis, mydao): self.couch_queue = couch_queue self.myredis = myredis self.mydao = mydao - self.name = "worker_" + self.couch_queue.queue_name + self.name = self.couch_queue.queue_name + "_worker" def update_item_with_new_aliases(self, alias_dict, item): if alias_dict == item["aliases"]: @@ -324,15 +298,15 @@ def sniffer(cls, item_aliases, aliases_providers_run, provider_config=default_se def run(self): alias_message = self.alias_queue.pop() if alias_message: - logger.info("{:20}: alias_message said {alias_message}".format( - "Backend.run", alias_message=alias_message)) + logger.info("alias_message said {alias_message}".format( + alias_message=alias_message)) (tiid, alias_dict, aliases_providers_run) = alias_message relevant_provider_names = self.sniffer(alias_dict, aliases_providers_run) - logger.info("{:20}: for {tiid} sniffer got input {alias_dict}".format( - "Backend", tiid=tiid, alias_dict=alias_dict)) - logger.info("{:20}: for {tiid} sniffer returned {providers}".format( - "Backend", tiid=tiid, providers=relevant_provider_names)) + logger.info("backend for {tiid} sniffer got input {alias_dict}".format( + tiid=tiid, alias_dict=alias_dict)) + logger.info("backend for {tiid} sniffer returned {providers}".format( + tiid=tiid, providers=relevant_provider_names)) # list out the method names so they are run in that priority, biblio before metrics for method_name in ["aliases", "biblio", "metrics"]: @@ -356,11 +330,11 @@ def main(): # these need to match the tiid alphabet defined in models: couch_queues = {} for i in "abcdefghijklmnopqrstuvwxyz1234567890": - couch_queues[i] = PythonQueue("couch_queue_"+i) + couch_queues[i] = PythonQueue(i+"_couch_queue") couch_worker = CouchWorker(couch_queues[i], myredis, mydao) couch_worker.spawn_and_loop() - logger.info("{:20}: launched backend couch worker with couch_queue_{i}".format( - "Backend", i=i)) + logger.info("launched backend couch worker with {i}_couch_queue".format( + i=i)) polling_interval = 0.1 # how many seconds between polling to talk to provider diff --git a/totalimpact/dao.py b/totalimpact/dao.py index 6d3a7a58..c79e9e37 100644 --- a/totalimpact/dao.py +++ b/totalimpact/dao.py @@ -107,18 +107,18 @@ def save(self, doc): if "_id" not in doc: raise KeyError("tried to save doc with '_id' key unset.") - logger.info("%20s saving id '%s'" %("dao", doc["_id"])) + #logger.info("dao saving id '%s'" %(doc["_id"])) retry = True while retry: try: response = self.db.save(doc) retry = False except couchdb.ResourceConflict, e: - logger.info("%20s Couch conflict saving %s; will retry" % ("dao", doc["_id"])) + logger.info("dao Couch conflict saving %s; will retry" % (doc["_id"])) newer_doc = self.get(doc["_id"]) doc["_rev"] = newer_doc["_rev"] time.sleep(0.1) - logger.info("%20s saved %s" %("dao", doc["_id"])) + logger.info("dao saved %s" %(doc["_id"])) return response diff --git a/totalimpact/providers/provider.py b/totalimpact/providers/provider.py index 0a412fd9..ea95c0e3 100644 --- a/totalimpact/providers/provider.py +++ b/totalimpact/providers/provider.py @@ -119,11 +119,11 @@ def _get_error(self, status_code, response=None): text = "" if status_code >= 500: error = ProviderServerError(response) - self.logger.info("%20s ProviderServerError status code=%i, %s, %s" + self.logger.info("%s ProviderServerError status code=%i, %s, %s" % (self.provider_name, status_code, text, str(headers))) else: error = ProviderClientError(response) - self.logger.info("%20s ProviderClientError status code=%i, %s, %s" + self.logger.info("%s ProviderClientError status code=%i, %s, %s" % (self.provider_name, status_code, text, str(headers))) raise(error) @@ -136,7 +136,7 @@ def _get_templated_url(self, template, id, method=None): def relevant_aliases(self, aliases): filtered = [alias for alias in aliases if self.is_relevant_alias(alias)] - #self.logger.debug("%20s relevant_aliases are %s given %s" % (self.provider_name, str(filtered), str(aliases))) + #self.logger.debug("%s relevant_aliases are %s given %s" % (self.provider_name, str(filtered), str(aliases))) return filtered @@ -207,7 +207,7 @@ def member_items(self, if not self.provides_members: raise NotImplementedError() - self.logger.debug("%20s getting member_items for %s" % (self.provider_name, query_string)) + self.logger.debug("%s getting member_items for %s" % (self.provider_name, query_string)) if not provider_url_template: provider_url_template = self.member_items_url_template @@ -219,7 +219,7 @@ def member_items(self, response = self.http_get(url, cache_enabled=cache_enabled) if response.status_code != 200: - self.logger.info("%20s status_code=%i" + self.logger.info("%s status_code=%i" % (self.provider_name, response.status_code)) if response.status_code == 404: raise ProviderItemNotFoundError @@ -246,7 +246,7 @@ def biblio(self, # Only lookup biblio for items with appropriate ids if not id: - #self.logger.debug("%20s not checking biblio, no relevant alias" % (self.provider_name)) + #self.logger.debug("%s not checking biblio, no relevant alias" % (self.provider_name)) return None if not provider_url_template: @@ -263,7 +263,7 @@ def get_biblio_for_id(self, if not self.provides_biblio: return {} - self.logger.debug("%20s getting biblio for %s" % (self.provider_name, id)) + self.logger.debug("%s getting biblio for %s" % (self.provider_name, id)) if not provider_url_template: provider_url_template = self.biblio_url_template @@ -273,7 +273,7 @@ def get_biblio_for_id(self, response = self.http_get(url, cache_enabled=cache_enabled) if response.status_code != 200: - self.logger.info("%20s status_code=%i" + self.logger.info("%s status_code=%i" % (self.provider_name, response.status_code)) if response.status_code == 404: #not found return {} @@ -302,7 +302,7 @@ def aliases(self, relevant_aliases = self.relevant_aliases(aliases) if not relevant_aliases: - #self.logger.debug("%20s not checking aliases, no relevant alias" % (self.provider_name)) + #self.logger.debug("%s not checking aliases, no relevant alias" % (self.provider_name)) return [] new_aliases = [] @@ -323,7 +323,7 @@ def _get_aliases_for_id(self, if not self.provides_aliases: return [] - self.logger.debug("%20s getting aliases for %s" % (self.provider_name, id)) + self.logger.debug("%s getting aliases for %s" % (self.provider_name, id)) if not provider_url_template: provider_url_template = self.aliases_url_template @@ -333,7 +333,7 @@ def _get_aliases_for_id(self, response = self.http_get(url, cache_enabled=cache_enabled) if response.status_code != 200: - self.logger.info("%20s status_code=%i" + self.logger.info("%s status_code=%i" % (self.provider_name, response.status_code)) if response.status_code == 404: return [] @@ -364,7 +364,7 @@ def metrics(self, # Only lookup metrics for items with appropriate ids if not id: - #self.logger.debug("%20s not checking metrics, no relevant alias" % (self.provider_name)) + #self.logger.debug("%s not checking metrics, no relevant alias" % (self.provider_name)) return {} if not provider_url_template: @@ -388,7 +388,7 @@ def get_metrics_for_id(self, if not self.provides_metrics: return {} - self.logger.debug("%20s getting metrics for %s" % (self.provider_name, id)) + self.logger.debug("%s getting metrics for %s" % (self.provider_name, id)) if not provider_url_template: provider_url_template = self.metrics_url_template @@ -397,7 +397,7 @@ def get_metrics_for_id(self, # try to get a response from the data provider response = self.http_get(url, cache_enabled=cache_enabled, allow_redirects=True) - #self.logger.debug("%20s get_metrics_for_id response.status_code %i" % (self.provider_name, response.status_code)) + #self.logger.debug("%s get_metrics_for_id response.status_code %i" % (self.provider_name, response.status_code)) # extract the metrics metrics_dict = self._extract_metrics(response.text, response.status_code, id=id) @@ -468,7 +468,7 @@ class CachedResponse: self.logger.debug("LIVE %s" %(url)) r = requests.get(url, headers=headers, timeout=timeout, proxies=proxies, allow_redirects=allow_redirects, verify=False) except requests.exceptions.Timeout as e: - self.logger.info("%20s Attempt to connect to provider timed out during GET on %s" %(self.provider_name, url)) + self.logger.info("%s Attempt to connect to provider timed out during GET on %s" %(self.provider_name, url)) raise ProviderTimeout("Attempt to connect to provider timed out during GET on " + url, e) except requests.exceptions.RequestException as e: raise ProviderHttpError("RequestException during GET on: " + url, e) @@ -540,7 +540,7 @@ def _load_json(page): try: data = simplejson.loads(page) except simplejson.JSONDecodeError, e: - logger.error("%20s json decode fail '%s'. Here's the string: %s" %("_load_json", e.msg, page)) + logger.error("%s json decode fail '%s'. Here's the string: %s" %("_load_json", e.msg, page)) raise ProviderContentMalformedError return(data) @@ -667,7 +667,7 @@ def _extract_from_xml(page, dict_of_keylists): # given a url that has a doi embedded in it, return the doi def doi_from_url_string(url): - logger.info("%20s parsing url %s" %("doi_from_url_string", url)) + logger.info("%s parsing url %s" %("doi_from_url_string", url)) result = re.findall("(10\.\d+.[0-9a-wA-W_/\.\-%]+)" , url, re.DOTALL) try: diff --git a/totalimpact/tiredis.py b/totalimpact/tiredis.py index a8c66077..1afcdf09 100644 --- a/totalimpact/tiredis.py +++ b/totalimpact/tiredis.py @@ -24,7 +24,7 @@ def get_num_providers_left(self, item_id): def decr_num_providers_left(self, item_id, provider_name): num_providers_left = self.decr(item_id) - logger.info("%20s bumped providers_run with %s for %s. %s left to run." % ("tiredis", + logger.info("bumped providers_run with %s for %s. %s left to run." % ( provider_name, item_id, num_providers_left)) return int(num_providers_left)