In [1]:
from pathlib import Path
from typing import List, Sequence, Union, Optional, Mapping
import enum
import logging

import openai
import tiktoken
import pandas as pd
import numpy as np

from openai.embeddings_utils import distances_from_embeddings

In [20]:
class ChatBot:

    class Language(enum.Enum):
        EN = 'en',
        IT = 'it',

    PROMPT_TEMPLATES = {
        'en': 
            "Answer the question based on the context below, and if the question can't be "
            "answered based on the context, say \"I don't know.\"\n\n"
            "Context: {context}\n\n",
        'it':
            "Rispondi alla domanda con in base al contesto sottostante e, se non è possibile "
            "rispondere alla domanda in base al contesto, dì \"Non lo so.\"\n\n"
            "Contesto: {context}\n\n"
    }

    def __init__(self, 
        kb_name: str,
        kb_language: Union[Language, str],
        openai_api_key: str,
        tokenizer_encoding: str = 'cl100k_base',
        embeddings_engine: str = 'text-embedding-ada-002',
        model_name: str = 'gpt-3.5-turbo',
        max_prompt_len: int = 1800,
        max_kb_article_len: int = 50,
        max_response_len: int = 150,
    ) -> None:
        self.logger = logging.getLogger(self.__class__.__name__)
        openai.api_key = openai_api_key
        self.kb_name = kb_name
        self.kb_language = str(kb_language)
        if self.kb_language not in self.PROMPT_TEMPLATES:
            raise ValueError(
                f'Unsupported knowledge base langue "{self.kb_language}". '
                f'Supported languages: {", ".join(self.PROMPT_TEMPLATES.keys())}'
            )
        self.data_folder = Path('data') / kb_name
        self.kb_path = self.data_folder / Path('kb.txt')
        self.embeddings_path = self.data_folder / Path('embeddings.csv')
        self.tokenizer = tiktoken.get_encoding(tokenizer_encoding)
        self.embeddings_engine_name = embeddings_engine
        self.model_name = model_name
        self.max_prompt_len = max_prompt_len
        self.max_kb_article_len = max_kb_article_len
        self.max_response_len = max_response_len
        self.history: List[Mapping[str, str]] = []
        if self.embeddings_path.is_file():
            self.logger.info('Embeddings file found, trying to load it ...')
            df = pd.read_csv(self.embeddings_path)
            df['embeddings'] = df['embeddings'].apply(eval).apply(np.array)
            self.embeddings = df
            self.logger.info('Embeddings were successfully loaded.')
        else:
            self.logger.info(
                'Embeddings file not found, creating embeddings with OpenAI services. '
                'This might take several minutes based on the knowledge base size ...'
            )
            lines = self._shortened(self.kb_path, max_tokens=self.max_kb_article_len)
            self.embeddings = self._create_embeddings(
                output_path=self.embeddings_path, 
                lines=lines
            )
            self.logger.info('Embeddings were successfully created.')

    def _split_into_many(self, text: str, max_tokens: int = 500) -> List[str]:
        ''' Function to split the text into chunks of a maximum number of tokens '''

        # Split the text into sentences
        sentences = text.split('. ')

        # Get the number of tokens for each sentence
        n_tokens = [len(self.tokenizer.encode(" " + sentence)) for sentence in sentences]
        
        chunks = []
        tokens_so_far = 0
        chunk = []

        # Loop through the sentences and tokens joined together in a tuple
        for sentence, token in zip(sentences, n_tokens):

            # If the number of tokens so far plus the number of tokens in the current sentence is 
            # greater than the max number of tokens, then add the chunk to the list of chunks and 
            # reset the chunk and tokens so far
            if tokens_so_far + token > max_tokens:
                chunks.append(". ".join(chunk) + ".")
                chunk = []
                tokens_so_far = 0

            # If the number of tokens in the current sentence is greater than the max number of 
            # tokens, go to the next sentence
            if token > max_tokens:
                continue

            # Otherwise, add the sentence to the chunk and add the number of tokens to the total
            chunk.append(sentence)
            tokens_so_far += token + 1

        return chunks
        
    def _shortened(self, kb_path, max_tokens: int = 500) -> List[str]:
        ''' Ensures each kb article is shorter than max_tokens number of tokens. '''
        shortened = []
        with open(kb_path) as f:
            for line in f:
                line = line.strip()
                # If the text is None, go to the next row
                if line is None:
                    continue

                # If the number of tokens is greater than the max number of tokens, split the text 
                # into chunks
                if len(self.tokenizer.encode(line)) > max_tokens:
                    shortened += self._split_into_many(line, max_tokens=max_tokens)
                
                # Otherwise, add the text to the list of shortened texts
                else:
                    shortened.append(line)
        return shortened
    
    def _create_embeddings(self, output_path: Union[str, Path], lines: Sequence[str]) -> pd.DataFrame:
        ''' Create embeddings for kb articles using OpenSi services. '''
        df = pd.DataFrame(lines, columns = ['text'])
        df['embeddings'] = df.text.apply(
            lambda x: openai.Embedding.create(
                input=x, 
                engine=self.embeddings_engine_name
            )['data'][0]['embedding']
        )
        df['n_tokens'] = df.text.apply(
            lambda x: len(self.tokenizer.encode(" " + x))
        )
        df.to_csv(output_path, index=False)
        return df
    
    def create_context(self, question: str) -> str:
        """
        Create a context for a question by finding the most similar context from the embeddings
        """

        # Get the embeddings for the question
        q_embeddings = openai.Embedding.create(
            input=question, 
            engine=self.embeddings_engine_name
        )['data'][0]['embedding']
        
        df = self.embeddings
        # Get the distances from the embeddings
        df['distances'] = distances_from_embeddings(
            q_embeddings, 
            df['embeddings'].values, 
            distance_metric='cosine'
        )

        returns = []
        cur_len = 0

        # Sort by distance and add the text to the context until the context is too long
        for _, row in df.sort_values('distances', ascending=True).iterrows():
            
            # Add the length of the text to the current length
            cur_len += row['n_tokens'] + 4
            
            # If the context is too long, break
            if cur_len > self.max_prompt_len:
                break
            
            # Else add it to the text that is being returned
            returns.append(row["text"])

        # Return the context
        return "\n\n###\n\n".join(returns)
    
    def answer_question(self, 
        question: str, 
        stop_sequence: Optional[str] = None,
        temperature: float = 0.0,
        debug=False
    ) -> str:
        """Answer a question based on the most similar context from the kb."""
        context = self.create_context(question)
        self.logger.debug("Context:\n%s", context)
        if debug:
            print("Context:\n{}".format(context))
        prompt = self.PROMPT_TEMPLATES[self.kb_language].format(context=context, question=question)

        messages = [
            {"role": "system", "content": prompt}
        ]
        messages += self.history
        messages += [{"role": "user", "content": question}]

        try:
            # Create a completions using the question and context        
            api_resp = openai.ChatCompletion.create(
                messages=messages,
                temperature=temperature,
                stop=stop_sequence,
                max_tokens=self.max_response_len,
                top_p=1,
                frequency_penalty=0,
                presence_penalty=0,
                model=self.model_name,
            )
            response = api_resp["choices"][0]["message"]["content"].strip()
            self.history.append({"role": "user", "content": question})
            self.history.append({"role": "assistant", "content": response})
            return response
        except Exception as e:
            self.logger.exception(e)
            return ""

