In [1]:
import numpy as np
import copy

In [2]:
class Process:
    
    def set_name(self,name):
        self.name_ = name
    def name(self):
        return self.name_
        
    def set_memory_MB(self,memory_MB):
        self.memory_MB_ = memory_MB
    def memory_MB(self):
        return self.memory_MB_
        
    def set_time_s(self,time_s):
        self.time_s_ = time_s
    def time_s(self):
        return self.time_s_
        
    def set_input_size_MB(self,input_MB):
        self.input_size_MB_ = input_MB
    def input_size_MB(self):
        return self.input_size_MB_
                
    def set_output_size_MB(self,output_MB):
        self.output_size_MB_ = output_MB
    def output_size_MB(self):
        return self.output_size_MB_
    
    def set_threads(self,min_threads,max_threads=None):
        self.min_threads_ = min_threads
        if max_threads:
            self.max_threads_ = max_threads
        else:
            self.max_threads_ = min_threads
    def min_threads(self):
        return self.min_threads_
    def max_threads(self):
        return self.max_threads_   
    def threads(self):
        return (self.min_threads(),self.max_threads())
    
    def set_pass_fraction(self,pass_fraction):
        self.pass_fraction_ = pass_fraction
    def pass_fraction(self):
        return self.pass_fraction_
        
    def set_failure_rate(self,failure_rate):
        self.failure_rate_ = failure_rate
    def failure_rate(self):
        return self.failure_rate_
    def success_rate(self):
        return (1.-self.failure_rate())
    
    def copy(self):
        return copy.copy(self)
    
    def __init__(self,
                 memory_MB=0,time_s=0,
                 input_MB=0,output_MB=0,
                 name="",
                 min_threads=1,max_threads=None,
                 pass_fraction=1.0,
                 failure_rate=0.0):

        self.set_name(name)
        
        self.set_memory_MB(memory_MB)
        self.set_time_s(time_s)
        
        self.set_input_size_MB(input_MB)
        self.set_output_size_MB(output_MB)
        
        self.set_threads(min_threads,max_threads)

        self.set_pass_fraction(pass_fraction)
        self.set_failure_rate(failure_rate)

    def __str__(self):
        s = "Process '%s':"%self.name()
        s = s+"\n\tMemory (MB): %f"%self.memory_MB()
        s = s+"\n\tTime per event (s): %f"%self.time_s()
        s = s+"\n\tInput/Output size per event (MB): %f / %f"%(self.input_size_MB(),self.output_size_MB())
        s = s+"\n\tThreads used (min,max): (%d,%d)"%(self.min_threads(),self.max_threads())
        s = s+"\n\tEvent pass fraction: %f"%self.pass_fraction()
        s = s+"\n\tFailure rate: %f"%self.failure_rate()
        return s
        
def ChainProcesses(processes,name=""):
    
    if len(processes)==0:
        print("Process list length 0. Return empty Process.")
        return Process()
    
    max_memory = np.max([p.memory_MB() for p in processes])

    time_s = processes[0].time_s()
    for i in range(1,len(processes)):
        time_s += np.prod([p.pass_fraction() for p in processes[0:i]])*processes[i].time_s()
    
    input_size_MB = processes[0].input_size_MB()
    max_input_size = np.max([p.input_size_MB() for p in processes])
    if input_size_MB < max_input_size:
        print("WARNING! First process input size (%f) is less than max input size (%f).\nSetting max to %f MB"%
              (input_size_MB,max_input_size,max_input_size))
        input_size_MB=max_input_size
            
    min_threads = np.max([p.min_threads() for p in processes])
    max_threads = np.max([p.max_threads() for p in processes])
    
    pass_fraction = np.prod([p.pass_fraction() for p in processes])
    failure_rate = 1. - np.prod([(p.success_rate()) for p in processes])
    
    output_size_MB = processes[-1].output_size_MB()*pass_fraction

    return Process(memory_MB=max_memory,time_s=time_s,
                   name=name,
                   input_MB=input_size_MB,output_MB=output_size_MB,
                   min_threads=min_threads,max_threads=max_threads,
                   pass_fraction=pass_fraction,failure_rate=failure_rate)

