# Question the database

The code belows allow questions to be submitted against the embedded fact information in the XBRL US database.  This includes a subset around 63,000 facts.

To start, click on Runtime->Run all above, wait for the runtime to build, then answer the prompts using your credentials from the test environment and the OpenAI key given to you.  If there are any issues, contact Marc.

View the textblock near the end for more use infromation.

In [130]:
!pip install openai tiktoken
from openai import OpenAI
import sys
import tiktoken
import requests
import getpass
import urllib
from urllib.parse import urlencode

email = None
url = 'https://testapi.xbrl.us/oauth2/token'

class userinfo:
    def __init__(self):
        self.access_token = ''
        self.refresh_token = ''

user = userinfo()

In [None]:

# Input setup
if email is None: 
    email = input("Enter your username (email).")
    password = getpass.getpass(prompt='Password: ')
    clientid = getpass.getpass(prompt='Client ID: ')
    secret = getpass.getpass(prompt='Secret: ')
    ai_key = getpass.getpass(prompt='Enter OpenAI Key: ')



body_auth = {'username' : ''.join(email), 
            'client_id': ''.join(clientid), 
            'client_secret' : ''.join(secret), 
            'password' : ''.join(password), 
            'grant_type' : 'password', 
            'platform' : 'ipynb' }


payload = urlencode(body_auth)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
res = requests.request("POST", url, data=payload, headers=headers)
auth_json = res.json()

if 'error' in auth_json:
    print ("\n\nThere was a problem generating an access token with these credentials. Run the second cell again to enter credentials.")
    email = None
    sys.exit()
else:
    user.access_token = auth_json['access_token']
    user.refresh_token = auth_json['refresh_token']
    newaccess = newrefresh = ''
    password = None
    print ("\n\nYour credentials expire in 60 minutes. After it expires, use the 'r' command to refresh them.")


In [171]:
def get_embedding(value):
    client = OpenAI(api_key=ai_key)

    return client.embeddings.create(input=value, model='text-embedding-3-small').data[0].embedding


def get_content(question):
    search_endpoint = 'https://testapi.xbrl.us/api/v1/ai/getcontent'
    params = {"question" : str(question)}

    res = requests.post(search_endpoint, data=params, headers={'Authorization' : 'Bearer {}'.format(user.access_token)})
    res_json = res.json()
    if 'error' in res_json:
        for x in range(3):
            success = get_refresh_token(user.refresh_token)
            if success: 
                res = requests.post(search_endpoint, data=params, headers={'Authorization' : 'Bearer {}'.format(user.access_token)})
                res_json = res.json()
                break
        if not success:
            print("The credentials have expired.  Click on Runtime->Run all and reenter your credentials.b")
            return False
             
    return res.text


def answer_question(question, content):

    tokenizer = tiktoken.get_encoding("cl100k_base")
    client = OpenAI(api_key=ai_key)
    max_len = 12192

    while True:
        if len(tokenizer.encode(content)) <= max_len:
            break
        else: 
            content = content[:int(len(content) * .9)]

    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages = [
                {"role": "system", "content": "Answer the question based on the context below, and if the question can't be answered based on the context, say \"I don't know\"\n\n"},
                {"role": "user", f"content": f"Context: {content}\n\n---\n\nQuestion: {question}\nAnswer:"}
            ],
            max_tokens=150,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            stop=None        
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        print(e)
        return ""  
    

def get_refresh_token(token):
    refresh_auth = {'client_id': ''.join(clientid), 
                'client_secret' : ''.join(secret), 
                'grant_type' : 'refresh_token', 
                'platform' : 'ipynb', 
                'refresh_token' : ''.join(token) }
    refreshres = requests.post(url, data=refresh_auth)
    refresh_json = refreshres.json()
    if 'error' in refresh_json:
        return False
    else:
        user.access_token = refresh_json['access_token']
        user.refresh_token = refresh_json['refresh_token']#print('access token: ' + access_token + 'refresh token: ' + refresh_token)
        return True
   
def chunkstring(string, length):
    return (string[0+i:length+i] for i in range(0, len(string), length))
    


### Instructions

Enter your questions into the prompt.  After a few seconds the Question and Answer will appear.  There are also a few commands that can be entered as well.

### Commands
 
 - p : Toggle printing of the context that was generated and submitted to provide a basis for the answer
 - r : Refresh Token.  Only use when you aren't receiving a response
 - exit: Exit the application.  Questioning can be restarted by running the code block below.  If credentials need to be reentered, click Runtime->Run all.

In [None]:
# Ask Questions
print_content = False
history = {}


while True:
    question = input("Enter your question (type 'help' for menu): ")

    match question.lower():
        case 'p':
            print_content = not print_content
            print("The generated content will {}".format('print' if print_content else 'not print.'))
        case 'r':
            success = get_refresh_token(user.refresh_token)
            if success:
                print("Refreshed Token")
        case 'l':
            while True:
                valid = []
                print('Question Listing\n')
                for index, question in enumerate(history.keys()):
                    print("%d) %s" % (index+1, question))
                    valid.append(index+1)
                print('q) Return to questions')
                print('')
                ans = input('Which question do you want the answer for? (q to return to questions)')
                match ans.lower():
                    case 'q':
                        break
                    case _:
                        try:
                            int_a = int(ans)
                        except:
                            continue
                        if int_a in valid:
                            question = list(history.keys())[int_a-1]
                            print("\nQUESTION:\n{}\n\nANSWER: \n{}\n\n".format(question, history[question]))
                        else: 
                            print(valid)
        case 'exit':
            print('quitting...')
            sys.exit()
        case 'help':
            print('Help Menu')
            print('l - list question history')
            print('p - Toggle printing output')
            print('r - refresh token')
            print('exit - quit program')
            print('help - This menu')
            print('')
        case _:
            ques_embedded = get_embedding(question)
            info = get_content(ques_embedded)
            if info is not False:
                answer = answer_question(question, info)
                if print_content:
                    content_array = info.split("\\n")
                    for mline in content_array:
                        for line in chunkstring(mline, 120):
                            print("{}".format(line))
                    print("\n###\n\n")
                    #print("\nCONTENT:\n{}\n###\n\n".format(info))
                history[question] = answer
                print("\nQUESTION:\n{}\n\nANSWER: \n{}\n\n".format(question, answer))
                print("")



