Permalink
Browse files

optimizing performance of ComputeDataService

  • Loading branch information...
1 parent 45d7d51 commit 15d1931187bc3eecaf519e5feb6c63bcf7ba7a1d @drelu drelu committed Jul 31, 2013
View
@@ -665,19 +665,24 @@ def dequeue_new_jobs(self):
job_counter = 0
while self.is_stopped(self.base_url)==False:
if len(self.freenodes)==0:
- time.sleep(3)
+ time.sleep(1)
continue
- logger.debug("Dequeue sub-job from PilotCompute queue: " + self.base_url)
- job_url=self.coordination.dequeue_job(self.base_url)
+ if self.cds_queue_url!=None:
+ logger.debug("Dequeue sub-job from PilotCompute queue: " + self.base_url
+ + " AND ComputeDataService queue: " + self.cds_queue_url)
+ job_url=self.coordination.dequeue_job(self.base_url, self.cds_queue_url)
+ else:
+ logger.debug("Dequeue sub-job from PilotCompute queue: " + self.base_url)
+ job_url=self.coordination.dequeue_job(self.base_url)
logger.debug("Dequed:%s"%str(job_url))
- if job_url==None:
- if self.cds_queue_url!=None:
- logger.debug("Dequeue sub-job from ComputeDataServicequeue: " + self.cds_queue_url)
- job_url=self.coordination.dequeue_job(self.cds_queue_url)
- logger.debug("Dequed:%s"%str(job_url))
- if job_url==None:
- time.sleep(3)
- continue
+# if job_url==None:
+# if self.cds_queue_url!=None:
+# logger.debug("Dequeue sub-job from ComputeDataServicequeue: " + self.cds_queue_url)
+# job_url=self.coordination.dequeue_job(self.cds_queue_url)
+# logger.debug("Dequed:%s"%str(job_url))
+# if job_url==None:
+# time.sleep(3)
+# continue
if job_url=="STOP":
break
@@ -687,7 +692,8 @@ def dequeue_new_jobs(self):
request = WorkRequest(self.start_new_job_in_thread, [job_url])
self.threadpool.putRequest(request)
- #time.sleep(1)
+ if self.coordination.get_queue_length(self.cds_queue_url)==0 and self.coordination.get_queue_length(self.base_url)==0:
+ time.sleep(1)
# wait for termination of Worker Threads
# self.threadpool.wait()
@@ -204,15 +204,28 @@ def queue_job(self, pilot_url, job_url):
self.redis_client.lpush(queue_name, job_url)
- def dequeue_job(self, pilot_url):
+ def dequeue_job(self, pilot_url, pilot_url2=None):
""" deque to new job of a certain pilot """
- queue_name = pilot_url + ":queue"
- logger.debug("Dequeue sub-job from: " + queue_name
- + " number queued items: " + str(self.redis_client.llen(queue_name)))
+ queue_list = []
+ queue_name = pilot_url + ":queue"
+ queue_list.append(queue_name)
+ if pilot_url2!=None:
+ queue_name2 = pilot_url2 + ":queue"
+ queue_list.append(queue_name2)
+ logger.debug("Dequeue sub-job from: " + str(queue_list))
+ #+ " number queued items: " + str(self.redis_client.llen(queue_name)))
#self.redis_client.set(queue_name + ':last_out', pickle.dumps(datetime.datetime.now()))
- job_url = self.redis_client.brpop(queue_name, timeout=5)
+ job_url = self.redis_client.brpop(queue_list, timeout=1)
+ #job_url = self.redis_client.rpop(queue_name)
+ logger.debug("Dequeued: " + str(job_url))
if job_url==None:
return job_url
logger.debug("Dequeued: " + str(job_url))
return job_url[1]
+
+ def get_queue_length(self, pilot_url):
+ queue_name = pilot_url + ":queue"
+ length = self.redis_client.llen(queue_name)
+ logger.debug("Queue: " + queue_name + " number queued items: " + str(length))
+ return length
@@ -7,7 +7,7 @@
from pilot import PilotComputeService, ComputeDataService, State
-COORDINATION_URL = "redis://ILikeBigJob_wITH-REdIS@gw68.quarry.iu.teragrid.org:6379"
+COORDINATION_URL = "redis://localhost:6379"
if __name__ == "__main__":
@@ -136,6 +136,18 @@ def list_pilot_compute(self):
return self.pilot_job_service
+ def get_details(self):
+ """ returns a list with dicts that contains the details of the Pilot Compute,
+ - job state
+ - description
+ - ...
+ """
+ pilot_details=[]
+ for pcs in self.pilot_job_services:
+ for pc in pcs.list_pilots():
+ pilot_details.append(pc.get_details())
+ return pilot_details
+
###########################################################################
# Compute Data Service private methods
def __submit_cu(self, compute_unit):
@@ -6,7 +6,7 @@
COORDINATION_URL = "redis://localhost"
-NUMBER_CUS=512
+NUMBER_CUS=128
NUMBER_SLOTS=16
if __name__ == "__main__":
@@ -35,12 +35,17 @@
"error": "stderr.txt",
}
+ cds = ComputeDataService()
+ cds.add_pilot_compute_service(pilot_compute_service)
+
+ unitservice = pilotjob
+
for i in range(0,NUMBER_CUS):
- compute_unit = pilotjob.submit_compute_unit(compute_unit_description)
+ compute_unit = unitservice.submit_compute_unit(compute_unit_description)
print("Finished setup. Waiting for scheduling of CU")
- pilotjob.wait()
+ unitservice.wait()
runtime=time.time()-start

0 comments on commit 15d1931

Please sign in to comment.