In [3]:
class DataIOInterface:
    
    def set_name(self,name):
        self.name_ = name
    def name(self):
        return self.name_

    def set_rate_MBps(self,rate_MBps):
        self.rate_MBps_ = rate_MBps
    def rate_MBps(self):
        return self.rate_MBps_
    
    def copy(self):
        return copy.copy(self)

    def __init__(self,
                 name="",
                 rate_MBps=np.inf):
        
        self.set_name(name)
        self.set_rate_MBps(rate_MBps)
        
    def __str__(self):
        s = "DataIOInterfaace '%s'"%self.name()
        s = s+"\tRate (MBps): %f"%self.rate_MBps()
        

In [4]:
class Job:
    
    def set_process(self,process):
        if type(process) is list:
            self.process_ = ChainProcesses(process)
        else:
            self.process_=process
    def process(self):
        return self.process_
        
    def set_n_events(self,n_events):
        self.n_events_ = n_events
    def n_events(self):
        return self.n_events_

    def memory_MB(self):
        return self.process().memory_MB()
    def cpu_time_s(self):
        return self.process().time_s()*self.n_events()
    def input_size_MB(self):
        return self.process().input_size_MB()*self.n_events()
    def output_size_MB(self):
        return self.process().output_size_MB()*self.n_events()
    def pass_fraction(self):
        return self.process().pass_fraction()
    def failure_rate(self):
        return (1.-np.power(self.process().success_rate(),self.n_events()))
    def success_rate(self):
        return np.power(self.process().success_rate(),self.n_events())        

    def copy(self):
        return copy.copy(self)

    def __init__(self,
                 process,
                 n_events=100):
        
        self.set_process(process)
        self.set_n_events(n_events)
        
    def __str__(self):
        s = "Job (Process '%s'):"%self.process().name()
        s = s+"\nN_Events: %d"%self.n_events()
        s = s+"\nMemory (MB): %f"%(self.memory_MB())
        s = s+"\nCPU Time (hr): %f"%(self.cpu_time_s()/3600.)
        s = s+"\nEvent pass fraction: %f"%self.pass_fraction()
        s = s+"\nInput/Output size (GB): %f / %f"%(self.input_size_MB()/1000.,self.output_size_MB()/1000.)
        s = s+"\nFailure rate: %f"%self.failure_rate()
        return s

In [5]:
class JobInstance:
    
    def set_job(self,job):
        self.job_ = job
    def job(self):
        return self.job_
        
    def set_input_interface(self,input_interface):
        self.input_interface_=input_interface
    def input_interface(self):
        return self.input_interface_
    
    def set_output_interface(self,output_interface):
        self.output_interface_=output_interface
    def output_interface(self):
        return self.output_interface_
    
    def input_time_s(self):
        return self.job().input_size_MB() / self.input_interface().rate_MBps()
    def output_time_s(self):
        return self.job().output_size_MB() / self.output_interface().rate_MBps()
    def io_time_s(self):
        return self.input_time_s() + self.output_time_s()
    
    def process(self):
        return self.job().process()        
    def n_events(self):
        return self.job().n_events()
    def memory_MB(self):
        return self.job().process().memory_MB()
    def cpu_time_s(self):
        return self.job().process().time_s()*self.job().n_events()
    def input_size_MB(self):
        return self.job().process().input_size_MB()*self.job().n_events()
    def output_size_MB(self):
        return self.job().process().output_size_MB()*self.job().n_events()
    def pass_fraction(self):
        return self.job().process().pass_fraction()
    def failure_rate(self):
        return (1.-np.power(self.job().process().success_rate(),self.job().n_events()))
    def success_rate(self):
        return np.power(self.job().process().success_rate(),self.job().n_events())
    
    def total_time_s(self):
        return self.io_time_s()+self.cpu_time_s()
    
    def copy(self):
        return copy.copy(self)

    def __init__(self,
                 job,
                 input_interface,
                 output_interface):
        
        self.set_job(job)
        self.set_input_interface(input_interface)
        self.set_output_interface(output_interface)

    def __str__(self):
        s = "Job Instance (Process '%s'):"%self.process().name()
        s = s+"\nInput interface: '%s' (Rate = %f MBps)"%(self.input_interface().name(),self.input_interface().rate_MBps())
        s = s+"\nOutput interface: '%s' (Rate = %f MBps)"%(self.output_interface().name(),self.output_interface().rate_MBps())
        s = s+"\nN_Events: %d"%self.n_events()
        s = s+"\nMemory (MB): %f"%(self.memory_MB())
        s = s+"\nTotal Time (hr): %f"%(self.total_time_s()/3600.)
        s = s+"\n\tInput Time (hr): %f"%(self.input_time_s()/3600.)
        s = s+"\n\tCPU Time (hr): %f"%(self.cpu_time_s()/3600.)
        s = s+"\n\tOutput Time (hr): %f"%(self.output_time_s()/3600.)
        s = s+"\nEvent pass fraction: %f"%self.pass_fraction()
        s = s+"\nInput/Output size (GB): %f / %f"%(self.input_size_MB()/1000.,self.output_size_MB()/1000.)
        s = s+"\nFailure rate: %f"%self.failure_rate()
        return s

