### Assistant API

- use assistant api, upload the entire document and do QA on entire document
- https://platform.openai.com/docs/assistants/how-it-works/objects
- https://platform.openai.com/docs/api-reference/assistants

In [1]:
import os,sys,getpass
sys.path.insert(0,'../../libs')
import time
import pandas as pd 
from utils import load_json,logging,exception_handler
from llm_utils import get_oai_fees
import json
import openai
from openai import OpenAI
## load API Key
key = load_json('/root/workspace/key/openai_key.json') 
os.environ['OPENAI_API_KEY'] = key['ISR']['API_KEY']
#os.environ["OPENAI_API_KEY"] = getpass.getpass(prompt='OpenAI API Token:')


### Create a basic File manager to mange files 

In [2]:
class OpenAI_File_Manager(object):
    def __init__(self, client=None, api_key=None):
        if not api_key:
            api_key = os.environ['OPENAI_API_KEY']
        if not client:
            client = OpenAI(api_key=api_key)
        self.client = client
        
    def upload_file(self,up_file_path,purpose='assistants'):
        """upload a document to openai and return doc id and name"""
        assert os.path.exists(up_file_path)
        file = self.client.files.create(
            file=open(up_file_path,'rb'),
            purpose=purpose
        )
        return(file.id,file.filename)
    
    def _get_file_list(self):
        ''' retrieve file list '''
        file_list = self.client.files.list().data
        return file_list 
    
    def _get_file_info(self):
        files_data = self._get_file_list()
        files_data_dict = [i.dict() for i in files_data]
        files_info = pd.DataFrame(files_data_dict)
        
        return files_info
    
    def get_files_by_ids(self,file_ids):
        if not isinstance(file_ids,list):
            file_ids=[file_ids]
        res = []
        for f_i in file_ids:
            f_r = self.client.files.retrieve(f_i)
            res.append(f_r)
        
        return res
    
    def delete_files_by_ids(self,file_ids):
        if not isinstance(file_ids,list):
            file_ids=[file_ids]
            
        files_info_df = self._get_file_info()
        db_file_ids = files_info_df['id'].values
        res = []
        for f_i in file_ids:
            if f_i in db_file_ids:
                self.client.files.delete(f_i)
                res.append(f_i)
            else:
                print("{} does not exist on openai server, please double check.".format(f_i))
            
        if len(res)>0:
            print("{} has been removed from file server.".format(res))
            
    def get_files_info_by(self,filter_criteria={},return_fields=['id','filename'],to_dict=True,to_single_list=True):
        """
        Filter file info based on a dictionary of criteria.
        Parameters:
        filter_criteria (dict): A dictionary where keys are column names and values are lists of column values to filter by.
        Returns:
        pd.DataFrame or dict: Filtered DataFrame.
        """
        files_info_df = self._get_file_info()
        
        for column, values in filter_criteria.items():
            if column in files_info_df.columns:
                files_info_df = files_info_df[files_info_df[column].isin(values)]
            else:
                raise ValueError(f"Column '{column}' not found in DataFrame")
            
        if return_fields:
            files_info_df = files_info_df[return_fields]
        
        if to_dict:
            files_info_df = files_info_df.to_dict(orient='records')
        
        if len(return_fields)==1 and to_dict and to_single_list:
            files_info_df = [i.get(return_fields[0]) for i in files_info_df]
            
        return files_info_df
    
    

- list all submitted documents

In [3]:
FM = OpenAI_File_Manager()


In [4]:
FM._get_file_info()

Unnamed: 0,id,bytes,created_at,filename,object,purpose,status,status_details
0,file-bPuZpeGbCK6bN24DVyAE4Epd,3466567,1704250068,USA_2022.pdf,file,assistants,processed,


- upload a local file 

In [5]:
up_file_path = '/root/workspace/data/DOCs/PDF/Belgium_2022.pdf'
fid,fname=FM.upload_file(up_file_path,purpose='assistants')
print(fid,fname)

file-3CP4PI8wViYusvF18IC0D7Th Belgium_2022.pdf


- filter and get doc info by filters 

In [6]:
## get file ids by name
filter_criteria={'filename':['Belgium_2022.pdf']} #USA_2022.pdf
sampled_file_ids = FM.get_files_info_by(filter_criteria,return_fields=['id'],to_dict=True,to_single_list=True)
print(sampled_file_ids)

['file-3CP4PI8wViYusvF18IC0D7Th']


- delete files 

In [7]:
FM.delete_files_by_ids(file_ids=sampled_file_ids[0])

['file-3CP4PI8wViYusvF18IC0D7Th'] has been removed from file server.


