In [1]:
import io
import os
import re
import sys
import math
import json
import pprint
import logging

import numpy as np
from tqdm import tqdm
from random import sample
from time import time, sleep
from string import punctuation
from functools import partial
from multiprocessing import Pool
from rouge_score import rouge_scorer
from typing import Union, Sequence, Optional

from openai import OpenAI, OpenAIError
from mistralai.client import MistralClient

In [2]:
class SelfInstruct:
  def __init__(
      self,  
      api_key=None,
      model_provider="openai",
      model_name="text-davinci-003",
      prompt_template_path="./prompt.txt",
      seed_tasks_path="./seed_tasks.jsonl", 
      num_instructions_to_generate=100,
      num_prompt_instructions=3,
      request_batch_size=5,
      num_cpus=16
    ):

    if model_provider == "openai":
      self.client = OpenAI(api_key=api_key)
    elif model_provider == "mistral":
      self.client = MistralClient(api_key=api_key, model=model_name)
    else:
      raise NotImplementedError(f"Model provider {model_provider} not implemented.")

    self.prompt_template_path = prompt_template_path
    self.prompt_template = None
    self.seed_tasks_path = seed_tasks_path
    self.num_instructions_to_generate = num_instructions_to_generate
    self.model_provider = model_provider
    self.model_name = model_name
    self.num_prompt_instructions = num_prompt_instructions
    self.request_batch_size = request_batch_size
    self.num_cpus = num_cpus

  def configure_prompt(self, topic, difficulty):
    """Configure the prompt template."""
    prompt = open(self.prompt_template_path).read() + "\n"
    prompt = prompt.format(
        num_questions=self.num_instructions_to_generate,
        topic=topic,
        difficulty=difficulty,
    )

    self.prompt_template = prompt

  def encode_prompt(self, prompt_instructions):
    """Encode multiple prompt instructions into a single string."""
    prompt = self.prompt_template + "\n"

    for idx, task_dict in enumerate(prompt_instructions):
        (instruction, output) = task_dict["instruction"], task_dict["output"]
        instruction = re.sub(r"\s+", " ", instruction).strip().rstrip(":")
        prompt += f"###\n"
        prompt += f"{idx + 1}. Instruction: {instruction}\n"
        prompt += f"{idx + 1}. Output: {output}\n"
    prompt += f"###\n"
    prompt += f"{idx + 2}. Instruction:"
    return prompt  

  def openai_completion(
      self, 
      prompts: Union[str, Sequence[str], Sequence[dict[str, str]], dict[str, str]],
      decoding_args={},
      batch_size=1,
      max_instances=sys.maxsize,
      sleep_time=2,
      ):
    is_single_prompt = isinstance(prompts, (str, dict))
    
    if is_single_prompt:
        prompts = [prompts]

    prompts = prompts[:max_instances]
    num_prompts = len(prompts)
    prompt_batches = [
        prompts[batch_id * batch_size : (batch_id + 1) * batch_size]
        for batch_id in range(int(math.ceil(num_prompts / batch_size)))
    ]

    completions = []
    for batch_id, prompt_batch in tqdm(
        enumerate(prompt_batches),
        desc="prompt_batches",
        total=len(prompt_batches),
    ):

      batch_decoding_args = decoding_args

      while True:
        try:
          # batched completion requests
          completion_batch = self.client.completions.create(prompt=prompt_batch, model=self.model_name, **batch_decoding_args)
          choices = completion_batch.choices

          for choice in choices:
            choice["total_tokens"] = completion_batch.usage.total_tokens

          completions.extend(choices)
          break

        except OpenAIError as e:
          logging.warning(f"OpenAIError: {e}.")
          if "Please reduce your prompt" in str(e):
            batch_decoding_args.max_tokens = int(batch_decoding_args.max_tokens * 0.8)
            logging.warning(f"Reducing target length to {batch_decoding_args.max_tokens}, Retrying...")
          else:
            logging.warning("Hit request rate limit; retrying...")
            sleep(sleep_time)  # Annoying rate limit on requests.

    if decoding_args['n'] > 1:
        # make completions a nested list, where each entry is a consecutive decoding_args.n of original entries.
        completions = [completions[i : i + decoding_args['n']] for i in range(0, len(completions), decoding_args['n'])]
    if is_single_prompt:
        # Return non-tuple if only 1 input and 1 generation.
        (completions,) = completions

    return completions

  def mistral_completion(self,):
    # mistral api does not support batched requests
    pass

  def find_word_in_string(w, s):
      return re.compile(r"\b({0})\b".format(w), flags=re.IGNORECASE).search(s)

  def post_process_gpt3_response(self, num_prompt_instructions, response):
    if response is None:
        return []

    raw_instructions = f"{num_prompt_instructions+1}. Instruction:" + response["text"]
    raw_instructions = re.split("###", raw_instructions)
    instructions = []

    for idx, inst in enumerate(raw_instructions):
        # if the decoding stops due to length, the last example is likely truncated so we discard it
        if idx == len(raw_instructions) - 1 and response["finish_reason"] == "length":
            continue

        idx += num_prompt_instructions + 1
        splitted_data = re.split(f"{idx}\.\s+(Instruction|Input|Output):", inst)

        if len(splitted_data) != 7:
            continue
        else:
            inst = splitted_data[2].strip()
            input = splitted_data[4].strip()
            input = "" if input.lower() == "<noinput>" else input
            output = splitted_data[6].strip()

        # filter out too short or too long instructions
        if len(inst.split()) <= 3 or len(inst.split()) > 150:
            continue

        # filter based on keywords that are not suitable for language models.
        blacklist = [
            "image",
            "images",
            "graph",
            "graphs",
            "picture",
            "pictures",
            "file",
            "files",
            "map",
            "maps",
            "draw",
            "plot",
            "go to",
            "video",
            "audio",
            "music",
            "flowchart",
            "diagram",
        ]

        blacklist += []

        if any(self.find_word_in_string(word, inst) for word in blacklist):
            continue
        # We found that the model tends to add "write a program" to some existing instructions, which lead to a lot of such instructions.
        # And it's a bit comfusing whether the model need to write a program or directly output the result.
        # Here we filter them out.
        # Note this is not a comprehensive filtering for all programming instructions.
        if inst.startswith("Write a program"):
            continue
        # filter those starting with punctuation
        if inst[0] in punctuation:
            continue
        # filter those starting with non-english character
        if not inst[0].isascii():
            continue
        instructions.append({"instruction": inst, "input": input, "output": output})

    return instructions

  def generate(self, output_dir='./'):
    seed_tasks = [json.loads(l) for l in open(self.seed_tasks_path, "r")]
    seed_instruction_data = [
        {"instruction": t["instruction"], "output": t["output"]}
        for t in seed_tasks
    ]

    print(f"Loaded {len(seed_instruction_data)} human-written seed instructions")

    os.makedirs(output_dir, exist_ok=True)
    request_idx = 0

    # load the LM-generated instructions (first run shouldn't trigger this)
    machine_instruction_data = []

    if os.path.exists(os.path.join(output_dir, "regen.json")):
        machine_instruction_data = self.jload(os.path.join(output_dir, "regen.json"))
        print(f"Loaded {len(machine_instruction_data)} machine-generated instructions")

    # similarities
    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=False)

    # now let's generate new instructions!
    progress_bar = tqdm(total=self.num_instructions_to_generate)

    if machine_instruction_data:
        progress_bar.update(len(machine_instruction_data))

    # first we tokenize all the seed instructions and generated machine instructions
    all_instructions = [d["instruction"] for d in seed_instruction_data] + [
        d["instruction"] for d in machine_instruction_data
    ]
    all_instruction_tokens = [scorer._tokenizer.tokenize(inst) for inst in all_instructions]

    while len(machine_instruction_data) < self.num_instructions_to_generate:
        request_idx += 1
        batch_inputs = []

        for _ in range(self.request_batch_size):
          # only sampling from the seed tasks
          prompt_instructions = sample(seed_instruction_data, self.num_prompt_instructions)
          prompt = self.encode_prompt(prompt_instructions)

          batch_inputs.append(prompt)

        request_start = time()

        if self.model_provider == "openai":
          decoding_args = {
              "temperature": 0.7,
              "n": 1,
              "max_tokens": 3072,
              "top_p": 1.0,
              # "stop": ["\n20", "20.", "20."],
          }

          results = self.openai_completion(
              prompts=batch_inputs,
              decoding_args=decoding_args,
          )

          request_duration = time() - request_start
          process_start = time()
          
          instruction_data = []

          for result in results:
            new_instructions = self.post_process_gpt3_response(self.num_prompt_instructions, result)
            instruction_data += new_instructions

          total = len(instruction_data)
          keep = 0

          for instruction_data_entry in instruction_data:
            # computing similarity with the pre-tokenzied instructions
            new_instruction_tokens = scorer._tokenizer.tokenize(instruction_data_entry["instruction"])
            with Pool(self.num_cpus) as p:
                rouge_scores = p.map(
                    partial(rouge_scorer._score_lcs, new_instruction_tokens),
                    all_instruction_tokens,
                )
            
            rouge_scores = [score.fmeasure for score in rouge_scores]
            most_similar_instructions = {
                all_instructions[i]: rouge_scores[i] for i in np.argsort(rouge_scores)[-10:][::-1]
            }

            if max(rouge_scores) > 0.7:
                continue
            else:
                keep += 1

            instruction_data_entry["most_similar_instructions"] = most_similar_instructions
            instruction_data_entry["avg_similarity_score"] = float(np.mean(rouge_scores))
            machine_instruction_data.append(instruction_data_entry)
            all_instructions.append(instruction_data_entry["instruction"])
            all_instruction_tokens.append(new_instruction_tokens)
            progress_bar.update(1)

          process_duration = time() - process_start
          print(f"Request {request_idx} took {request_duration:.2f}s, processing took {process_duration:.2f}s")
          print(f"Generated {total} instructions, kept {keep} instructions")
          self.jdump(machine_instruction_data, os.path.join(output_dir, "regen.json"))

        elif self.model_provider == "mistral":
          raise NotImplementedError("Mistral model provider not implemented yet")
        else:
          raise NotImplementedError(f"Model provider {self.model_provider} not implemented")

  def _make_r_io_base(self, f, mode: str):
    if not isinstance(f, io.IOBase):
        f = open(f, mode=mode)
    return f

  def jdump(self, obj, f, mode="w", indent=4, default=str):
    """Dump a str or dictionary to a file in json format.

    Args:
        obj: An object to be written.
        f: A string path to the location on disk.
        mode: Mode for opening the file.
        indent: Indent for storing json dictionaries.
        default: A function to handle non-serializable entries; defaults to `str`.
    """
    f = self._make_w_io_base(f, mode)
    if isinstance(obj, (dict, list)):
        json.dump(obj, f, indent=indent, default=default)
    elif isinstance(obj, str):
        f.write(obj)
    else:
        raise ValueError(f"Unexpected type: {type(obj)}")
    f.close()

  def jload(self, f, mode="r"):
      """Load a .json file into a dictionary."""
      f = self._make_r_io_base(f, mode)
      jdict = json.load(f)
      f.close()
      return jdict

Instruction:
I am preparing a set of exam questions about machine learning theory. 
Generate 50 questions on machine learning theory for me in this jsonl format: 
{"instruction": <Insert exam question>,"context":"","response":"<insert response here>} 
Each json should be begin at a brand new line.

Questions:



In [3]:
self_instruct = SelfInstruct(
    api_key="asdasdasd",
    model_provider="openai",
    model_name="text-davinci-003",
    prompt_template_path="./prompt.txt",
    seed_tasks_path="/content/seed_tasks.jsonl",
    num_instructions_to_generate=100
)

self_instruct.configure_prompt(
    topic='Machine Learning', 
    difficulty='Beginner'
)

Hello! How can I help you today? Is there something you would like to talk about or ask me? I'm here to help answer any questions you might have to the best of my ability. I can provide information on a wide range of topics, including science, history, technology, culture, and more. I can also help with tasks such as generating ideas, solving problems, and providing explanations. Let me know how I can assist you.


In [None]:
self_instruct.generate(output_dir="./")