In [97]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
drive_dir = '/content/drive/MyDrive/Clarifying_Questions_GPT_Research'

In [None]:
!cd {drive_dir} && pip install -r requirements.txt

In [98]:
import openai

import pickle

import random

import copy

import sys

import os

import pprint

In [5]:
with open(drive_dir+"/openai_key.key", "r") as f:
    openai.api_key = f.read()

In [6]:
# generates text using gpt-3.5 given a conversation context
def generateText(context):
  assert isinstance(context, list), f"input is not a context list, got {type(context)}"
  resp = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=context)

  return resp['choices'][0]['message']["content"]

# appends a new entry to a conversation context
def appendContext(text, context, role):
  assert role == "user" or role == "assistant", f"unexpected role, got: {role}"

  if role == "user":
    assert context[-1]["role"] == "assistant", f"incompatible adjecent role, got user"
    context.append({"role": "user", "content": text})
  else:
    assert context[-1]["role"] == "user", f"incompatible adjecent role, got assistant"
    context.append({"role": "assistant", "content": text})

  return

# generates a new question/suggestion given a conversation context
def generateQ(context):
  role = context[-1]["role"]
  assert role == "user", f"unexpected role, got: {role}"

  return generateText(context)

In [107]:
# prompt used to extract the possible responses to a question
def getOptionsPrompt(q):
  return f"Here is a question:\n{q}\n\nTask:\nThe person asking the question presents several options to an assistant. What are the options? List the options using only the words in the question itself."

# given a string that has a python list, extracts and returns said python list
def str2lst(l):
  assert '[' in l, f"string does not seem to contain a list, string: {l}"
  assert ']' in l, f"string does not seem to contain a list, string: {l}"

  l = l.split("[",1)[1]
  l = l.split("]",1)[0]

  if '"' in l:
    l = l.replace('"', '')
  elif "'" in l:
    l = l.replace("'", '')
  else:
    assert 1 == 0, f"string does not seem to contain a list, string: {l}"

  l = l.replace(", ", ',')
  l = l.replace("  ", ' ')

  return l.split(",")

# extracts the possible responses from a given question and returns them in a python list
def getOptions(q):
  context = [{"role": "user", "content":getOptionsPrompt(q)}]

  text_options = generateText(context)
  appendContext(text_options, context, "assistant")

  appendContext("Return options as a python list:", context, "user")

  return str2lst(generateText(context))

In [8]:
def isSuggestion(gen):
    return not '?' in gen and (not ' or ' in gen or '!' in gen) # this still needs a lot of work, but as a mvp it's fine

def isQuestion(gen):
    return ' or ' in gen and '?' in gen # this works surprisingly well, however might need to get more sophisticated

In [9]:
# creates new node; creates edge from parent to new node; returns new node
def expandTree(parent, ans_num, new_id, g_attempts=3):

    # creating node, where None fields will chage depending on what is generated
    node = {
        'id':  new_id, # parent['id'] + ans_num + 1, # index of this node in the tree array

        'parent_id': parent['id'], # index of the parent node in the tree array

        'idx_in_parent': ans_num, # index that represents the edge from parent that goes to this node

        'layer_num': parent['layer_num'] + 1, # depth of this node in the tree

        'flag': None, # flag indicating wether this node contains a question, suggestion, or error

        'generated_text': None, # text that was generated

        'full_context': None, # context (i.e. the history of questions and answers) that lead to this node

        'childrens_option': None, # possible answers (edges) for this node

        'childrens_done_bool': None, # wether the answers (edges) have been used to generate child nodes

        'childrens_id': None, # the indexes of the children nodes in the tree array
    }

    # creating edge from parent to node
    parent['childrens_done_bool'][ans_num] = True
    parent['childrens_id'][ans_num] = node['id']

    # adding edge, i.e. the user's chosen option, to context
    new_context = copy.copy(parent['full_context']) # can create shallow copy because pickle can handle references
    appendContext(parent['childrens_option'][ans_num], new_context, 'user')

    # try to generate a valid question, if succesfull break from loop
    for i in range(g_attempts):

        # IN PREVIOUS CODE, GENERATEQ RECEIVED CONTEXT AND CHOSEN OPTION,
        #generation = generateQ(parent['full_context'], parent['childrens_option'][ans_num])
        generation = generateQ(new_context)

        if isSuggestion(generation):
          node['flag'] = 's'
          break

        if isQuestion(generation):
          node['flag'] = 'q'
          break

    node['generated_text'] = generation
    # PREVIOUS CODE
    #node['full_context'] = f"{parent['full_context']}\nUser:{parent['childrens_option'][ans_num]}\nChatGPT:{generation}"
    appendContext(generation, new_context, 'assistant')
    node['full_context'] = new_context

    if node['flag'] != 's' and node['flag'] != 'q':
        node['flag'] = 'e'

    else:
        #node['flag'] = 'q'
        node['childrens_option'] = getOptions(generation)
        node['childrens_done_bool'] = [False for x in range(len(node['childrens_option']))]
        node['childrens_id'] = [None for x in range(len(node['childrens_option']))]

    return node

