In [None]:
import pandas as pd
import socket
from itertools import product
import numpy as np
import json
from typing import List

In [None]:
params = [
        {
        'model':'xgboost',
        'min_child_weight': [1, 5, 10],
        'gamma': [0.5, 1, 1.5, 2, 5],
        'subsample': [0.6, 0.8, 1.0],
        'colsample_bytree': [0.6, 0.8, 1.0],
        'max_depth': range(1, 11, 1),
        'n_estimators':range(50, 400, 25),
        },
        {'model':'lightgbm',
         'num_leaves': range(10, 110, 50),
         'min_child_samples':range(100, 1250, 250),
         'max_bin':range(3, 18, 3),
          'bagging_fraction': [0.6,0.8, 1,1.2],
          'max_depth': range(8, 17, 2),
          'min_split_gain': [0.001, 0.1],
          'min_child_weight': [30, 40,50]
          },
        {
          'model':'catboost',
          'depth':range(1, 4, 2),
          'learning_rate':[0.03,0.001,0.01], 
          'iterations':[100],
          'l2_leaf_reg':[3,1,5,10,100],
          'border_count':[32,5,10,20,50,100,200],
          'ctr_border_count':[50,5,10,20,100,200],
        },
]



def split_params(jobs_param:List,n:int)->List:
    """Splits a list of parameter grids into n jobs. 
        Using cartesian plane.

    Args:
        jobs_param: list parameter grids.
        n: Desired number of jobs.

    Returns:
        Return a list of jsonified params.
    """
    li = []
    for i,x in enumerate(jobs_param):
        model = x.pop('model')
        c_plane = [dict(zip(x, v)) for v in product(*x.values())]
        for y in c_plane:
            y.update( {"model":model})
        li+=c_plane
    split_list=np.array_split(li, n)
    json_list = [json.dumps(i.tolist()) for i in split_list]
    print(f'''
            Total Number of Params = {len(li)}
            Params per Job         = {len(split_list[0])} 
            Number of Jobs         = {n}
            ''')
    return json_list

split = split_params(params,5000)


In [None]:
# This is the first job to be delivered to our workers.
# You can see that there are 6 different model parameters inside this job
split[0]

In [None]:
# We're using a package call Pika to communicate with our rabbitMQ deployment
# Because we've used the port-forwarding mechanism, we can simply connect via localhost. Pika defaults to port 5672
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()

channel.queue_declare(queue="task_queue", durable=True)

for msg in split:
    channel.basic_publish(
        exchange="",
        routing_key="task_queue",
        body=msg,
        properties=pika.BasicProperties(delivery_mode=2),  # make message persistent
    )
    print(f"Sent {msg}")

connection.close()