In [1]:
import json
import threading
import time
import requests
import random

In [2]:
settings = {
        'max_new_tokens': 10,
        'history': {'internal': [], 'visible': []},
        'mode': 'instruct',  # Valid options: 'chat', 'chat-instruct', 'instruct'
        'character': 'Example',
        'instruction_template': 'Vicuna-v1.1',
        'your_name': 'You',

        'regenerate': False,
        '_continue': False,
        'stop_at_newline': False,
        'chat_prompt_size': 2048,
        'chat_generation_attempts': 1,
        'chat-instruct_command': 'Continue the chat dialogue below. Write a single reply for the character "'
                                 '<|character|>".\n\n<|prompt|>',

        # Generation params. If 'preset' is set to different than 'None', the values
        # in presets/preset-name.yaml are used instead of the individual numbers.
        'preset': 'None',
        'do_sample': True,
        'temperature': 0.7,
        'top_p': 0.1,
        'typical_p': 1,
        'epsilon_cutoff': 0,  # In units of 1e-4
        'eta_cutoff': 0,  # In units of 1e-4
        'tfs': 1,
        'top_a': 0,
        'repetition_penalty': 1.18,
        'top_k': 40,
        'min_length': 0,
        'no_repeat_ngram_size': 0,
        'num_beams': 1,
        'penalty_alpha': 0,
        'length_penalty': 1,
        'early_stopping': False,
        'mirostat_mode': 0,
        'mirostat_tau': 5,
        'mirostat_eta': 0.1,
        'add_bos_token': True,
        'truncation_length': 2048,
        'ban_eos_token': False,
        'skip_special_tokens': True,
        'stopping_strings': []
}

In [3]:
diagnostic_settings = {
        'max_new_tokens': 10,
        'history': {'internal': [], 'visible': []},
        'mode': 'instruct',  # Valid options: 'chat', 'chat-instruct', 'instruct'
        'character': 'Example',
        'instruction_template': 'Vicuna-v1.1',
        'your_name': 'You',

        'regenerate': False,
        '_continue': False,
        'stop_at_newline': False,
        'chat_prompt_size': 2048,
        'chat_generation_attempts': 1,
        'chat-instruct_command': 'Continue the chat dialogue below. Write a single reply for the character "'
                                 '<|character|>".\n\n<|prompt|>',
        'preset': 'None',
        'do_sample': True,
        'temperature': 0.7,
        'top_p': 0.1,
        'typical_p': 1,
        'epsilon_cutoff': 0,  # In units of 1e-4
        'eta_cutoff': 0,  # In units of 1e-4
        'tfs': 1,
        'top_a': 0,
        'repetition_penalty': 1.18,
        'top_k': 40,
        'min_length': 0,
        'no_repeat_ngram_size': 0,
        'num_beams': 1,
        'penalty_alpha': 0,
        'length_penalty': 1,
        'early_stopping': False,
        'mirostat_mode': 0,
        'mirostat_tau': 5,
        'mirostat_eta': 0.1,

        'seed': -1,
        'add_bos_token': True,
        'truncation_length': 2048,
        'ban_eos_token': False,
        'skip_special_tokens': True,
        'stopping_strings': []
}


def write_response(command):
    with open('chat/instructions.json', 'a+') as file:
        file.write(command.text.replace('\n', '\\n') + "\n")
    with open('chat/responses.json', 'a+') as file:
        file.write(command.response.replace('\n', '\\n') + "\n")


class Command:
    def __init__(self, id_, text, settings):
        self.id_ = id_
        self.text = text
        self.n_words = len(text.split())
        self.settings = settings
        self.flag_resp = False
        self.response = ""
        self.n_tokens = -1

    def def_request(self):
        return {
            'user_input': self.text,
            **self.settings
        }

    def def_prompt(self):
        return {'prompt': self.text}

    def set_response(self, response):
        self.flag_resp = True
        self.response = response['history']['visible'][0][1]
        write_response(self)
        print(len(self.text), len(self.text.split()), self.n_tokens, '\n')
        print(f'Response to command {self.id_} ({self.n_tokens} tokens) given.', '\n')

    def set_tokens(self, response):
        self.n_tokens = response['tokens']


class Resource:
    def __init__(self, type_, address, available):
        self.type = type_
        self.address = address
        host = address.split(":")[0]
        port = int(address.split(":")[1])
        self.api = f'http://{host}:{port + 1000}/api/v1/chat'
        self.ct_api = f'http://{host}:{port + 1000}/api/v1/token-count'
        self.available = available
        self.status = "free"

    def __str__(self):
        return f'{self.type}, {self.address}, {self.available}, {self.status}'


