Permalink
Browse files

some work on logging

  • Loading branch information...
1 parent c249a7f commit 32b8de444a77c8f643f148ba271bca805540befd @drelu drelu committed Jun 7, 2012
View
@@ -1,8 +1,8 @@
# SAGA Implementation: bliss, saga
[DEFAULT]
-saga=saga
+saga=bliss
# Logging config
# logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL
# logging.level=logging.DEBUG
-logging.level=logging.INFO
+logging.level=logging.FATAL
View
@@ -47,10 +47,15 @@
#print("Set logging level: %s"%(logging_level))
- logging.basicConfig(level=logging_level, datefmt='%m/%d/%Y %I:%M:%S %p',
+ logging.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(name='bigjob')
+ #logger.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
+ # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.setLevel(logging_level)
+
+ paramiko_logger = logging.getLogger(name="paramiko.transport")
+ paramiko_logger.setLevel(logging.ERROR)
#logging.basicConfig(level=logging_level)
saga = default_dict["saga"]
@@ -73,21 +78,10 @@
# define external-facing API
-from bigjob.bigjob_manager import bigjob as myBigjob
-from bigjob.bigjob_manager import subjob as mySubjob
-
-
-class subjob(mySubjob):
- pass
-
-
-class bigjob(myBigjob):
- pass
-
-
+from bigjob.bigjob_manager import bigjob
+from bigjob.bigjob_manager import subjob
try:
- from bigjob.bigjob_manager import description as myDescription
- class description(myDescription):
- pass
+ from bigjob.bigjob_manager import description
except:
pass
+
View
@@ -22,6 +22,10 @@
from bigjob import SAGA_BLISS
from bigjob.state import Running, New, Failed, Done, Unknown
+# import other BigJob packages
+# import API
+import api.base
+sys.path.append(os.path.dirname(__file__))
if SAGA_BLISS == False:
try:
@@ -44,10 +48,6 @@
logger.warn("SAGA Bliss not found")
-# import other BigJob packages
-# import API
-import api.base
-sys.path.append(os.path.dirname(__file__))
if is_bliss:
import bliss.saga as saga
@@ -25,7 +25,7 @@
import bigjob.bigjob_manager
# Log everything, and send it to stderr.
-logging.basicConfig(level=logging.DEBUG)
+#logging.basicConfig(level=logging.DEBUG)
COORDINATION_URL="advert://advert.cct.lsu.edu:8080"
@@ -2,10 +2,6 @@
import os
import time
-import logging
-#from bigjob import logger
-#logger.setLevel(logging.FATAL)
-
sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, os.getcwd() + "/../")
from pilot import PilotComputeService, ComputeDataService, State
@@ -20,8 +16,8 @@
tcp://* (ZMQ - listening to all interfaces)
"""
-COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
-#COORDINATION_URL = "redis://localhost:6379"
+#COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
+COORDINATION_URL = "redis://localhost:6379"
if __name__ == "__main__":
@@ -57,18 +53,18 @@
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
- logging.debug("Finished setup. Waiting for scheduling of CU")
+ print("Finished setup. Waiting for scheduling of CU")
compute_data_service.wait()
while compute_unit != State.Done:
- logging.debug("Final state check...")
+ print("Final state check...")
state_cu = compute_unit.get_state()
print "PCS State %s" % pilot_compute_service
print "CU: %s State: %s"%(compute_unit, state_cu)
if state_cu==State.Done:
break
time.sleep(2)
- logging.debug("Terminate Pilot Compute and Compute Data Service")
+ print("Terminate Pilot Compute and Compute Data Service")
compute_data_service.cancel()
pilot_compute_service.cancel()
@@ -27,7 +27,7 @@ class NoCoordinationAdaptor:
def get_base_url(cls, application_id):
surl = SAGAUrl(cls.BASE_URL)
base_url = surl.scheme + "://" + surl.host + "/" + application_id + "/"
- logging.debug(base_url)
+ logger.debug(base_url)
return base_url
###########################################################################
@@ -17,7 +17,7 @@
paramiko_logger = paramiko.util.logging.getLogger()
-paramiko_logger.setLevel(logging.WARN)
+#paramiko_logger.setLevel(logging.ERROR)
class SSHFileAdaptor(object):
""" BigData Coordination File Management for Pilot Store """
@@ -83,20 +83,20 @@ def put_du(self, du):
def put_du_paramiko(self, du):
- logging.debug("Copy DU using Paramiko")
+ logger.debug("Copy DU using Paramiko")
for i in du.list_data_unit_items():
remote_path = os.path.join(self.path, str(du.id), os.path.basename(i.local_url))
- logging.debug("Put file: %s to %s"%(i.local_url, remote_path))
+ logger.debug("Put file: %s to %s"%(i.local_url, remote_path))
if i.local_url.startswith("ssh://"):
# check if remote path is directory
if self.__is_remote_directory(i.local_url):
- logging.warning("Path %s is a directory. Ignored."%i.local_url)
+ logger.warning("Path %s is a directory. Ignored."%i.local_url)
continue
self.__third_party_transfer(i.local_url, remote_path)
else:
if stat.S_ISDIR(os.stat(i.local_url).st_mode):
- logging.warning("Path %s is a directory. Ignored."%i.local_url)
+ logger.warning("Path %s is a directory. Ignored."%i.local_url)
continue
#self.__sftp.put(i.local_url, remote_path, self.put_progress, True)
ssh_client, sftp_client = self.__create_sftp_client()
@@ -106,21 +106,21 @@ def put_du_paramiko(self, du):
def put_du_scp(self, du):
- logging.debug("Copy DU using SCP")
+ logger.debug("Copy DU using SCP")
for i in du.list_data_unit_items():
remote_path = os.path.join(self.path, str(du.id), os.path.basename(i.local_url))
- logging.debug("Put file: %s to %s"%(i.local_url, remote_path))
+ logger.debug("Put file: %s to %s"%(i.local_url, remote_path))
if i.local_url.startswith("ssh://"):
# check if remote path is directory
if self.__is_remote_directory(i.local_url):
- logging.warning("Path %s is a directory. Ignored."%i.local_url)
+ logger.warning("Path %s is a directory. Ignored."%i.local_url)
continue
#self.__third_party_transfer(i.local_url, remote_path)
else:
if stat.S_ISDIR(os.stat(i.local_url).st_mode):
- logging.warning("Path %s is a directory. Ignored."%i.local_url)
+ logger.warning("Path %s is a directory. Ignored."%i.local_url)
continue
result = urlparse.urlparse(i.local_url)
source_host = result.netloc
@@ -162,7 +162,7 @@ def remove_du(self, du):
def put_progress(self, transfered_bytes, total_bytes):
- logging.debug("Bytes transfered %d/%d"%(transfered_bytes, total_bytes))
+ logger.debug("Bytes transfered %d/%d"%(transfered_bytes, total_bytes))
@@ -212,7 +212,7 @@ def __remove_directory(self, path):
if self.__exists(path):
for filename in self.__sftp.listdir(path):
filepath = os.path.join(path, filename)
- logging.debug("Delete %s"%filepath)
+ logger.debug("Delete %s"%filepath)
if stat.S_ISDIR(self.__sftp.stat(filepath).st_mode):
[self.__remove_directory(filepath)]
else:
@@ -300,16 +300,16 @@ def __third_party_transfer_host(self, source_url, target_url):
sftp.put("%s", "%s")
"""%(target_host, source_path, target_path)
- logging.debug("Execute: \n%s"%python_script)
+ logger.debug("Execute: \n%s"%python_script)
source_client = paramiko.SSHClient()
source_client.load_system_host_keys()
source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
source_client.connect(source_host)
stdin, stdout, stderr = source_client.exec_command("python -c \'%s\'"%python_script)
stdin.close()
- logging.debug("************************************************")
- logging.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read())
- logging.debug("************************************************")
+ logger.debug("************************************************")
+ logger.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read())
+ logger.debug("************************************************")
def __third_party_transfer(self, source_url, target_path):
@@ -336,16 +336,16 @@ def __third_party_transfer(self, source_url, target_path):
"""%(source_host, source_path, target_path)
- logging.debug("Execute: \n%s"%python_script)
+ logger.debug("Execute: \n%s"%python_script)
source_client = paramiko.SSHClient()
source_client.load_system_host_keys()
source_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
source_client.connect(self.host)
stdin, stdout, stderr = source_client.exec_command("python -c \'%s\'"%python_script)
stdin.close()
- logging.debug("************************************************")
- logging.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read())
- logging.debug("************************************************")
+ logger.debug("************************************************")
+ logger.debug("Stdout: %s\nStderr:%s", stdout.read(), stderr.read())
+ logger.debug("************************************************")
def __exists(self, path):
"""Return True if the remote path exists
@@ -45,21 +45,20 @@ class PilotComputeService(PilotComputeService):
'coordination_url'
)
- def __init__(self, coordination_url=COORDINATION_URL, pjs_url=None):
+ def __init__(self, coordination_url=COORDINATION_URL, pcs_url=None):
""" Create a PilotJobService object.
Keyword arguments:
- pjs_id -- Don't create a new, but connect to an existing (optional)
+ pcs_id -- Don't create a new, but connect to an existing (optional)
"""
- self.__mjs = None
self.pilot_computes=[]
self.coordination_url=coordination_url
- if pjs_url==None: # new pjs
+ if pcs_url==None: # new pjs
self.id = self.PJS_ID_PREFIX+str(uuid.uuid1())
self.url = "pilotjob://localhost/"+self.id
else:
- logger.error("Reconnect to PJS currently not supported.")
+ logger.error("Reconnect to PCS currently not supported.")
def create_pilot(self, rm=None, pilot_compute_description=None, pj_type=None, context=None):
@@ -69,17 +68,10 @@ def create_pilot(self, rm=None, pilot_compute_description=None, pj_type=None, co
pilot_compute_description -- PilotJob Description
Return value:
- A PilotJob handle
+ A PilotCompute object
"""
-
- if self.__mjs == None:
- logging.debug("Create Dynamic BigJob Service")
- #self.__mjs = many_job_service([], self.coordination_url)
-
bj_dict = self.__translate_pj_bj_description(pilot_compute_description)
bj = self.__start_bigjob(bj_dict)
-
- #bigjob = self.__mjs.add_resource(resource_description)
pj = PilotCompute(self, bj, pilot_compute_description)
self.pilot_computes.append(pj)
return pj
@@ -144,7 +136,7 @@ def __repr__(self):
def __start_bigjob(self, bj_dict):
""" private method - starts a bigjob on the defined resource """
gram_url = bj_dict["resource_url"]
- logging.debug("start bigjob at: " + gram_url)
+ logger.debug("start bigjob at: " + gram_url)
bj = bigjob(self.coordination_url)
ppn="1"
if ("processes_per_node" in bj_dict):
@@ -184,13 +176,13 @@ def __init__(self, pilot_compute_service=None,
pilot_compute_description=None,
pilot_url=None):
if pilot_url==None:
- logging.debug("Create PilotCompute for BigJob: " + str(bigjob))
+ logger.debug("Create PilotCompute for BigJob: " + str(bigjob))
self.pilot_compute_description=pilot_compute_description
self.__pilot_compute_service=pilot_compute_service
self.__bigjob = bigjob
self.__subjobs = []
else:
- logging.debug("Reconnect to an existing Pilot Compute")
+ logger.debug("Reconnect to an existing Pilot Compute")
self.__bigjob = bigjob(pilot_url=pilot_url)
@@ -213,7 +205,7 @@ def get_free_nodes(self):
def submit_cu(self, compute_unit):
""" Submits compute unit to Bigjob """
- logging.debug("Submit CU to big-job")
+ logger.debug("Submit CU to big-job")
sj = subjob()
sj.submit_job(self.__bigjob.pilot_url, compute_unit.subjob_description)
self.__subjobs.append(sj)
@@ -3,8 +3,6 @@
import sys
import os
import logging
-logging.basicConfig(level=logging.DEBUG)
-
import uuid
import random
import threading
@@ -125,7 +123,7 @@ def create_du(self, du):
def put_du(self, du):
- logging.debug("Put PD: %s to PS: %s"%(du.id,self.service_url))
+ logger.debug("Put PD: %s to PS: %s"%(du.id,self.service_url))
self.__filemanager.create_du(du.id)
self.__filemanager.put_du(du)
self.data_units[du.id] = du
@@ -23,7 +23,7 @@ def set_pilot_jobs(self, pilot_jobs):
def schedule_pilot_data(self, data_unit_description=None):
- logging.debug("Schedule to PD - # Avail pilots: %d"%len(self.pilot_data))
+ logger.debug("Schedule to PD - # Avail pilots: %d"%len(self.pilot_data))
candidate_pilot_data = []
if data_unit_description.has_key("affinity_datacenter_label") and data_unit_description.has_key("affinity_machine_label"):
for i in self.pilot_data:
@@ -51,7 +51,7 @@ def schedule_pilot_job(self, compute_unit_description=None):
TODO: incorporate potential data movements to co-locate PD/WU
"""
- logging.debug("Schedule to PJ - # Avail PJs: %d"%len(self.pilot_jobs))
+ logger.debug("Schedule to PJ - # Avail PJs: %d"%len(self.pilot_jobs))
candidate_pilot_jobs = []
required_number_of_processes=1
if compute_unit_description.has_key("number_of_processes"):

0 comments on commit 32b8de4

Please sign in to comment.