In [10]:
default_seed = "Ask me one either or question at a time to help me find out where I want to go travel"

In [11]:
def getFirstNode(seed, options_prompt, g_attempts):

    # if seed is None, set seed to a default
    if not seed:
        seed = default_seed

    # starting context that contains only the seed as an user input
    starting_context = [{"role": "user", "content": seed}]

    # try multiple times to generate a valid question, if succesfull break from loop
    flag = 'e' # flag for wether the generated text is a valid question
    for i in range(g_attempts):

      # generating the first question GPT makes given the seed
      first_q = generateQ(starting_context)

      if isSuggestion(first_q):
        flag = 's'
        break

      if isQuestion(first_q):
        flag = 'q'
        break

    # if the generated question is not valid after g_attempts tried, throw an error
    assert flag != 'e', f"first question was not valid, question: {first_q}"

    # appending first question to the context as the assistant response
    appendContext(first_q, starting_context, "assistant")

    # getting the options from the first question
    options = getOptions(first_q)

    node = {
        'id': 0,

        'parent_id': None,

        'idx_in_parent': None,

        'layer_num': 0,

        'flag': 'q',

        'generated_text': first_q,

        'full_context': starting_context,

        'childrens_option': options,

        'childrens_done_bool': [False for x in range(len(options))],

        'childrens_id': [None for x in range(len(options))]
    }

    return node

In [12]:
def nextInDPS(tree_dic):
    # get id of the next node according to dps
    r = tree_dic['dps_cor'][0]
    c = tree_dic['dps_cor'][1]
    node_id = tree_dic['ids_by_layer'][r][c]

    # if this node is unfinished, return node id
    if node_id in tree_dic['unfinished_ids']:
        return node_id

    # else, pass to this function the next node of the same depth d, or if all nodes in d done, the first node of d + 1
    else:

        if len(tree_dic['ids_by_layer'][r]) - 1 > c:
            tree_dic['dps_cor'] = [r, c+1]
        elif len(tree_dic['ids_by_layer']) - 1 > r:
            tree_dic['dps_cor'] = [r+1, 0]
        else: return None


    return nextInDPS(tree_dic)

In [13]:
# path to pickle that stores tree to load. If there's no path or the pickle is not found, start generating the tree from scratch
pkl_path = '/content/drive/MyDrive/Clarifying_Questions_GPT_Research/Trees'

In [64]:
def prepDic4Save(tree_dic):
  # turn tree_dic pickle serializable by converting the sets to lists
  tree_dic['unfinished_ids'] = list(tree_dic['unfinished_ids'])
  tree_dic['suggestion_ids'] = list(tree_dic['suggestion_ids'])
  tree_dic['error_ids'] = list(tree_dic['error_ids'])
  return

def prepDic4Use(tree_dic):
  # turn lists into sets where we need a set instead of a list
  tree_dic['unfinished_ids'] = set(tree_dic['unfinished_ids'])
  tree_dic['suggestion_ids'] = set(tree_dic['suggestion_ids'])
  tree_dic['error_ids'] = set(tree_dic['error_ids'])
  return

def loadTreePkl(filename):
    # Opening pickle file
    with open(os.path.join(pkl_path, filename), 'rb') as f:
        # Reading from pickle file
        tree_dic = pickle.load(f)
    # turn lists into sets where we need a set instead of a list
    prepDic4Use(tree_dic)
    print('Tree Loaded')

    return tree_dic

def saveTreePkl(tree_dic, filename):
    # turn tree_dic pickle serializable
    prepDic4Save(tree_dic)

     # save tree to a pickle file
    if pkl_path:
        with open(os.path.join(pkl_path, filename), "wb") as f:
            pickle.dump(tree_dic, f)
        print('Tree Saved')

    # turn lists into sets where we need a set instead of a list
    prepDic4Use(tree_dic)

    return

