In [12]:
import threading
import time
import random
import collections
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

In [13]:
class WorkerThread(threading.Thread):
    
    def __init__(self, task, **kwargs):
        super(WorkerThread, self).__init__(**kwargs)
        self.task   = task

    def run(self):
        logging.debug("starting %s" % (self.task.file))
        self.task.run()
        logging.debug("done %s" % (self.task.file))
        

In [14]:
class Worker():
    
    def __init__(self):
        self.thr    = None

    def run_task(self, task):    
        if self.thr != None and self.thr.is_alive():
            self.thr.join()
        self.thr  = WorkerThread(task)
        self.thr.start()
        logging.debug("%s Launched" % (task.file))
            
    def is_running(self):
        if self.thr == None:
            return False
        else:
            return self.thr.is_alive()

In [15]:
class Task():
    
    def __init__(self, file):
        self.file = file
    
    def run(self):
        time.sleep(random.random()*100)
        

In [79]:
myTask = Task("Fichero 1")
wrk1 = Worker()
wrk1.run_task(myTask)
wrk1.is_running()

(Thread-30 ) starting Fichero 1
(MainThread) Fichero 1 Launched


True

In [93]:
wrk1.is_running()

False

In [16]:
class TaskQueue():
    
    def __init__(self):
        self.number_of_tasks = 0
        self.queue           = collections.deque([]) 
        
    def add_task(self, task):
        self.number_of_tasks += 1
        self.queue.append(task)
        
    def take_task(self):
        if len(self.queue) > 0:
            self.number_of_tasks -= 1
            return self.queue.popleft()
        
    def get_num_tasks(self):
        return self.number_of_tasks
    

In [95]:
myTaskQueue = TaskQueue()
print("Number of Tasks = {}".format(myTaskQueue.get_num_tasks()))
myTaskQueue.add_task(myTask)
print("Number of Tasks = {}".format(myTaskQueue.get_num_tasks()))
myTaskQueue.add_task(myTask)
print("Number of Tasks = {}".format(myTaskQueue.get_num_tasks()))
myTaskQueue.take_task()
print("Number of Tasks = {}".format(myTaskQueue.get_num_tasks()))

Number of Tasks = 0
Number of Tasks = 1
Number of Tasks = 2
Number of Tasks = 1


In [27]:
class WorkerPool():
    
    def __init__(self, max_workers):
        self.pool = []
        self.number_of_workers = 0
        self.max_workers = max_workers
        
    def add_worker(self, worker):
        if self.number_of_workers < self.max_workers:
            self.pool.append(worker)
            self.number_of_workers += 1
        
    def remove_worker(self):
        deleted = False
        while True:
            for wrk in self.pool:
                if not wrk.is_running():
                    self.pool.remove(wrk)
                    self.number_of_workers -= 1
                    deleted = True
            if deleted:
                break
        
    def get_num_workers(self):
        return self.number_of_workers

In [108]:
myWorkerPool = WorkerPool(10)
wrk1 = Worker()
wrk2 = Worker()
wrk3 = Worker()
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))
myWorkerPool.add_worker(wrk1)
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))
myWorkerPool.add_worker(wrk2)
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))
myWorkerPool.add_worker(wrk3)
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))
myWorkerPool.remove_worker()
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))
myWorkerPool.remove_worker()
print("Number of Workers = {}".format(myWorkerPool.get_num_workers()))

Number of Workers = 0
Number of Workers = 1
Number of Workers = 2
Number of Workers = 3
Number of Workers = 2
Number of Workers = 1


State vector and Reward system

We feed the state vector to our AI agent and agent choose an action based on that state. The state vector should contain valuable information. The goodness of the action taken by an agent depends on how informative the state vector is.

the state vector contains the following information.

Number of tasks in Queue
Number of Workers
Average time of task processing?


Following is the reward system:

Give a reward of +3 if the number of tasks in queue decreases
Give a reward of -3 if the number of tasks in queue increases
Give a reward of -0.1 each time we add a new Worker to the pool.
Give a reward of +0.1 each time we remove a Worker from the pool.
Give a reward of +3 if the average processing time decreases
Give a reward of -3 if the average processing time increases

We also have to implement an action space. The agent will choose one of the actions from the action space and send it to the environment. Following is the action space