In [6]:
class Sample:
    
    def set_name(self,name):
        self.name_ = name
    def name(self):
        return self.name_

    def set_job(self,job):
        self.job_ = job
    def job(self):
        return self.job_
        
    def set_events_and_jobs(self,n_events,n_jobs):

        if n_events is None and n_jobs is None:
            print("Must specify either n_jobs or n_events.")
            return

        #Both events and jobs specified
        #Update events per job
        elif n_events is not None and n_jobs is not None:
        
            self.n_events_ = n_events
            self.n_events_per_job_ = n_events/n_jobs
            self.n_jobs_ = self.n_events_ / self.n_events_per_job_
                
            self.job().set_n_events(n_events=self.n_events_per_job)
            
        #Only n_events is specified.
        #Calc necessary number of jobs based on job.n_events
        elif n_events is not None and n_jobs is None:
            self.n_events_per_job_ = self.job().n_events()
            self.n_jobs_ = n_events / self.n_events_per_job_
            self.n_events_ = self.n_jobs_*self.n_events_per_job_
        
        #Only number of jobs is specified.
        #Calculate number of events based on job.n_events
        elif n_events is None and n_jobs is not None:
            self.n_events_per_job_ = self.job().n_events
            self.n_jobs_ = n_jobs
            self.n_events_ = self.n_jobs_*self.n_events_per_job_

        else:
            print("Error: Unknown condition.")
            return
        
        #Check/tell user if some recalcs happened
        if self.n_events_!=n_events or self.n_jobs_!=n_jobs:
            print("Requested n_events=",n_events," with n_jobs=",n_jobs)
            print("Setting n_events=%d with n_jobs=%d"%(self.n_events_,self.n_jobs_))
            
    def n_events(self):
        return self.n_events_
    def n_jobs(self):
        return self.n_jobs_
    def n_events_per_job(self):
        return self.n_events_per_job_
    
    def input_size_MB(self):
        return self.n_jobs()*self.job().input_size_MB()
    def output_size_MB(self):
        return self.n_jobs()*self.job().output_size_MB()
    def cpu_time_s(self):
        return self.n_jobs()*self.job().cpu_time_s()
    def cpu_time_days(self):
        return self.cpu_time_s()/3600./24.
    def slot_weight(self):
        return (self.job().memory_MB()/2000.)

    def compute_time_days(self,slots):
        return self.cpu_time_days()/(slots / self.slot_weight())
    
    def __init__(self,
                 job,
                 name="",
                 n_events=None,n_jobs=None):
        
        if n_events is None and n_jobs is None:
            print("Must specify either n_jobs or n_events.")
            return
        
        self.set_name(name)
        self.set_job(job)
        self.set_events_and_jobs(n_events,n_jobs)        
        
    def __str__(self):
        s = "Sample '%s':"%self.name()
        s = s+"\nN_Events: %d"%self.n_events()
        s = s+"\nN_Jobs: %d"%self.n_jobs()
        s = s+"\nN_Events_per_job: %d"%self.n_events_per_job()
        s = s+"\nInput Datset Size (TB): %f"%(self.input_size_MB()/1000./1000.)
        s = s+"\nOutput Datset Size (TB): %f"%(self.output_size_MB()/1000./1000.)
        s = s+"\nCPU Time (hr): %f"%(self.cpu_time_s()/3600.)
        s = s+"\nWeighted CPU Time (hr): %f"%(self.cpu_time_s()/3600.*self.slot_weight())
        return s


