In [97]:
import math
import copy
from task import Task
from caseLoader import CaseLoader
from taskType import TaskType
import random
import time

In [98]:
MT = 10 # microtick

In [99]:
loader = CaseLoader()
cases = loader.load_test_case("inf_30_30", 0)
cases_TT = [c for c in cases if c.type == TaskType.TIME]

Searching for test cases at:
./test_cases/inf_30_30
Loaded case_group: inf_30_30 - case: 0 test case(s).


In [100]:
def get_ready(task_set, cycle, ready_list):
    for task in task_set:
        if cycle % task.period == 0:
            new_job = copy.deepcopy(task)
            new_job.deadline = new_job.deadline + cycle
            new_job.release_time = cycle
            ready_list.append(new_job)
    
    # sort on deadline
    # https://www.geeksforgeeks.org/sorting-objects-of-user-defined-class-in-python/
    ready_list = sorted(ready_list, key=lambda t: t.deadline) 
    
    return sorted(ready_list, key=lambda t: t.deadline)
        

# ts is set of TT tasks
def edf(ts):
    T = math.lcm(2000,3000,4000) # least common multiple of TT task periods. 2000 mt = s, wcrts = edf(cases)
    S = [] # schedule will be hyperperiod long. 12 000 microticks == 120 000 microsecs == 120 ms
    ready_list = []
    wcrts = {} # worst case response times

    for t in range(0, T):
        ready_list = get_ready(ts, t, ready_list)
        
        for task in ready_list:
            if task.duration > 0 and task.deadline <= t:
                return []
        
            # job done check response time gt wcrt and remove from ready list
            if task.duration == 0 and task.deadline >= t:
                response_time = t - task.release_time
                
                if task.name not in wcrts or response_time >= wcrts[task.name]:
                    wcrts[task.name] = response_time
                
                ready_list.remove(task)
                
            
        if ready_list == []:
            S.append("IDLE")
            continue
        else:
            # EDF get next job to execute 
            S.append(ready_list[0].name)
            ready_list[0].duration = ready_list[0].duration - 1
    
    if ready_list != []:
        return [], []
    
    return S, wcrts

In [101]:
s, wcrts = edf(cases_TT)

In [102]:
len(s)

12000

In [103]:
#Schedulability of ET tasks under a given polling task

#Data:
#polling task budget: Cp
#polling task period: Tp
#polling task deadline: Dp
#subset of ET tasks to check Tau^ET (T_ET)
#pi = priority
#Ci = Computation time?
#ti = task period (tuple)

#result: small_tau (Boolean)

####

def unpack(task):
    return task.priority, task.duration, task.period, task.deadline
    
def calculate_schedulabiltiy(Tp, Dp, Cp, task_periods):
    #compute delta and alpha accordingly to [2]
    Delta = Tp + Dp - 2*Cp
    alpha = Cp/Tp
    
    #hyperperiod is lcm of all task periods in T_ET (all values must be from the chosen subset of ET tasks from the .csv)
    periods = []
    for task in task_periods:
        (pi, Ci, Ti, Di) = unpack(task)
        periods.append(task.period)

    hyperperiod = math.lcm(*periods)
    
    hyperperiod = math.lcm(*[t.period for t in task_periods])
    
    

    #loop
    for task_period in task_periods:
        (pi, Ci, Ti, Di) = unpack(task_period)
        t = 0
        #initialize the response time of ti (task period) to a value exceeding the deadline
        response_time = Di + 1

        #remember we are dealing with constrained deadline tasks for the AdvPoll, hence, in the worst case arrival pattern, the intersection must lie within the hyperperiod if the task is schedulable

        while t < hyperperiod:
            #the supply at time t ([1])
            supply = alpha*(t-Delta)
            
            #compute the maximum demand at time t according to Eq. 2
            demand = 0
            for tj in task_periods:
                (pj, Cj, Tj, Dj) = tj.period, tj.duration, tj.period, tj.deadline

                if pj >= pi:
                    demand  = demand + math.ceil(t/Tj)*Cj
            
            #According to lemma 1 of [1], we are searching for the earliest time, when the supply exceeds the demand
            if supply >= demand:
                response_time = t
                break 
            
            t = t + 1
        
            if response_time >= Di:
                return False, response_time
        
    return True, response_time

#TODO: 
    # load in .csv
    # loop through properties and assign to multiple tuples in task_periods
def fetch_subset(name_of_csv):
    return "Not Implemented"

#properties of polling task
Dp = 1
Tp = 2
Cp = 3

subset = [(1,2,3,4), (5,4,3,2)]

In [104]:
cases_ET = [c for c in cases if c.type == TaskType.EVENT]

In [105]:
calculate_schedulabiltiy(500, 500, 500, cases_ET)

(True, 0)

In [106]:
def p(delta, t):
    return np.exp( -delta / t )

# simulated annealing 
def sa(s0, t0, a, stopcriterion_sec, neighborhood_f = None, cost_f = None):
    # initialize variables
    sec0 = int(time.time())
    sec_now = int(time.time())
    s_best = s0[:]
    cost_best = cost_f(s_best)
    s_cur = s0[:]
    cost_cur = cost_f(s_cur)
    t = t0
    
    # does not matter if < or <= if 0 then new solution will be selected no matter what     
    while sec_now - sec0 < stopcriterion_sec:
        s_tmp = neighborhood_f(s_cur)
        cost_tmp = cost_f(s_tmp)
        delta = cost_tmp - cost_cur
        
        # accept randomly drawn solution from current neighborhood if better or with some probability
        if delta <= 0 or p(delta, t) > rand.uniform(0.0, 1.0):
            s_cur = s_tmp
            cost_cur = cost_tmp
            
            # keep track of the best solution 
            if cost_cur < cost_best:
                s_best = s_cur
                cost_best = cost_cur
        
        # update temperature 
        t = t * a
        
        # update sec
        sec_now = int(time.time())
    
    return s_best, cost_best 

In [107]:
rand = random.Random()

# assume for now 1 PS and D=T
def random_neighbor(s):
    C = s[0]
    T = s[1]
    return [max(100, C + rand.randint(-100,100)), max(100, T + rand.randint(-100, 100))]
    
    

In [108]:
calculate_schedulabiltiy(14000, 14000, 5000, cases_ET)

(False, 3848)

In [109]:
polling_server = Task('tTT30', 1000, 1000, TaskType.TIME, 7, 1000)
cases_1 = cases_TT + [polling_server]

In [110]:
def cost_f1(args):
    tt_tasks, et_tasks, ps = *args
    schedulable, response_time = calculate_schedulabiltiy(ps.period, ps.deadline, ps.duration, et_tasks)
    if not schedulable:
        return float("inf"), {}
    
    return edf(tt_tasks + [ps])
    

#def sa(s0, t0, a, stopcriterion_sec, neighborhood_f = None, cost_f = None):
s, cost = sa([polling_server.duration, polling_server.period], 1000, 0.99, 60, random_neighbor, cost_f1)

TypeError: cost_f1() missing 2 required positional arguments: 'et_tasks' and 'ps'

SyntaxError: can't use starred expression here (492293102.py, line 1)

In [45]:
math.lcm(*[t.period for t in cases_ET])

12000

In [70]:
max(100, 0)

100

In [78]:
for i in range(100, 500):
    t = i

In [79]:
t

499

In [89]:
float('inf')

inf