0 - Do nothing.
1 - Add Worker to the pool
2 - Remove Worker from the pool

The agent will send one of these numbers to the environment, and the environment performs the action corresponding to that numbers.

In [24]:
class Environment():
    
    def __init__(self, worker_pool, task_queue):
        self.worker_pool    = worker_pool
        self.task_queue     = task_queue
        self.avg_time       = 0
        
    
    def step(self, action, past_num_tasks):

        reward, done = 0, 0

        if action == 0:         # if action is 0, check status and give reward
            if self.task_queue.get_num_tasks() > past_num_tasks:
                reward -= .3        # reward of -0.3
            else:
                reward += .3
    
        if action == 1:         # if action is 1, add Worker to the Pool
            wrk = Worker()
            self.worker_pool.add_worker(wrk)
            reward -= .1        # reward of -0.1

        if action == 2:         # if action is 2, remove Worker from the Pool 
            self.worker_pool.remove_worker()
            reward += .1        # reward of -0.1


        # creating the state vector
        state = [self.worker_pool.get_num_workers(), 
                 self.task_queue.get_num_tasks(), 
                 self.avg_time]

        return reward, state, done


In [33]:
# Creramos una cola donde ir encolando las tareas
myTaskQueue = TaskQueue()
# Creamos el Pool con un tope de 10 workers
myWorkerPool = WorkerPool(10)

# Inicializamos el Pool con 5 workers
for i in range(0,5):
    wrk = Worker()
    myWorkerPool.add_worker(wrk)
    
myEnvironment = Environment(myWorkerPool, myTaskQueue)


try:
    past_num_tasks = 0
    while True:
        # Creamos una tarea cada cierto tiempo
        num = random.randint(1, 10000)
        if num % 1000 == 0:
            task = Task("Fichero")
            myEnvironment.task_queue.add_task(task)
            
        for wrk in myEnvironment.worker_pool.pool:
            if not wrk.is_running():
                if myEnvironment.task_queue.get_num_tasks() > 0:
                    wrk.run_task(myEnvironment.task_queue.take_task())

        #print("Number of Tasks   = {}".format(myEnvironment.task_queue.get_num_tasks()))
        #print("Number of Workers = {}".format(myEnvironment.worker_pool.get_num_workers()))
        
        # Comenzamos con un agente que seleccione las acciones al azar
        action = random.choice([0,1,2])
        reward, state, done = myEnvironment.step(action, past_num_tasks)
        print("reward, state, done = {}, {}, {}".format(reward, state, done))        



except KeyboardInterrupt:
    print("Press Ctrl-C to terminate while statement")
    pass


reward, state, done = 0.1, [2, 0, 0], 0
reward, state, done = 0.3, [2, 0, 0], 0
reward, state, done = 0.1, [1, 0, 0], 0
reward, state, done = 0.3, [1, 0, 0], 0
reward, state, done = 0.1, [0, 0, 0], 0
reward, state, done = -0.1, [1, 0, 0], 0
reward, state, done = 0.1, [0, 0, 0], 0
reward, state, done = 0.3, [0, 0, 0], 0
reward, state, done = 0.3, [0, 0, 0], 0
reward, state, done = 0.3, [0, 0, 0], 0
reward, state, done = 0.3, [0, 0, 0], 0
reward, state, done = -0.1, [1, 0, 0], 0
reward, state, done = -0.1, [2, 0, 0], 0
reward, state, done = 0.1, [1, 0, 0], 0
reward, state, done = 0.1, [0, 0, 0], 0
reward, state, done = -0.1, [1, 0, 0], 0
reward, state, done = -0.1, [2, 0, 0], 0
reward, state, done = 0.1, [1, 0, 0], 0
reward, state, done = 0.3, [1, 0, 0], 0
reward, state, done = -0.1, [2, 0, 0], 0
reward, state, done = 0.3, [2, 0, 0], 0
reward, state, done = 0.3, [2, 0, 0], 0
reward, state, done = 0.3, [2, 0, 0], 0
reward, state, done = 0.1, [1, 0, 0], 0
reward, state, done = 0.1, [0, 0, 

https://stackoverflow.com/questions/12868956/python-using-threads-or-a-queue-to-iterate-over-a-for-loop-that-calls-a-functi
https://towardsdatascience.com/create-your-own-reinforcement-learning-environment-beb12f4151ef