In [109]:
def generate_tree(prompts, gen_num=10, gen_attempts=3, filename=None, tick=None):

    if not tick: tick = gen_num

    try:
        tree_dic = loadTreePkl(filename)

    except:
        print('Starting New Tree')
        first_node = getFirstNode(prompts['seed'],
                                    prompts['extract_options'] if prompts['extract_options'] else "default 'extract_options' prompt", # I have not implemented this yet
                                    gen_attempts)

        tree_dic = {
        'tree': [first_node],

        'prompts': prompts,

        'current_node': 0,

        'dps_cor': [0,0],

        'unfinished_ids': {0},

        'ids_by_layer': [[0]],

        'suggestion_ids': set(),

        'error_ids': set()
        }

    for it in range(gen_num):
        print(it)

        # if the current node is None, that means the tree is completed, so exit out of the loop
        if tree_dic['current_node'] == None: break

        # get current node
        node = tree_dic['tree'][tree_dic['current_node']]

        # get edge options
        options = [idx for idx in range(len(node['childrens_option'])) if not node['childrens_done_bool'][idx]]

        # manage unfinished set
        if len(options) == 1:
            tree_dic['unfinished_ids'].remove(node['id'])
        else:
            tree_dic['unfinished_ids'].add(node['id'])

        # expand the tree
        node = expandTree(node, random.sample(options, k=1)[0], len(tree_dic['tree']), g_attempts)

        # add node to tree
        assert node['id'] == len(tree_dic['tree'])
        tree_dic['tree'].append(node)

        # add to ids_by_layers; append to end of list if this is the first node in this depth
        if len(tree_dic['ids_by_layer']) > node['layer_num']:
            tree_dic['ids_by_layer'][node['layer_num']].append(node['id'])
        else:
            tree_dic['ids_by_layer'].append([node['id']])

        # add to suggestion or error if required, sample next node from unfinished node list
        if node['flag'] == 's':
            tree_dic['suggestion_ids'].add(node['id'])
            tree_dic['current_node'] = nextInDPS(tree_dic)

        elif node['flag'] == 'e':
            tree_dic['error_ids'].add(node['id'])
            tree_dic['current_node'] = nextInDPS(tree_dic)

        else:
            tree_dic['current_node'] = node['id']

        # save to pickle
        if filename and ((it+1) % tick) == 0:
            saveTreePkl(tree_dic, filename)

    # last save into pickle
    saveTreePkl(tree_dic, filename)

    print('Done')
    return tree_dic

In [108]:
g_attempts = 3

node_gen_num = 100 # number of nodes that should be generated before stopping / could be None, setting the stopping time to the default of the next completed branch
save_tick = None # number of nodes until new pickle is saved

fname = 'GPT_test_tree.pkl'
# this is a dictionary of prompts used in the tree generation process. If the value is None, a default option is used instead
prompts = {
    'seed': "Ask me one either or question at a time to help me find out where I want to go travel. Ask me many questions before giving me a suggestion", # the seed prompt that elicits the behaviour of asking clarifying questions from the LLM; default behaviour is a prompt inside the generate_tree function
    'extract_options': None, # the prompt to extract the possible answers to a given question; default behaviour is a prompt inside the generate_tree function
    # Not implemented yet
    'is_suggestion': None, # the prompt used to deetermine wether the LLM generated a suggestion; default behaviour is to see if the string contains "?"
    'is_question': None # the prompt used to deetermine wether the LLM generated a question with options; default behaviour is to see if the string contains " or "
}

In [None]:
tree = generate_tree(prompts, gen_num=79, gen_attempts=3, filename=fname, tick=10)

In [162]:
tree = loadTreePkl(fname)

Tree Loaded


In [163]:
pprint.pprint(f"# of nodes: {len(tree['tree'])}")

'# of nodes: 300'


In [164]:
pprint.pprint(f"max depth of tree: {len(tree['ids_by_layer'])}")
# pprint.pprint(tree['ids_by_layer'])

'max depth of tree: 11'


In [165]:
pprint.pprint(f"# of unfinished nodes: {len(tree['unfinished_ids'])}")
pprint.pprint(f"# of completly explored nodes: {len(tree['tree']) - len(tree['unfinished_ids'])}")
# pprint.pprint(tree['unfinished_ids'])

'# of unfinished nodes: 163'
'# of completly explored nodes: 137'


In [166]:
pprint.pprint(f"# of completed chats: {len(tree['suggestion_ids'])}")
# pprint.pprint(tree['suggestion_ids'])

'# of completed chats: 82'


In [151]:
pprint.pprint(f"# of error nodes: {len(tree['error_ids'])}")
pprint.pprint(tree['error_ids'])

'# of error nodes: 1'
{163}


In [167]:
node_id = random.sample([i for i in range(len(tree['tree']))], k=1)[0]

node_id = random.sample(tree['suggestion_ids'], k=1)[0]

since Python 3.9 and will be removed in a subsequent version.
  node_id = random.sample(tree['suggestion_ids'], k=1)[0]


In [168]:
pprint.pprint(tree['tree'][node_id])

{'childrens_done_bool': [False],
 'childrens_id': [None],
 'childrens_option': ['Alps within your home country'],
 'flag': 's',
 'full_context': [{'content': 'Ask me one either or question at a time to help '
                              'me find out where I want to go travel. Ask me '
                              'many questions before giving me a suggestion',
                   'role': 'user'},
                  {'content': "Sure! Here's the first question: Do you prefer "
                              'beach vacations or mountain getaways?',
                   'role': 'assistant'},
                  {'content': 'mountain getaways', 'role': 'user'},
                  {'content': "Great! Here's the next question: Are you more "
                              'interested in hiking and outdoor activities or '
                              'exploring quaint towns and cultural sights?',
                   'role': 'assistant'},
                  {'content': 'Exploring quaint towns and cul