In [7]:
class SampleInstance:
    
    def set_sample(self,sample,recompute=True):
        self.sample_ = sample
        if recompute: self.recompute()
    def sample(self):
        return self.sample_
    def name(self):
        return self.sample().name()

    def set_input_interface(self,input_interface,recompute=True):
        self.input_interface_=input_interface
        if recompute: self.recompute()
    def input_interface(self):
        return self.input_interface_
    
    def set_output_interface(self,output_interface,recompute=True):
        self.output_interface_=output_interface
        if recompute: self.recompute()
    def output_interface(self):
        return self.output_interface_
    
    def input_time_s(self):
        return self.sample().input_size_MB() / self.input_interface().rate_MBps()
    def input_time_days(self):
        return self.input_time_s()/3600./24.
    def output_time_s(self):
        return self.sample().output_size_MB() / self.output_interface().rate_MBps()
    def output_time_days(self):
        return self.output_time_s()/3600./24.
    def io_time_s(self):
        return self.input_time_s() + self.output_time_s()
        
    def n_events(self):
        return self.sample().n_events()
    def n_jobs(self):
        return self.sample().n_jobs()
    def n_events_per_job(self):
        return self.sample().n_events_per_job()
    
    def input_size_MB(self):
        return self.sample().input_size_MB()
    def output_size_MB(self):
        return self.sample().output_size_MB()
    def cpu_time_s(self):
        return self.sample().cpu_time_s()
    def cpu_time_days(self):
        return self.sample().cpu_time_days()
    def slot_weight(self):
        return self.sample().slot_weight()
    
    def compute_time_days(self,slots=None):
        if slots is None:
            return self.sample().compute_time_days(self.peak_slots_)
        else:
            return self.sample().compute_time_days(slots)

    def peak_slots(self,time_complete_days=None,max_slots=np.inf):
        if time_complete_days is None and max_slots==np.inf:
            return self.peak_slots_
        self.recompute(time_complete_days,max_slots)
        return self.peak_slots_
    
    def time_complete_days(self):
        return self.time_complete_days_
    
    def set_max_slots(self,max_slots):
        self.recompute(self.time_complete_days(),max_slots)
        
    def set_time_complete_days(self,days,max_slots=np.inf):
        self.recompute(days,max_slots)

    def bound(self):
        if self.compute_time_days()>self.input_time_days() and self.compute_time_days()>self.output_time_days():
            return "COMPUTE"
        elif self.input_time_days()>self.output_time_days():
            return "INPUT"
        else:
            return "OUTPUT"
        
    def recompute(self,time_complete_days,max_slots):
        
        self.peak_slots_ = np.inf
        
        #If completion time set, calculate peak slots needed
        if time_complete_days is not None:
            self.time_complete_days_ = float(time_complete_days)
            
            #check if possible given input data rate
            if self.input_time_days()>self.time_complete_days_:
                needed_input_rate = self.input_interface().rate_MBps() * self.input_time_days()/self.time_complete_days_
                
                print("Time to input data exceeds desired completion time: %f > %f"%(self.input_time_days(),self.time_complete_days_))
                print("Input rate needed is %f MBps (vs. specified %f MBps)"%(needed_input_rate,self.input_interface().rate_MBps()))
                print("Increasing completion time to %f"%self.input_time_days())
                
                self.time_complete_days_ = self.input_time_days()

            #check if possible given input data rate
            if self.output_time_days()>self.time_complete_days_:
                needed_output_rate = self.output_interface().rate_MBps() * self.output_time_days()/self.time_complete_days_
                
                print("Time to output data exceeds desired completion time: %f > %f"%(self.output_time_days(),self.time_complete_days_))
                print("Output rate needed is %f MBps (vs. specified %f MBps)"%(needed_output_rate,self.output_interface().rate_MBps()))
                print("Increasing completion time to %f"%self.output_time_days())
                
                self.time_complete_days_ = self.output_time_days()
                
            self.peak_slots_ = self.slot_weight()*((self.cpu_time_days())/self.time_complete_days_)
            
        if self.peak_slots_ > max_slots:
            
            self.peak_slots_ = max_slots
            
            compute_time_days = self.compute_time_days(slots=self.peak_slots_)
            #print("%f %f %f"%(compute_time_days,input_time_days,output_time_days))

            if compute_time_days>self.input_time_days() and compute_time_days>self.output_time_days():
                self.time_complete_days_ = compute_time_days
            elif input_time_days>=output_time_days:
                self.time_complete_days_ = input_time_days
                self.peak_slots_ = self.slot_weight()*(self.time_complete_days_ / (self.cpu_time_days()))
            elif output_time_days>input_time_days:
                self.time_complete_days_ = output_time_days
                self.peak_slots_ = self.slot_weight()*(self.time_complete_days_ / (self.cpu_time_days()))
                
            if self.peak_slots_ < max_slots:
                print("Process is %s bound, and so uses only %d of available %d slots."%(self.bound(),self.peak_slots,max_slots))
                
        return
    
    def __init__(self,
                 sample,
                 input_interface,
                 output_interface,
                 time_complete_days=None,max_slots=np.inf):
        
        if time_complete_days is None and max_slots==np.inf:
            print("Must specify either time_complete_days or max_slots.")
            return
        
        self.set_sample(sample,recompute=False)
        self.set_input_interface(input_interface,recompute=False)
        self.set_output_interface(output_interface,recompute=False)

        self.recompute(time_complete_days=time_complete_days,
                       max_slots=max_slots)
        
        
    def __str__(self):
        s = "Sample '%s':"%self.name()
        s = s+"\nN_Events: %d"%self.n_events()
        s = s+"\nN_Jobs: %d"%self.n_jobs()
        s = s+"\nN_Events_per_job: %d"%self.n_events_per_job()
        s = s+"\nInput Datset Size (TB): %f"%(self.input_size_MB()/1000./1000.)
        s = s+"\nOutput Datset Size (TB): %f"%(self.output_size_MB()/1000./1000.)
        s = s+"\nCompletion Time (days): %f"%(self.time_complete_days())
        s = s+"\n\tCPU Time (hr): %f"%(self.cpu_time_s()/3600.)
        s = s+"\n\tWeighted CPU Time (hr): %f"%(self.cpu_time_s()/3600.*self.slot_weight())
        s = s+"\nPeak slots: %d"%self.peak_slots()
        s = s+"\nBound: %s"%self.bound()
        return s