### Create a Base Assistant object to access basic functionalities 

In [8]:
class OpenAIAssistant_Base():
    def __init__(self, client=None,api_key=None):
        if not api_key:
            api_key = os.environ['OPENAI_API_KEY']
        if not client:
            client = OpenAI(api_key=api_key)
        self.client = client
        self.assistant = None
        
        self.FileManager = OpenAI_File_Manager(client=client,api_key=api_key)
    
    def _set_active_assistant(self,current_assistant):
        self.assistant = current_assistant
        print('set {} as current active assistant.'.format(current_assistant.name,))
    
    def create_assistant(self,name,description,model="gpt-4-1106-preview",tools=[{"type":"retrieval"}],set_to_current=True,**kwargs):
        new_assistant = self.client.beta.assistants.create(
                        #instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.",
                        name=name,
                        description=description,
                        tools=tools,
                        model=model,
                        **kwargs
                        )
        
        if set_to_current:
            self._set_active_assistant(new_assistant)
            print('New assistant created and set to current')
        return new_assistant
    
    def update_current_assistant(self,**kwargs):
        if self.assistant:
            self.assistant = self.client.beta.assistants.update(
                self.assistant.id,
                **kwargs
                # instructions="You are an HR bot, and you have access to files to answer employee questions about company policies. Always response with info from either of the files.",
                # name="HR Helper",
                # tools=[{"type": "retrieval"}],
                # model="gpt-4",
                # file_ids=["file-abc123", "file-abc456"],
                )
        else:
            raise('Current Assistant not set, please use _set_active_assistant to set current assistant.')
            
    def delete_assistants_by_ids(self,as_ids):
        if not isinstance(as_ids,list):
            as_ids=[as_ids]
            
        as_info_df = self._get_assistant_info()
        db_as_ids = as_info_df['id'].values
        res = []
        for a_i in as_ids:
            if a_i in db_as_ids:
                self.client.beta.assistants.delete(a_i)
                res.append(a_i)
            else:
                print("{} does not exist on openai server, please double check.".format(a_i))
            
        if len(res)>0:
            print("{} has been removed from file server.".format(res))
        
    def _get_assistant_list(self):
        ''' retrieve assistant list '''
        a_list = self.client.beta.assistants.list(
            order="desc",
            #limit="20"
        )
        
        return a_list
    
    def _get_assistant_info(self):
        """Get assistants meta info"""
        a_data = self._get_assistant_list()
        a_data_dict = [i.dict() for i in a_data]
        a_info = pd.DataFrame(a_data_dict)
        
        return a_info
    
    def get_assistants_by_ids(self,a_ids):
        if not isinstance(a_ids,list):
            a_ids=[a_ids]
        res = []
        for a_i in a_ids:
            a_r = self.client.beta.assistants.retrieve(a_i)
            res.append(a_r)
        if len(res) == 1:
            res = res[0]
        return res
    
    def get_assistants_info_by(self,filter_criteria={},return_fields=['id','name'],to_dict=True,to_single_list=True):
        """
        Filter file info based on a dictionary of criteria.
        Parameters:
        filter_criteria (dict): A dictionary where keys are column names and values are lists of column values to filter by.
        Returns:
        pd.DataFrame or dict: Filtered DataFrame. or a list 
        """
        as_info_df = self._get_assistant_info()
        
        for column, values in filter_criteria.items():
            if column in as_info_df.columns:
                as_info_df = as_info_df[as_info_df[column].isin(values)]
            else:
                raise ValueError(f"Column '{column}' not found in DataFrame")
            
        if return_fields:
            as_info_df = as_info_df[return_fields]
        
        if to_dict:
            as_info_df = as_info_df.to_dict(orient='records')
        
        if len(return_fields)==1 and to_dict and to_single_list:
            as_info_df = [i.get(return_fields[0]) for i in as_info_df]
            
        return as_info_df
    
    def quick_run(self,user_input_dict,**kwargs):
        if self.assistant:
            run = self.client.beta.threads.create_and_run(
                        assistant_id=self.assistant.id,
                        thread={
                            "messages": [
                                            user_input_dict
                                            #{"role": "user", "content": "Explain deep learning to a 5 year old."} ## not sure if you can add files here 
                                        ]
                        },
                        **kwargs
                        #instructions= 'update system instruction on the fly '
                    )
        else:
            raise('no active assistant set, please use _set_activate_assistant to activate an assistant.')
        
        return run
    
    def _get_finished_run(self,initial_run):
        start_time = time.time()
        while True:
            time.sleep(1)
            run = self.client.beta.threads.runs.retrieve(thread_id=initial_run.thread_id, run_id=initial_run.id)
            elapsed_time = time.time() - start_time
            print(f"\rElapsed time: {elapsed_time:.2f} s || Status: {run.status}    ", end="", flush=True)
            if run.status in ['completed', 'failed', 'requires_action']:
                return run
            
    def _parse_return_message(self,run):
        ## retrieve the message 
        return_messages = self.client.beta.threads.messages.list(
            thread_id=run.thread_id
        )
        # for each in return_messages:
        #   print(each.role+": {}".format(each.content[0].text.value))
        #   print("=============")
        return return_messages.data[0].content[0].text.value

    def quick_query(self,user_input_dict,**kwargs):
        """
        A quick query from scratch, no conversation history is used 
        
        Args:
            user_input_dict (_type_): a dictionary with system message and user message

        Returns:
            json string : a json string of responses 
        """
        init_run = self.quick_run(user_input_dict,**kwargs)
        post_run = self._get_finished_run(init_run)
        res = self._parse_return_message(post_run)
        
        return res
    
    def quick_query_and_parse(self,**kwargs):
        
        raise NotImplementedError("This function is a placeholder and needs to be implemented.")
    

