In [None]:
!pip install bitsandbytes
!pip install -q datasets loralib sentencepiece accelerate
!pip install -q git+https://github.com/zphang/transformers@c3dc391
!pip install -q git+https://github.com/huggingface/peft.git
!pip install gradio

In [None]:
# Please specify a BASE_MODEL, e.g. BASE_MODEL = 'decapoda-research/llama-7b-hf'
BASE_MODEL = ""

# Please specify a LORA_WEIGHTS, e.g. LORA_WEIGHTS = 'mchl-labs/stambecco-7b-plus'
LORA_WEIGHTS = ""

In [None]:
import torch
import transformers
import gradio as gr
from peft import PeftModel
from transformers import LLaMATokenizer, LLaMAForCausalLM, GenerationConfig

assert torch.cuda.is_available(), "Change the runtime type to GPU, disable this check if you are on 'mps'"
device = "cuda"

try:
    if torch.backends.mps.is_available():
        device = "mps"
except:
    pass

tokenizer = LLaMATokenizer.from_pretrained(BASE_MODEL)

if device == "mps":
    model = LLaMAForCausalLM.from_pretrained(
        BASE_MODEL,
        device_map={"": device},
        torch_dtype=torch.float16,
    )
    model = PeftModel.from_pretrained(
        model,
        LORA_WEIGHTS,
        device_map={"": device},
        torch_dtype=torch.float16,
    )
else:
    model = LLaMAForCausalLM.from_pretrained(
        BASE_MODEL,
        load_in_8bit=True,
        device_map="auto",
        torch_dtype=torch.float16,
    )

    model = PeftModel.from_pretrained(model, LORA_WEIGHTS, torch_dtype=torch.float16)

model.eval()

In [None]:
def generate_prompt(instruction, input=None):
    if input:
        return f"""Di seguito è riportata un'istruzione che descrive un task, insieme ad un input che fornisce un contesto più ampio. Scrivi una risposta che completi adeguatamente la richiesta.
### Istruzione:
{instruction}
### Input:
{input}
### Risposta:"""
    else:
        return f"""Di seguito è riportata un'istruzione che descrive un task. Scrivi una risposta che completi adeguatamente la richiesta.
### Istruzione:
{instruction}
### Risposta:"""

In [None]:
import gc
import traceback
from queue import Queue
from threading import Thread


class Stream(transformers.StoppingCriteria):
    def __init__(self, callback_func=None):
        self.callback_func = callback_func

    def __call__(self, input_ids, scores) -> bool:
        if self.callback_func is not None:
            self.callback_func(input_ids[0])
        return False


class Iteratorize:

    """
    Transforms a function that takes a callback
    into a lazy iterator (generator).
    """

    def __init__(self, func, kwargs={}, callback=None):
        self.mfunc = func
        self.c_callback = callback
        self.q = Queue()
        self.sentinel = object()
        self.kwargs = kwargs
        self.stop_now = False

        def _callback(val):
            if self.stop_now:
                raise ValueError
            self.q.put(val)

        def gentask():
            try:
                ret = self.mfunc(callback=_callback, **self.kwargs)
            except ValueError:
                pass
            except:
                traceback.print_exc()
                pass

            self.q.put(self.sentinel)
            if self.c_callback:
                self.c_callback(ret)

        self.thread = Thread(target=gentask)
        self.thread.start()

    def __iter__(self):
        return self

    def __next__(self):
        obj = self.q.get(True, None)
        if obj is self.sentinel:
            raise StopIteration
        else:
            return obj

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop_now = True

In [None]:
model.config.pad_token_id = tokenizer.pad_token_id = 0
model.config.bos_token_id = 1
model.config.eos_token_id = 2

def evaluate(
        instruction,
        input=None,
        temperature=0.1,
        top_p=0.75,
        top_k=40,
        num_beams=4,
        max_new_tokens=128,
        stream_output=False,
        **kwargs,
    ):
        prompt = generate_prompt(instruction, input)
        inputs = tokenizer(prompt, return_tensors="pt")
        input_ids = inputs["input_ids"].to(device)
        generation_config = GenerationConfig(
            temperature=temperature,
            top_p=top_p,
            top_k=top_k,
            num_beams=num_beams,
            **kwargs,
        )

        generate_params = {
            "input_ids": input_ids,
            "generation_config": generation_config,
            "return_dict_in_generate": True,
            "output_scores": True,
            "max_new_tokens": max_new_tokens,
        }

        if stream_output:
            # Stream the reply 1 token at a time.
            # This is based on the trick of using 'stopping_criteria' to create an iterator,
            # from https://github.com/oobabooga/text-generation-webui/blob/ad37f396fc8bcbab90e11ecf17c56c97bfbd4a9c/modules/text_generation.py#L216-L243.

            def generate_with_callback(callback=None, **kwargs):
                kwargs.setdefault(
                    "stopping_criteria", transformers.StoppingCriteriaList()
                )
                kwargs["stopping_criteria"].append(
                    Stream(callback_func=callback)
                )
                with torch.no_grad():
                    model.generate(**kwargs)

            def generate_with_streaming(**kwargs):
                return Iteratorize(
                    generate_with_callback, kwargs, callback=None
                )

            with generate_with_streaming(**generate_params) as generator:
                for output in generator:
                    decoded_output = tokenizer.decode(output)

                    if output[-1] in [tokenizer.eos_token_id]:
                        break

                    yield decoded_output.split("### Risposta:")[1].strip()
            return  # early return for stream_output

        # Without streaming
        with torch.no_grad():
            generation_output = model.generate(
                input_ids=input_ids,
                generation_config=generation_config,
                return_dict_in_generate=True,
                output_scores=True,
                max_new_tokens=max_new_tokens,
            )
        s = generation_output.sequences[0]
        output = tokenizer.decode(s)
        yield output.split("### Risposta:")[1].strip()

In [None]:
import warnings
warnings.filterwarnings("ignore")
g = gr.Interface(
    fn=evaluate,
    inputs=[
        gr.components.Textbox(
            lines=2, label="Instruction", placeholder="Parlami della fondazione di Roma"
        ),
        gr.components.Textbox(lines=2, label="Input", placeholder="Inserisci qui il tuo input"),
        gr.components.Slider(minimum=0, maximum=1, value=0.1, label="Temperature"),
        gr.components.Slider(minimum=0, maximum=1, value=0.75, label="Top p"),
        gr.components.Slider(minimum=0, maximum=100, step=1, value=40, label="Top k"),
        gr.components.Slider(minimum=1, maximum=4, step=1, value=4, label="Beams"),
        gr.components.Slider(
            minimum=1, maximum=512, step=1, value=128, label="Max tokens"
        ),
        gr.components.Checkbox(value=True, label="Stream output"),
    ],
    outputs=[
        gr.inputs.Textbox(
            lines=7,
            label="Output",
        )
    ],
    title="🦌 Stambecco 🇮🇹",
    description="Stambecco is a Italian Instruction-following model based on the LLaMA model. It is trained on an Italian version of the [GPT-4-LLM](https://github.com/Instruction-Tuning-with-GPT-4/GPT-4-LLM) dataset, a dataset of GPT-4 generated instruction-following data. For more information, please visit [the project's website](https://github.com/mchl-labs/stambecco).",
    )
g.queue().launch(server_name="0.0.0.0", share=True)