In [8]:
icarus_data_pmtFilter = Process(name="ICARUS Data PMTFilter",
                                memory_MB=2000,
                                time_s=2,
                                input_MB=170,
                                output_MB=50*0.05,
                                pass_fraction=0.05)

In [9]:
icarus_data_reco1 = Process(name="ICARUS Data Stage0",
                           memory_MB=4000,
                           time_s=120,
                           input_MB=170,
                           output_MB=50)

In [10]:
icarus_data_reco2 = Process(name="ICARUS Data Stage1",
                           memory_MB=4000,
                           time_s=100,
                           input_MB=50,
                           output_MB=45)

In [11]:
icarus_data_reco = ChainProcesses([icarus_data_reco1,icarus_data_reco2],"ICARUS Data Reco")

In [12]:
print(icarus_data_reco1)
print(icarus_data_reco2)
print(icarus_data_reco)

Process 'ICARUS Data Stage0':
	Memory (MB): 4000.000000
	Time per event (s): 120.000000
	Input/Output size per event (MB): 170.000000 / 50.000000
	Threads used (min,max): (1,1)
	Event pass fraction: 1.000000
	Failure rate: 0.000000
Process 'ICARUS Data Stage1':
	Memory (MB): 4000.000000
	Time per event (s): 100.000000
	Input/Output size per event (MB): 50.000000 / 45.000000
	Threads used (min,max): (1,1)
	Event pass fraction: 1.000000
	Failure rate: 0.000000
Process 'ICARUS Data Reco':
	Memory (MB): 4000.000000
	Time per event (s): 220.000000
	Input/Output size per event (MB): 170.000000 / 45.000000
	Threads used (min,max): (1,1)
	Event pass fraction: 1.000000
	Failure rate: 0.000000