class ResourcePool(list):
    def __init__(self):
        super().__init__()
        self.available_resources = []

    def insert_resource(self, type_, address, available):
        for element in self:
            if address == element.address:
                element.available = available
                return
        self.append(Resource(type_, address, available))

    def update_avail_resources(self):
        self.available_resources = []
        for element in self:
            if element.available:
                self.available_resources.append(element)

    def get_avail_resources(self):
        return self.available_resources


def check_status(signal):
    resource_pool = signal[1]
    clst_status = {}
    while signal[0]:
        with open('cluster_management/cluster_status.json', 'r') as file:
            clst_status = json.load(file)
        for k in clst_status:
            props = k.split()
            resource_pool.insert_resource(props[0], props[1] + ":" + props[2], clst_status[k] == 'ON')
        resource_pool.update_avail_resources()
        time.sleep(1)


def req(request, resource):

    # Check for prompt overflow
    if request[0].n_words > (request[0].settings['chat_prompt_size'] + request[0].settings['max_new_tokens']) / 2:
        try:
            prompt = request[0].def_prompt()
            response = requests.post(resource.ct_api, json=request[0].def_prompt())
            if response.status_code == 200:
                result = response.json()['results'][0]
                request[0].set_tokens(result)
                if request[0].n_tokens > request[0].settings['chat_prompt_size'] + request[0].settings['max_new_tokens']:
                    request[1] = 'computed'
                    print(f'WARNING: command {request[0].id_} too long! ({request[0].n_tokens} tokens)')
            else:
                print(f'Status code: {response.status_code}')
                request[1] = 'to_be_computed'
                return
        except Exception as ex:
            print(ex)
            request[1] = 'to_be_computed'
            return

    try:
        response = requests.post(resource.api, json=request[0].def_request())
        if response.status_code == 200:
            result = response.json()['results'][0]
            request[0].set_response(result)
            request[1] = 'computed'
        else:
            print(f'Status code: {response.status_code}')
            request[1] = 'to_be_computed'
    except Exception as ex:
        print(ex)
        request[1] = 'to_be_computed'


def compute_queue(inputs, signal):
    resource_pool = signal[1]
    diagnostic_request = [Command(-1, 'example_text', diagnostic_settings), 'to_be_computed']
    all_requests = [[inp, 'to_be_computed'] for inp in inputs]
    free_resources = [x for x in resource_pool.get_avail_resources()]
    if len(free_resources) == 0:
        raise "NO RESOURCES AVAILABLE"
    busy_resources = []
    err_resources = []
    tasks = []

    while 1:
        for resource in free_resources:
            for request in all_requests:
                if request[1] == 'to_be_computed':
                    request[1] = 'computing'
                    tasks.append([threading.Thread(target=req, args=(request, resource)), resource, request])
                    tasks[-1][0].start()
                    resource.status = 'busy'
                    busy_resources.append(free_resources.pop(free_resources.index(resource)))
                    break
        for i in range(len(tasks))[::-1]:
            if not tasks[i][0].is_alive():
                resource = tasks[i][1]
                if tasks[i][-1][1] != 'computed':
                    diagnostics = threading.Thread(target=req, args=(diagnostic_request, resource))
                    diagnostics.start()
                    diagnostics.join()
                    if diagnostic_request[1] != 'computed':
                        resource.status = 'error'
                        err_resources.append(busy_resources.pop(busy_resources.index(resource)))
                        print(f"RESOURCE {resource.api} NOT RESPONDING")
                    else:
                        resource.status = 'free'
                        free_resources.append(busy_resources.pop(busy_resources.index(resource)))
                else:
                    resource.status = 'free'
                    free_resources.append(busy_resources.pop(busy_resources.index(resource)))
                tasks.pop(i)
        if sum(1 if x[1] != 'computed' else 0 for x in all_requests) == 0:
            return


In [8]:
signal = [True, ResourcePool()]
threading.Thread(target=check_status, args=(signal, )).start()

Exception in thread Thread-6 (check_status):
Traceback (most recent call last):
  File "C:\ProgramData\mambaforge_22.9.0.2\envs\c_venv\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "C:\ProgramData\mambaforge_22.9.0.2\envs\c_venv\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\cian_cl-a\AppData\Local\Temp\ipykernel_21460\1193911141.py", line 128, in check_status
  File "C:\Users\cian_cl-a\AppData\Local\Temp\ipykernel_21460\1193911141.py", line 108, in insert_resource
  File "C:\Users\cian_cl-a\AppData\Local\Temp\ipykernel_21460\1193911141.py", line 88, in __init__
ValueError: invalid literal for int() with base 10: 'None'


In [5]:
compute_queue(inputs, signal)

NameError: name 'inputs' is not defined

In [None]:
inputs = [Command(i, f"""
### Instruction:
Summarize me the following text in one line:
"you are ugly"

### Response:
""")]