In this notebook I will show you how you can create an LLM based chatbot with Sentiment Analysis using OpenVINO.

First we are going to import [OpenVINO](https://docs.openvino.ai/2023.1/api/ie_python_api/api.html):

In [None]:
import openvino as ov

For this project we will be using pre-trained models from huggingface. There you can find models for a variety of tasks, including the ones we are interested in.

In particular we are going to use [RedPajama-INCITE-Chat-3B-v1](https://huggingface.co/togethercomputer/RedPajama-INCITE-Chat-3B-v1) for the chat component, and [distilbert-base-uncased-finetuned-sst-2-english](https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english) for the sentiment analysis. You can visit those links to read more about the models.

One of the simplest ways to work with huggingface models and OpenVINO is to use the [optimum](https://huggingface.co/docs/optimum/intel/index) library, which automates a lot of things for you.

Depending on the task at hand, you will need to include different helpers from the optimum library. For text generation (chat), we will be using OVModelForCausalLM, and for Sentiment Analysis we will be using OVModelForSequenceClassification. You can see these defined here: https://huggingface.co/docs/optimum/intel/inference

In [None]:
from optimum.intel.openvino import OVModelForCausalLM, OVModelForSequenceClassification

Once you find your models from huggingface, make sure to copy the model names as you can see from the top of the website in the previous links. These are the ones we will be using:


In [None]:
model_name_chat = "togethercomputer/RedPajama-INCITE-Chat-3B-v1"
model_name_sentiment = "distilbert-base-uncased-finetuned-sst-2-english"

We can now simply use the helpers for each one of the models. Under the hood these functions will download and convert the models so that they can be used directly from OpenVINO.

For each one you only need to provide either the model name as shown in the huggingface website, or a folder path that contains local model files.

In [None]:
ov_model_chat = OVModelForCausalLM.from_pretrained(model_name_chat, export=True, compile=False)
ov_model_sentiment = OVModelForSequenceClassification.from_pretrained(model_name_sentiment, export=True, compile=False)

You can explore the configuration details of each model:

In [None]:
print("Chat model:", ov_model_chat.config)
print("Sentiment model:",ov_model_sentiment.config)

In [None]:
from transformers import AutoTokenizer, AutoConfig #Helpers that make thing simpler for us.

ov_model_chat_tok = AutoTokenizer.from_pretrained(model_name_chat, trust_remote_code=False)
ov_model_sentiment_tok = AutoTokenizer.from_pretrained(model_name_sentiment, trust_remote_code=False)

These models can be run offline and they were converted to the OpenVINO intermediate format, so let's make sure we save them in a convenient location:

In [None]:
from pathlib import Path

path_model_chat = Path("model_chat")
path_model_sentiment = Path("model_sentiment")

ov_model_chat.save_pretrained(path_model_chat)
ov_model_chat_tok.save_pretrained(path_model_chat)

ov_model_sentiment.save_pretrained(path_model_sentiment)
ov_model_sentiment_tok.save_pretrained(path_model_sentiment)

You can now have a look at those two folders, each one will have three files representing the model (openvino_model.xml, openvino_model.bin, and config.json), and extra files for the AutoTokenizer.

We can now use these models. Let's start with the LLM. 

Note that we are now supplying the folder path of the model instead of the name. This means that we are now directly using the OpenVINO optimised model files we saved in the previous step.

After loading the model and the AutoTokenizer, we need to tokenize the input which the AutoTokenizer helper makes it simple to do. Then we can generate text by simply calling the generate method of the OpenVINO chat model and decoding it:

In [None]:
ov_config = {'PERFORMANCE_HINT': 'LATENCY', 'NUM_STREAMS': '1', "CACHE_DIR": ""}
ov_model_chat_tok = AutoTokenizer.from_pretrained(path_model_chat)
ov_model_chat = OVModelForCausalLM.from_pretrained(path_model_chat, device="CPU", ov_config=ov_config, config=AutoConfig.from_pretrained(path_model_chat))

tokenizer_kwargs = {}
test_string = "2 + 2 ="
input_tokens = ov_model_chat_tok(test_string, return_tensors="pt", **tokenizer_kwargs)
answer = ov_model_chat.generate(**input_tokens, max_new_tokens=2)
print(ov_model_chat_tok.batch_decode(answer)[0])

Now it is time to test the sentiment analysis model.

By using the helper functions we can do this very easily.

We will be using the pipeline to get text classification directly.

The output returns two values, either POSITIVE or NEGATIVE, and a score. I simply convert those into a single integer that contains the same information:

In [None]:
from transformers import pipeline

ov_sentiment_config = {'PERFORMANCE_HINT': 'LATENCY', 'NUM_STREAMS': '1', "CACHE_DIR": ""}
ov_model_sentiment_tok = AutoTokenizer.from_pretrained(path_model_sentiment, trust_remote_code=False)
ov_model_sentiment = OVModelForSequenceClassification.from_pretrained(path_model_sentiment, device="CPU", ov_sentiment_config=ov_sentiment_config, config=AutoConfig.from_pretrained(path_model_sentiment))
tokenizer_sentiment_kwargs = {}

pipe = pipeline("text-classification", model=ov_model_sentiment, tokenizer=ov_model_sentiment_tok)

def get_sentiment(text):    
    outputs = pipe(text)
    sentiment = outputs[0]["label"]
    sentiment_score = outputs[0]["score"]
    if(sentiment == "NEGATIVE"):
        sentiment_score *= -1
    return sentiment_score

In [None]:
get_sentiment("hello, this is great")

In [None]:
get_sentiment("I don't like this at all!")

Now we are ready to create a chat with sentiment analysis in it

In [None]:
from threading import Event, Thread
from uuid import uuid4
from typing import Optional, Union, Dict, Tuple, List


import gradio as gr
import torch
from transformers import (
    AutoTokenizer,
    StoppingCriteria,
    StoppingCriteriaList,
    TextIteratorStreamer,
)

#This is the expected format for the model
history_template = "\n<human>:{user}\n<bot>:{assistant}"
current_message_template = "\n<human>:{user}\n<bot>:{assistant}"
start_message = ""
stop_tokens = [29, 0]
tokenizer_kwargs = {}

def red_pijama_partial_text_processor(partial_text, new_text):
    if new_text == '<':
        return partial_text
    
    partial_text += new_text
    return partial_text.split('<bot>:')[-1]

max_new_tokens = 256

class StopOnTokens(StoppingCriteria):
    def __init__(self, token_ids):
        self.token_ids = token_ids
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        for stop_id in self.token_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

if stop_tokens is not None:
    if isinstance(stop_tokens[0], str):
        stop_tokens = tok.convert_tokens_to_ids(stop_tokens)
        
    stop_tokens = [StopOnTokens(stop_tokens)]

def default_partial_text_processor(partial_text:str, new_text:str):
    """
    helper for updating partially generated answer, used by de
    
    Params:
      partial_text: text buffer for storing previosly generated text
      new_text: text update for the current step
    Returns:
      updated text string
    
    """
    partial_text += new_text
    return partial_text

text_processor = red_pijama_partial_text_processor


def convert_history_to_text(history:List[Tuple[str, str]]):
    """
    function for conversion history stored as list pairs of user and assistant messages to string according to model expected conversation template
    Params:
      history: dialogue history
    Returns:
      history in text format
    """
    text = start_message + "".join(
        [
            "".join(
                [
                    history_template.format(user=item[0], assistant=item[1])
                ]
            )
            for item in history[:-1]
        ]
    )
    text += "".join(
        [
            "".join(
                [
                    current_message_template.format(user=history[-1][0], assistant=history[-1][1])
                ]
            )
        ]
    )
    return text



def user(message, history):
    """
    callback function for updating user messages in interface on submit button click
    
    Params:
      message: current message
      history: conversation history
    Returns:
      None
    """

    # Append the user's message to the conversation history
    return "", history + [[message, ""]]


def bot(history, temperature, top_p, top_k, repetition_penalty, conversation_id):
    """
    callback function for running chatbot on submit button click
    
    Params:
      history: conversation history
      temperature:  parameter for control the level of creativity in AI-generated text. 
                    By adjusting the `temperature`, you can influence the AI model's probability distribution, making the text more focused or diverse.
      top_p: parameter for control the range of tokens considered by the AI model based on their cumulative probability.
      top_k: parameter for control the range of tokens considered by the AI model based on their cumulative probability, selecting number of tokens with highest probability.
      repetition_penalty: parameter for penalizing tokens based on how frequently they occur in the text.
      conversation_id: unique conversation identifier.
    
    """
    user_message = history[-1][0]
    user_sentiment = get_sentiment(user_message)

    # Construct the input message string for the model by concatenating the current system message and conversation history
    messages = convert_history_to_text(history)

    
    # Tokenize the messages string
    input_ids = ov_model_chat_tok(messages, return_tensors="pt", **tokenizer_kwargs).input_ids
    if input_ids.shape[1] > 2000:
        history = [history[-1]]
        messages = convert_history_to_text(history)
        input_ids = ov_model_chat_tok(messages, return_tensors="pt", **tokenizer_kwargs).input_ids
    streamer = TextIteratorStreamer(ov_model_chat_tok, timeout=30.0, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        input_ids=input_ids,
        max_new_tokens=max_new_tokens,
        temperature=temperature,
        do_sample=temperature > 0.0,
        top_p=top_p,
        top_k=top_k,
        repetition_penalty=repetition_penalty,
        streamer=streamer,
    )
    if stop_tokens is not None:
        generate_kwargs["stopping_criteria"] = StoppingCriteriaList(stop_tokens)

    stream_complete = Event()

    def generate_and_signal_complete():
        """
        generation function for single thread
        """
        global start_time
        ov_model_chat.generate(**generate_kwargs)
        stream_complete.set()

    t1 = Thread(target=generate_and_signal_complete)
    t1.start()

    # Initialize an empty string to store the generated text
    partial_text = ""
    for new_text in streamer:
        partial_text = text_processor(partial_text, new_text)
        history[-1][1] = partial_text
        history[-1][1] = "**USER_SENTIMENT: " + str(user_sentiment) + "**\n" + history[-1][1]
        yield history
    

def get_uuid():
    """
    universal unique identifier for thread
    """
    return str(uuid4())


with gr.Blocks(
    theme=gr.themes.Soft(),
    css=".disclaimer {font-variant-caps: all-small-caps;}",
) as demo:
    conversation_id = gr.State(get_uuid)
    gr.Markdown(
        f"""<h1><center>OpenVINO Chatbot with Sentiment Analysis</center></h1>"""
    )
    chatbot = gr.Chatbot(height=800)
    with gr.Row():
        with gr.Column():
            msg = gr.Textbox(
                label="Chat Message Box",
                placeholder="Chat Message Box",
                show_label=False,
                container=False
            )
        with gr.Column():
            with gr.Row():
                submit = gr.Button("Submit")
                stop = gr.Button("Stop")
                clear = gr.Button("Clear")
    with gr.Row():
        with gr.Accordion("Advanced Options:", open=False):
            with gr.Row():
                with gr.Column():
                    with gr.Row():
                        temperature = gr.Slider(
                            label="Temperature",
                            value=0.1,
                            minimum=0.0,
                            maximum=1.0,
                            step=0.1,
                            interactive=True,
                            info="Higher values produce more diverse outputs",
                        )
                with gr.Column():
                    with gr.Row():
                        top_p = gr.Slider(
                            label="Top-p (nucleus sampling)",
                            value=1.0,
                            minimum=0.0,
                            maximum=1,
                            step=0.01,
                            interactive=True,
                            info=(
                                "Sample from the smallest possible set of tokens whose cumulative probability "
                                "exceeds top_p. Set to 1 to disable and sample from all tokens."
                            ),
                        )
                with gr.Column():
                    with gr.Row():
                        top_k = gr.Slider(
                            label="Top-k",
                            value=50,
                            minimum=0.0,
                            maximum=200,
                            step=1,
                            interactive=True,
                            info="Sample from a shortlist of top-k tokens — 0 to disable and sample from all tokens.",
                        )
                with gr.Column():
                    with gr.Row():
                        repetition_penalty = gr.Slider(
                            label="Repetition Penalty",
                            value=1.1,
                            minimum=1.0,
                            maximum=2.0,
                            step=0.1,
                            interactive=True,
                            info="Penalize repetition — 1.0 to disable.",
                        )
    submit_event = msg.submit(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,            
        ],
        outputs=chatbot,
        queue=True,
    )
    submit_click_event = submit.click(
        fn=user,
        inputs=[msg, chatbot],
        outputs=[msg, chatbot],
        queue=False,
    ).then(
        fn=bot,
        inputs=[
            chatbot,
            temperature,
            top_p,
            top_k,
            repetition_penalty,
            conversation_id,
        ],
        outputs=chatbot,
        queue=True,
    )
    stop.click(
        fn=None,
        inputs=None,
        outputs=None,
        cancels=[submit_event, submit_click_event],
        queue=False,
    )
    clear.click(lambda: None, None, chatbot, queue=False)

demo.queue(max_size=2)
# if you are launching remotely, specify server_name and server_port
#  demo.launch(server_name='your server name', server_port='server port in int')
# if you have any issue to launch on your platform, you can pass share=True to launch method:
# demo.launch(share=True)
# it creates a publicly shareable link for the interface. Read more in the docs: https://gradio.app/docs/
demo.launch()

In [None]:
# please run this cell for stopping gradio interface
#demo.close()