In [32]:
from skopt import Optimizer
from skopt.learning import GaussianProcessRegressor
from skopt.learning.gaussian_process.kernels import RBF, ConstantKernel, Product
from tqdm import tqdm_notebook as tqdm
from skopt import gp_minimize
from time import sleep
import docker
import random
import os
import string

### Глобальные Параметры

In [33]:
# первые n_initial_points модель не обучается
n_initial_points = 5

# число итераций цикла
n_calls = 3

# оптимизация на кубе [low_constraint, high_constraint]^dim
low_constraint, high_constraint = 2., 301.
dim = 1

# столько контейнеров вызываются для параллельной работы
batch_size = 2

# директория на сервере, хранит директории, которые будут монтироваться в контейнеры
folder_local = '/home/matyushinleonid/lhcb_ecal/feb_meeting/folder_local'
folder_local = '/home/igor/LAMBDA/lhcb_repo'

# директория для файлов input и output внутри контейнера
folder_container = '/home/nb_user/logs'

# python-клиент докера
client = docker.from_env()

# имя образа
container = "calorbuild"

# имена директорий, каждая соответствует своей копии образа
worker_names = ['first_worker', 'second_worker']

###
first_loop_legal_upper_bounds = [i for i in range(3, 301, 3)]
#second_loop_legal_upper_bounds = [i // 3 * 4 for i in first_loop_legal_upper_bounds]
#space_size = len(first_loop_legal_upper_bounds)
#total_amount_of_inner_part = [first_loop_legal_upper_bounds[i] * second_loop_legal_upper_bounds[i] \
#                              for i in range(space_size)]

def crop_number(n):
    return min(first_loop_legal_upper_bounds, key=lambda t:abs(t-n))
###

### Модель

In [34]:
kernel = Product(ConstantKernel(1), RBF(1)) + ConstantKernel(1)

model = GaussianProcessRegressor(alpha=0, 
                                 normalize_y=True, 
                                 noise='gaussian', 
                                 n_restarts_optimizer=10, 
                                 kernel=kernel)

optimizer = Optimizer([[low_constraint, high_constraint]]*dim,
                      model,
                      n_initial_points=n_initial_points,
                      acq_func='EI',
                      acq_optimizer='lbfgs',
                      random_state=None)

### Оптимизация (+ работа с контейнерами)

In [45]:
def get_folder(folder_local):
    list_dir = os.listdir(folder_local)
    for _ in range(3):
        new_folder ='{}/{}'.format(folder_local,
                                   ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10)))
        if new_folder not in list_dir:
            os.mkdir(new_folder)
            return new_folder
    raise Exception("Cannot create uniq folder")   


def write_input_file(input_data):
    job_folder = get_folder(folder_local)
    file_to_write = '{}/input.txt'.format(job_folder)    
    cropped_input_data = list(map(crop_number, input_data))
    string_to_write = ' '.join(map(str, cropped_input_data))
    with open(file_to_write, "w") as file:
        print(string_to_write,
              file=file)
    return new_folder

def create_job(job_folder):
    client.containers.run(container,
                          privileged=True,
                          remove=True,
                          detach=True,
                          hostname='dev',
                          tty=True,
                          stdin_open=True,
                          volumes={job_folder: {'bind': folder_container,
                                                     'mode': 'rw'}})
    

def read_output_file(job_folder):
    file_to_read = '{}/output.txt'.format(job_folder)
    with open(file_to_read, 'r') as myfile:
        data = myfile.read()
    return float(data)

def get_price(params, lamb=1):
    param1 = crop_number(params[0])
    param2 = param1 // 3 * 4
    return lamb * param1 * param2

In [38]:
for i in tqdm(range(n_calls)):
    X = optimizer.ask(n_points=batch_size)
    prices = []
    for i, worker_name in enumerate(worker_names):
        x = X[i]
        price = get_price(x)
        prices.append(price)
        write_input_file(worker_name, x)
        create_job(worker_name)
    
    sleep(15 * 60) # 15 min sleep
    
    Y = []
    for i, worker_name in enumerate(worker_names):
        y = read_output_file(worker_name)
        price = prices[i]
        Y.append(y + price)
        
    optimizer.tell(X, Y)

HBox(children=(IntProgress(value=0, max=3), HTML(value='')))




TypeError: write_input_file() takes 1 positional argument but 2 were given

In [6]:
optimizer.Xi, optimizer.yi

([[280.7934963888376],
  [214.45812970118018],
  [268.9186554601966],
  [193.20322796424293],
  [68.68811441603718],
  [7.705841379134213]],
 [106579.06183393415,
  60942.02138167647,
  97722.98133277889,
  49630.40136958836,
  6877.329098757192,
  682.0861652574987])

# Parallel with multiproccesing



In [43]:
from multiprocessing import Queue, Pool, Manager

In [44]:
def worker_do_job(q_in, q_out):
    while True:
        data = q_in.get()        
        in_dir = write_input_file(data)        
        create_job(in_dir)       
        result = read_output_file(in_dir)
        q_out.put((data, result))       
    return     

In [None]:
pool = Pool(batch_size) 
m = Manager()
q_in = m.Queue()
q_out = m.Queue()
pool.starmap_async(worker_do_job, [(q_in, q_out)]*batch_size)

X = optimizer.ask(n_points=batch_size)
for i in range(batch_size):
    q_in.put(X[i])

for i in tqdm(range(n_calls-batch_size)): 
    x, y = q_out.get()
    optimizer.tell(x, y)    
    q_in.put(optimizer.ask())
    
for _ in range(batch_size):
    x, y  = q_out.get()
    optimizer.tell(x, y)        
    
pool.terminate()

In [47]:
m = Manager()
q_test = m.Queue()
q_test.put((1,2))

x, y = q_test.get()
print(x)
print(y)

1
2