In [3]:
import getpass
openai_api_key = getpass.getpass('Enter OpenAI API key:')

In [23]:
agent = ChatBot('empatair', 'it', openai_api_key, max_prompt_len=400)

In [26]:
agent.answer_question("Si possono portare animali sui voli?", debug=True)

Context:
Posso portare oggetti di valore a bordo? Sì, puoi portarli.

###

E se ho una borsa più piccola? Posso portare il mio zainetto a bordo? I passeggeri possono viaggiare con un bagaglio a mano gratuito.

###

Posso portare sia l'attrezzatura da sci sia quella per snowboard con me? No, è possibile portare solo un set di attrezzatura sportiva per passeggero per volo.

###

Viene garantito che l'attrezzatura sportiva viaggerà con me?" No, purtroppo è previsto un limite per le attrezzature sportive per ciascun volo.

###

Posso portare l'attrezzatura da sci/snowboard con me? Sì, puoi portare la tua attrezzatura da sci/snowboard come bagaglio da stiva.

###

Posso usare il mio dispositivo per l'ossigeno durante il volo? No, il contenitore per l'ossigeno non può essere portato a bordo.

###

Sono cieco. Posso portare il mio cane con me? Sì, puoi portare gratuitamente il tuo cane guida.

###

Carrozzine e passeggini devono essere imbarcati con gli altri bagagli e verranno trasportati gr

'Come ho detto in precedenza, dipende dalle politiche della compagnia aerea e dalle normative del paese di destinazione. In generale, alcune compagnie aeree consentono il trasporto di animali domestici, ma potrebbero essere previste restrizioni o costi aggiuntivi. Tuttavia, per quanto riguarda gli animali non domestici, come ad esempio animali esotici o selvatici, di solito non è consentito il loro trasporto sui voli commerciali. Ti consiglio di contattare direttamente la compagnia aerea per maggiori informazioni.'

In [12]:
agent.answer_question("Come si chiama ?")

"Mi dispiace, non ho informazioni riguardo all'età di un contadino. Tuttavia, l'età di un contadino non è rilevante per i voli aerei."