Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Further work on Google Storage integration for PD

  • Loading branch information...
commit 8573856916c0cc22da48a72e7755893912e8113c 1 parent 69824f9
Andre Luckow drelu authored
4 .gitignore
View
@@ -9,4 +9,6 @@ build/*
cli/bj-*
examples/pilot-api/work/bj-*
dump.rdb
-*.log
+*.log
+examples/pilot-api/gce.dat
+gce.dat
2  VERSION
View
@@ -1 +1 @@
-0.4.89
+0.4.90
4 bigjob.conf
View
@@ -5,5 +5,5 @@ saga=saga
# Logging config
# logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL
# logging.level=logging.DEBUG
-logging.level=logging.FATAL
-#logging.level=logging.DEBUG
+#logging.level=logging.FATAL
+logging.level=logging.DEBUG
42 bigjob/bigjob_agent.py
View
@@ -17,6 +17,7 @@
import logging
import shutil
from string import Template
+
logging.basicConfig(level=logging.DEBUG)
try:
@@ -27,8 +28,10 @@
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../ext/threadpool-1.2.7/src/")
logging.debug(str(sys.path))
from threadpool import *
-from bigjob import logger
+# BigJob/Pilot framework classes
+from bigjob import logger
+from pilot.impl.pilotdata_manager import DataUnit, PilotData
logger.debug("Python Version: " + str(sys.version_info))
if sys.version_info < (2, 5):
@@ -341,13 +344,14 @@ def execute_job(self, job_url, job_dict):
if not os.path.isabs(error):
error=os.path.join(workingdirectory, error)
+
# append job to job list
self.jobs.append(job_url)
# File Stage-In of dependent data units
- if job_dict.has_key("input_data"):
- self.__stage_in_data_units(job_dict["input_data"])
+ if job_dict.has_key("InputData"):
+ self.__stage_in_data_units(eval(job_dict["InputData"]), workingdirectory)
# File Stage-In - Move pilot-level files to working directory of sub-job
if self.pilot_description!=None:
@@ -445,14 +449,9 @@ def execute_job(self, job_url, job_dict):
self.coordination.set_job_state(job_url, str(bigjob.state.Running))
except:
traceback.print_exc(file=sys.stderr)
+
- def __stage_in_data_units(self, input_data=[]):
- """ stage in data units specified in input_data field """
- for i in input_data:
- pass
-
-
-
+
def allocate_nodes(self, job_dict):
""" allocate nodes
allocated nodes will be written to machinefile advert-launcher-machines-<jobid>
@@ -711,6 +710,20 @@ def stop_background_thread(self):
self.stop=True
+ #############################################################################
+ # Private methods
+
+ def __stage_in_data_units(self, input_data=[], target_directory="."):
+ """ stage in data units specified in input_data field """
+ logger.debug("Stage in input files")
+ for i in input_data:
+ pd_url = self.__get_pd_url(i)
+ du_id = self.__get_du_id(i)
+ pd = PilotData(pd_url=pd_url)
+ du = pd.get_du(du_id)
+ du.export(target_directory)
+
+
def __expand_directory(self, directory):
""" expands directory name $HOME or ~ to the working directory
on the respective machine
@@ -727,8 +740,15 @@ def __expand_directory(self, directory):
pass
return directory
+
-
+ def __get_pd_url(self, du_url):
+ url = du_url[:du_url.index(":du-")]
+ return url
+
+ def __get_du_id(self, du_url):
+ du_id = du_url[du_url.index("du-"):]
+ return du_id
def __get_launch_method(self, requested_method):
""" returns desired execution method: ssh, aprun """
52 coordination/bigjob_coordination_redis.py
View
@@ -11,7 +11,7 @@
import time
from bigjob import logger
-from redis import *
+import redis
if sys.version_info < (2, 5):
sys.path.append(os.path.dirname( os.path.abspath( __file__) ) + "/../ext/uuid-1.30/")
@@ -71,14 +71,14 @@ def __init__(self, server=REDIS_SERVER, server_port=REDIS_SERVER_PORT, server_co
logger.debug("Connect to Redis: " + server + " Port: " + str(server_port))
if self.password==None:
- self.redis = Redis(host=server, port=server_port, db=0)
+ self.redis_client = redis.Redis(host=server, port=server_port, db=0)
else:
- self.redis = Redis(host=server, port=server_port, password=self.password, db=0)
- #self.redis_pubsub = self.redis.pubsub() # redis pubsub client
+ self.redis_client = redis.Redis(host=server, port=server_port, password=self.password, db=0)
+ #self.redis_client_pubsub = self.redis_client.pubsub() # redis pubsub client
#self.resource_lock = threading.RLock()
- self.pipe = self.redis.pipeline()
+ self.pipe = self.redis_client.pipeline()
try:
- self.redis.ping()
+ self.redis_client.ping()
except:
logger.error("Please start Redis server!")
raise Exception("Please start Redis server!")
@@ -92,13 +92,13 @@ def get_address(self):
def set_pilot_state(self, pilot_url, new_state, stopped=False):
logger.debug("update state of pilot job to: " + str(new_state)
+ " stopped: " + str(stopped))
- self.redis.hmset(pilot_url, {"state":str(new_state), "stopped":str(stopped)})
+ self.redis_client.hmset(pilot_url, {"state":str(new_state), "stopped":str(stopped)})
if stopped==True:
self.queue_job(pilot_url, "STOP")
def get_pilot_state(self, pilot_url):
- state = self.redis.hgetall(pilot_url)
+ state = self.redis_client.hgetall(pilot_url)
return state
@@ -106,27 +106,27 @@ def get_pilot_state(self, pilot_url):
# Pilot-Job State
def set_pilot_description(self, pilot_url, description):
logger.debug("update description of pilot job to: " + str(description))
- self.redis.hmset(pilot_url + ":description", {"description":description})
+ self.redis_client.hmset(pilot_url + ":description", {"description":description})
def get_pilot_description(self, pilot_url):
- description = self.redis.hgetall(pilot_url + ":description")
+ description = self.redis_client.hgetall(pilot_url + ":description")
return description
#def is_pilot_stopped(self,pilot_url):
- # state = self.redis.hgetall(pilot_url)
+ # state = self.redis_client.hgetall(pilot_url)
# if state==None or not state.has_key("stopped"):
# return True
# return state["stopped"]
def get_jobs_of_pilot(self, pilot_url):
""" returns array of job_url that are associated with a pilot """
- jobs = self.redis.keys(pilot_url+":jobs:*")
+ jobs = self.redis_client.keys(pilot_url+":jobs:*")
jobs_fqdn = [os.path.join(self.get_address(), i)for i in jobs]
return jobs_fqdn
def delete_pilot(self, pilot_url):
- items = self.redis.keys(pilot_url+"*")
+ items = self.redis_client.keys(pilot_url+"*")
for i in items:
self.pipe.delete(i)
self.pipe.execute()
@@ -136,33 +136,33 @@ def delete_pilot(self, pilot_url):
def set_job_state(self, job_url, new_state):
#self.resource_lock.acquire()
logger.debug("set job state to: " + str(new_state))
- self.redis.hset(job_url, "state", str(new_state))
+ self.redis_client.hset(job_url, "state", str(new_state))
if new_state=="Unknown":
- self.redis.hset(job_url,"start_time", str(time.time()))
+ self.redis_client.hset(job_url,"start_time", str(time.time()))
elif new_state=="Running":
- self.redis.hset(job_url,"end_queue_time", str(time.time()))
+ self.redis_client.hset(job_url,"end_queue_time", str(time.time()))
elif new_state=="Done":
- self.redis.hset(job_url, "end_time", str(time.time()))
+ self.redis_client.hset(job_url, "end_time", str(time.time()))
#self.resource_lock.release()
def get_job_state(self, job_url):
- return self.redis.hget(job_url, "state")
+ return self.redis_client.hget(job_url, "state")
#####################################################################################
# Sub-Job Description
def set_job(self, job_url, job_dict):
- self.redis.hmset(job_url, job_dict)
+ self.redis_client.hmset(job_url, job_dict)
self.set_job_state(job_url, "Unknown")
def get_job(self, job_url):
- return self.redis.hgetall(job_url)
+ return self.redis_client.hgetall(job_url)
def delete_job(self, job_url):
- self.redis.delete(job_url+"*")
+ self.redis_client.delete(job_url+"*")
#####################################################################################
@@ -170,17 +170,17 @@ def delete_job(self, job_url):
def queue_job(self, pilot_url, job_url):
""" queue new job to pilot """
queue_name = pilot_url + ":queue"
- self.redis.set(queue_name + ':last_in', pickle.dumps(datetime.datetime.now()))
- self.redis.lpush(queue_name, job_url)
+ self.redis_client.set(queue_name + ':last_in', pickle.dumps(datetime.datetime.now()))
+ self.redis_client.lpush(queue_name, job_url)
def dequeue_job(self, pilot_url):
""" deque to new job of a certain pilot """
queue_name = pilot_url + ":queue"
logger.debug("Dequeue sub-job from: " + queue_name
- + " number queued items: " + str(self.redis.llen(queue_name)))
- self.redis.set(queue_name + ':last_out', pickle.dumps(datetime.datetime.now()))
- job_url = self.redis.brpop(queue_name, 1)
+ + " number queued items: " + str(self.redis_client.llen(queue_name)))
+ self.redis_client.set(queue_name + ':last_out', pickle.dumps(datetime.datetime.now()))
+ job_url = self.redis_client.brpop(queue_name, 1)
if job_url==None:
return job_url
logger.debug("Dequeued: " + str(job_url))
4 examples/example_local_single.py
View
@@ -19,9 +19,9 @@
tcp://* (ZMQ - listening to all interfaces)
"""
-COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
+#COORDINATION_URL = "advert://localhost/?dbtype=sqlite3"
#COORDINATION_URL = "tcp://*"
-#COORDINATION_URL = "redis://localhost:6379"
+COORDINATION_URL = "redis://localhost:6379"
# for running BJ from local dir
sys.path.insert(0, os.getcwd() + "/../")
82 examples/pilot-api/example-pilot-compute-data-gce.py
View
@@ -0,0 +1,82 @@
+import sys
+import os
+import time
+import logging
+import uuid
+#logging.basicConfig(level=logging.DEBUG)
+
+sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
+from pilot import PilotComputeService, PilotDataService, ComputeDataService, State
+
+
+COORDINATION_URL = "redis://localhost:6379"
+
+if __name__ == "__main__":
+
+
+ # create pilot data service (factory for data pilots (physical, distributed storage))
+ # and pilot data
+ pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
+ pilot_data_description={
+ "service_url": "gs://pilot-data-" + str(uuid.uuid1()),
+ "size": 100,
+ "affinity_datacenter_label": "us-google",
+ "affinity_machine_label": ""
+ }
+ pd = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)
+
+
+ # Create Data Unit Description
+ #base_dir = "../data1"
+ #url_list = os.listdir(base_dir)
+ # make absolute paths
+ #absolute_url_list = [os.path.join(base_dir, i) for i in url_list]
+ data_unit_description = {
+ "file_urls": [os.path.join(os.getcwd(), "test.txt")],
+ "affinity_datacenter_label": "us-google",
+ "affinity_machine_label": ""
+ }
+
+ # submit pilot data to a pilot store
+ data_unit = pd.submit_data_unit(data_unit_description)
+ data_unit.wait()
+ print("Data Unit URL: " + data_unit.get_url())
+
+ pilot_compute_service = PilotComputeService(coordination_url=COORDINATION_URL)
+
+ # create pilot job service and initiate a pilot job
+ pilot_compute_description = {
+ #"service_url": 'gce+ssh://api.google.com',
+ "service_url": 'fork://localhost',
+ "number_of_processes": 1,
+ 'affinity_datacenter_label': "us-google",
+ 'affinity_machine_label': ""
+ }
+
+ pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
+
+ compute_data_service = ComputeDataService()
+ compute_data_service.add_pilot_compute_service(pilot_compute_service)
+ compute_data_service.add_pilot_data_service(pilot_data_service)
+
+
+ # start work unit
+ compute_unit_description = {
+ "executable": "/bin/cat",
+ "arguments": ["test.txt"],
+ "number_of_processes": 1,
+ "output": "stdout.txt",
+ "error": "stderr.txt",
+ "input_data": [data_unit.get_url()],
+ "output_data": []
+ }
+
+ compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
+ logging.debug("Finished setup of ComputeDataService. Waiting for scheduling of PD")
+ compute_data_service.wait()
+
+
+ logging.debug("Terminate Pilot Compute/Data Service")
+ compute_data_service.cancel()
+ pilot_data_service.cancel()
+ pilot_compute_service.cancel()
46 pilot/coordination/redis_adaptor.py
View
@@ -1,8 +1,6 @@
import logging
import json
import pdb
-from redis import *
-
from pilot import *
from bigjob import logger
import bliss.saga as saga
@@ -65,13 +63,15 @@ def add_pd(cls, pds_url, pd):
@classmethod
def update_pd(cls, pd):
+ du_urls=None
if len(pd.data_units) > 0:
du_urls = [i.url for i in pd.data_units.values()]
pd_dict={
"data_units": du_urls,
"pilot_data": pd.to_dict(),
- "pilot_data_description": pd.pilot_data_description
+ "pilot_data_description": pd.pilot_data_description,
+ "security_context": pd.security_context
}
cls.__store_entry(pd.url+RedisCoordinationAdaptor.SEPARATOR + "info", pd_dict)
@@ -89,15 +89,7 @@ def list_pd(cls, pds_url):
""" return a list of urls to pd managed by the PDS """
pds_url = cls.__get_url(pds_url)
logger.debug("List PD at %s"%pds_url)
- #pds_dir = saga.advert.directory(pds_url, saga.advert.Create |
- # saga.advert.CreateParents |
- # saga.advert.ReadWrite)
- #pd_list = pds_dir.list()
- #pd_full_urls = []
- #for i in pd_list:
- # pd_full_urls.append(pds_url + "/" + i)
- #return pd_full_urls
@classmethod
def delete_pd(cls, pds_url):
@@ -170,7 +162,7 @@ def get_du(cls, du_url):
@classmethod
def update_du(cls, du):
- logger.debug("**** Update pilot data at: " + du.url)
+ logger.debug("**** Update data unit at: " + du.url)
du_dict_list = [i.to_dict() for i in du.data_unit_items]
du_urls = [i.url for i in du.pilot_data]
du_dict = {
@@ -183,19 +175,12 @@ def update_du(cls, du):
@classmethod
- def list_du(cls, dus_url):
+ def list_du(cls, pd_url):
""" return a list of urls to du managed by the PDS """
- dus_url = cls.__get_url(dus_url)
- logger.debug("List PDS at %s"%dus_url)
- #dus_dir = saga.advert.directory(dus_url, saga.advert.Create |
- # saga.advert.CreateParents |
- # saga.advert.ReadWrite)
-
- #du_list = dus_dir.list()
- #du_full_urls = []
- #for i in du_list:
- # du_full_urls.append(dus_url + "/" + i)
- return du_full_urls
+ pd_url = cls.__get_url(pd_url)
+ logger.debug("List PDS at %s"%pd_url)
+ dus = cls.__list_keys(pd_url+":du-*")
+ return dus
@classmethod
@@ -229,6 +214,7 @@ def get_cds_url(cls, application_url, cds_id):
# internal Redis-related methods
@classmethod
def __get_redis_api_client(cls):
+ import redis
''' Initialize Redis API Client '''
server_port=6379
saga_url = saga.Url(RedisCoordinationAdaptor.BASE_URL)
@@ -236,9 +222,9 @@ def __get_redis_api_client(cls):
server = saga_url.host
if username==None or username=="":
- redis_client = Redis(host=server, port=server_port, db=0)
+ redis_client = redis.Redis(host=server, port=server_port, db=0)
else:
- redis_client = Redis(host=server, port=server_port, password=username, db=0)
+ redis_client = redis.Redis(host=server, port=server_port, password=username, db=0)
try:
redis_client.ping()
@@ -252,6 +238,14 @@ def __get_redis_api_client(cls):
def __get_url(cls, url):
return url
+
+ @classmethod
+ def __list_keys(cls, search_url):
+ redis_client = cls.__get_redis_api_client()
+ keys = redis_client.keys(search_url)
+ keys_normalized = [i[:i.index(":info")] for i in keys]
+ return keys_normalized
+
@classmethod
def __store_entry(cls, entry_url, content):
6 pilot/filemanagement/bliss_adaptor.py
View
@@ -31,6 +31,12 @@ def __init__(self, service_url):
self.__state=State.New
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return None
+
+
def initialize_pilotdata(self):
# check whether directory exists
8 pilot/filemanagement/globusonline_adaptor.py
View
@@ -45,7 +45,13 @@ def __init__(self, service_url):
# initialize ssh client
self.__state=State.New
-
+
+
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return None
+
def initialize_pilotdata(self):
# check whether directory exists
57 pilot/filemanagement/gs_adaptor.py
View
@@ -18,6 +18,7 @@
from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.client import Credentials
from oauth2client.tools import run
import httplib2
import urllib
@@ -42,16 +43,21 @@ class GSFileAdaptor(object):
- def __init__(self, service_url):
+ def __init__(self, service_url, security_context=None):
# Initializations of instance variables
self.service_url = service_url
self.bucket_name = self.__get_bucket_name(service_url)
self.__state=State.New
# Do OAUTH authentication
- storage = Storage('gce.dat')
- self.credentials = storage.get()
+ if security_context!=None:
+ logger.debug("Attempt to restore credentials from security context: " + str(security_context))
+ self.credentials = Credentials.new_from_json(security_context)
+ else:
+ storage = Storage('gce.dat')
+ self.credentials = storage.get()
if self.credentials is None or self.credentials.invalid == True:
+ logger.debug("No valid credential found. Run new OAuth authentication round...")
flow = OAuth2WebServerFlow(
client_id=OAUTH2_CLIENT_ID,
client_secret=OAUTH2_CLIENT_SECRET,
@@ -60,7 +66,13 @@ def __init__(self, service_url):
user_agent='bigjob-client/1.0')
self.credentials = run(flow, storage)
-
+
+
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return self.credentials.to_json()
+
def initialize_pilotdata(self):
# check whether directory exists
@@ -74,6 +86,7 @@ def initialize_pilotdata(self):
gs = self.__get_api_client()[0]
gs.buckets().insert(body=request_dict).execute()
except:
+ logger.debug("Error creating bucket: " + self.bucket_name)
pass # Do nothing if bucket already exists
@@ -92,10 +105,10 @@ def get_state(self):
def create_du(self, du_id):
gs = self.__get_api_client()[0]
- o = gs.objects().insert(bucket=self.bucket_name, name=str(du_id)+"/",
+ o = gs.objects().insert(bucket=self.bucket_name, name=str(du_id)+"/du_info",
body={'media': {
- "contentType":"text/ascii",
- "data":""
+ "contentType":"text/ascii",
+ "data": du_id
}
}
).execute()
@@ -106,17 +119,13 @@ def put_du(self, du):
logger.debug("Copy DU to Google Storage")
for i in du.list().keys():
remote_path = os.path.join(str(du.id), i)
- self.__put_file(i, remote_path)
+ self._put_file(i, remote_path)
def copy_du(self, du, pd_new):
bucket_name = self.__get_bucket_name(pd_new.service_url)
gs = self.__get_api_client()[0]
- bucket = gs.buckets().get(bucket=bucket_name)
-
-
- #bucket.insert(media_body)
remote_url = pd_new.service_url + "/" + str(du.id)
@@ -125,17 +134,18 @@ def copy_du(self, du, pd_new):
def get_du(self, du, target_url):
- #du_id=du.id
- du_id="andre"
+ du_id=du.id
+ logger.debug("Get DU: " + str(du_id))
gs = self.__get_api_client()[0]
- result = gs.objects().list(bucket=self.bucket_name,
- delimiter="/",
- prefix=[du_id]).execute()
+ result = gs.objects().list(bucket=self.bucket_name, prefix=du_id).execute()
+ #delimiter="/",
+ #prefix=[du_id]).execute()
logger.debug("Result: " + str(result))
+ for i in result["items"]:
+ full_filename = i["name"]
+ self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename)))
-
-
-
+
def remove_du(self, du):
self.__remove_directory(os.path.join(self.bucket_name, du.id))
@@ -174,7 +184,8 @@ def create_remote_directory(self, target_url):
def get_path(self, target_url):
result = urlparse.urlparse(target_url)
- target_query = result.path
+ target_path = result.path
+ return target_path
###########################################################################
@@ -198,8 +209,8 @@ def __get_api_client(self):
def __get_bucket_name(self, service_url):
- result = urlparse.urlparse(service_url)
- bucket_name = result.path.replace("/", "")
+ bucket_name = service_url.replace("gs://", "")
+ bucket_name = bucket_name.replace("/", "")
return bucket_name
7 pilot/filemanagement/s3_adaptor.py
View
@@ -44,7 +44,12 @@ def __init__(self, service_url):
self.__state=State.New
self.s3_conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
-
+
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return None
+
def initialize_pilotdata(self):
# check whether directory exists
6 pilot/filemanagement/ssh_adaptor.py
View
@@ -38,6 +38,12 @@ def __init__(self, service_url):
self.__sftp = self.__client.open_sftp()
self.__state=State.New
+
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return None
+
def initialize_pilotdata(self):
# check whether directory exists
5 pilot/filemanagement/webhdfs_adaptor.py
View
@@ -33,6 +33,11 @@ def __init__(self, service_url):
self.HDFS_SERVICE_PORT,
self.HDFS_USER_NAME)
+ def get_security_context(self):
+ """ Returns security context that needs to be available on the distributed
+ node in order to access this Pilot Data """
+ return None
+
def initialize_pilotstore(self):
self.__webhdfs.mkdir(self.path)
65 pilot/impl/pilotdata_manager.py
View
@@ -35,7 +35,6 @@
#from pilot.coordination.advert import AdvertCoordinationAdaptor as CoordinationAdaptor
#from pilot.coordination.nocoord import NoCoordinationAdaptor as CoordinationAdaptor
from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor
-
from bliss.saga import Url as SAGAUrl
@@ -72,12 +71,14 @@ def __init__(self, pilot_data_service=None, pilot_data_description=None, pd_url=
In the future more SAGA/Bliss URL schemes/adaptors are supported.
"""
self.id = None
- self.url = None
+ self.url = pd_url
self.pilot_data_description = None
+ self.pilot_data_service = pilot_data_service
self.service_url=None
self.size = None
self.data_unit_description = None
self.data_units={}
+ self.security_context = None
if pd_url==None and pilot_data_service!=None: # new pd
self.id = self.PD_ID_PREFIX+str(uuid.uuid1())
@@ -86,16 +87,19 @@ def __init__(self, pilot_data_service=None, pilot_data_description=None, pd_url=
elif pd_url != None:
logger.warn("Reconnect to PilotData: %s"%pd_url)
dictionary = CoordinationAdaptor.get_pd(pd_url)
+ if dictionary.has_key("security_context"):
+ self.security_context=dictionary["security_context"]
pd_dict = eval(dictionary["pilot_data"])
for i in pd_dict:
self.__setattr__(i, pd_dict[i])
- du_dict = eval(dictionary["data_units"])
- for i in du_dict:
- du_id = DataUnit._get_du_id(i)
- self.data_units[du_id] = None # TODO Restore DataUnit
+ # A Pilot Data does not hold a direct reference to a Data Unit (only URL refs are stored)
+ #du_dict = eval(dictionary["data_units"])
+ #for i in du_dict:
+ # du_id = DataUnit._get_du_id(i)
+ # self.data_units[du_id] = None # TODO Restore DataUnit
self.__initialize_pilot_data()
-
+ CoordinationAdaptor.update_pd(self)
def cancel(self):
@@ -105,14 +109,12 @@ def cancel(self):
def url_for_du(self, du):
- if self.data_units.has_key(du.id):
- return self.service_url + "/" + str(du.id)
- return None
-
+ return self.service_url + "/" + str(du.id)
+
def submit_data_unit(self, data_unit_description):
""" creates a data unit object and initially imports data specified in data_unit_description """
- du = DataUnit(pilot_data_service=self,
+ du = DataUnit(pilot_data=self,
data_unit_description=data_unit_description)
self.data_units[du.id]=du
du.add_pilot_data(self)
@@ -121,7 +123,8 @@ def submit_data_unit(self, data_unit_description):
def list_data_units(self):
""" List all data units of PD """
- return self.data_units.values()
+ du_urls = CoordinationAdaptor.list_du(self.url)
+ return du_urls
def get_state(self):
@@ -129,6 +132,13 @@ def get_state(self):
return self.__filemanager.get_state()
+ def get_du(self, du_id):
+ """ Returns Data Unit if part of Pilot Data """
+ du_url = self.url + ":" + du_id
+ du = DataUnit(du_url=du_url)
+ return du
+
+
def wait(self):
""" Wait until PD enters a final state (Done, Canceled or Failed)."""
while 1:
@@ -152,10 +162,12 @@ def wait(self):
def export_du(self, du, target_url):
+ """ Export Data Unit to a local directory """
self.__filemanager.get_du(du, target_url)
def create_du(self, du):
+ """ Create a new Data Unit within Pilot """
self.__filemanager.create_du(du.id)
@@ -207,11 +219,14 @@ def __initialize_pilot_data(self):
self.__filemanager = GSFileAdaptor(self.service_url)
elif self.service_url.startswith("gs:"):
logger.debug("Use Google Cloud Storage backend")
- self.__filemanager = GSFileAdaptor(self.service_url)
+ self.__filemanager = GSFileAdaptor(self.service_url, self.security_context)
self.__filemanager.initialize_pilotdata()
self.__filemanager.get_pilotdata_size()
+ # Update security context
+ self.security_context = self.__filemanager.get_security_context()
+
def __get_pd_id(self, pd_url):
start = pd_url.index(self.PD_ID_PREFIX)
@@ -394,7 +409,7 @@ class DataUnit(DataUnit):
DU_ID_PREFIX="du-"
- def __init__(self, pilot_data_service=None, data_unit_description=None, du_url=None):
+ def __init__(self, pilot_data=None, data_unit_description=None, du_url=None):
"""
1.) create a new Pilot Data: pilot_data_service and data_unit_description required
2.) reconnect to an existing Pilot Data: du_url required
@@ -404,7 +419,7 @@ def __init__(self, pilot_data_service=None, data_unit_description=None, du_url=N
self.id = self.DU_ID_PREFIX + str(uuid.uuid1())
self.data_unit_description = data_unit_description
self.pilot_data=[]
- self.url = CoordinationAdaptor.add_du(pilot_data_service.url, self)
+ self.url = CoordinationAdaptor.add_du(pilot_data.url, self)
self.state = State.New
self.data_unit_items = DataUnitItem.create_data_unit_list(self, self.data_unit_description["file_urls"])
CoordinationAdaptor.update_du(self)
@@ -441,7 +456,6 @@ def list(self):
}
"""
base_urls = [i.url_for_du(self) for i in self.get_pilot_data()]
-
result_dict = {}
for i in self.data_unit_items:
result_dict[i.filename]=[os.path.join(j, i.filename) for j in base_urls]
@@ -640,7 +654,22 @@ def to_dict(self):
du_dict["id"]=self.id
return du_dict
+def __get_pd_url(du_url):
+ url = du_url[:du_url.index(":du-")]
+ return url
+
+def __get_du_id(du_url):
+ du_id = du_url[du_url.index("du-"):]
+ return du_id
+
if __name__ == "__main__":
- du = DataUnit(du_url="redis://localhost/bigdata:pds-32d63b2e-df05-11e1-a329-705681b3df0f:pd-37674138-df05-11e1-80d0-705681b3df0f:du-3b8d428c-df05-11e1-af2a-705681b3df0f")
+ du_url = "redis://localhost/bigdata:pds-f31a670c-e3f6-11e1-afaf-705681b3df0f:pd-f31c47b8-e3f6-11e1-af44-705681b3df0f:du-f4debce8-e3f6-11e1-8399-705681b3df0f"
+ pd_url = __get_pd_url(du_url)
+ du_id = __get_du_id(du_url)
+ pd = PilotData(pd_url=pd_url)
+ print str(pd.list_data_units())
+ du = pd.get_du(du_id)
+
+ #du = DataUnit(du_url="redis://localhost/bigdata:pds-32d63b2e-df05-11e1-a329-705681b3df0f:pd-37674138-df05-11e1-80d0-705681b3df0f:du-3b8d428c-df05-11e1-af2a-705681b3df0f")
logger.debug(str(du.list()))
Please sign in to comment.
Something went wrong with that request. Please try again.