In [9]:
OAI_Agent = OpenAIAssistant_Base()

- create a new assistant 

In [10]:
OAI_Agent.create_assistant(name='ISR Test Bot',
                      description='this is just a test assistant',
                      instructions="You are a personal math tutor. When asked a question, write and run Python code to answer the question.")
print(OAI_Agent.assistant.name , OAI_Agent.assistant.id)

set ISR Test Bot as current active assistant.
New assistant created and set to current
ISR Test Bot asst_FVLC0kuZlTg4dRIRZ6ay6dvD


- update an assistant 

In [11]:
OAI_Agent._get_assistant_info()

Unnamed: 0,id,created_at,description,file_ids,instructions,metadata,model,name,object,tools
0,asst_FVLC0kuZlTg4dRIRZ6ay6dvD,1704470138,this is just a test assistant,[],You are a personal math tutor. When asked a qu...,{},gpt-4-1106-preview,ISR Test Bot,assistant,[{'type': 'retrieval'}]
1,asst_NRAvnu7V68Pd5DyPSXFoTs45,1703463015,,[],You are an advanced AI with access to a wide r...,{},gpt-4-1106-preview,Paper Organizer,assistant,[{'type': 'code_interpreter'}]


- get assistant meta info by filter criteria

In [12]:
filter_criteria={'name':['ISR Test Bot']}
as_id = OAI_Agent.get_assistants_info_by(filter_criteria,return_fields=['id'])
print(as_id)

['asst_FVLC0kuZlTg4dRIRZ6ay6dvD']


- retrieve assistant by ids 

In [13]:
## retrieve by id 
assit = OAI_Agent.get_assistants_by_ids(as_id[0])
print(assit.name, assit.id )

## set the retrieved assistant as current active assistant 
OAI_Agent._set_active_assistant(assit)

ISR Test Bot asst_FVLC0kuZlTg4dRIRZ6ay6dvD
set ISR Test Bot as current active assistant.


- delete assistants by ids 

In [14]:
# del_as_ids = as_id
# OAI_Agent.delete_assistants_by_ids(as_id)

- give assistant instructions; model ; tools and file 

In [15]:
filter_criteria={'filename':['USA_2022.pdf']} 
sampled_file_ids = OAI_Agent.FileManager.get_files_info_by(filter_criteria,return_fields=['id'],to_dict=True,to_single_list=True)
print(sampled_file_ids)

['file-bPuZpeGbCK6bN24DVyAE4Epd']


In [16]:
ass_info_dict = {
                'instructions':"You are an experienced IMF economist. Your main job is to review and analyze IMF country staff reports. Use your knowledge base to best respond to user questions",
                'name':"ISR Test Bot",
                'tools':[{"type": "retrieval"}],
                'model':"gpt-4-1106-preview",
                'file_ids':sampled_file_ids ### USA file id
                }
OAI_Agent.update_current_assistant(**ass_info_dict)

In [17]:
prompt_identify_risks = """
You are an economist in the IMF. Your main task is to review Country Staff reports. 
Please read carefully the document. Please determine all major macro critical downside or upside risks described in the document. Pay more attention to common macro critical risks like inflation, geopolitics, trade, climate etc. 
Please return all identified risks in a JSON object. 

Output is a JSON object with the following format:
"risks": [
    {{"risk_name": "<risk1>", "risk_description": "<risk_description1>"}}, 
    {{"risk_name": "<risk2>", "risk_description": "<risk_description2>"}},
    ......
    ]

Please proceed with the task, keeping in mind the importance of accuracy and clarity in your analysis. Return the JSON object only and nothing else. 

"""
user_input_dict = {"role":"user",
                    "content":prompt_identify_risks
                    }