In [13]:
icarus_data_reco_filtered = ChainProcesses([icarus_data_pmtFilter,icarus_data_reco1,icarus_data_reco2],"ICARUS Data Reco, Filtered")

In [14]:
print(icarus_data_reco_filtered)

Process 'ICARUS Data Reco, Filtered':
	Memory (MB): 4000.000000
	Time per event (s): 13.000000
	Input/Output size per event (MB): 170.000000 / 2.250000
	Threads used (min,max): (1,1)
	Event pass fraction: 0.050000
	Failure rate: 0.000000


In [15]:
icarus_data_reco1_job = Job(icarus_data_reco1,50)

In [16]:
print(icarus_data_reco1_job)

Job (Process 'ICARUS Data Stage0'):
N_Events: 50
Memory (MB): 4000.000000
CPU Time (hr): 1.666667
Event pass fraction: 1.000000
Input/Output size (GB): 8.500000 / 2.500000
Failure rate: 0.000000


In [17]:
icarus_data_reco_job = Job(icarus_data_reco,50)

In [18]:
print(icarus_data_reco_job)

Job (Process 'ICARUS Data Reco'):
N_Events: 50
Memory (MB): 4000.000000
CPU Time (hr): 3.055556
Event pass fraction: 1.000000
Input/Output size (GB): 8.500000 / 2.250000
Failure rate: 0.000000


In [19]:
icarus_data_reco_filtered_job = Job(icarus_data_reco_filtered,50)

In [20]:
print(icarus_data_reco_filtered_job)

Job (Process 'ICARUS Data Reco, Filtered'):
N_Events: 50
Memory (MB): 4000.000000
CPU Time (hr): 0.180556
Event pass fraction: 0.050000
Input/Output size (GB): 8.500000 / 0.112500
Failure rate: 0.000000


In [21]:
tape_io_interface = DataIOInterface(name="FNAL Enstore",rate_MBps=500)

In [22]:
dcache_io_interface = DataIOInterface(name="FNAL dCache",rate_MBps=3000)

In [23]:
ji = JobInstance(job=icarus_data_reco_filtered_job.copy(),
                 input_interface=tape_io_interface.copy(),
                 output_interface=dcache_io_interface.copy())
print(ji)

Job Instance (Process 'ICARUS Data Reco, Filtered'):
Input interface: 'FNAL Enstore' (Rate = 500.000000 MBps)
Output interface: 'FNAL dCache' (Rate = 3000.000000 MBps)
N_Events: 50
Memory (MB): 4000.000000
Total Time (hr): 0.185288
	Input Time (hr): 0.004722
	CPU Time (hr): 0.180556
	Output Time (hr): 0.000010
Event pass fraction: 0.050000
Input/Output size (GB): 8.500000 / 0.112500
Failure rate: 0.000000


In [24]:
sample_icarus_data_reco_filtered = Sample(job=icarus_data_reco_filtered_job.copy(),name="ICARUS Reco Data Filtered",n_events=10e6)

Requested n_events= 10000000.0  with n_jobs= None
Setting n_events=10000000 with n_jobs=200000


In [25]:
print(sample_icarus_data_reco_filtered)

Sample 'ICARUS Reco Data Filtered':
N_Events: 10000000
N_Jobs: 200000
N_Events_per_job: 50
Input Datset Size (TB): 1700.000000
Output Datset Size (TB): 22.500000
CPU Time (hr): 36111.111111
Weighted CPU Time (hr): 72222.222222


In [26]:
si = SampleInstance(sample=sample_icarus_data_reco_filtered,
                    input_interface=tape_io_interface,
                    output_interface=dcache_io_interface,
                    time_complete_days=1,max_slots=np.inf)

Time to input data exceeds desired completion time: 39.351852 > 1.000000
Input rate needed is 19675.925926 MBps (vs. specified 500.000000 MBps)
Increasing completion time to 39.351852


In [27]:
print(si)

Sample 'ICARUS Reco Data Filtered':
N_Events: 10000000
N_Jobs: 200000
N_Events_per_job: 50
Input Datset Size (TB): 1700.000000
Output Datset Size (TB): 22.500000
Completion Time (days): 39.351852
	CPU Time (hr): 36111.111111
	Weighted CPU Time (hr): 72222.222222
Peak slots: 76
Bound: INPUT