msg = OAI_Agent.quick_query(user_input_dict)#,instructions="You are a testing bot. For every user question. Just say 'I don't know'") #quick_query(self,user_input_dict,**kwargs

Elapsed time: 55.71 s || Status: completed      

In [18]:
print(msg)

```json
"risks": [
    {
        "risk_name": "Geopolitical tensions and sanctions",
        "risk_description": "Russia’s invasion of Ukraine leads to the broadening of sanctions impacting oil, gas, and food sectors, combined with Russian countersanctions and impacts on global financial and trading systems, resulting in higher commodity prices, refugee migration, tighter financial conditions, and other adverse spillovers, particularly affecting LICs and commodity-importing EMs"
    },
    {
        "risk_name": "Rising and volatile food and energy prices",
        "risk_description": "Commodity prices are volatile and tend to increase due to supply constraints, the war in Ukraine, export restrictions, and currency depreciations, leading to short-run disruptions in the green transition, bouts of price and real sector volatility, food insecurity, social unrest, and acute crises, especially in EMDEs with limited fiscal space"
    },
    {
        "risk_name": "China's economic slowdown",

- now let's try change a file id see if it still works 

In [19]:
up_file_path = '/root/workspace/data/DOCs/PDF/Indonesia_2023.pdf'
fid,fname=OAI_Agent.FileManager.upload_file(up_file_path,purpose='assistants')

In [20]:
## get a different document as use it as reference
filter_criteria={'filename':['Indonesia_2023.pdf']} 
sampled_file_ids = OAI_Agent.FileManager.get_files_info_by(filter_criteria,return_fields=['id'],to_dict=True,to_single_list=True)
print(sampled_file_ids)


['file-PG4bC7c6m9Aa7017oMQbWoNu']


In [21]:
ass_info_dict = {
                'file_ids':sampled_file_ids ### Belgium_ file id
                }
OAI_Agent.update_current_assistant(**ass_info_dict)
print(OAI_Agent.assistant.file_ids)

['file-PG4bC7c6m9Aa7017oMQbWoNu']


In [22]:
OAI_Agent.FileManager._get_file_info()

Unnamed: 0,id,bytes,created_at,filename,object,purpose,status,status_details
0,file-PG4bC7c6m9Aa7017oMQbWoNu,7302895,1704470314,Indonesia_2023.pdf,file,assistants,processed,
1,file-bPuZpeGbCK6bN24DVyAE4Epd,3466567,1704250068,USA_2022.pdf,file,assistants,processed,


In [23]:
msg = OAI_Agent.quick_query(user_input_dict)
print(msg)

Elapsed time: 38.74 s || Status: completed      ```json
{
  "risks": [
    {
      "risk_name": "Financial Shocks due to Investor Risk Appetite",
      "risk_description": "Shallow FX markets in Indonesia make it vulnerable to financial shocks caused by sudden changes in investor risk appetite, resulting in capital outflows, exchange rate depreciation, volatility, and a spike in the uncovered interest parity (UIP) premium."
    },
    {
      "risk_name": "Global Supply Shocks",
      "risk_description": "Global supply shocks can lead to higher inflation and tightening of monetary policy in advanced economies. For Indonesia, this may mean a negative impact with currency depreciation, spikes in domestic inflation, and capital outflows from emerging markets."
    },
    {
      "risk_name": "Tradeoffs from Policy Response to Inflation and Exchange Rate Shocks",
      "risk_description": "Policy responses to inflation and exchange rate shocks, such as tightening monetary policy, can creat

- system instructions:

You are an economist in the IMF. Your main task is to review Country Staff reports.  You should read the document in your knowledge base carefully and follow user instruction to complete the task.

- User message 

Please determine all major macro critical risks described in the document. Pay more attention to common macro critical risks like inflation, macro stability, financial market volatility, debt, geopolitics, trade, climate, and other country specific risks etc. Please return all identified risks in a JSON object.

Output is a JSON object with the following format: "risks": [ {{"risk_name": "<risk1>", "risk_description": "<risk_description1>"}}, {{"risk_name": "<risk2>", "risk_description": "<risk_description2>"}}, ...... ]

Please proceed with the task, keeping in mind the importance of accuracy and clarity in your analysis. Return the JSON object only and nothing else.